Pathological Behaviour and Troubleshooting

Some Dask operations can show really poor performance. Here are some workarounds that we have found

Note

This behaviour may not apply to all versions of Dask, it’s mostly to highlight areas where we’ve seen problems

Indexing using an array

Using an array of indices to index a Dask array can make Dask load the entire thing

# Return only the values at 0000Z
indices = data.time.dt.hour ==  0

sample = data[indices, :, :]

Instead try .where(..., drop=True)

sample = data.where(data.time.dt.hour == 0, drop=True)

Or alternatively, make sure the index array has the same chunking as the source data

indices = numpy.isifinite(data) # Computed from 'data', so has the same chunks

sample = data[indices]

Climatologies & Resampling

The default Xarray climatology and resample operations create a large number of chunks - one for each sample period.

Say we want to resample hourly ERA5 data to daily

import xarray

msl = xarray.open_mfdataset('/g/data/rt52/era5/single-levels/reanalysis/msl/2010/*.nc', chunks={'latitude': 300, 'longitude': 300}).msl
msl.data
Array Chunk
Bytes 36.38 GB 267.84 MB
Shape (8760, 721, 1440) (744, 300, 300)
Count 372 Tasks 180 Chunks
Type float32 numpy.ndarray
1440 721 8760

The source data has a reasonable chunk size - a month in time, and 300x300 horizontally, with a total of 180 chunks in the whole dataset.

Using the default Xarray resample increases the number of chunks dramatically

msl_daily = msl.resample(time='1D').mean()
msl_daily.data
Array Chunk
Bytes 1.52 GB 360.00 kB
Shape (365, 721, 1440) (1, 300, 300)
Count 22272 Tasks 5475 Chunks
Type float32 numpy.ndarray
1440 721 365

The number of chunks has increased by 30x, and the task graph size by 60x. Rather than one chunk per month, now the time chunking is once per day.

This increase in chunk count is neccessary if your data’s time axis is irregularly spaced, but if you have regular data it’s possible to optimise the resampling using array reshaping operations without changing the number of chunks.

The climtas library has implementations of resample and groupby that implement these optimisations, keeping the number of chunks the same and not significantly increasing the task graph size.

import climtas

msl_daily = climtas.blocked_resample(msl, time=24).mean()
msl_daily.data
Array Chunk
Bytes 1.52 GB 11.16 MB
Shape (365, 721, 1440) (31, 300, 300)
Count 552 Tasks 180 Chunks
Type float32 numpy.ndarray
1440 721 365

Lack of Backpressure

The Dask task graph doesn’t currently keep track of how much memory an operation will use. This means that Dask can try to start processing lots of chunks, but then as it goes down the graph it stalls as there isn’t enough memory available to perform the next set of operations.

The ideal way to solve this is to use a concept called ‘back pressure’ - the rate that new chunks start being processed is limited to make sure that there’s enough memory available to finish processing them. Dask doesn’t currently support this, as a work around the Climtas library has a function that will save a Dataset to file a few chunks at a time manually rate limiting the number of active chunks being processed.

climtas.io.to_netcdf_throttled(msl_daily, 'era5_msl_daily.nc')

Kernel Dying / Out of Memory

The Jupyter kernel dying can be identified either as an explicit error message that pops up, or when you try to execute a cell and the number goes back to ‘[1]’

The most likely cause of the kernel dying is running out of memory. This can either be from Dask loading too much, keeping a lot of arrays around, or from external libraries like netCDF needing extra memory.

To see if Dask is loading too much memory, check the memory graph in the [Distributed Dashboard](distributed dashboard), and make sure your cluster’s memory limit matches the resources you have available. Dask will restart a worker if its memory goes over 90% or so of its limit.

If you use Numpy arrays to hold intermediate data, then it’s possible for the size of these arrays to add up to quite a lot, especially if they’re multi-dimensional. You can get the size of an array in bytes with .nbytes:

import numpy

x = numpy.zeros((100,100,20))
x.nbytes / 1024**2
1.52587890625

or by converting to a dask array and checking the dask printout:

import dask.array

dask.array.from_array(x)
Array Chunk
Bytes 1.60 MB 1.60 MB
Shape (100, 100, 20) (100, 100, 20)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
20 100 100

If you’re creating intermediate arrays in a loop then it can be helpful to create a function for the loop body, as that will ensure that the temporary arrays are cleaned up.

def process_var(var):
    da = xarray.open_dataarray(f'{var}.nc')
    temp = numpy.random.random(ds.shape)
    (da + temp).to_netcdf(f'{var}_rand.nc')
    # 'da' and 'temp' get cleaned up when the function exits

for var in ['t', 'u', 'v']:
    process_var(var)

Another issue we have seen is from the NetCDF library not having enough memory available to decompress the data it reads from a file. NetCDF itself needs some memory available in order to open a file, so you need to leave some space for that in your Dask allocation. This needn’t be a massive amount, a hundred megabytes or so should be fine.