Batch Scripts¶
When you’re processing a large amount of data its a good idea to first test your analysis in an interactive job (e.g. using Jupyter) where you can refine your code quickly. Once you’ve got everything ready you can then move to a batch job and submit to the queue.
Batch jobs are submitted to the queue with a special program, Gadi uses qsub
. The jobs are put into a queue alongside everyone else’s jobs on the supercomputer, and the jobs get selected from the queue and run when there are free cpus on the supercomputer based on a number of priority factors - job size, walltime request, amount of time your project has used already etc.
Job Scripts¶
A job script is a bash shell script. It lists the resources needed by the job, does any setup like loading modules, then runs your program:
#!/bin/bash
#PBS -l ncpus=4
#PBS -l mem=16gb
#PBS -l walltime=1:00:00
#PBS -l jobfs=100gb
#PBS -l storage=gdata/hh5
#PBS -l wd
#PBS -j oe
#PBS -W umask=0022
module use /g/data/hh5/public/modules
module load conda/analysis3
python ./myscript.py
You can run any type of program here, but you should make sure that there’s some sort of parallelisation enabled in it if you request more than one CPU. This could be from a model using MPI, in which case you’d run it with mpirun PROGRAM
, or a Python script using either Dask or multiprocessing
.
Note
The #PBS
lines need to be at the start of the file, right after #!/bin/bash
.
Dask Clients¶
To enable Dask to use the resources requested by the run script you should start a Dask client. The simple way to do this on Gadi is to use climtas.nci.GadiClient()
. Note that when you’re using Dask (or multiprocessing
) you need to put your entire script inside a if __name__ == '__main__'
check:
import climtas.nci
if __name__ == '__main__':
climtas.nci.GadiClient()
# Rest of your script runs inside the 'if' check
# ...
The climtas function is a shortcut to starting a Dask client manually with the resources available to the queued job:
import dask.distributed
import os
if __name__ == '__main__':
dask.distributed.Client(
n_workers = int(os.environ['PBS_NCPUS']),
memory_limit = int(os.environ['PBS_VMEM']) / int(os.environ['PBS_NCPUS']),
local_directory = os.path.join(os.environ['PBS_JOBFS'], 'dask-worker-space')
)
# Rest of your script runs inside the 'if' check
# ...
It’s important to set the memory limit, as the queue system will automatically kill your job if it uses more memory than it has requested. If you don’t set the local directory then Dask will store temporary files in the current directory. For a large analysis these files can become quite large - putting them on the jobfs disk means they get cleaned up automatically when the job finishes.
Note
This way of starting a Dask cluster only works on a single compute node, so if you’re using Gadi’s normal queue you can’t use more than 48 CPUs. It’s possible to set Dask up to use more than one node, but check how well your problem scales with increasing numbers of CPUs first.
Note
If you’re using the jobfs disk on Gadi, make sure your run script has a #PBS -l jobfs
resource request
Checking on batch jobs¶
When you submit a job with qsub
you’ll get a job ID number from the queue system. This ID number can be used to stop the job running if needed, with qdel JOB_ID
.
You can check on the status of your jobs with qstat
. On Gadi, the script /g/data/hh5/public/apps/nci_scripts/uqstat
shows extended information about a job, including how much of its CPU and memory request it is using.
project user name queue state ncpus walltime su mem_pct cpu_pct qtime
21672958.gadi-pbs w35 saw562 jupyter-lab normalbw R 1 0 days 00:01:18 0 14% 35% 0 days 00:01:00
This is useful to check that your job is actually running in parallel - if the CPU percent is very low then it’s possible your job is running in serial mode
Example¶
Here’s a small example batch setup. You submit the job with qsub submit_mean.pbs
, the batch script then runs the python script for you.
submit_mean.pbs
#!/bin/bash
#PBS -l ncpus=4
#PBS -l mem=16gb
#PBS -l walltime=1:00:00
#PBS -l jobfs=100gb
#PBS -l wd
#PBS -l storage=gdata/hh5+gdata/rt52
#PBS -W umask=0022
#PBS -j oe
module use /g/data3/hh5/public/modules
module load conda/analysis3-21.04
set -eu
python ./mean.py
mean.py
import xarray
import climtas.nci
# It's fine to define functions outside of the `if __name__ == '__main__'` statement
def calc_mean(path, variable):
ds = xarray.open_mfdataset(path, combine='nested', concat_dim='time')
return ds[variable].mean()
if __name__ == '__main__':
c = climtas.nci.GadiClient()
path = "/g/data/rt52/era5/single-levels/reanalysis/2t/2001/2t_era5_oper_sfc_*.nc"
variable = 't2m'
mean = calc_mean(path, variable)
print(mean.load())