Distributed computing

Questions

  • How is multiprocessing used?

  • What are SharedArrays?

Instructor note

  • 35 min teaching

  • 25 min exercises

Distributed computing

In distributed computing, different processes often need to communicate with each other, and this is typically done through a method called message passing¹. Julia’s main implementation of message passing for distributed-memory systems is contained in the Distributed module².

Julia’s approach to message passing is different from other frameworks like MPI (Message Passing Interface)³. In many systems, communication is “two-sided”, meaning that both the sender and receiver processes need to participate in the communication⁴. The sender process needs to initiate the send, and the receiver needs to expect and handle the receipt of the message⁵.

However, Julia uses a “one-sided” communication approach. This means that the programmer needs to explicitly manage only one process in a two-process operation. In other words, these operations typically do not look like “message send” and “message receive” but rather resemble higher-level operations like calls to user functions¹.

This approach can simplify the programming model, especially for complex applications where coordinating sends and receives can be challenging. It allows programs to run on multiple processes in separate memory domains at once. This can be particularly useful in environments where you have multiple CPUs or are working across a cluster of machines.

References:

  1. Multi-processing and Distributed Computing · The Julia Language. https://docs.julialang.org/en/v1/manual/distributed-computing/

  2. Distributed Computing · The Julia Language. https://docs.julialang.org/en/v1/stdlib/Distributed/

  3. What is message passing interface in distributed system?. https://profoundqa.com/what-is-message-passing-interface-in-distributed-system/

  4. Message Passing Model of Process Communication - GeeksforGeeks. https://www.geeksforgeeks.org/message-passing-model-of-process-communication/

  5. Programming Environments for Massively Parallel Distributed Systems. https://books.google.com/books/about/Programming_Environments_for_Massively_P.html?id=NlR_m9OWgoYC

Julia can be started with a given number of p local workers using the -p:

julia -p 4

The Distributed module is automatically loaded if the -p flag is used. But we can also dynamically add processes in a running Julia session:

using Distributed

println(nprocs())
addprocs(4)         # add 4 workers
println(nprocs())   # total number of processes
println(nworkers()) # only worker processes
rmprocs(workers())  # remove worker processes

Note what happens here: there is one master process which can create additional worker processes, and as we shall see, it can also distribute work to these workers.

For running on a cluster, we instead need to provide the --machine-file option and the name of a file containing a list of machines that are accessible via password-less ssh. Support for running on clusters with various schedulers (including SLURM) can be found in the ClusterManagers.jl package.

Each process has a unique identifier accessible via the myid() function (master has myid() = 1). The @spawnat macro can be used to transfer work to a process, and then return the resulting Future to the master process using the fetch function:

# execute myid() and rand() on process 2
r = @spawnat 2 (myid(), rand())
# fetch the result
fetch(r)

One use case could be to manually distribute expensive function calls between processes, but there are higher-level and simpler constructs than @spawnat:

  • the @distributed macro for for loops. Can be used with a reduction operator to gather work performed by the independent tasks.

  • the pmap function which maps an array or range to a given function.

To illustrate the difference between these approaches we revisit the sum_sqrt function from the Multithreading episode. To use pmap we need to modify our function to accept a range so we will use this modified version. Note that to make any function available to all processes it needs to be decorated with the @everywhere macro:

@everywhere function sqrt_sum_range(A, r)
    s = zero(eltype(A))
    for i in r
        @inbounds s += sqrt(A[i])
    end
    return s
end

Let us look at and discuss example implementations using each of these techniques:

A = rand(100_000)
batch = Int(length(A) / 100)

@distributed (+) for r in [(1:batch) .+ offset for offset in 0:batch:length(A)-1]
    sqrt_sum_range(A, r)
end

We are using both @distributed and pmap to calculate the sum of square roots of an array. The sqrt_sum_range function calculates the sum of square roots for a given range of an array. We are using this function with both @distributed and pmap.

The main difference is how work is distributed among workers. With @distributed, work is divided equally among all workers, regardless of their computing power. With pmap, work is assigned based on the computing power of each worker.

In conclusion, if you have some small, simple assignments, these problems with @distributed will most likely not cause problems. However, for larger or more complex tasks, pmap has advantages¹. The @spawnat version looks cumbersome for this case particular case as the algorithm required the explicit partitioning of the array which is common in MPI, for instance. The @distributed (+) parallel for loop and the pmap mapping are much simpler, but which one is preferable for a given use case?

  • @distributed is appropriate for reductions. The @distributed macro is used to distribute work evenly across all workers.

    It divides the specified range according to the number of all workers¹. This means that it will immediately distribute the work to be evenly distributed to all workers. It does not load-balance and simply divides the work evenly between processes. It is best in cases where each loop iteration is cheap.

  • On the other hand, pmap starts each worker on a job and assigns work tasks based on the computing power of the worker. Once a worker finishes a job, it will provide the next available job². This is similar to queue-based multiprocessing common in Python. pmap can handle reductions as well as other algorithms. It performs load-balancing and since dynamic scheduling introduces some overhead it’s best to use pmap for computationally heavy tasks.

  • In the case of @spawnat, because the futures are not immediately using CPU resources, it opens the possibility of using asynchronous and uneven workloads.

Multiprocessing overhead

Just like with multithreading, multiprocessing with Distributed comes with an overhead because of sending messages and moving data between processes.

The simple example with the sqrt_sum() function will not benefit from parallelisation. But if you add a sleep(0.001)() inside the loop, to emulate an expensive calculation, and reduce array size to e.g. rand(1000) you should observe near-linear scaling. Try it!

Finally, it should be emphasized that a common use case of pmap involves heavy computations inside functions defined in imported packages. For example, computing the singular value decomposition of many matrices:

@everywhere using LinearAlgebra
using BenchmarkTools
x=[rand(100,100) for i in 1:10]
@btime map(LinearAlgebra.svd, x);
@btime pmap(LinearAlgebra.svd, x);

References:

  1. Julia concurrent programming – the difference between @distributed and pmap. https://www.programmersought.com/article/38894559405/

  2. Julia - Difference between parallel map and parallel for-loop. https://stackoverflow.com/questions/55697905/difference-between-parallel-map-and-parallel-for-loop

  3. Julia - What exactly is the difference between @parallel and pmap. https://stackoverflow.com/questions/37846838/what-exactly-is-the-difference-between-parallel-and-pmap

SharedArrays

Shared arrays, supplied by the SharedArrays module in base Julia, are arrays that are shared across multiple processes on the same machine. They can be used to distribute operations on an array across processes.

Let us revisit the sqrt_array function and modify it to mutate the argument passed to it, and also add a method to it for SharedArrays which has the required @distributed and @sync macros (@sync is needed to wait for all processes to finish):

function sqrt_array!(A)
    for i in eachindex(A)
        @inbounds A[i] = sqrt(A[i])
    end
end

The choice between using Serial or SharedArray depends on the specific requirements of your computation and the resources available on your machine.

  • Serial: This version is simpler and doesn’t require inter-process communication. However, it may not fully utilize all the available CPU cores¹.

  • SharedArray: This version uses the @distributed and @sync macros to distribute operations across multiple processes on the same machine. It can be faster than the serial version, especially for large computations, but it also requires more communication and data transfer between processes².

Remember, in a SharedArray, each “participating” process has access to the entire array³. This is a good choice when you want to have a large amount of data jointly accessible to two or more processes on the same machine⁴. However, it’s important to note that using SharedArray requires careful consideration of memory usage and process communication⁵.

References:

  1. SharedArrays · ParallelUtilities.jl - JuliaHub. https://docs.juliahub.com/ParallelUtilities/SO4iL/0.8.5/examples/sharedarrays/

  2. Difference between SharedArrays and DistributedArrays. https://discourse.julialang.org/t/difference-between-sharedarrays-and-distributedarrays/60258

  3. Parallel Computing · The Julia Language - MIT. https://web.mit.edu/julia_v0.6.2/julia/share/doc/julia/html/en/manual/parallel-computing.html

  4. Parallel Computing with Julia - University of Illinois Chicago. http://homepages.math.uic.edu/~jan/mcs507/paralleljulia.pdf

  5. Parallel Computing · The Julia Language. https://docs.julialang.org/en/v1/manual/parallel-computing/

Remember that Julia always selects the most specialized method for dispatch based on the argument type. We can now time these two methods using @time instead of @btime, this time:

A = rand(100_000_000);
@time sqrt_array!(A)

SA = SharedArray(A);
@time sqrt_array!(SA)

Bonus questions

  • Should the @time expression be called more than once?

  • How can we check which method is being dispatched for A and SA?

We should keep in mind however that every change to a SharedArray causes message passing to keep them in sync between processes, and this can affect performance.

DistributedArrays

Another way to approach parallelization over multiple machines is through DArray`s from the `DistributedArrays.jl package, which implements a Global Array interface. A DArray is distributed across a set of workers. Each worker can read and write from its local portion of the array and each worker has read-only access to the portions of the array held by other workers. This can be particularly useful for large computations that are organized around large arrays of data.

Currently, distributed arrays do not have much functionality and they requires significant book-keeping of array indices.

Exercises

Using SharedArrays with the Laplace function

Look again at the double for loop in the lap2d! function and think about how you could use SharedArrays.

  • Create a new script where you import Distributed, SharedArrays and BenchmarkTools and define the lap2d! function.

  • Benchmark the original version:

u, unew = setup()
@btime lap2d!(u, unew)
  • Now create a new method for this function which accepts SharedArrays.

  • Add worker processes with addprocs and benchmark your new method when passing in SharedArrays. Is there any performance gain?

  • The overhead in managing the workers will probably far outweigh the parallelization benefit because the computation in the inner loop is very simple and fast.

  • Try adding sleep(0.001) to the outermost loop to simulate the effect of a more demanding calculation, and rerun the benchmarking. Can you see a speedup now?

  • Remember that you can remove worker processes with rmprocs(workers()).

Distribute the computation of π

../_images/pi_with_darts.png

Consider again the estimate_pi() function:

function estimate_pi(num_points)
    hits = 0
    for _ in 1:num_points
        x, y = rand(), rand()
        if x^2 + y^2 < 1.0
            hits += 1
        end
    end
    fraction = hits / num_points
    return 4 * fraction
end
num_points = 100_000_000
estimate_pi(num_points)  # 3.14147572...

Now try to parallelise this function using both a parallel mapping with pmap() and an @everywhere (+) construct. Write your code in a script which you can call with julia -p N estimate_pi.jl.

  • First decorate the function with @everywhere.

  • Call it in serial with p1 = estimate_pi(num_points).

  • Use a list comprehension to split up num_points into evenly sized chunks in a Vector. Hint:

    num_jobs = 100
    chunks = [____ / ____ for i in 1:____]
    
  • For parallel mapping, use p2 = mean(pmap(___, ___)) to get the mean from a parallel mapping.

  • For a distributed for loop, use something like:

    p3 = @distributed (+) for ____ in ____
       estimate_pi(____)
    end
    p3 = p3 / num_jobs
    
  • Print p1, p2 and p3 to make sure that your code is working well.

  • Now do some benchmarking. You’ll need to remove the assignments to use @btime (e.g. replace p1 = estimate_pi(num_points)) with @btime estimate_pi(num_points)). To benchmark the for loop, you can encapsulate the loop in a @btime begin - end block.

  • Run your script with different number of processes and observe the parallel efficiency.

  • Do you see a difference in parallel efficiency from changing the number of jobs?

See also

Keypoints

  • One should choose a distributed mechanism that fits with the time and memory parameters of your problem.

  • @distributed is good for reductions and fast inner loops with limited data transfer.

  • pmap is good for expensive inner loops that return a value.

  • SharedArrays can be an easier drop-in replacement for threading-like behaviors on a single machine.