Types of parallelism

Embarrassingly Parallel

The simplest method of parallelisation is ‘embarissingly parallel’ - this is a type of parallelisation that requires no extra effort.

Consider a program that takes in hourly data, producing daily means. Each day’s result is totally independent and requires no knowledge of what happens on the previous or subsequent day. If our input file is split up, say into one file per month, we can process each file independently, say with a loop:

for input in tas_*.nc; do
    output=tas_daily_${input#*_} # Changes tas_201001.nc to tas_daily_201001.nc
    
    ./hourly_to_daily.py $input $output
done

GNU Parallel

To parallelise this loop automatically you can use GNU parallel (module ‘parallel’ at NCI):

parallel --jobs 4 ./hourly_to_daily.py ::: tas_*.nc

This will run ./hourly_to_daily.py on each of the files tas_*.nc, with at most 4 jobs running in parallel.

There’s a couple things to note - first we can’t do the fancy output file naming trick, the script itself would need to handle that. Secondly parallel can only use a single node if you’re using a supercomputer.

Looped Qsub

If you have access to a supercomputer you can also try submitting multiple jobs to its queue, using an environment variable to identify which file to process. Using Gadi’s qsub:

for input in tas_*.nc; do
    qsub -v input hourly_to_daily.pbs
done

with the job script

#!/bin/bash
# hourly_to_daily.pbs
#PBS -l ncpus=1,walltime=0:10:00,mem=4gb,wd

set -eu
output=tas_daily_${input#*_} # Changes tas_201001.nc to tas_daily_201001.nc

./hourly_to_daily.py $input $output

You can also combine this with parallel, say by submitting a year at a time

for year in {1990..2010}
    qsub -v year hourly_to_daily_year.pbs
done
#!/bin/bash
# hourly_to_daily_year.pbs
#PBS -l ncpus=4,walltime=0:10:00,mem=16gb,wd

set -eu
module load parallel

parallel --jobs $PBS_NCPUS ./hourly_to_daily.py ::: tas_${year}*.nc

Vectorised operators

The next simplest parallelisation method to use is vectorisation. Modern computers have ‘vector instruction sets’ that can compute multiple values in an array at once.

Fortran / C

If you’re using Fortran or C, this is for the most part automatic (assuming you’re compiling with the -O2 or -O3 flags to enable optimisations). It can be helpful to set the -xHost (intel) or -march=native (gnu) flag too - this lets the compiler use the full vector capability available on the CPU, but you can’t then run the program on a different CPU type without recompiling (At NCI, Gadi has different CPU types available in different queues).

It is also possible to work with the vector instructions directly, using Intel MKL or compiler intrinsic functions. This should only be done by experienced programmers after detailled profiling of the code to ensure you’re working on a part of the program that’s actually slow.

Python

In Python the easiest way to use vectorisation is to use Numpy, or libraries that depend on it (scipy, pandas, xarray etc.). Numpy uses optimised functions when doing array operations.

Note

As much as possible avoid looping over elements of a numpy array, working on the whole array at once is much faster.

Distributed

The next step in parallelisation is to use more than one process. While vectorised operations work within a single process, distributed techniques can make use of an arbitrary number of processors, with each running its own program. The individual programs communicate with each other in some way to organise what part of the work each will do.

Fortran / C

The standard way to do distributed programming in Fortran and C is with the MPI (Message Passing Interface) library. This is the system that most supercomputer clusters are optimised to use. With MPI you use a special program mpiexec to start up a number of copies of your program, then the copies can communicate with each other by passing messages between them.

In a climate model each processor might be running a square out of the global grid, with a ‘halo’ of grid cells at the edge being passed to neighboring squares over MPI.

program sendrecv
    use mpi_f08
    ! Send an array of data from processor 1 to processor 0

    real :: a(10)
    integer :: rank, tag
    
    call MPI_Init()
    call MPI_Comm_rank(MPI_COMM_WORLD, rank)
    tag = 0
    
    if (rank == 0) then
        a = 0
        call MPI_Recv(a, size(a), MPI_REAL, 1, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE)
        write(*,*) a
    else if (rank == 1) then
        a = 5
        call MPI_Send(a, size(a), MPI_REAL, 0, tag, MPI_COMM_WORLD)
    end if
    
    call MPI_Finalize()
end program

Python

Python’s multiprocessing library is a built in way to use distributed processing. It starts up extra programs for you when you make a Pool, although it can only use the processors on a single node. There are a few different ways to use it, I normally use it to run multiple loop instances in parallel, so rather than

def process_file(f):
    pass # ...

for f in files:
    process_file(f)

you would use

if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:
        it = pool.imap_unordered(process_file, files)
        for i in it:
            pass

Here 4 processors are used to run the function process_file() on each element of the array files, in an arbitrary order. There are a couple bits of boilerplate - if __name__ == '__main__': is needed around the whole script to make sure that child processes start properly, and the loop at the end runs over the function results, making sure that all of the function calls have completed.

While multiprocessing is good for loops, similar to how you’d run embarassingly parallel code, if you’re working with gridded data dask is a good way to parallelise your code. Dask provides replacements for numpy arrays and pandas dataframes which can be split into smaller ‘chunks’, with the chunks then able to be computed in parallel.

a = dask.array.random.random((10_000, 10_000), chunks=(1000, 1000))
mean = a.mean()

For the most part Dask arrays work exactly like their numpy counterparts.

Note

You can find more information about using Dask in a later chapter.

Shared Memory

Perhaps the most complex of these options is shared memory parallelisation. In this model, all the processes have a shared view of arrays, and loops can be annotated to run in parallel. This saves on memory, as each process doesn’t need its own copy of data, but adds the complexity that writes need to be managed to avoid race conditions. Shared memory will also only work within a single node, if you’re wanting to use more than one node you can combine it with message passing so that all the processes on a single node share memory, and the nodes themselves pass messages between each other over MPI. Generally when using shared memory you run a single program that splits into multiple ‘threads’.

Fortran / C

The most common library for shared memory parallelisation is OpenMP. This lets you annotate loops with special comments which will then be automatically paralellised. A special compiler flag -qopenmp (Intel) or -openmp (Gnu) is needed to enable OpenMP comments, and the environment variable $OMP_NUM_THREADS needs to be set to the number of processes the program will use.

real :: a(10), b(10), c(10)
real :: d
integer :: i

!$omp parallel loop private(d)
do i=1,10
    d = a(i) * 10
    c(i) = b(i) + d
end do

Here all the loop iterations are run in parallel, with each instance running an iteration of the loop. The variable d is marked as private - every instance gets its own d variable. The arrays are shared, so each instance reads and writes to the same arrays.

Note

One thing to watch is to make sure each iteration of the loop is independent, as they can be executed in arbitrary order. Here c(4) might be evaluated before c(3) is, producing unexpected results

c(1) = 1
!$omp parallel loop
do i=2,10
    c(i) = c(i-1) + 1
end do