# Converting a function from 1D to ND - apply_along_axis

We have a function that works along the time axis, but only for 1D data. We'd like to convert it to work on a full dataset.

In [1]:
import xarray
import numpy as np
import scipy.integrate
import dask.array

## 1d function

Our initial function, that works only on one dimension, is:

In [2]:
def edof_1d(A):
    N = len(A)
    x = A - A.mean()
    c = np.correlate(x, x, 'full')
    c = c[N-1:]/(N-1-np.arange(0, N, 1))
    n = 0
    while (c[n] > 0) and (n < N/2):
        n = n+1
    T = scipy.integrate.trapz(c[:n])/c[0]
    edof = N/(2*T)
    return edof

## Nd function

In this case the function is calling numpy.correlate, which needs the whole time axis. The best way to parallelise a function is to re-write it to compute the whole grid at once, stepping over time. See the example {doc}`oned_to_nd_rewrite`

A more general way to convert the 1d function to work on a nd dataset is to use [`dask.array.apply_along_axis`](https://docs.dask.org/en/latest/array-api.html#dask.array.apply_along_axis) (a dask-array version of [`numpy.apply_along_axis`](https://numpy.org/doc/stable/reference/generated/numpy.apply_along_axis.html)). This will process the chunks in parallel, but will be a bit slower than the re-write method.

In [3]:
def edof(A):
    axis = 0 # Generally 'time' is axis zero, or use A.get_axis_num('time')
    edof = dask.array.apply_along_axis(edof_1d, axis, A)
    return np.nanmean(edof)

## Sample data

To test performance, I'll use ERA5 data of approximately the target size. Note the horizontal chunks, we're working along the time axis so horizontal chunking should be preferred

In [4]:
path = "/g/data/rt52/era5/single-levels/reanalysis/2t/2001/2t_era5_oper_sfc_200101*.nc"
ds = xarray.open_mfdataset(
    path, combine="nested", concat_dim="time", chunks={'latitude': 91, 'longitude': 180}
)

In [5]:
ds.t2m

Unnamed: 0,Array,Chunk
Bytes,3.09 GB,48.75 MB
Shape,"(744, 721, 1440)","(744, 91, 180)"
Count,65 Tasks,64 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 3.09 GB 48.75 MB Shape (744, 721, 1440) (744, 91, 180) Count 65 Tasks 64 Chunks Type float32 numpy.ndarray",1440  721  744,

Unnamed: 0,Array,Chunk
Bytes,3.09 GB,48.75 MB
Shape,"(744, 721, 1440)","(744, 91, 180)"
Count,65 Tasks,64 Chunks
Type,float32,numpy.ndarray


## Start a dask client

I'm running on Gadi, [`climtas.nci.GadiClient()`](https://climtas.readthedocs.io/en/latest/nci.html#climtas.nci.GadiClient) gets the available resources from PBS, or you can start up a client manually with [`dask.distributed.Client()`](https://distributed.dask.org/en/latest/quickstart.html#setup-dask-distributed-the-easy-way).

In [6]:
import climtas.nci
climtas.nci.GadiClient()

0,1
Client  Scheduler: tcp://127.0.0.1:45573  Dashboard: /proxy/8787/status,Cluster  Workers: 1  Cores: 1  Memory: 4.29 GB


## Running the function

Since we're using a Dask function the values aren't computed automatically. Instead we need to call `.compute()` on the result to get the value

In [7]:
edof(ds.t2m)

  c = c[N-1:]/(N-1-np.arange(0, N, 1))


Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,278 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Count 278 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,278 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [8]:
%%time

edof(ds.t2m).compute()

  c = c[N-1:]/(N-1-np.arange(0, N, 1))


CPU times: user 3.15 s, sys: 360 ms, total: 3.51 s
Wall time: 3min 59s


27.634806731994875