Here we will look at how (distributed) parallel computing can be achieved from within Julia. The nice thing about Julia is that parallel computing is baked into the standard library. This is only a very brief overview (and I will elaborate more on this in the future), mainly targeted at running MD simulations.
For more details on fine-grained control like @spawn
, take a look at: https://docs.julialang.org/en/stable/manual/parallel-computing/
You can control the number of worker processes from the command line by running julia -p n
where $n$ is the number of workers. Alternatively, we can use the addprocs
command:
workers()
1-element Vector{Int64}: 1
addprocs(8)
8-element Vector{Int64}: 2 3 4 5 6 7 8 9
which spawned 8 worker threads:
workers()
8-element Vector{Int64}: 2 3 4 5 6 7 8 9
for
loops and @parallel
reduction¶The easiest way to perform an operation in parallel is to use the @parallel
macro. More details can be found here: https://docs.julialang.org/en/stable/manual/parallel-computing/#Parallel-Map-and-Loops-1
In the example below, we throw a "coin" 200000000 times, and count how many heads there are by casting the output of the rand
command. So
Int(rand(Bool))
would give $0$ with probability of $0.5$, and $1$ the rest of time. Since want to do this in parallel using the @parallel
macro together with a for
loop. Note that @parallel for i = 1:200000000
would run through the loop in parallel, but would not send the result back to the controler. That's where the (+)
function comes in. It applies the +(.,.)
function to the result of each loop iteration and "reduces" the result of all these parallel runs into a single variable (nheads
)
function n_heads()
n = 0
for i = 1:200000000
n += Int(rand(Bool))
end
n
end
function n_heads_parallel()
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
nheads
end
n_heads_parallel (generic function with 1 method)
@benchmark(n_heads())
BenchmarkTools.Trial: 25 samples with 1 evaluation. Range (min … max): 200.602 ms … 215.285 ms ┊ GC (min … max): 0.00% … 0.00% Time (median): 203.534 ms ┊ GC (median): 0.00% Time (mean ± σ): 204.416 ms ± 3.491 ms ┊ GC (mean ± σ): 0.00% ± 0.00% █ ▅▅▁▅█▁▅█▁▁▁▅▅▅▁▁▅█▅▁▅▁▁▅▁▅▁▅▁▁▁▁▁▁▁▁▁▁▁▅▅▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▅ ▁ 201 ms Histogram: frequency by time 215 ms < Memory estimate: 0 bytes, allocs estimate: 0.
@benchmark(n_heads_parallel())
BenchmarkTools.Trial: 66 samples with 1 evaluation. Range (min … max): 57.387 ms … 114.101 ms ┊ GC (min … max): 0.00% … 0.00% Time (median): 73.049 ms ┊ GC (median): 0.00% Time (mean ± σ): 76.487 ms ± 11.048 ms ┊ GC (mean ± σ): 0.00% ± 0.00% ▂ █▂ ▂ ▂ ▄▁▁▁▁▄▆▆▄▁▄▁▄█▆██▆█▆▄▆▄█▄▆█▄▁▄▁▁▄▁▆█▁▁▆▁▁▄▄▁▄▁▄▁▁▄▁▁▁▁▁▁▁▄▁▄ ▁ 57.4 ms Histogram: frequency by time 106 ms < Memory estimate: 25.03 KiB, allocs estimate: 603.
Let's do something more reminiscent of MD simulations: let's fill an array (a
) some sort of computed index. It might be very tempting to do this:
a = zeros(10)
@distributed for i = 1:10
a[i] = i^2
end
Task (runnable) @0x0000000115ce4fd0
Which seems to not have done anything?!
a
10-element Vector{Float64}: 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
This might seem odd, since the for loop definitely did work. But if you look at the output from the @parallel for
loop you might notice all of these Future
data types. These are future calls (from the perspective of thread 1, i.e. the control thread): they might or might not have happened. And they definitely will take place in the "future" from the time you hit <shift+enter>. So basically, the work in the loop was done (in the future), and you just haven't "collected" the data onto the control thread's copy of the array a
.
Let's test our hypothesis by spawing a remote call (a Future
) on a different thread to retrieve the global a
stored there, and returning a remote reference Bref
. We can then fetch the output back to the control thread using the fetch(Bref)
command.
https://docs.julialang.org/en/stable/manual/parallel-computing/#Data-Movement-1
Bref = @spawnat 2 a
fetch(Bref)
10-element Vector{Float64}: 1.0 4.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
Bref = @spawnat 3 a
fetch(Bref)
10-element Vector{Float64}: 0.0 0.0 9.0 16.0 0.0 0.0 0.0 0.0 0.0 0.0
Aha! So we're doing work... it's just that we need to synchronize the data in the array a
...
SharedArray
for Data Movement¶Not to be confused with Distributed Arrays (DArray
), SharedArray
s make all their data available accross all threads. Let's have a look:
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i^2
a[i] += 1 # implicit comms!
end
Task (runnable) @0x00000001151387f0
a
10-element SharedVector{Float64}: 2.0 5.0 10.0 17.0 26.0 37.0 50.0 65.0 82.0 101.0
... magic! There is a price to pay for this kind of convenience, but we'll see that later. So if you want a more fine-grained control over the communications and memory foot print, Distributed Arrays might be a better (if not more tedious) choice.
https://github.com/JuliaParallel/DistributedArrays.jl#distributed-arrays
@everywhere
Macro¶Before we continue with SharedArray
, let's make a brief tangent to the oh-so-useful @everywhere
macro. As the name suggests, it runs a command... well... everywhere:
@everywhere id = myid()
@everywhere println(id)
1 From worker 2: 2 From worker 5: 5 From worker 8: 8 From worker 9: 9 From worker 4: 4 From worker 3: 3 From worker 7: 7 From worker 6: 6
... which is very useful for making functions/modules available to all the workers