Distributed computing
Questions
How is multiprocessing used?
What are SharedArrays?
Instructor note
35 min teaching
25 min exercises
Distributed computing
In distributed computing, different processes often need to communicate with each other, and this is typically done through a method called message passing¹. Julia’s main implementation of message passing for distributed-memory systems is contained in the Distributed
module².
Julia’s approach to message passing is different from other frameworks like MPI (Message Passing Interface)³. In many systems, communication is “two-sided”, meaning that both the sender and receiver processes need to participate in the communication⁴. The sender process needs to initiate the send, and the receiver needs to expect and handle the receipt of the message⁵.
However, Julia uses a “one-sided” communication approach. This means that the programmer needs to explicitly manage only one process in a two-process operation. In other words, these operations typically do not look like “message send” and “message receive” but rather resemble higher-level operations like calls to user functions¹.
This approach can simplify the programming model, especially for complex applications where coordinating sends and receives can be challenging. It allows programs to run on multiple processes in separate memory domains at once. This can be particularly useful in environments where you have multiple CPUs or are working across a cluster of machines.
References:
Multi-processing and Distributed Computing · The Julia Language. https://docs.julialang.org/en/v1/manual/distributed-computing/
Distributed Computing · The Julia Language. https://docs.julialang.org/en/v1/stdlib/Distributed/
What is message passing interface in distributed system?. https://profoundqa.com/what-is-message-passing-interface-in-distributed-system/
Message Passing Model of Process Communication - GeeksforGeeks. https://www.geeksforgeeks.org/message-passing-model-of-process-communication/
Programming Environments for Massively Parallel Distributed Systems. https://books.google.com/books/about/Programming_Environments_for_Massively_P.html?id=NlR_m9OWgoYC
Julia can be started with a given number of p local workers using the -p
:
julia -p 4
The Distributed
module is automatically loaded if the -p
flag is used.
But we can also dynamically add processes in a running Julia session:
using Distributed
println(nprocs())
addprocs(4) # add 4 workers
println(nprocs()) # total number of processes
println(nworkers()) # only worker processes
rmprocs(workers()) # remove worker processes
Note what happens here: there is one master process which can create additional worker processes, and as we shall see, it can also distribute work to these workers.
For running on a cluster, we instead need to provide the --machine-file
option
and the name of a file containing a list of machines that are accessible via
password-less ssh
. Support for running on clusters with various schedulers
(including SLURM) can be found in the
ClusterManagers.jl
package.
Each process has a unique identifier accessible via the myid()
function (master
has myid() = 1
). The @spawnat
macro can be used to transfer
work to a process, and then return the resulting Future
to the master process
using the fetch
function:
# execute myid() and rand() on process 2
r = @spawnat 2 (myid(), rand())
# fetch the result
fetch(r)
One use case could be to manually distribute expensive function calls
between processes, but there are higher-level and simpler constructs than @spawnat
:
the
@distributed
macro forfor
loops. Can be used with a reduction operator to gather work performed by the independent tasks.the
pmap
function which maps an array or range to a given function.
To illustrate the difference between these approaches we revisit the
sum_sqrt
function from the Multithreading episode. To use pmap
we need to modify our
function to accept a range so we will use this modified version.
Note that to make any function available to all processes it needs to
be decorated with the @everywhere
macro:
@everywhere function sqrt_sum_range(A, r)
s = zero(eltype(A))
for i in r
@inbounds s += sqrt(A[i])
end
return s
end
function sqrt_sum(A)
s = zero(eltype(A))
for i in eachindex(A)
@inbounds s += sqrt(A[i])
end
return s
end
Let us look at and discuss example implementations using each of these techniques:
A = rand(100_000)
batch = Int(length(A) / 100)
@distributed (+) for r in [(1:batch) .+ offset for offset in 0:batch:length(A)-1]
sqrt_sum_range(A, r)
end
A = rand(100_000)
batch = Int(length(A) / 100)
sum(pmap(r -> sqrt_sum_range(A, r), [(1:batch) .+ offset for offset in 0:batch:length(A)-1]))
futures = Array{Future}(undef, nworkers())
A = rand(100_000)
batch = Int(length(A) / 100)
@time begin
for (i, id) in enumerate(workers())
batch = floor(Int, length(A) / nworkers())
remainder = length(A) % nworkers()
if (i-1) < remainder
start = 1 + (i - 1) * (batch + 1)
stop = start + batch
else
start = 1 + (i - 1) * batch + remainder
stop = start + batch - 1
end
futures[i] = @spawnat myid() sqrt_sum_range(A, start:stop)
end
p = sum(fetch.(futures))
end
We are using both @distributed and pmap to calculate the sum of square roots of an array. The sqrt_sum_range function calculates the sum of square roots for a given range of an array. We are using this function with both @distributed and pmap.
The main difference is how work is distributed among workers. With @distributed, work is divided equally among all workers, regardless of their computing power. With pmap, work is assigned based on the computing power of each worker.
In conclusion, if you have some small, simple assignments, these problems with @distributed will most likely not cause problems. However, for larger or more complex tasks, pmap has advantages¹.
The @spawnat
version looks cumbersome for this case particular case as the algorithm
required the explicit partitioning of the array which is common in MPI, for instance.
The @distributed (+)
parallel for loop and the pmap
mapping are much simpler,
but which one is preferable for a given use case?
@distributed
is appropriate for reductions. The@distributed
macro is used to distribute work evenly across all workers.It divides the specified range according to the number of all workers¹. This means that it will immediately distribute the work to be evenly distributed to all workers. It does not load-balance and simply divides the work evenly between processes. It is best in cases where each loop iteration is cheap.
On the other hand,
pmap
starts each worker on a job and assigns work tasks based on the computing power of the worker. Once a worker finishes a job, it will provide the next available job². This is similar to queue-based multiprocessing common in Python. pmap can handle reductions as well as other algorithms. It performs load-balancing and since dynamic scheduling introduces some overhead it’s best to use pmap for computationally heavy tasks.In the case of
@spawnat
, because the futures are not immediately using CPU resources, it opens the possibility of using asynchronous and uneven workloads.
Multiprocessing overhead
Just like with multithreading, multiprocessing with Distributed
comes with an overhead
because of sending messages and moving data between processes.
The simple example with the sqrt_sum()
function will not benefit from parallelisation.
But if you add a sleep(0.001)()
inside the loop, to emulate an expensive calculation,
and reduce array size to e.g. rand(1000)
you should observe near-linear scaling. Try it!
Finally, it should be emphasized that a common use case of pmap
involves heavy
computations inside functions defined in imported packages.
For example, computing the singular value decomposition of many matrices:
@everywhere using LinearAlgebra
using BenchmarkTools
x=[rand(100,100) for i in 1:10]
@btime map(LinearAlgebra.svd, x);
@btime pmap(LinearAlgebra.svd, x);
References:
Julia concurrent programming – the difference between @distributed and pmap. https://www.programmersought.com/article/38894559405/
Julia - Difference between parallel map and parallel for-loop. https://stackoverflow.com/questions/55697905/difference-between-parallel-map-and-parallel-for-loop
Julia - What exactly is the difference between @parallel and pmap. https://stackoverflow.com/questions/37846838/what-exactly-is-the-difference-between-parallel-and-pmap
DistributedArrays
Another way to approach parallelization over multiple machines is through DArray`s from the `DistributedArrays.jl package, which implements a Global Array interface. A DArray is distributed across a set of workers. Each worker can read and write from its local portion of the array and each worker has read-only access to the portions of the array held by other workers. This can be particularly useful for large computations that are organized around large arrays of data.
Currently, distributed arrays do not have much functionality and they requires significant book-keeping of array indices.
Exercises
Using SharedArrays with the Laplace function
Look again at the double for loop in the lap2d!
function
and think about how you could use SharedArrays.
Laplace and setup functions
function lap2d!(u, unew)
M, N = size(u)
for j in 2:N-1
for i in 2:M-1
unew[i,j] = 0.25 * (u[i+1,j] + u[i-1,j] + u[i,j+1] + u[i,j-1])
end
end
end
function setup(N=4096, M=4096)
u = zeros(M, N)
# set boundary conditions
u[1,:] = u[end,:] = u[:,1] = u[:,end] .= 10.0
unew = copy(u);
return u, unew
end
Create a new script where you import
Distributed
,SharedArrays
andBenchmarkTools
and define thelap2d!
function.Benchmark the original version:
u, unew = setup()
@btime lap2d!(u, unew)
Now create a new method for this function which accepts SharedArrays.
Add worker processes with
addprocs
and benchmark your new method when passing in SharedArrays. Is there any performance gain?The overhead in managing the workers will probably far outweigh the parallelization benefit because the computation in the inner loop is very simple and fast.
Try adding
sleep(0.001)
to the outermost loop to simulate the effect of a more demanding calculation, and rerun the benchmarking. Can you see a speedup now?Remember that you can remove worker processes with
rmprocs(workers())
.
Solution
using BenchmarkTools
using Distributed
using SharedArrays
function lap2d!(u, unew)
M, N = size(u)
for j in 2:N-1
for i in 2:M-1
@inbounds unew[i,j] = 0.25 * (u[i+1,j] + u[i-1,j] + u[i,j+1] + u[i,j-1])
end
end
end
function lap2d!(u::SharedArray, unew::SharedArray)
M, N = size(u)
@sync @distributed for j in 2:N-1
for i in 2:M-1
@inbounds unew[i,j] = 0.25 * (u[i+1,j] + u[i-1,j] + u[i,j+1] + u[i,j-1])
end
end
end
u, unew = setup()
u_s = SharedArray(u);
unew_s = SharedArray(unew);
# test for correctness:
lap2d!(u, unew)
lap2d!(u_s, unew_s)
# element-wise comparison, should give "true"
all(u .≈ u_s)
# benchmark
@btime lap2d!(u, unew)
# 10.853 ms (0 allocations: 0 bytes)
@btime lap2d!(u_s, unew_s)
# 38.033 ms (1426 allocations: 68.59 KiB)
The SharedArray version runs slower! What if we add a sleep(0.001)
in there?
function lap2d!(u, unew)
M, N = size(u)
for j in 2:N-1
for i in 2:M-1
@inbounds unew[i,j] = 0.25 * (u[i+1,j] + u[i-1,j] + u[i,j+1] + u[i,j-1])
end
sleep(0.001)
end
end
function lap2d!(u::SharedArray, unew::SharedArray)
M, N = size(u)
@sync @distributed for j in 2:N-1
for i in 2:M-1
@inbounds unew[i,j] = 0.25 * (u[i+1,j] + u[i-1,j] + u[i,j+1] + u[i,j-1])
end
sleep(0.001)
end
end
# benchmark
@btime lap2d!(u, unew)
# 8.432 s (20640 allocations: 648.77 KiB)
@btime lap2d!(u_s, unew_s)
# 1.063 s (1428 allocations: 69.17 KiB)
On 8 CPU cores the speedup is now very close to 8!
Distribute the computation of π
Consider again the estimate_pi()
function:
function estimate_pi(num_points)
hits = 0
for _ in 1:num_points
x, y = rand(), rand()
if x^2 + y^2 < 1.0
hits += 1
end
end
fraction = hits / num_points
return 4 * fraction
end
num_points = 100_000_000
estimate_pi(num_points) # 3.14147572...
Now try to parallelise this function using both a parallel mapping with pmap()
and an @everywhere (+)
construct. Write your code in a script which you can call with
julia -p N estimate_pi.jl
.
First decorate the function with
@everywhere
.Call it in serial with
p1 = estimate_pi(num_points)
.Use a list comprehension to split up
num_points
into evenly sized chunks in a Vector. Hint:num_jobs = 100 chunks = [____ / ____ for i in 1:____]
For parallel mapping, use
p2 = mean(pmap(___, ___))
to get the mean from a parallel mapping.For a distributed for loop, use something like:
p3 = @distributed (+) for ____ in ____ estimate_pi(____) end p3 = p3 / num_jobs
Print
p1
,p2
andp3
to make sure that your code is working well.Now do some benchmarking. You’ll need to remove the assignments to use
@btime
(e.g. replacep1 = estimate_pi(num_points))
with@btime estimate_pi(num_points))
. To benchmark the for loop, you can encapsulate the loop in a@btime begin
-end
block.Run your script with different number of processes and observe the parallel efficiency.
Do you see a difference in parallel efficiency from changing the number of jobs?
Solution
using Distributed
@everywhere function estimate_pi(num_points)
hits = 0
for _ in 1:num_points
x, y = rand(), rand()
if x^2 + y^2 < 1.0
hits += 1
end
end
fraction = hits / num_points
return 4 * fraction
end
Set number of points and split into chunks:
num_points = 100_000_000
num_jobs = 100
chunks = [num_points / num_jobs for i in 1:num_jobs]
Call estimate_pi()
in serial, with pmap()
and @distributed (+)
:
using Statistics
p1 = estimate_pi(num_points)
p2 = mean(pmap(estimate_pi, chunks))
p3 = @distributed (+) for c in chunks
estimate_pi(c)
end
p3 = p3 / num_jobs
println("$p1 $p2 $p3")
Benchmark with @btime
:
using BenchmarkTools
@btime estimate_pi(num_points)
@btime mean(pmap(estimate_pi, chunks))
@btime begin
@distributed (+) for c in chunks
estimate_pi(c)
end
end
Finally run from a terminal:
$ julia -p 4 estimate_pi.jl
# 227.873 ms (1 allocation: 16 bytes)
# 63.707 ms (4602 allocations: 163.09 KiB)
# 59.410 ms (259 allocations: 15.12 KiB)
Increasing number of jobs (num_jobs = 1000
) reduces efficiency for the parallel mapping
because increased communication overhead:
$ julia -p 4 estimate_pi.jl
# 228.595 ms (1 allocation: 16 bytes)
# 86.811 ms (45462 allocations: 1.57 MiB)
# 59.480 ms (270 allocations: 43.61 KiB)
See also
The Julia Parallel organization collects packages developed for parallel computing in Julia.
Valentin Churavy, Levels of Parallelism
Keypoints
One should choose a distributed mechanism that fits with the time and memory parameters of your problem.
@distributed
is good for reductions and fast inner loops with limited data transfer.pmap
is good for expensive inner loops that return a value.SharedArrays
can be an easier drop-in replacement for threading-like behaviors on a single machine.