A (very) Brief Look at Distributed Computing¶

(Back to Overview)

Here we will look at how (distributed) parallel computing can be achieved from within Julia. The nice thing about Julia is that parallel computing is baked into the standard library. This is only a very brief overview (and I will elaborate more on this in the future), mainly targeted at running MD simulations.

For more details on fine-grained control like @spawn, take a look at: https://docs.julialang.org/en/stable/manual/parallel-computing/

Controling the number of processes (a.k.a. workers)¶

You can control the number of worker processes from the command line by running julia -p n where $n$ is the number of workers. Alternatively, we can use the addprocs command:

In [2]:
workers()
Out[2]:
1-element Vector{Int64}:
 1
In [3]:
addprocs(8)
Out[3]:
8-element Vector{Int64}:
 2
 3
 4
 5
 6
 7
 8
 9

which spawned 8 worker threads:

In [4]:
workers()
Out[4]:
8-element Vector{Int64}:
 2
 3
 4
 5
 6
 7
 8
 9

Prarallel for loops and @parallel reduction¶

The easiest way to perform an operation in parallel is to use the @parallel macro. More details can be found here: https://docs.julialang.org/en/stable/manual/parallel-computing/#Parallel-Map-and-Loops-1

In the example below, we throw a "coin" 200000000 times, and count how many heads there are by casting the output of the rand command. So

Int(rand(Bool))

would give $0$ with probability of $0.5$, and $1$ the rest of time. Since want to do this in parallel using the @parallel macro together with a for loop. Note that @parallel for i = 1:200000000 would run through the loop in parallel, but would not send the result back to the controler. That's where the (+) function comes in. It applies the +(.,.) function to the result of each loop iteration and "reduces" the result of all these parallel runs into a single variable (nheads)

In [5]:
function n_heads()
    n = 0
    for i = 1:200000000
        n += Int(rand(Bool))
    end
    n
end

function n_heads_parallel()
    nheads = @distributed (+) for i = 1:200000000
        Int(rand(Bool))
    end
    nheads
end
Out[5]:
n_heads_parallel (generic function with 1 method)
In [6]:
@benchmark(n_heads())
Out[6]:
BenchmarkTools.Trial: 25 samples with 1 evaluation.
 Range (min … max):  200.602 ms … 215.285 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     203.534 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   204.416 ms ±   3.491 ms  ┊ GC (mean ± σ):  0.00% ± 0.00%

      █                                                          
  ▅▅▁▅█▁▅█▁▁▁▅▅▅▁▁▅█▅▁▅▁▁▅▁▅▁▅▁▁▁▁▁▁▁▁▁▁▁▅▅▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▅ ▁
  201 ms           Histogram: frequency by time          215 ms <

 Memory estimate: 0 bytes, allocs estimate: 0.
In [7]:
@benchmark(n_heads_parallel())
Out[7]:
BenchmarkTools.Trial: 66 samples with 1 evaluation.
 Range (min … max):  57.387 ms … 114.101 ms  ┊ GC (min … max): 0.00% … 0.00%
 Time  (median):     73.049 ms               ┊ GC (median):    0.00%
 Time  (mean ± σ):   76.487 ms ±  11.048 ms  ┊ GC (mean ± σ):  0.00% ± 0.00%

               ▂ █▂ ▂    ▂                                      
  ▄▁▁▁▁▄▆▆▄▁▄▁▄█▆██▆█▆▄▆▄█▄▆█▄▁▄▁▁▄▁▆█▁▁▆▁▁▄▄▁▄▁▄▁▁▄▁▁▁▁▁▁▁▄▁▄ ▁
  57.4 ms         Histogram: frequency by time          106 ms <

 Memory estimate: 25.03 KiB, allocs estimate: 603.

Let's do something more reminiscent of MD simulations: let's fill an array (a) some sort of computed index. It might be very tempting to do this:

In [8]:
a = zeros(10)
@distributed for i = 1:10
    a[i] = i^2
end
Out[8]:
Task (runnable) @0x0000000115ce4fd0

Which seems to not have done anything?!

In [9]:
a
Out[9]:
10-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

This might seem odd, since the for loop definitely did work. But if you look at the output from the @parallel for loop you might notice all of these Future data types. These are future calls (from the perspective of thread 1, i.e. the control thread): they might or might not have happened. And they definitely will take place in the "future" from the time you hit <shift+enter>. So basically, the work in the loop was done (in the future), and you just haven't "collected" the data onto the control thread's copy of the array a.

Let's test our hypothesis by spawing a remote call (a Future) on a different thread to retrieve the global a stored there, and returning a remote reference Bref. We can then fetch the output back to the control thread using the fetch(Bref) command.

https://docs.julialang.org/en/stable/manual/parallel-computing/#Data-Movement-1

In [10]:
Bref = @spawnat 2 a
fetch(Bref)
Out[10]:
10-element Vector{Float64}:
 1.0
 4.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
In [11]:
Bref = @spawnat 3 a
fetch(Bref)
Out[11]:
10-element Vector{Float64}:
  0.0
  0.0
  9.0
 16.0
  0.0
  0.0
  0.0
  0.0
  0.0
  0.0

Aha! So we're doing work... it's just that we need to synchronize the data in the array a...

Use SharedArray for Data Movement¶

Not to be confused with Distributed Arrays (DArray), SharedArrays make all their data available accross all threads. Let's have a look:

In [12]:
a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i^2
    a[i] += 1 # implicit  comms!
end
Out[12]:
Task (runnable) @0x00000001151387f0
In [13]:
a
Out[13]:
10-element SharedVector{Float64}:
   2.0
   5.0
  10.0
  17.0
  26.0
  37.0
  50.0
  65.0
  82.0
 101.0

... magic! There is a price to pay for this kind of convenience, but we'll see that later. So if you want a more fine-grained control over the communications and memory foot print, Distributed Arrays might be a better (if not more tedious) choice.

https://github.com/JuliaParallel/DistributedArrays.jl#distributed-arrays

The @everywhere Macro¶

Before we continue with SharedArray, let's make a brief tangent to the oh-so-useful @everywhere macro. As the name suggests, it runs a command... well... everywhere:

In [14]:
@everywhere id = myid()
In [15]:
@everywhere println(id)
1
      From worker 2:	2
      From worker 5:	5
      From worker 8:	8
      From worker 9:	9
      From worker 4:	4
      From worker 3:	3
      From worker 7:	7
      From worker 6:	6

... which is very useful for making functions/modules available to all the workers