Multithreading
Questions
How multithreading works in Julia?
Instructor note
30 min teaching
30 min exercises
Linear algebra threading
OpenBLAS is the default linear algebra backend in Julia.
Because it is an external library, it manages threading by itself independent of the Julia threads.
We can manage the thread count for OpenBLAS inside Julia session using the BLAS.set_num_threads()
function or by setting an environment variable as follows:
export OPENBLAS_NUM_THREADS=4
Consider the multiplication of two large arrays and compare the execution time.
using LinearAlgebra
A = rand(2000, 2000)
B = rand(2000, 2000)
# Precompile the matrix multiplication
A*B
# Single thread
begin
BLAS.set_num_threads(1)
@show BLAS.get_num_threads()
@time A*B
end
# All threads on the machine
begin
BLAS.set_num_threads(Sys.CPU_THREADS)
@show BLAS.get_num_threads()
@time A*B
end
We should see a difference between using single and all threads.
Julia threading
We start by walking through how to use multithreading in Julia.
We can set the JULIA_NUM_THREADS
environment variable such that Julia will start the set thread count.
export JULIA_NUM_THREADS=4
Alternatively, we can also use the -t/--threads
option to supply the thread count.
Using an option will override the value in the environment variable.
# Short form
julia -t 4
# Long form
julia --threads=4
In Julia, we can import the Threads module for easier usage.
using Base.Threads
The nthreads
function show us how many threads we have available:
nthreads()
There are three main ways of approaching multithreading:
Using
@threads
to parallelize a for loop to run in multiple threads.Using
@spawn
and@sync
to spawn tasks in threads and synchronize them at the end of the block.Using
@spawn
andfetch
to spawn tasks and fetch their return values once they are complete.
Choosing the most suitable method depends on the problem.
Julia uses dynamic scheduler for multithreading which allows us to do nested multithreading, that is, to nest @spawn
statements.
using Base.Threads
# Using @threads macro with dynamic scheduling
a = zeros(Int, 2*nthreads())
@threads for i in eachindex(a)
a[i] = threadid()
end
println(a)
using Base.Threads
function task(b, chunk)
for i in chunk
b[i] = threadid()
end
end
# Using @sync and @spawn macros (also dynamic scheduling)
b = zeros(Int, 2 * nthreads())
chunks = Iterators.partition(eachindex(b), length(b) ÷ nthreads())
@sync for chunk in chunks
@spawn task(b, chunk)
end
println(b)
using Base.Threads
# Using @spawn and fetch
t = [@spawn threadid() for _ in 1:2*nthreads()]
c = fetch.(t)
Let’s see if we can achieve any speed gain when performing a costly calculation.
function sqrt_array(A)
B = similar(A)
for i in eachindex(A)
@inbounds B[i] = sqrt(A[i])
end
B
end
using Base.Threads
function threaded_sqrt_array(A)
B = similar(A)
@threads for i in eachindex(A)
@inbounds B[i] = sqrt(A[i])
end
B
end
using Base.Threads
function sqrt_array!(A, B, chunk)
for i in chunk
@inbounds B[i] = sqrt(A[i])
end
end
function threaded_sqrt_array(A)
B = similar(A)
chunks = Iterators.partition(eachindex(A), length(A) ÷ nthreads())
@sync for chunk in chunks
@spawn sqrt_array!(A, B, chunk)
end
B
end
We can now compare the performance:
A = rand(1000, 1000)
@btime sqrt_array(A);
@btime threaded_sqrt_array(A);
# make sure we're getting the correct value
sqrt_array(A) ≈ threaded_sqrt_array(A)
With 4 threads, the speedup could be about a factor of 3.
Threading overhead
Julia threading has an overhead of a few microseconds (equivalent to thousands of computations). Multithreading becomes efficient for tasks that are larger than the overhead.
Pitfalls
Just like with multithreading in other languages, one needs to be aware of possible race conditions, i.e. when the order in which threads read from and write to memory can change the result of a computation.
We can illustrate this with an example where we sum up the square root of elements of an array. The serial version provides the correct value and reference execution time. The “race condition” version illustrates how a naive implementation can lead to problems. The “atomic” version shows how we can ensure a correct results by using atomic operations. The “workaround” version shows how we can refactor the code to get both correct result and speedup.
function sqrt_sum(A)
s = zero(eltype(A))
for i in eachindex(A)
@inbounds s += sqrt(A[i])
end
return s
end
using Base.Threads
function threaded_sqrt_sum(A)
s = zero(eltype(A))
@threads for i in eachindex(A)
@inbounds s += sqrt(A[i])
end
return s
end
using Base.Threads
function threaded_sqrt_sum_atomic(A)
s = Atomic{eltype(A)}(zero(eltype(A)))
@threads for i in eachindex(A)
@inbounds atomic_add!(s, sqrt(A[i]))
end
return s[]
end
using Base.Threads
function threaded_sqrt_sum_migration(A)
partial = zeros(eltype(A), nthreads())
@threads for i in eachindex(A)
# We should not use threadid() for indexing
# Threads can migrate during execution.
@inbounds partial[threadid()] += sqrt(A[i])
end
s = zero(eltype(A))
for i in eachindex(partial)
s += partial[i]
end
return s
end
using Base.Threads
function sqrt_sum(A, chunk)
s = zero(eltype(A))
for i in chunk
@inbounds s += sqrt(A[i])
end
return s
end
function threaded_sqrt_sum_workaround(A)
chunks = Iterators.partition(eachindex(A), length(A) ÷ nthreads())
tasks = map(chunks) do chunk
@spawn sqrt_sum(A, chunk)
end
s = mapreduce(fetch, +, tasks; init=zero(eltype(A)))
return s
end
We will observe that:
The serial version is slow but correct.
The race condition version is both slow and wrong.
The atomic version is correct but extremely slow.
The migration version is incorrect because tasks can migrate between Julia threads during execution, thus
threadid()
is not constant.The workaround is fast and correct, but required refactoring.
Bonus questions:
What does
eltype()
do?What does
eachindex()
do?
Threading with Threads.@threads
is quite straightforward,
but one needs to be careful not to introduce race conditions
and sometimes that requires code refactorization. Using atomic operations
adds significant overhead and thus only makes sense if each iteration
of the loop takes significant time to compute.
Exercises
Multithreading the Laplace function
Consider the double for loop in the lap2d!()
function:
Can it safely be threaded, i.e. is there any risk of race conditions?
Is it thread-safe?
Yes, this function is thread-safe since each iteration of the loop accesses a different memory location.
Insert the
Threads.@threads
macro in the right location - note that@threads
currently only works on outermost loops!Measure its effects with
@benchmark
. Since it’s cumbersome to change the “Julia: Num Threads” option in VSCode and relaunch the Julia REPL over and over, create a script instead which imports BenchmarkTools and prints benchmark results:bench_results = @benchmark lap2d!(u, unew) println(minimum(bench_results.times))
Now run with different number of threads from a terminal using
julia -t <N> laplace.jl
and observe the scaling.Try increasing the problem size (e.g.
M=N=8192
). Does it scale better?
Solution
Multithreaded version:
using .Threads
function lap2d!(u, unew)
M, N = size(u)
@threads for j in 2:N-1
for i in 2:M-1
@inbounds unew[i,j] = 0.25 * (u[i+1,j] + u[i-1,j] + u[i,j+1] + u[i,j-1])
end
end
end
Benchmarking:
function setup(N=4096, M=4096)
u = zeros(M, N)
# set boundary conditions
u[1,:] = u[end,:] = u[:,1] = u[:,end] .= 10.0
unew = copy(u);
return u, unew
end
using BenchmarkTools
u, unew = setup()
bench_results = @benchmark lap2d!($u, $unew)
println("time = $(minimum(bench_results.times)/10^6)")
$ julia -t 1 laplace.jl
# time = 7.440875
$ julia -t 2 laplace.jl
# time = 4.559292
$ julia -t 4 laplace.jl
# time = 3.802625
Increasing the problem size will not improve the parallel efficiency as it does not increase the computational cost in the loop.
Multithread the computation of π
Consider the following function which estimates π by “throwing darts”, i.e. randomly sampling (x,y) points in the interval [0.0, 1.0] and checking if they fall within the unit circle.
function estimate_pi(num_points)
hits = 0
for _ in 1:num_points
x, y = rand(), rand()
if x^2 + y^2 < 1.0
hits += 1
end
end
fraction = hits / num_points
return 4 * fraction
end
num_points = 100_000_000
estimate_pi(num_points) # 3.14147572...
Can this function be safely threaded, i.e. is there any risk of race conditions?
Is it thread-safe?
No, this function is not thread-safe! The algorithm needs to be rewritten.
Define a new function
threaded_estimate_pi()
where you implement the necessary changes to multithread the loop.Run some benchmarks to explore the parallel efficiency.
Hint
You need to make sure that the different threads are not incrementing the same memory address.
You should split the input size evenly for available threads, then use @spawn
to compute hits in each thread and finally fetch
the return values, and preceed as in the serial example.
Solution
Here is a threaded version:
using Random
using Base.Threads
function partial_hits(rng, num_points)
hits = zero(Int)
for _ in 1:num_points
x, y = rand(rng), rand(rng)
if x^2 + y^2 < 1.0
hits += 1
end
end
return hits
end
function partition(v::Integer, n::Integer)
d, r = divrem(v, n)
c = vcat(fill(d, n-r), fill(d+1, r))
return c
end
function threaded_estimate_pi(num_points)
n = nthreads()
rngs = [MersenneTwister(seed) for seed in 1:n]
chunks = partition(num_points, n)
tasks = map(zip(rngs, chunks)) do (rng, chunk)
@spawn partial_hits(rng, chunk)
end
hits = mapreduce(fetch, +, tasks; init=zero(Int))
return 4 * hits / num_points
end
To benchmark it:
using BenchmarkTools
num_points = 100_000_000
# make sure we get an accurate estimate:
println("pi = $(threaded_estimate_pi(num_points))")
bench_results = @benchmark threaded_estimate_pi($num_points)
println("time = $(minimum(bench_results.times)/10^6)")
Results:
$ julia -t 1 threaded_estimate_pi.jl
# pi = 3.14147464
# time = 496.935583
$ julia -t 2 threaded_estimate_pi.jl
# pi = 3.1417046
# time = 255.328
$ julia -t 4 threaded_estimate_pi.jl
# pi = 3.14172796
# time = 132.892833
Parallel scaling seems decent, but comparing to the unthreaded version reveals the overhead from creating and managing threads:
$ julia estimate_pi.jl
# pi = 3.14147392
# time = 228.434583