Types of parallelism¶
Embarrassingly Parallel¶
The simplest method of parallelisation is ‘embarissingly parallel’ - this is a type of parallelisation that requires no extra effort.
Consider a program that takes in hourly data, producing daily means. Each day’s result is totally independent and requires no knowledge of what happens on the previous or subsequent day. If our input file is split up, say into one file per month, we can process each file independently, say with a loop:
for input in tas_*.nc; do
output=tas_daily_${input#*_} # Changes tas_201001.nc to tas_daily_201001.nc
./hourly_to_daily.py $input $output
done
GNU Parallel¶
To parallelise this loop automatically you can use GNU parallel (module ‘parallel’ at NCI):
parallel --jobs 4 ./hourly_to_daily.py ::: tas_*.nc
This will run ./hourly_to_daily.py
on each of the files tas_*.nc
, with at most 4 jobs running in parallel.
There’s a couple things to note - first we can’t do the fancy output file naming trick, the script itself would need to handle that. Secondly parallel
can only use a single node if you’re using a supercomputer.
Looped Qsub¶
If you have access to a supercomputer you can also try submitting multiple jobs to its queue, using an environment variable to identify which file to process. Using Gadi’s qsub:
for input in tas_*.nc; do
qsub -v input hourly_to_daily.pbs
done
with the job script
#!/bin/bash
# hourly_to_daily.pbs
#PBS -l ncpus=1,walltime=0:10:00,mem=4gb,wd
set -eu
output=tas_daily_${input#*_} # Changes tas_201001.nc to tas_daily_201001.nc
./hourly_to_daily.py $input $output
You can also combine this with parallel
, say by submitting a year at a time
for year in {1990..2010}
qsub -v year hourly_to_daily_year.pbs
done
#!/bin/bash
# hourly_to_daily_year.pbs
#PBS -l ncpus=4,walltime=0:10:00,mem=16gb,wd
set -eu
module load parallel
parallel --jobs $PBS_NCPUS ./hourly_to_daily.py ::: tas_${year}*.nc
Vectorised operators¶
The next simplest parallelisation method to use is vectorisation. Modern computers have ‘vector instruction sets’ that can compute multiple values in an array at once.
Fortran / C¶
If you’re using Fortran or C, this is for the most part automatic (assuming you’re compiling with the -O2
or -O3
flags to enable optimisations). It can be helpful to set the -xHost
(intel) or -march=native
(gnu) flag too - this lets the compiler use the full vector capability available on the CPU, but you can’t then run the program on a different CPU type without recompiling (At NCI, Gadi has different CPU types available in different queues).
It is also possible to work with the vector instructions directly, using Intel MKL or compiler intrinsic functions. This should only be done by experienced programmers after detailled profiling of the code to ensure you’re working on a part of the program that’s actually slow.
Python¶
In Python the easiest way to use vectorisation is to use Numpy, or libraries that depend on it (scipy, pandas, xarray etc.). Numpy uses optimised functions when doing array operations.
Note
As much as possible avoid looping over elements of a numpy array, working on the whole array at once is much faster.
Distributed¶
The next step in parallelisation is to use more than one process. While vectorised operations work within a single process, distributed techniques can make use of an arbitrary number of processors, with each running its own program. The individual programs communicate with each other in some way to organise what part of the work each will do.
Fortran / C¶
The standard way to do distributed programming in Fortran and C is with the MPI (Message Passing Interface) library. This is the system that most supercomputer clusters are optimised to use. With MPI you use a special program mpiexec
to start up a number of copies of your program, then the copies can communicate with each other by passing messages between them.
In a climate model each processor might be running a square out of the global grid, with a ‘halo’ of grid cells at the edge being passed to neighboring squares over MPI.
program sendrecv
use mpi_f08
! Send an array of data from processor 1 to processor 0
real :: a(10)
integer :: rank, tag
call MPI_Init()
call MPI_Comm_rank(MPI_COMM_WORLD, rank)
tag = 0
if (rank == 0) then
a = 0
call MPI_Recv(a, size(a), MPI_REAL, 1, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE)
write(*,*) a
else if (rank == 1) then
a = 5
call MPI_Send(a, size(a), MPI_REAL, 0, tag, MPI_COMM_WORLD)
end if
call MPI_Finalize()
end program
Python¶
Python’s multiprocessing library is a built in way to use distributed processing. It starts up extra programs for you when you make a Pool
, although it can only use the processors on a single node. There are a few different ways to use it, I normally use it to run multiple loop instances in parallel, so rather than
def process_file(f):
pass # ...
for f in files:
process_file(f)
you would use
if __name__ == '__main__':
with multiprocessing.Pool(4) as pool:
it = pool.imap_unordered(process_file, files)
for i in it:
pass
Here 4 processors are used to run the function process_file()
on each element of the array files
, in an arbitrary order. There are a couple bits of boilerplate - if __name__ == '__main__':
is needed around the whole script to make sure that child processes start properly, and the loop at the end runs over the function results, making sure that all of the function calls have completed.
While multiprocessing
is good for loops, similar to how you’d run embarassingly parallel code, if you’re working with gridded data dask is a good way to parallelise your code. Dask provides replacements for numpy arrays and pandas dataframes which can be split into smaller ‘chunks’, with the chunks then able to be computed in parallel.
a = dask.array.random.random((10_000, 10_000), chunks=(1000, 1000))
mean = a.mean()
For the most part Dask arrays work exactly like their numpy counterparts.
Note
You can find more information about using Dask in a later chapter.