Message passing
Questions
How is MPI different from Distributed.jl?
What methods for parallelisation exist in MPI?
Instructor note
20 min teaching
20 min exercises
MPI
MPI.jl is a Julia interface to
the Message Passing Interface, which has been the standard workhorse of
parallel computing for decades. Like Distributed
, MPI belongs to the
distributed-memory paradigm.
The idea behind MPI is that:
Tasks have a rank and are numbered 0, 1, 2, 3, …
Each task manages its own memory
Each task can run multiple threads
Tasks communicate and share data by sending messages.
Many higher-level functions exist to distribute information to other tasks and gather information from other tasks.
All tasks typically run the entire code and we have to be careful to avoid that all tasks do the same thing.
MPI.jl
provides Julia bindings for the Message Passing Interface (MPI) standard.
This is how a hello world MPI program looks like in Julia:
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
println("Hello from process $(rank) out of $(size)")
MPI.Barrier(comm)
MPI.COMM_WORLD
is the communicator - a group of processes that can talk to each otherComm_rank
returns the individual rank (0, 1, 2, …) for each task that calls itComm_size
returns the total number of ranks.
To run this code with a specific number of processes we use the mpiexecjl
command which
can be installed as a wrapper of the MPI command mpiexec
(see Setup):
$ mpiexecjl -np 4 julia hello.jl
# Hello from process 1 out of 4
# Hello from process 0 out of 4
# Hello from process 2 out of 4
# Hello from process 3 out of 4
Point-to-point and collective communication
The MPI standard contains a lot of functionality,
but in principle one can get away with only point-to-point communication (MPI.send()
and
MPI.recv()
). However, collective communication can sometimes require less effort as you
will learn in an exercise below.
In any case, it is good to have a mental model of different communication patterns in MPI.
Serialised vs buffer-like objects
Lower-case methods (e.g. MPI.send()
and MPI.recv()
) are used to communicate generic
objects between MPI processes. It is also possible to send buffer-like isbits
objects
which provides faster communication, but require the memory space to be allocated for the
receiving buffer prior to communication. These methods start with uppercase letters,
e.g. MPI.Send()
, MPI.Recv()
, MPI.Gather()
etc.
Mutating vs non-mutating
For communication operations which receive data, MPI.jl typically defines two separate functions:
One function in which the output buffer is supplied by the user. As it mutates this value, it adopts the Julia convention of suffixing with
!
(e.g.MPI.Recv!()
,MPI.Reduce!()
).One function which allocates the buffer for the output (
MPI.Recv()
,MPI.Reduce()
).
Examples
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
if rank != 0
# All ranks other than 0 should send a message
local message = "Hello World, I'm rank $rank"
MPI.send(message, comm, dest=0, tag=0)
else
# Rank 0 will receive each message and print them
for sender in 1:(size-1)
message = MPI.recv(comm, source=sender, tag=0)
println(message)
end
end
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Rank 0 will broadcast message to all other ranks
if rank == 0
send_message = "Hello World from rank 0"
else
send_message = nothing
end
receive_message = MPI.bcast(send_message, comm, root=0)
if rank != 0
println("rank $rank received message: $receive_message")
end
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Only upper-case Gather exists so message must be buffer-like of isbitstype
send_message = rank^3
# data from all ranks are gathered on root rank
receive_message = MPI.Gather(send_message, comm, root=0)
if rank == 0
for i in 1:size
println("Received $(receive_message[i]) from rank $(i-1)")
end
end
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Only upper-case Scatter exists so message must be buffer-like of isbitstype
if rank == 0
sendbuf = [i^3 for i in 1:size]
else
sendbuf = nothing
end
recvbuf = MPI.Scatter(sendbuf, Int64, comm, root=0)
print("rank $rank received message: $recvbuf\n")
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Only upper-case Reduce exists so message must be buffer-like of isbitstype
data = rank
recvbuf = MPI.Reduce(data, +, comm, root=0)
if rank == 0
println(recvbuf)
end
Blocking and non-blocking communication
Point-to-point communication can be blocking or non-blocking:
MPI.Send()
will only return when the program can safely modify the send buffer and
MPI.Recv()
will only return once the data has been received and written to the receive
buffer.
Consider the following example of a deadlock caused by blocking communication. The problem can be circumvented by introducing sequential sends and receives, but it’s more conveniently solved by using non-blocking send and receive.
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Check that there are exactly two ranks
if size != 2
print("This example requires exactly two ranks")
exit(1)
end
# Call the other rank the neighbour
if rank == 0
neighbour = 1
else
neighbour = 0
end
# Send a message to the other rank
send_message = [i for i in 1:100]
MPI.send(send_message, comm, dest=neighbour, tag=0)
# Receive the message from the other rank
recv_message = MPI.recv(comm, source=neighbour, tag=0)
print("Message received by rank $rank")
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Check that there are exactly two ranks
if size != 2
println("This example requires exactly two ranks")
exit(1)
end
# Call the other rank the neighbour
if rank == 0
neighbour = 1
else
neighbour = 0
end
send_message = [i for i in 1:100]
# Send a message to the other rank and then receive
if rank == 0
MPI.send(send_message, comm, dest=neighbour, tag=0)
recv_message = MPI.recv(comm, source=neighbour, tag=0)
print("Message received by rank $rank\n")
end
# Receive the message from the other rank and then send
if rank == 1
recv_message = MPI.recv(comm, source=neighbour, tag=0)
print("Message received by rank $rank\n")
MPI.send(send_message, comm, dest=neighbour, tag=0)
end
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Check that there are exactly two ranks
if size != 2
print("This example requires exactly two ranks")
exit(1)
end
# Call the other rank the neighbour
if rank == 0
neighbour = 1
else
neighbour = 0
end
# Send a message to the other rank
send_message = [i for i in 1:100]
recv_message = similar(send_message)
# non-blocking send
sreq = MPI.Isend(send_message, comm, dest=neighbour, tag=0)
# non-blocking receive into receive buffer
rreq = MPI.Irecv!(recv_message, comm, source=neighbour, tag=0)
stats = MPI.Waitall!([rreq, sreq])
print("Message received by rank $rank\n")
Exercises
Run the examples of point-to-point and collective communication
Take the examples of point-to-point and collective communication methods from the Examples section above, and run them locally or on a cluster.
Try to understand exactly the program flow and what each rank is doing.
From blocking to non-blocking
Consider the following example where data is sent around “in a circle” (0->1, 1->2, …, N->0). Will it work as intended?
using MPI MPI.Init() comm = MPI.COMM_WORLD rank = MPI.Comm_rank(comm) size = MPI.Comm_size(comm) # where to send to dst = mod(rank+1, size) # where to receive from src = mod(rank-1, size) # unititalised send and receive buffers send_mesg = Array{Float64}(undef, 5) recv_mesg = Array{Float64}(undef, 5) # fill the send array fill!(send_mesg, Float64(rank)) print("$rank: Sending $rank -> $dst = $send_mesg\n") MPI.Send(send_mesg, comm, dest=dst, tag=rank+32) MPI.Recv!(recv_mesg, comm, source=src, tag=src+32) print("$rank: Received $src -> $rank = $recv_mesg\n") MPI.Barrier(comm)
Try running this program. Were the arrays received successfully? Introduce non-blocking communication to solve the problem.
Explanation for why this code might work
This code might work correctly, but it’s not _guaranteed_ to work! This depends on the backend MPI library (OpenMPI or MPICH). MPI.Send()
can run in one of several modes, and usually standard mode is the default which means that the library decides based on performance reasons whether to buffer the send message or not - if it does, the send can complete before a matching receive has been invoked. It might stop working if you’re sending larger amounts of data, so you need to use non-blocking communication instead.
More details in this StackOverflow post.
Solution
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# where to send to
dst = mod(rank+1, size)
# where to receive from
src = mod(rank-1, size)
send_mesg = Array{Float64}(undef, 5)
recv_mesg = Array{Float64}(undef, 5)
# fill the send array
fill!(send_mesg, Float64(rank))
print("$rank: Sending $rank -> $dst = $send_mesg\n")
sreq = MPI.Isend(send_mesg, comm, dest=dst, tag=rank+32)
rreq = MPI.Irecv!(recv_mesg, comm, source=src, tag=src+32)
stats = MPI.Waitall!([rreq, sreq])
print("$rank: Received $src -> $rank = $recv_mesg\n")
MPI-parallelise compute_pi()
function
Consider again the following function which estimates π by “throwing darts”, i.e. randomly sampling (x,y) points in the interval [0.0, 1.0] and checking if they fall within the unit circle.
function estimate_pi(num_points)
hits = 0
for _ in 1:num_points
x, y = rand(), rand()
if x^2 + y^2 < 1.0
hits += 1
end
end
fraction = hits / num_points
return 4 * fraction
end
num_points = 100_000_000
estimate_pi(num_points) # 3.14147572...
There are several ways in which this function could be parallelised with MPI. Below you will find two working solutions using collective communication:
One implements an unnecessarily complicated algorithm, which nonetheless is illustrative for more general cases.
The other implements a more compact and efficient solution.
Inspect the complicated solution first and answer the questions!
Study the following fully functional MPI code and then answer the questions below. Feel free
to add print statements to the code and run it with
mpiexecjl -np <N> julia estimate_pi.jl
to understand what’s going on.
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
function estimate_pi(num_points)
hits = 0
for _ in 1:num_points
x, y = rand(), rand()
if x^2 + y^2 < 1.0
hits += 1
end
end
fraction = hits / num_points
return 4 * fraction
end
function main()
t1 = time()
num_points = 10^9
num_jobs = 10
chunks = [num_points / num_jobs for i in 1:num_jobs]
# distribute work among MPI tasks
count = div(num_jobs, size)
remainder = num_jobs % size
if rank < remainder
first = rank * (count + 1) + 1
last = first + count
else
first = rank * count + remainder + 1
last = first + count - 1
end
# each rank computes pi for their vector elements
estimates = []
for i in first:last
push!(estimates, estimate_pi(chunks[i]))
end
# sum up all estimates and average on root tank
pi_sum = MPI.Reduce(sum(estimates), +, comm, root=0)
if rank == 0
println("pi = $(pi_sum/num_jobs)")
end
t2 = time()
println("elapsed time = $(t2 - t1)")
end
main()
For
num_jobs = 10
andsize = 4
, what would be the values ofcount
andremainder
?What is the purpose of the if-else block starting with
if rank < remainder
?For
num_jobs = 10
andsize = 4
, what would be the values offirst
andlast
for each rank?Is load-balancing an issue in this solution? (i.e. how evenly work is split between tasks)
Would you expect this MPI solution to perform and scale similarly well to the distributed
pmap()
solution we saw in the Distributed computing episode?Can you think of any improvements to the MPI algorithm algorithm employed?
Now look at the more compact solution!
Solution
div()
performs integer division, anddiv(10, 4) = 2
. The%
operator computes the remainder from integer division and10 % 4 = 2
.This block splits indices of the chunks vector between ranks. The first
remainder
ranks getcount + 1
tasks each, remainingnum_jobs - remainder
ranks getcount
tasks each.{rank 0 : [1,2,3], rank 1 : [4,5,6], rank 2 : [7,8], rank 3 : [9,10]}
Yes, load balancing is an issue because all ranks do not get equal amount of work.
It will depend on the load balancing! With e.g. 2 ranks, both ranks will have equal work and the performance will be very close to the
pmap()
solution with 2 workers. With 4 ranks, the load-balancing will be poorer for this MPI solution and it will perform worse thanpmap()
.Splitting vector (or array) indices between MPI tasks is a common construct and useful to know well. In this case, however, it’s overkill. It will be enough to divide
num_points
evenly between the ranks.
Study the following fully functional MPI code and then answer the questions below. Feel free
to add print statements to the code and run it with
mpiexecjl -np <N> julia estimate_pi.jl
to understand what’s going on.
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
function estimate_pi(num_points)
hits = 0
for _ in 1:num_points
x, y = rand(), rand()
if x^2 + y^2 < 1.0
hits += 1
end
end
fraction = hits / num_points
return 4 * fraction
end
function main()
t1 = time()
num_points = 10^9
# divide work evenly between ranks
my_points = floor(Int, num_points / size)
remainder = num_points % size
if rank < remainder
my_points += 1
end
# each rank computes pi for their points
pi = estimate_pi(my_points)
# sum up all estimates and average on root tank
pi_sum = MPI.Reduce(pi, +, comm, root=0)
if rank == 0
println("pi = $(pi_sum / size)")
end
t2 = time()
println("elapsed time = $(t2 - t1)")
end
main()
The algorithm to split work is significantly simpler here with num_points
divided as
evenly as possible between the ranks.
Is load balancing better in this solution? What’s the “worst case” load imbalance?
How does the performance of this MPI version compare to the distributed
pmap()
version that we saw in the Distributed computing episode?
Solution
Load balancing is in general much better in this version. The worst case is a difference of one single point between ranks.
The performance is statistically equivalent to the
pmap()
version!
Limitations
MPI.jl has (as of October 2023) not reached v1.0 so future changes could be backwards incompatible.
The MPI.jl documentation has a section on known issues.
See also
Keypoints
MPI is a standard work-horse of parallel computing.
All communication is handled explicitly - not behind the scenes as in
Distributed
.Programming with MPI requires a different mental model since each parallel rank is executing the same program and the programmer needs to distribute the work by hand.