Usage

PartitionedArrays considers a data-oriented programming model that allows one to write distributed algorithms in a generic way, independent from the message passing back-end used to run them. The basic abstraction of this model consists in expressing distributed data using array containers. The particular container type will depend on the back-end used to run the code in parallel. MPI is one of the possible back-ends, used to run large cases on computational clusters. However, one can also use serial arrays to prototype and debug complex codes in an effective way.

Basic usage

We want each rank in a distributed system to print its rank id and the total number of ranks. The distributed data are the rank ids. If we have an array with all rank ids, printing the messages is trivial with map function.

np = 4
ranks = LinearIndices((np,))
map(ranks) do rank
   println("I am proc $rank of $np.")
end

Previous code is not parallel (yet). However, it can be easily parallelized if one considers a suitable distributed array type that overloads map with a parallel implementation.

# hello_mpi.jl
using PartitionedArrays
np = 4
ranks = distribute_with_mpi(LinearIndices((np,)))
map(ranks) do rank
   println("I am proc $rank of $np.")
end

Now this code is parallel. Function distribute_with_mpi takes an array and distributes it over the different ranks of a given MPI communicator (a duplicate of MPI.COMM_WORLD by default). The type of the result is an array type called MPIArray, which overloads function map with a parallel implementation. Function distribute_with_mpi assigns exactly one item in the input array to each rank in the communicator. Thus, the resulting array ranks will be distributed in such a way that each MPI rank will get an integer corresponding to its (1-based) rank id. If we place this code in a file called "hello_mpi.jl", we can run it as any Julia applications using the MPI API in MPI.jl. For instance,

using MPI
mpiexec(cmd->run(`$cmd -np 4 julia --project=. hello_mpi.jl`))

The construction of the array ranks containing the rank ids is just the first step of a computation using PartitionedArrays. See the Examples for more interesting cases.

Debugging

One of the main advantages of PartitionedArrays is that it allows one to write and debug your parallel code without using MPI. This makes possible to use the standard Julia development workflow (e.g., Revise) when implementing distributed applications, which is certainly useful. This ability comes from the fact that one can use standard serial Julia arrays to test your application based on PartitionedArrays. However, the array type MPIArray resulting after distributing data over MPI processes, is not as flexible as the standard arrays in Julia. There are operations that are not allowed for MPIArray, mainly for performance reasons. One of them is indexing the array at arbitrary indices. In consequence, code that runs with the common Julia arrays might fall when switching to MPI. In order to anticipate these type of errors, PartitionedArrays provides an special array type called DebugArray for debugging purposes. The type DebugArray tries to mimic the limitations of MPIArray but it is just a wrapper to a standard Julia array and therefore can be used in a standard Julia session.

using PartitionedArrays
np = 4
ranks = DebugArray(LinearIndices((np,)))
ranks[3] # Error!

The last line of previous code will throw an error telling that scalar indexing is not allowed. This is to mimic the error you would get in production when using MPI.

using PartitionedArrays
with_mpi() do distribute
    np = 4
    ranks = distribute(LinearIndices((np,)))
    ranks[3] # Error!
end

We also provide function with_debug which allows to easily switch from one back-end to the other. For instance, if we define the following main function

using PartitionedArrays
function main(distribute)
    np = 4
    ranks = distribute(LinearIndices((np,)))
    map(ranks) do rank
       println("I am proc $rank of $np")
    end
end

then with_debug(main) and with_mpi(main) will run the code using the debug back-end and MPI respectively. If you want to run in using native Julia arrays, you can simply call main(identity). Make sure that your code works using DebugArray before moving to MPI.

Running MPI code safely

MPI applications should call MPI.Abort if they stop prematurely (e.g., by an error). The Julia error handling system is not aware of that. For this reasons, codes like the following one will crash and stop without calling MPI.Abort.

using PartitionedArrays
np = 3
ranks = distribute_with_mpi(LinearIndices((np,)))
map(ranks) do rank
    if rank == 2
        error("I have crashed")
    end
end

Even worse, the code will crash only in the 2nd MPI process. The other processes will finish normally. This can lead to zombie MPI processes running in the background (and provably consuming quota in your cluster account until the queuing system kills them). To fix this, PartitionedArrays provides function with_mpi. We rewrite the previous example using it.

using PartitionedArrays
with_mpi() do distribute
    np = 3
    ranks = distribute(LinearIndices((np,)))
    map(ranks) do rank
        if rank == 2
            error("I have crashed")
        end
    end
end

Essentially, with_mpi(f) calls f(distribute_with_mpi) in a try-catch block. If some error is cached, MPI.Abort will be called, safely finishing all the MPI processes, also the ones that did not experienced the error.

Benchmarking distributed codes

When using MPI, the computational time to run some code can be different for each one of the processes. Usually, one measures the time for each process and computes some statistics of the resulting values. This is done by doing time measurements with the tool of your choice and then gathering the results on the root for further analysis. Note that this is possible thanks to the changes in version 0.4.1 that allow one to use gather on arbitrary objects.

In the following example, we force different computation times at each of the processes by sleeping a value proportional to the rank id. We gather all the timings in the main process and compute some statistics:

using PartitionedArrays
using Statistics
with_mpi() do distribute
    np = 3
    ranks = distribute(LinearIndices((np,)))
    t = @elapsed map(ranks) do rank
        sleep(rank)
    end
    ts = gather(map(rank->t,ranks))
    map_main(ts) do ts
        @show ts
        @show maximum(ts)
        @show minimum(ts)
        @show Statistics.mean(ts)
    end
end
ts = [1.001268313, 2.0023204, 3.001216396]
maximum(ts) = 3.001216396
minimum(ts) = 1.001268313
Statistics.mean(ts) = 2.001601703

This mechanism also works for the other back-ends. For sequential ones, it provides the time spend by all parts combined. Note how we define t (outside the call to map) and the object passed to gather.

using PartitionedArrays
using Statistics
with_debug() do distribute
    np = 3
    ranks = distribute(LinearIndices((np,)))
    t = @elapsed map(ranks) do rank
        sleep(rank)
    end
    ts = gather(map(rank->t,ranks))
    map_main(ts) do ts
        @show ts
        @show maximum(ts)
        @show minimum(ts)
        @show Statistics.mean(ts)
    end
end;
ts = [6.009726399, 6.009726399, 6.009726399]
maximum(ts) = 6.009726399
minimum(ts) = 6.009726399
Statistics.mean(ts) = 6.009726398999999

We can also consider more sophisticated ways of measuring the times, e.g., with TimerOutputs.

using PartitionedArrays
using Statistics
using TimerOutputs
with_mpi() do distribute
    np = 3
    ranks = distribute(LinearIndices((np,)))
    to = TimerOutput()
    @timeit to "phase 1" map(ranks) do rank
        sleep(rank)
    end
    @timeit to "phase 2" map(ranks) do rank
        sleep(2*rank)
    end
    tos = gather(map(rank->to,ranks))
    map_main(tos) do tos
        # check the timings on the first rank
        display(tos[1])
        # compute statistics for phase 1
        ts = map(tos) do to
            TimerOutputs.time(to["phase 1"])
        end
        @show ts
        @show maximum(ts)
        @show minimum(ts)
        @show Statistics.mean(ts)
    end
end
 ────────────────────────────────────────────────────────────────────
                            Time                    Allocations      
                   ───────────────────────   ────────────────────────
 Tot / % measured:      10.3s /  29.3%           44.9MiB /   0.0%    

 Section   ncalls     time    %tot     avg     alloc    %tot      avg
 ────────────────────────────────────────────────────────────────────
 phase 2        1    2.00s   66.6%   2.00s      120B   50.0%     120B
 phase 1        1    1.00s   33.4%   1.00s      120B   50.0%     120B
 ────────────────────────────────────────────────────────────────────
ts = [1002323746, 2001614329, 3004363808]
maximum(ts) = 3004363808
minimum(ts) = 1002323746
Statistics.mean(ts) = 2.0027672943333333e9

In addition, the library provides a special timer type called PTimer.

Note

PTimer has been deprecated. Do time measurements with the tool of your choice and then gather the results on the root for further analysis (see above).

In the following example we force different computation times at each of the processes by sleeping a value proportional to the rank id. When displayed, the instance of PTimer shows some statistics of the times over the different processes.

using PartitionedArrays
with_mpi() do distribute
    np = 3
    ranks = distribute(LinearIndices((np,)))
    t = PTimer(ranks)
    tic!(t)
    map(ranks) do rank
        sleep(rank)
    end
    toc!(t,"Sleep")
    display(t)
end
───────────────────────────────────────────
Section         max         min         avg
───────────────────────────────────────────
Sleep     3.021e+00   1.021e+00   2.021e+00
───────────────────────────────────────────

This mechanism also works for the other back-ends. For sequential ones, it provides the time spend by all parts combined.

using PartitionedArrays
with_debug() do distribute
    np = 3
    ranks = distribute(LinearIndices((np,)))
    t = PTimer(ranks)
    tic!(t)
    map(ranks) do rank
        sleep(rank)
    end
    toc!(t,"Sleep")
    display(t)
end
───────────────────────────────────────────
Section         max         min         avg
───────────────────────────────────────────
Sleep     6.010e+00   6.010e+00   6.010e+00
───────────────────────────────────────────