Parallel computing

Questions

  • What is the Global Interpreter Lock in Python?

  • How can Python code be parallelised?

Objectives

  • Become familiar with different types of parallelism

  • Learn the basics of parallel workflows, multiprocessing and distributed memory parallelism

Instructor note

  • 40 min teaching/type-along

  • 40 min exercises

The performance of a single CPU core has stagnated over the last ten years and most of the speed-up in modern CPUs is coming from using multiple CPU cores, i.e. parallel processing. Parallel processing is normally based either on multiple threads or multiple processes.

There are three main models of parallel computing:

  • “Embarrassingly” parallel: the code does not need to synchronize/communicate with other instances, and you can run multiple instances of the code separately, and combine the results later. If you can do this, great!

  • Shared memory parallelism (multithreading):

    • Parallel threads do separate work and communicate via the same memory and write to shared variables.

    • Multiple threads in a single Python program cannot execute at the same time (see GIL below)

    • Running multiple threads in Python is only effective for certain I/O-bound tasks

    • External libraries in other languages (e.g. C) which are called from Python can still use multithreading

  • Distributed memory parallelism (multiprocessing): Different processes manage their own memory segments and share data by communicating (passing messages) as needed.

    • A process can contain one or more threads

    • Two processes can run on different CPU cores and different computers

    • Processes have more overhead than threads (creating and destroying processes takes more time)

    • Running multiple processes is only effective for CPU-bound tasks

In the next episode we will look at Dask, an array model extension and task scheduler, which combines multiprocessing with (embarrassingly) parallel workflows and “lazy” execution.

In the Python world, it is common to see the word concurrency denoting any type of simultaneous processing, including threads, tasks and processes.

Warning

Parallel programming requires that we adopt a different mental model compared to serial programming. Many things can go wrong and one can get unexpected results or difficult-to-debug problems. It is important to understand the possible pitfalls before embarking on code parallelisation. For an entertaining take on this, see Raymond Hettinger’s PyCon2016 presentation.

The Global Interpreter Lock

The designers of the Python language made the choice that only one thread in a process can run actual Python code by using the so-called global interpreter lock (GIL). This means that approaches that may work in other languages (C, C++, Fortran), may not work in Python without being a bit careful. The reason GIL is needed is because part of the Python implementation related to the memory management is not thread-safe. At first glance, this is bad for parallelism. But one can avoid GIL through the folowing:

  • External libraries (NumPy, SciPy, Pandas, etc), written in C or other languages, can release the lock and run multi-threaded.

  • Most input/output tasks release the GIL.

  • There are several Python libraries that side-step the GIL, e.g. by using multiprocessing instead of threading.

Multithreading

Due to the GIL only one thread can execute Python code at once, and this makes threading rather useless for compute-bound problems in pure Python. However, multithreading is still relevant in two situations:

  • External libraries written in non-Python languages can take advantage of multithreading

  • Multithreading can be useful for running multiple I/O-bound tasks simultaneously.

Multithreaded libraries

NumPy and SciPy are built on external libraries such as LAPACK, FFTW append BLAS, which provide optimized routines for linear algebra, Fourier transforms etc. These libraries are written in C, C++ or Fortran and are thus not limited by the GIL, so they typically support actual multihreading during the execution. It might be a good idea to use multiple threads during calculations like matrix operations or frequency analysis.

Depending on configuration, NumPy will often use multiple threads by default, but we can use the environment variable OMP_NUM_THREADS to set the number of threads manually:

$ export OMP_NUM_THREADS=<N>

After setting this environment variable we continue as usual and multithreading will be turned on.

Demo: Multithreading NumPy

Here is an example which does a symmetrical matrix inversion of size 4000 by 4000. To run it, we can save it in a file named omp_test.py or download from here.

import numpy as np
import time

A = np.random.random((4000,4000))
A = A * A.T
time_start = time.time()
np.linalg.inv(A)
time_end = time.time()
print("time spent for inverting A is", round(time_end - time_start,2), 's')

Let us test it with 1 and 4 threads:

$ export OMP_NUM_THREADS=1
$ python omp_test.py

$ export OMP_NUM_THREADS=4
$ python omp_test.py

Multithreaded I/O

This is how an I/O-bound application might look:

../_images/IOBound.png

From https://realpython.com/, distributed via a Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported licence

The threading library provides an API for creating and working with threads. The simplest approach to create and manage threads is to use the ThreadPoolExecutor class. An example use case could be to download data from multiple websites using multiple threads:

import concurrent.futures

def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        executor.map(my_download_function, sites)

The speedup gained from multithreading I/O bound problems can be understood from the following image.

../_images/Threading.png

From https://realpython.com/, distributed via a Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported licence

Further details on threading in Python can be found in the See also section below.

Multiprocessing

The multiprocessing module in Python supports spawning processes using an API similar to the threading module. It effectively side-steps the GIL by using subprocesses instead of threads, where each subprocess is an independent Python process.

One of the simplest ways to use multiprocessing is via Pool objects and the parallel Pool.map() function, similarly to what we saw for multithreading above. In the following code, we define a square() function, call the cpu_count() method to get the number of CPUs on the machine, and then initialize a Pool object in a context manager and inside of it call the Pool.map() method to parallelize the computation. We can save the code in a file named mp_map.py or download from here.

import multiprocessing as mp
   
def square(x):
    return x * x
   
if __name__ == '__main__':
    nprocs = mp.cpu_count()
    print(f"Number of CPU cores: {nprocs}")
   
    # use context manager to allocate and release the resources automatically
    with mp.Pool(processes=nprocs) as pool:
        result = pool.map(square, range(20))    
    print(result)
    

For functions that take multiple arguments one can instead use the Pool.starmap() function (save as mp_starmap.py or download here)

import multiprocessing as mp

def power_n(x, n):
    return x ** n

if __name__ == '__main__':
    nprocs = mp.cpu_count()
    print(f"Number of CPU cores: {nprocs}")

    with mp.Pool(processes=nprocs) as pool:
        result = pool.starmap(power_n, [(x, 2) for x in range(20)])
    print(result)
    

Interactive environments

Functionality within multiprocessing requires that the __main__ module be importable by children processes. This means that for example multiprocessing.Pool will not work in the interactive interpreter. A fork of multiprocessing, called multiprocess, can be used in interactive environments like Jupyter.

multiprocessing has a number of other methods which can be useful for certain use cases, including Process and Queue which make it possible to have direct control over individual processes. Refer to the See also section below for a list of external resources that cover these methods.

At the end of this episode you can turn your attention back to the word-count problem and practice using multiprocessing pools of processes.

MPI

The message passing interface (MPI) is a standard workhorse of parallel computing. Nearly all major scientific HPC applications use MPI. Like multiprocessing, MPI belongs to the distributed-memory paradigm.

The idea behind MPI is that:

  • Tasks have a rank and are numbered 0, 1, 2, 3, …

  • Each task manages its own memory

  • Each task can run multiple threads

  • Tasks communicate and share data by sending messages.

  • Many higher-level functions exist to distribute information to other tasks and gather information from other tasks.

  • All tasks typically run the entire code and we have to be careful to avoid that all tasks do the same thing.

mpi4py provides Python bindings for the Message Passing Interface (MPI) standard. This is how a hello world MPI program looks like in Python:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print('Hello from process {} out of {}'.format(rank, size))
  • MPI.COMM_WORLD is the communicator - a group of processes that can talk to each other

  • Get_rank returns the individual rank (0, 1, 2, …) for each task that calls it

  • Get_size returns the total number of ranks.

To run this code with a specific number of processes we use the mpirun command which comes with the MPI library:

$ mpirun -np 4 python hello.py

# Hello from process 1 out of 4
# Hello from process 0 out of 4
# Hello from process 2 out of 4
# Hello from process 3 out of 4

Point-to-point and collective communication

The MPI standard contains a lot of functionality, but in principle one can get away with only point-to-point communication (MPI.COMM_WORLD.send and MPI.COMM_WORLD.recv). However, collective communication can sometimes require less effort as you will learn in an exercise below. In any case, it is good to have a mental model of different communication patterns in MPI.

../_images/send-recv.png

send and recv: blocking point-to-point communication between two ranks.

../_images/gather.png

gather: all ranks send data to rank root.

../_images/scatter.png

scatter: data on rank 0 is split into chunks and sent to other ranks

../_images/broadcast.png

bcast: broadcast message to all ranks

../_images/reduction.png

reduce: ranks send data which are reduced on rank root

Examples

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
n_ranks = comm.Get_size()

if rank != 0:
    # All ranks other than 0 should send a message
    message = "Hello World, I'm rank {:d}".format(rank)
    comm.send(message, dest=0, tag=0)
else:
    # Rank 0 will receive each message and print them
    for sender in range(1, n_ranks):
        message = comm.recv(source=sender, tag=0)
        print(message)

MPI excels for problems which can be divided up into some sort of subdomains and communication is required between the subdomains between e.g. timesteps or iterations. The word-count problem is simpler than that and MPI is somewhat overkill, but in an exercise below you will learn to use point-to-point communication to parallelize it.

In addition to the lower-case methods send(), recv(), broadcast() etc., there are also upper-case methods Send(), Recv(), Broadcast(). These work with buffer-like objects (including strings and NumPy arrays) which have known memory location and size. Upper-case methods are faster and are strongly recommended for large numeric data.

Exercises

Compute numerical integrals

The primary objective of this exercise is to compute integrals \(\int_0^1 x^{3/2} \, dx\) numerically.

One approach to integration is by establishing a grid along the x-axis. Specifically, the integration range is divided into ‘n’ segments or bins. Below is a basic serial code.

import math
import time

# Grid size
n = 100000000

def integration_serial(n):
    h = 1.0 / float(n)
    mysum = 0.0
    
    for i in range(n):
        x = h * (i + 0.5)
        mysum += x ** (3/2)

    return h * mysum

if __name__ == "__main__":
    starttime = time.time()
    integral = integration_serial(n)
    endtime = time.time()

    print("Integral value is %e, Error is %e" % (integral, abs(integral - 2/5)))  # The correct integral value is 2/5
    print("Time spent: %.2f sec" % (endtime-starttime))

# 13.63 sec

Think about how to parallelize the code using multithreading and multiprocessing.

Word-autocorrelation example project

Inspired by a study of dynamic correlations of words in written text, we decide to investigate autocorrelations (ACFs) of words in our database of book texts in the word-count project. Many of the exercises below are based on working with the following word-autocorrelation code, so let us get familiar with it.

  • The script takes three command-line arguments: the path of a datafile (book text), the path to the processed word-count file, and the output filename for the computed autocorrelation function.

  • The __main__ block calls the setup() function to preprocess the text (remove delimiters etc.) and load the pre-computed word-count results.

  • word_acf() computes the word ACF in a text for a given word using simple for-loops (you will learn to speed it up later).

  • ave_word_acf() loops over a list of words and computes their average ACF.

To run this code for one book e.g. pg99.txt:

$ git clone https://github.com/ENCCS/word-count-hpda.git
$ cd word-count-hpda
$ python source/wordcount.py data/pg99.txt processed_data/pg99.dat
$ python source/autocorrelation.py data/pg99.txt processed_data/pg99.dat results/acf_pg99.dat

It will print out the time it took to calculate the ACF.

Parallelize word-autocorrelation code with multiprocessing

A serial version of the code is available in the source/autocorrelation.py script in the word-count repository. The full script can be viewed above, but we focus on the word_acf() and ave_word_acf() functions:

def word_acf(word, text, timesteps):
    """
    Calculate word-autocorrelation function for given word 
    in a text. Each word in the text corresponds to one "timestep".
    """
    acf = np.zeros((timesteps,))
    mask = [w==word for w in text]
    nwords_chosen = np.sum(mask)
    nwords_total = len(text)
    for t in range(timesteps):
        for i in range(1,nwords_total-t):
            acf[t] += mask[i]*mask[i+t]
        acf[t] /= nwords_chosen      
    return acf
def ave_word_acf(words, text, timesteps=100):
    """
    Calculate an average word-autocorrelation function 
    for a list of words in a text.
    """
    acf = np.zeros((len(words), timesteps))
    for n, word in enumerate(words):
        acf[n, :] = word_acf(word, text, timesteps)
    return np.average(acf, axis=0)
  • Think about what this code is doing and try to find a good place to parallelize it using a pool of processes.

  • With or without having a look at the hints below, try to parallelize the code using multiprocessing and use time.time() to measure the speedup when running it for one book.

  • Note: You will not be able to use Jupyter for this task due to the above-mentioned limitation of multiprocessing.

Write an MPI version of word-autocorrelation

Just like with multiprocessing, the most natural MPI solution parallelizes over the words used to compute the word-autocorrelation. For educational purposes, both point-to-point and collective communication implementations will be demonstrated here.

Start by importing mpi4py (from mpi4py import MPI) at the top of the script.

Here is a new function which takes care of managing MPI tasks. The problem needs to be split up between N ranks, and the method needs to be general enough to handle cases where the number of words is not a multiple of the number of ranks. Below we see a standard algorithm to accomplish this. The function also calls two functions which implement point-to-point and collective communication, respectively, to collect individual results to one rank which computes the average

def mpi_acf(book, wc_book, nwords = 16, timesteps = 100):
    # initialize MPI
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    n_ranks = comm.Get_size()

    # load book text and preprocess it
    clean_text, top_words = setup(book, wc_book, nwords)
    
    # distribute words among MPI tasks
    count = nwords // n_ranks
    remainder = nwords % n_ranks
    # first 'remainder' ranks get 'count + 1' tasks each
    if rank < remainder:
        first = rank * (count + 1)
        last = first + count + 1
    # remaining 'nwords - remainder' ranks get 'count' task each
    else:
        first = rank * count + remainder
        last = first + count 
    # each rank gets unique words
    my_words = top_words[first:last]
    print(f"My rank number is {rank} and first, last = {first}, {last}")

    # use collective function
    acf_tot = ave_word_acf_gather(comm, my_words, clean_text, timesteps)

    # use p2p function
    #acf_tot = ave_word_acf_p2p(comm, my_words, clean_text, timesteps)

    # only rank 0 has the averaged data
    if rank == 0:
        return acf_tot / nwords

What type of communication can we use?

The end result should be an average of all the word-autocorrelation functions. What type of communication can be used to collect the results on one rank which computes the average and prints it to file?

Study the two “faded” MPI function implementations below, one using point-to-point communication and the other using collective communication. Try to figure out what you should replace the ____ with.

def ave_word_acf_p2p(comm, my_words, text, timesteps=100):
    rank = comm.Get_rank()
    n_ranks = comm.Get_size()
    # each rank computes its own set of acfs
    my_acfs = np.zeros((len(____), timesteps))
    for i, word in enumerate(my_words):
        my_acfs[i,:] = word_acf(word, text, timesteps)

    if ____ == ____:
        results = []
        # append own results
        results.append(my_acfs)
        # receive data from other ranks and append to results
        for sender in range(1, ____):
            results.append(comm.____(source=sender, tag=12))
        # compute total
        acf_tot = np.zeros((timesteps,))
        for i in range(____):
            for j in range(len(results[i])):
                acf_tot += results[i][j]
        return acf_tot
    else:
        # send data
        comm.____(my_acfs, dest=____, tag=12)

After implementing one or both of these functions, run your code and time the result for different number of tasks!

$ time mpirun -np <N> python source/autocorrelation.py data/pg58.txt processed_data/pg58.dat results/pg58_acf.csv

See also

Keypoints

  • 1 Beaware of GIL and its impact on performance

  • 2 Use threads for I/O-bound tasks