Here we will look at how (distributed) parallel computing can be achieved from within Julia. The nice thing about Julia is that parallel computing is baked into the standard library. This is only a very brief overview -- but should be enough to get everyone started.
Let's check the number of threads
Threads.nthreads()
4
Note that we can't change the number of threads of a running julia
process. It is controlled either from the command line by running julia -t n
, where $n$ is the number of workers; or by the JULIA_NUM_THREADS
envionment variable.
for
loops¶The @threads
macro spreads a for loop over the available threads
a = zeros(10)
Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
println(a)
[1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0]
Warning: this placement can be random!
Let's test the performance of multi-threaded code
function sqrt_array(A)
B = similar(A)
for i in eachindex(A)
@inbounds B[i] = sqrt(A[i])
end
B
end
sqrt_array (generic function with 1 method)
function threaded_sqrt_array(A)
B = similar(A)
Threads.@threads for i in eachindex(A)
@inbounds B[i] = sqrt(A[i])
end
B
end
threaded_sqrt_array (generic function with 1 method)
A = rand(1000, 1000)
@btime sqrt_array(A);
1.506 ms (2 allocations: 7.63 MiB)
@btime threaded_sqrt_array(A);
515.931 μs (27 allocations: 7.63 MiB)
Multi-threaded code can encounter race conditions. Let's look at this code which calculates the map-reduced sum square root:
function sqrt_sum(A)
s = zero(eltype(A))
for i in eachindex(A)
@inbounds s += sqrt(A[i])
end
return s
end
sqrt_sum (generic function with 1 method)
The naive versioun would be:
function threaded_sqrt_sum(A)
s = zero(eltype(A))
Threads.@threads for i in eachindex(A)
@inbounds s += sqrt(A[i])
end
return s
end
threaded_sqrt_sum (generic function with 1 method)
since multiple threads write to s at the same time, we encounter a race condition.
We can see this by comparing the results:
sqrt_sum(A)
666276.2521367806
threaded_sqrt_sum(A)
166545.27314915543
A work-around is to use atomic operations:
function threaded_sqrt_sum_atomic(A)
s = Threads.Atomic{eltype(A)}(zero(eltype(A)))
Threads.@threads for i in eachindex(A)
@inbounds Threads.atomic_add!(s, sqrt(A[i]))
end
return s[]
end
threaded_sqrt_sum_atomic (generic function with 1 method)
But this effectively serializes the code (only one atomic will run at a time). Instead we can change the algorithm a bit:
Let's break up the sum into independent work
function threaded_sqrt_sum_independent(A)
# Independent work part
partial = zeros(eltype(A), Threads.nthreads())
Threads.@threads for i in eachindex(A)
@inbounds partial[Threads.threadid()] += sqrt(A[i])
end
# Single-threaded part
s = zero(eltype(A))
for i in eachindex(partial)
s += partial[i]
end
return s
end
threaded_sqrt_sum_independent (generic function with 1 method)
And let's see how they compare:
@btime sqrt_sum(A)
1.465 ms (1 allocation: 16 bytes)
666276.2521367806
@btime threaded_sqrt_sum_atomic(A)
42.910 ms (27 allocations: 2.05 KiB)
666276.2521367898
@btime threaded_sqrt_sum_independent(A)
677.865 μs (27 allocations: 2.12 KiB)
666276.2521367706