Pathological Behaviour and Troubleshooting¶
Some Dask operations can show really poor performance. Here are some workarounds that we have found
Note
This behaviour may not apply to all versions of Dask, it’s mostly to highlight areas where we’ve seen problems
Indexing using an array¶
Using an array of indices to index a Dask array can make Dask load the entire thing
# Return only the values at 0000Z
indices = data.time.dt.hour == 0
sample = data[indices, :, :]
Instead try .where(..., drop=True)
sample = data.where(data.time.dt.hour == 0, drop=True)
Or alternatively, make sure the index array has the same chunking as the source data
indices = numpy.isifinite(data) # Computed from 'data', so has the same chunks
sample = data[indices]
Climatologies & Resampling¶
The default Xarray climatology and resample operations create a large number of chunks - one for each sample period.
Say we want to resample hourly ERA5 data to daily
import xarray
msl = xarray.open_mfdataset('/g/data/rt52/era5/single-levels/reanalysis/msl/2010/*.nc', chunks={'latitude': 300, 'longitude': 300}).msl
msl.data
|
The source data has a reasonable chunk size - a month in time, and 300x300 horizontally, with a total of 180 chunks in the whole dataset.
Using the default Xarray resample increases the number of chunks dramatically
msl_daily = msl.resample(time='1D').mean()
msl_daily.data
|
The number of chunks has increased by 30x, and the task graph size by 60x. Rather than one chunk per month, now the time chunking is once per day.
This increase in chunk count is neccessary if your data’s time axis is irregularly spaced, but if you have regular data it’s possible to optimise the resampling using array reshaping operations without changing the number of chunks.
The climtas library has implementations of resample and groupby that implement these optimisations, keeping the number of chunks the same and not significantly increasing the task graph size.
import climtas
msl_daily = climtas.blocked_resample(msl, time=24).mean()
msl_daily.data
|
Lack of Backpressure¶
The Dask task graph doesn’t currently keep track of how much memory an operation will use. This means that Dask can try to start processing lots of chunks, but then as it goes down the graph it stalls as there isn’t enough memory available to perform the next set of operations.
The ideal way to solve this is to use a concept called ‘back pressure’ - the rate that new chunks start being processed is limited to make sure that there’s enough memory available to finish processing them. Dask doesn’t currently support this, as a work around the Climtas library has a function that will save a Dataset to file a few chunks at a time manually rate limiting the number of active chunks being processed.
climtas.io.to_netcdf_throttled(msl_daily, 'era5_msl_daily.nc')
Kernel Dying / Out of Memory¶
The Jupyter kernel dying can be identified either as an explicit error message that pops up, or when you try to execute a cell and the number goes back to ‘[1]’
The most likely cause of the kernel dying is running out of memory. This can either be from Dask loading too much, keeping a lot of arrays around, or from external libraries like netCDF needing extra memory.
To see if Dask is loading too much memory, check the memory graph in the [Distributed Dashboard](distributed dashboard), and make sure your cluster’s memory limit matches the resources you have available. Dask will restart a worker if its memory goes over 90% or so of its limit.
If you use Numpy arrays to hold intermediate data, then it’s possible for the size of these arrays to add up to quite a lot, especially if they’re multi-dimensional. You can get the size of an array in bytes with .nbytes
:
import numpy
x = numpy.zeros((100,100,20))
x.nbytes / 1024**2
1.52587890625
or by converting to a dask array and checking the dask printout:
import dask.array
dask.array.from_array(x)
|
If you’re creating intermediate arrays in a loop then it can be helpful to create a function for the loop body, as that will ensure that the temporary arrays are cleaned up.
def process_var(var):
da = xarray.open_dataarray(f'{var}.nc')
temp = numpy.random.random(ds.shape)
(da + temp).to_netcdf(f'{var}_rand.nc')
# 'da' and 'temp' get cleaned up when the function exits
for var in ['t', 'u', 'v']:
process_var(var)
Another issue we have seen is from the NetCDF library not having enough memory available to decompress the data it reads from a file. NetCDF itself needs some memory available in order to open a file, so you need to leave some space for that in your Dask allocation. This needn’t be a massive amount, a hundred megabytes or so should be fine.