Introduction¶
The dask library provides parallel versions of many operations available in numpy and pandas. It does this by breaking up an array into chunks.
The full documentation for the Dask library is available at https://docs.dask.org/en/latest/
import dask.array
dask.array.ones((10,10), chunks=(5,5))
|
Here I’ve made a 10x10 array, that’s made up of 4 5x5 chunks.
Dask will run operations on different chunks in parallel. It evaluates arrays lazily - data is only computed for the chunks needed, if you don’t explicitly load the data (e.g. by using .compute()
, or plotting or saving the data) it builds up a graph of operations that can be run later when the actual values are needed.
data = dask.array.ones((10,10), chunks=(5,5))
(data + 5).visualize()
(data + 5)[4,6].compute()
6.0
Since I’ve only asked for a single element of the array, only that one chunk needs to be calculated. This is less important for this tiny example, but it does get important when you’ve got a huge dataset that you only need a subset of.
Creating Dask arrays manually¶
Something to keep in mind with Dask arrays is that you can’t directly write to an array element
try:
data[4, 6] = 3
except Exception as e:
print('ERROR', e)
ERROR Item assignment with <class 'tuple'> not supported
This keeps us from trying to do calculations with loops, which Dask can’t parallelise and are really inefficient in Python. Instead you should use whole-array operations.
Say we want to calculate the area of grid cells, knowing the latitude and longitude of grid point centres.
For each grid point we need to calculate
Rather than doing this in a loop over each grid point, we can use numpy array operations, computing the width and height of each cell and then performing an outer vector product to create a 2d array
import numpy
from numpy import deg2rad
# Grid centres
lon = dask.array.linspace(0, 360, num=500, endpoint=False, chunks=100)
lat = dask.array.linspace(-90, 90, num=300, endpoint=True, chunks=100)
# Grid spacing
dlon = (lon[1] - lon[0]).compute()
dlat = (lat[1] - lat[0]).compute()
# Grid edges in radians
lon0 = deg2rad(lon - dlon/2)
lon1 = deg2rad(lon + dlon/2)
lat0 = deg2rad((lat - dlat/2).clip(-90, 90))
lat1 = deg2rad((lat + dlat/2).clip(-90, 90))
# Compute cell dimensions
cell_width = lon1 - lon0
cell_height = numpy.sin(lat1) - numpy.sin(lat0)
# Area
R_earth = 6_371_000
A = R_earth**2 * numpy.outer(cell_height, cell_width)
A
|
You can see that even though only numpy operations were used we still ended up with a Dask array at the end. An array much larger than would fit in memory can be created this way, with the values only needing to be computed if they’re actually used.
Plotting Dask arrays works just like numpy arrays:
import matplotlib.pyplot as plt
plt.pcolormesh(A)
plt.colorbar()
<matplotlib.colorbar.Colorbar at 0x7fdac0880c10>
And we can check the total area is about \(5.1 \times 10^{14}\)
'%e'%(A.sum().compute())
'5.100645e+14'
Creating Dask arrays from NetCDF files¶
The most common way of creating a Dask array is to read them from a netcdf file with Xarray. You can give open_dataset()
and open_mfdataset()
a chunks
parameter, which is how large chunks should be in each dimension of the file.
If you use open_mfdataset()
, by default each input file will be its own chunk.
import xarray
path = 'https://dapds00.nci.org.au/thredds/dodsC/fs38/publications/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Amon/tas/gn/v20191108/tas_Amon_ACCESS-CM2_historical_r1i1p1f1_gn_185001-201412.nc'
ds = xarray.open_dataset(path, chunks={'time': 1})
ds.tas
<xarray.DataArray 'tas' (time: 1980, lat: 144, lon: 192)> dask.array<open_dataset-0e410aadf9156f5cf0b6cd8c2849d202tas, shape=(1980, 144, 192), dtype=float32, chunksize=(1, 144, 192), chunktype=numpy.ndarray> Coordinates: * time (time) datetime64[ns] 1850-01-16T12:00:00 ... 2014-12-16T12:00:00 * lat (lat) float64 -89.38 -88.12 -86.88 -85.62 ... 86.88 88.12 89.38 * lon (lon) float64 0.9375 2.812 4.688 6.562 ... 353.4 355.3 357.2 359.1 height float64 ... Attributes: standard_name: air_temperature long_name: Near-Surface Air Temperature comment: near-surface (usually, 2 meter) air temperature units: K cell_methods: area: time: mean cell_measures: area: areacella history: 2019-11-08T06:41:45Z altered by CMOR: Treated scalar dime... _ChunkSizes: [ 1 144 192]
- time: 1980
- lat: 144
- lon: 192
- dask.array<chunksize=(1, 144, 192), meta=np.ndarray>
Array Chunk Bytes 218.97 MB 110.59 kB Shape (1980, 144, 192) (1, 144, 192) Count 1981 Tasks 1980 Chunks Type float32 numpy.ndarray - time(time)datetime64[ns]1850-01-16T12:00:00 ... 2014-12-...
- bounds :
- time_bnds
- axis :
- T
- long_name :
- time
- standard_name :
- time
- _ChunkSizes :
- 1
array(['1850-01-16T12:00:00.000000000', '1850-02-15T00:00:00.000000000', '1850-03-16T12:00:00.000000000', ..., '2014-10-16T12:00:00.000000000', '2014-11-16T00:00:00.000000000', '2014-12-16T12:00:00.000000000'], dtype='datetime64[ns]')
- lat(lat)float64-89.38 -88.12 ... 88.12 89.38
- bounds :
- lat_bnds
- units :
- degrees_north
- axis :
- Y
- long_name :
- Latitude
- standard_name :
- latitude
array([-89.375, -88.125, -86.875, -85.625, -84.375, -83.125, -81.875, -80.625, -79.375, -78.125, -76.875, -75.625, -74.375, -73.125, -71.875, -70.625, -69.375, -68.125, -66.875, -65.625, -64.375, -63.125, -61.875, -60.625, -59.375, -58.125, -56.875, -55.625, -54.375, -53.125, -51.875, -50.625, -49.375, -48.125, -46.875, -45.625, -44.375, -43.125, -41.875, -40.625, -39.375, -38.125, -36.875, -35.625, -34.375, -33.125, -31.875, -30.625, -29.375, -28.125, -26.875, -25.625, -24.375, -23.125, -21.875, -20.625, -19.375, -18.125, -16.875, -15.625, -14.375, -13.125, -11.875, -10.625, -9.375, -8.125, -6.875, -5.625, -4.375, -3.125, -1.875, -0.625, 0.625, 1.875, 3.125, 4.375, 5.625, 6.875, 8.125, 9.375, 10.625, 11.875, 13.125, 14.375, 15.625, 16.875, 18.125, 19.375, 20.625, 21.875, 23.125, 24.375, 25.625, 26.875, 28.125, 29.375, 30.625, 31.875, 33.125, 34.375, 35.625, 36.875, 38.125, 39.375, 40.625, 41.875, 43.125, 44.375, 45.625, 46.875, 48.125, 49.375, 50.625, 51.875, 53.125, 54.375, 55.625, 56.875, 58.125, 59.375, 60.625, 61.875, 63.125, 64.375, 65.625, 66.875, 68.125, 69.375, 70.625, 71.875, 73.125, 74.375, 75.625, 76.875, 78.125, 79.375, 80.625, 81.875, 83.125, 84.375, 85.625, 86.875, 88.125, 89.375])
- lon(lon)float640.9375 2.812 4.688 ... 357.2 359.1
- bounds :
- lon_bnds
- units :
- degrees_east
- axis :
- X
- long_name :
- Longitude
- standard_name :
- longitude
array([ 0.9375, 2.8125, 4.6875, 6.5625, 8.4375, 10.3125, 12.1875, 14.0625, 15.9375, 17.8125, 19.6875, 21.5625, 23.4375, 25.3125, 27.1875, 29.0625, 30.9375, 32.8125, 34.6875, 36.5625, 38.4375, 40.3125, 42.1875, 44.0625, 45.9375, 47.8125, 49.6875, 51.5625, 53.4375, 55.3125, 57.1875, 59.0625, 60.9375, 62.8125, 64.6875, 66.5625, 68.4375, 70.3125, 72.1875, 74.0625, 75.9375, 77.8125, 79.6875, 81.5625, 83.4375, 85.3125, 87.1875, 89.0625, 90.9375, 92.8125, 94.6875, 96.5625, 98.4375, 100.3125, 102.1875, 104.0625, 105.9375, 107.8125, 109.6875, 111.5625, 113.4375, 115.3125, 117.1875, 119.0625, 120.9375, 122.8125, 124.6875, 126.5625, 128.4375, 130.3125, 132.1875, 134.0625, 135.9375, 137.8125, 139.6875, 141.5625, 143.4375, 145.3125, 147.1875, 149.0625, 150.9375, 152.8125, 154.6875, 156.5625, 158.4375, 160.3125, 162.1875, 164.0625, 165.9375, 167.8125, 169.6875, 171.5625, 173.4375, 175.3125, 177.1875, 179.0625, 180.9375, 182.8125, 184.6875, 186.5625, 188.4375, 190.3125, 192.1875, 194.0625, 195.9375, 197.8125, 199.6875, 201.5625, 203.4375, 205.3125, 207.1875, 209.0625, 210.9375, 212.8125, 214.6875, 216.5625, 218.4375, 220.3125, 222.1875, 224.0625, 225.9375, 227.8125, 229.6875, 231.5625, 233.4375, 235.3125, 237.1875, 239.0625, 240.9375, 242.8125, 244.6875, 246.5625, 248.4375, 250.3125, 252.1875, 254.0625, 255.9375, 257.8125, 259.6875, 261.5625, 263.4375, 265.3125, 267.1875, 269.0625, 270.9375, 272.8125, 274.6875, 276.5625, 278.4375, 280.3125, 282.1875, 284.0625, 285.9375, 287.8125, 289.6875, 291.5625, 293.4375, 295.3125, 297.1875, 299.0625, 300.9375, 302.8125, 304.6875, 306.5625, 308.4375, 310.3125, 312.1875, 314.0625, 315.9375, 317.8125, 319.6875, 321.5625, 323.4375, 325.3125, 327.1875, 329.0625, 330.9375, 332.8125, 334.6875, 336.5625, 338.4375, 340.3125, 342.1875, 344.0625, 345.9375, 347.8125, 349.6875, 351.5625, 353.4375, 355.3125, 357.1875, 359.0625])
- height()float64...
- units :
- m
- axis :
- Z
- positive :
- up
- long_name :
- height
- standard_name :
- height
array(2.)
- standard_name :
- air_temperature
- long_name :
- Near-Surface Air Temperature
- comment :
- near-surface (usually, 2 meter) air temperature
- units :
- K
- cell_methods :
- area: time: mean
- cell_measures :
- area: areacella
- history :
- 2019-11-08T06:41:45Z altered by CMOR: Treated scalar dimension: 'height'. 2019-11-08T06:41:45Z altered by CMOR: replaced missing value flag (-1.07374e+09) with standard missing value (1e+20).
- _ChunkSizes :
- [ 1 144 192]
There are a few ways to turn a Dask xarray.DataArray back into a numpy array. .load()
will compute the dask data and retain metadata, .values
will compute the dask data and return a numpy array, and .data
will return the dask array itself.
ds.tas[0,50,99].load()
<xarray.DataArray 'tas' ()> array(295.87695312) Coordinates: time datetime64[ns] 1850-01-16T12:00:00 lat float64 -26.88 lon float64 186.6 height float64 2.0 Attributes: standard_name: air_temperature long_name: Near-Surface Air Temperature comment: near-surface (usually, 2 meter) air temperature units: K cell_methods: area: time: mean cell_measures: area: areacella history: 2019-11-08T06:41:45Z altered by CMOR: Treated scalar dime... _ChunkSizes: [ 1 144 192]
- 295.9
array(295.87695312)
- time()datetime64[ns]1850-01-16T12:00:00
- bounds :
- time_bnds
- axis :
- T
- long_name :
- time
- standard_name :
- time
- _ChunkSizes :
- 1
array('1850-01-16T12:00:00.000000000', dtype='datetime64[ns]')
- lat()float64-26.88
- bounds :
- lat_bnds
- units :
- degrees_north
- axis :
- Y
- long_name :
- Latitude
- standard_name :
- latitude
array(-26.875)
- lon()float64186.6
- bounds :
- lon_bnds
- units :
- degrees_east
- axis :
- X
- long_name :
- Longitude
- standard_name :
- longitude
array(186.5625)
- height()float642.0
- units :
- m
- axis :
- Z
- positive :
- up
- long_name :
- height
- standard_name :
- height
array(2.)
- standard_name :
- air_temperature
- long_name :
- Near-Surface Air Temperature
- comment :
- near-surface (usually, 2 meter) air temperature
- units :
- K
- cell_methods :
- area: time: mean
- cell_measures :
- area: areacella
- history :
- 2019-11-08T06:41:45Z altered by CMOR: Treated scalar dimension: 'height'. 2019-11-08T06:41:45Z altered by CMOR: replaced missing value flag (-1.07374e+09) with standard missing value (1e+20).
- _ChunkSizes :
- [ 1 144 192]
ds.tas[0,50,99].values
array(295.87695312)
ds.tas.data
|
Distributed Dask¶
Without any special setup, Dask will run operations in threaded mode. You can configure it to run in distributed mode instead with
import dask.distributed
import tempfile
try:
client
except NameError:
dask_worker_dir = tempfile.TemporaryDirectory()
client = dask.distributed.Client(
local_directory = dask_worker_dir.name,
)
client
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
Client
|
Cluster
|
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
This will by default ask for all resources available on your computer.
Hint
The try: ... except NameError:
structure is to make sure only on Dask client is created, in case you execute the notebook cell more than once. If you’re writing a python script rather than using a notebook it’s not needed.
Warning
It’s important to set the local_directory
parameter, otherwise Dask will store temporary files in the current working directory which can be a problem if filesystem quotas are enabled.
Other useful options are:
n_workers
: Number of distributed processesthreads_per_worker
: Number of shared memory threads within each processmemory_limit
: Memory available to each process (e.g.'4gb'
)
Warning
If you’re using a shared system, be polite and don’t take over the whole system with your Dask cluster, set reasonable limits. If running on NCI’s Gadi supercomputer, climtas.nci.GadiClient()
will inspect the PBS resources requested by qsub
and set up the cluster using those limits
You can follow the dashboard link displayed by Jupyter to get an interactive view of what the Dask cluster is doing.
To stop the Dask cluster run
client.close()
This isn’t normally needed, Dask will clean itself up at the end of your script automatically, but it can be helpful if you’re experimenting with different cluster sizes.