Batch Scripts

When you’re processing a large amount of data its a good idea to first test your analysis in an interactive job (e.g. using Jupyter) where you can refine your code quickly. Once you’ve got everything ready you can then move to a batch job and submit to the queue.

Batch jobs are submitted to the queue with a special program, Gadi uses qsub. The jobs are put into a queue alongside everyone else’s jobs on the supercomputer, and the jobs get selected from the queue and run when there are free cpus on the supercomputer based on a number of priority factors - job size, walltime request, amount of time your project has used already etc.

Job Scripts

A job script is a bash shell script. It lists the resources needed by the job, does any setup like loading modules, then runs your program:

#!/bin/bash
#PBS -l ncpus=4
#PBS -l mem=16gb
#PBS -l walltime=1:00:00
#PBS -l jobfs=100gb
#PBS -l storage=gdata/hh5
#PBS -l wd
#PBS -j oe
#PBS -W umask=0022

module use /g/data/hh5/public/modules
module load conda/analysis3

python ./myscript.py

You can run any type of program here, but you should make sure that there’s some sort of parallelisation enabled in it if you request more than one CPU. This could be from a model using MPI, in which case you’d run it with mpirun PROGRAM, or a Python script using either Dask or multiprocessing.

Note

The #PBS lines need to be at the start of the file, right after #!/bin/bash.

Dask Clients

To enable Dask to use the resources requested by the run script you should start a Dask client. The simple way to do this on Gadi is to use climtas.nci.GadiClient(). Note that when you’re using Dask (or multiprocessing) you need to put your entire script inside a if __name__ == '__main__' check:

import climtas.nci

if __name__ == '__main__':
    climtas.nci.GadiClient()
    
    # Rest of your script runs inside the 'if' check
    # ...

The climtas function is a shortcut to starting a Dask client manually with the resources available to the queued job:

import dask.distributed
import os

if __name__ == '__main__':
    dask.distributed.Client(
        n_workers = int(os.environ['PBS_NCPUS']),
        memory_limit = int(os.environ['PBS_VMEM']) / int(os.environ['PBS_NCPUS']),
        local_directory = os.path.join(os.environ['PBS_JOBFS'], 'dask-worker-space')
    )
    
    # Rest of your script runs inside the 'if' check
    # ...

It’s important to set the memory limit, as the queue system will automatically kill your job if it uses more memory than it has requested. If you don’t set the local directory then Dask will store temporary files in the current directory. For a large analysis these files can become quite large - putting them on the jobfs disk means they get cleaned up automatically when the job finishes.

Note

This way of starting a Dask cluster only works on a single compute node, so if you’re using Gadi’s normal queue you can’t use more than 48 CPUs. It’s possible to set Dask up to use more than one node, but check how well your problem scales with increasing numbers of CPUs first.

Note

If you’re using the jobfs disk on Gadi, make sure your run script has a #PBS -l jobfs resource request

Checking on batch jobs

When you submit a job with qsub you’ll get a job ID number from the queue system. This ID number can be used to stop the job running if needed, with qdel JOB_ID.

You can check on the status of your jobs with qstat. On Gadi, the script /g/data/hh5/public/apps/nci_scripts/uqstat shows extended information about a job, including how much of its CPU and memory request it is using.

                  project    user         name     queue state  ncpus        walltime su mem_pct cpu_pct           qtime
21672958.gadi-pbs     w35  saw562  jupyter-lab  normalbw     R      1 0 days 00:01:18  0     14%     35% 0 days 00:01:00

This is useful to check that your job is actually running in parallel - if the CPU percent is very low then it’s possible your job is running in serial mode

Example

Here’s a small example batch setup. You submit the job with qsub submit_mean.pbs, the batch script then runs the python script for you.

submit_mean.pbs

#!/bin/bash
#PBS -l ncpus=4
#PBS -l mem=16gb
#PBS -l walltime=1:00:00
#PBS -l jobfs=100gb
#PBS -l wd
#PBS -l storage=gdata/hh5+gdata/rt52
#PBS -W umask=0022
#PBS -j oe

module use /g/data3/hh5/public/modules
module load conda/analysis3-21.04

set -eu

python ./mean.py

mean.py

import xarray
import climtas.nci

# It's fine to define functions outside of the `if __name__ == '__main__'` statement
def calc_mean(path, variable):
    ds = xarray.open_mfdataset(path, combine='nested', concat_dim='time')
    return ds[variable].mean()

if __name__ == '__main__':
    c = climtas.nci.GadiClient()
    
    path = "/g/data/rt52/era5/single-levels/reanalysis/2t/2001/2t_era5_oper_sfc_*.nc"
    variable = 't2m'
    
    mean = calc_mean(path, variable)
    print(mean.load())