Message passing

Questions

  • How is MPI different from Distributed.jl?

  • What methods for parallelisation exist in MPI?

Instructor note

  • 20 min teaching

  • 20 min exercises

MPI

MPI.jl is a Julia interface to the Message Passing Interface, which has been the standard workhorse of parallel computing for decades. Like Distributed, MPI belongs to the distributed-memory paradigm.

The idea behind MPI is that:

  • Tasks have a rank and are numbered 0, 1, 2, 3, …

  • Each task manages its own memory

  • Each task can run multiple threads

  • Tasks communicate and share data by sending messages.

  • Many higher-level functions exist to distribute information to other tasks and gather information from other tasks.

  • All tasks typically run the entire code and we have to be careful to avoid that all tasks do the same thing.

MPI.jl provides Julia bindings for the Message Passing Interface (MPI) standard. This is how a hello world MPI program looks like in Julia:

using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
println("Hello from process $(rank) out of $(size)")
MPI.Barrier(comm)
  • MPI.COMM_WORLD is the communicator - a group of processes that can talk to each other

  • Comm_rank returns the individual rank (0, 1, 2, …) for each task that calls it

  • Comm_size returns the total number of ranks.

To run this code with a specific number of processes we use the mpiexecjl command which can be installed as a wrapper of the MPI command mpiexec (see Setup):

$ mpiexecjl -np 4 julia hello.jl

# Hello from process 1 out of 4
# Hello from process 0 out of 4
# Hello from process 2 out of 4
# Hello from process 3 out of 4

Point-to-point and collective communication

The MPI standard contains a lot of functionality, but in principle one can get away with only point-to-point communication (MPI.send() and MPI.recv()). However, collective communication can sometimes require less effort as you will learn in an exercise below. In any case, it is good to have a mental model of different communication patterns in MPI.

../_images/send-recv.png

send and recv: blocking point-to-point communication between two ranks.

../_images/gather.png

gather: all ranks send data to rank root.

../_images/scatter.png

scatter: data on rank 0 is split into chunks and sent to other ranks

../_images/broadcast.png

bcast: broadcast message to all ranks

../_images/reduction.png

reduce: ranks send data which are reduced on rank root

Serialised vs buffer-like objects

Lower-case methods (e.g. MPI.send() and MPI.recv()) are used to communicate generic objects between MPI processes. It is also possible to send buffer-like isbits objects which provides faster communication, but require the memory space to be allocated for the receiving buffer prior to communication. These methods start with uppercase letters, e.g. MPI.Send(), MPI.Recv(), MPI.Gather() etc.

Mutating vs non-mutating

For communication operations which receive data, MPI.jl typically defines two separate functions:

  • One function in which the output buffer is supplied by the user. As it mutates this value, it adopts the Julia convention of suffixing with ! (e.g. MPI.Recv!(), MPI.Reduce!()).

  • One function which allocates the buffer for the output (MPI.Recv(), MPI.Reduce()).

Examples

using MPI
MPI.Init()

comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)

if rank != 0
    # All ranks other than 0 should send a message
    local message = "Hello World, I'm rank $rank"
    MPI.send(message, comm, dest=0, tag=0)
else
    # Rank 0 will receive each message and print them
    for sender in 1:(size-1)
        message = MPI.recv(comm, source=sender, tag=0)
        println(message)
    end
end   

Blocking and non-blocking communication

Point-to-point communication can be blocking or non-blocking: MPI.Send() will only return when the program can safely modify the send buffer and MPI.Recv() will only return once the data has been received and written to the receive buffer.

Consider the following example of a deadlock caused by blocking communication. The problem can be circumvented by introducing sequential sends and receives, but it’s more conveniently solved by using non-blocking send and receive.

using MPI
MPI.Init()

comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)

# Check that there are exactly two ranks
if size != 2
    print("This example requires exactly two ranks")
    exit(1)
end

# Call the other rank the neighbour
if rank == 0
    neighbour = 1
else
    neighbour = 0
end

# Send a message to the other rank
send_message = [i for i in 1:100]
MPI.send(send_message, comm, dest=neighbour, tag=0)

# Receive the message from the other rank
recv_message = MPI.recv(comm, source=neighbour, tag=0)

print("Message received by rank $rank")

Exercises

Run the examples of point-to-point and collective communication

Take the examples of point-to-point and collective communication methods from the Examples section above, and run them locally or on a cluster.

Try to understand exactly the program flow and what each rank is doing.

From blocking to non-blocking

Consider the following example where data is sent around “in a circle” (0->1, 1->2, …, N->0). Will it work as intended?

using MPI
MPI.Init()

comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)

# where to send to
dst = mod(rank+1, size)
# where to receive from
src = mod(rank-1, size)

# unititalised send and receive buffers
send_mesg = Array{Float64}(undef, 5)
recv_mesg = Array{Float64}(undef, 5)

# fill the send array
fill!(send_mesg, Float64(rank))

print("$rank: Sending   $rank -> $dst = $send_mesg\n")
MPI.Send(send_mesg, comm, dest=dst, tag=rank+32)

MPI.Recv!(recv_mesg, comm, source=src,  tag=src+32)
print("$rank: Received $src -> $rank = $recv_mesg\n")

MPI.Barrier(comm)

Try running this program. Were the arrays received successfully? Introduce non-blocking communication to solve the problem.

MPI-parallelise compute_pi() function

../_images/pi_with_darts.png

Consider again the following function which estimates π by “throwing darts”, i.e. randomly sampling (x,y) points in the interval [0.0, 1.0] and checking if they fall within the unit circle.

function estimate_pi(num_points)
    hits = 0
    for _ in 1:num_points
        x, y = rand(), rand()
        if x^2 + y^2 < 1.0
            hits += 1
        end
    end
    fraction = hits / num_points
    return 4 * fraction
end
num_points = 100_000_000
estimate_pi(num_points)  # 3.14147572...

There are several ways in which this function could be parallelised with MPI. Below you will find two working solutions using collective communication:

  • One implements an unnecessarily complicated algorithm, which nonetheless is illustrative for more general cases.

  • The other implements a more compact and efficient solution.

Inspect the complicated solution first and answer the questions!

Study the following fully functional MPI code and then answer the questions below. Feel free to add print statements to the code and run it with mpiexecjl -np <N> julia estimate_pi.jl to understand what’s going on.

using MPI
MPI.Init()

comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)

function estimate_pi(num_points)
    hits = 0
    for _ in 1:num_points
        x, y = rand(), rand()
        if x^2 + y^2 < 1.0
            hits += 1
        end
    end
    fraction = hits / num_points
    return 4 * fraction
end

function main()
    t1 = time()
    num_points = 10^9
    num_jobs = 10
    chunks = [num_points / num_jobs for i in 1:num_jobs]
    
    # distribute work among MPI tasks
    count = div(num_jobs, size)
    remainder = num_jobs % size

    if rank < remainder
        first = rank * (count + 1) + 1
        last = first + count
    else
        first = rank * count + remainder + 1
        last = first + count - 1
    end

    # each rank computes pi for their vector elements
    estimates = []
    for i in first:last
        push!(estimates, estimate_pi(chunks[i]))
    end

    # sum up all estimates and average on root tank
    pi_sum = MPI.Reduce(sum(estimates), +, comm, root=0)
    if rank == 0
        println("pi = $(pi_sum/num_jobs)")
    end
    t2 = time()
    println("elapsed time = $(t2 - t1)")
end

main()
  1. For num_jobs = 10 and size = 4, what would be the values of count and remainder?

  2. What is the purpose of the if-else block starting with if rank < remainder?

  3. For num_jobs = 10 and size = 4, what would be the values of first and last for each rank?

  4. Is load-balancing an issue in this solution? (i.e. how evenly work is split between tasks)

  5. Would you expect this MPI solution to perform and scale similarly well to the distributed pmap() solution we saw in the Distributed computing episode?

  6. Can you think of any improvements to the MPI algorithm algorithm employed?

  7. Now look at the more compact solution!

Limitations

MPI.jl has (as of October 2023) not reached v1.0 so future changes could be backwards incompatible.

The MPI.jl documentation has a section on known issues.

See also

Keypoints

  • MPI is a standard work-horse of parallel computing.

  • All communication is handled explicitly - not behind the scenes as in Distributed.

  • Programming with MPI requires a different mental model since each parallel rank is executing the same program and the programmer needs to distribute the work by hand.