Introduction
Background
Aim
Alternative packages
DistArray
Scalapy (ScaLAPACK)
scalapy.DistributedMatrix
class essentially uses the routines from ScaLAPACK and therefore is limited to the functionality of that: two-dimensional arrays and very specific block-cyclic distribution strategies that optimize numerical efficiency in the context of linear algebra problems. In contrast, we are interested in n-dimensional arrays whose distribution scheme shall be arbitrary in the first place. Therefore scalapy is not extensive enough for us.Petsc4py (PETSc)
petsc4py.PETSc.DMDA
is limited to one, two and three dimensions as PETSc uses a highly problem-fitted distribution scheme. We in contrast need n-dimensional arrays with arbitrary distribution schemes. Hence, petsc4py is not suitable for us.Code architecture
Choosing the right level of parallelization
Vertical and horizontal scaling
High- and low-level parallelization
Downsides
Problem sizes
d2o as layer of abstraction
Choosing a parallelization environment
-
Threading and multiprocessing: These two options limit the application to a single machine which conflicts with the aim of massive scalability.
-
(py)Spark [16] and hadoop [17]: These modern frameworks are very powerful but regrettably too abstract for our purposes, as they prescind the location of individual portions of the full data. Building a numpy-like interface would be disproportionately hard or even unfeasible. In addition to that, implementing a low-level interface for highly optimized applications which interact with the node’s local data is not convenient within pySpark. Lastly, those frameworks are usually not installed as standard dependencies on scientific HPC clusters.
-
MPI [5, 6]: The Message Passing Interface is available on virtually every HPC cluster via well-tested implementations like OpenMPI [18], MPICH2 [19] or Intel MPI [20]. The open implementations are also available on commodity multicore hardware like desktops or laptops. A Python interface to MPI is given by the Python module mpi4py [21]. MPI furthermore offers the right level of abstraction for hands-on control of distribution strategies for the package developers.
Internal structure
Composed object
d2o.distributed_data_object
—is a composed object; cf. Fig. 1.data
. Here the MPI processes’ local portion of the global array data is stored, even though the distributed_data_object itself will never make any assumptions about its specific content since the distribution strategy is arbitrary in the first place. The distributed_data_object is the only object of the d2o library that a casual user would interact with.d2o.distributor
subclass. This object stores all the distribution-scheme and cluster related information it needs in order to scatter (gather) data to (from) the nodes and to serve for special methods, e.g. the array-cumulative sum. The distributed_data_object builds its rich user interface on top of those abstracted methods of its distributor.Advantages of a global view interface
Basic usage
-
instances of the
numpy.ndarray
class are labeleda
andb
, -
instances of
d2o.distributed_data_object
are labeledobj
andp
.
Initialization
numpy.ndarray
. First we import the packages.
obj
call shows the local portion of the global data available in this process.Arithmetics
__add__
, __sub__
, etc.) of the first object. If this fails, i.e. if it throws an exception, Python will try to call the reverse methods of the second object (__radd__
, __rsub__
, etc.): Depending on the conjunction’s ordering, the return type may vary when combining numpy arrays with distributed_data_objects. If the numpy array is in the first place, numpy will try to extract the second object’s array data using its __array__
method. This invokes the distributed_data_object’s get_full_data
method that communicates the full data to every process. For large arrays this is extremely inefficient and should be avoided by all means. Hence, it is crucial for performance to assure that the distributed_data_object’s methods will be called by Python. In this case, the locally relevant parts of the array are extracted from the numpy array and then efficiently processed as a whole.
Array indexing
Distribution strategies
distribution_strategy
” keyword:
Distributed arrays
mpirun -n4 [...]
. The data is split along the first axis; the print statement in line 14 yields the four pieces: The second print statement (line 16) illustrates the behavior of data extraction; obj[0:3:2, 1:3]
is slicing notation for the entries 1, 2, 9 and 10.12 This expression returns a distributed_data_object where the processes possess the individual portion of the requested data. This means that the distribution strategy of the new (sub-)array is determined by and aligned to that of the original array. The result is a distributed_data_object where the processes 1 and 3 do not possess any data as they had no data to contribute to the slice in obj[0:3:2, 1:3]
. In line 19 we store a small 2 × 2 block b
in the lower middle of obj
. The process’ local data reads: Finally, in line 25 we use obj.get_full_data()
in order to consolidate the distributed data; i.e. to communicate the individual pieces between the processes and merge them into a single numpy array.
Performance and scalability
Scaling the array size
(1,)
when run within one single MPI process. Using this trivial shape makes the costs for memory allocation negligible compared to the others tasks. Hence, those 60 μs represent d2o’s constant overhead compared to numpy, since a comparable numpy array requires ≈0.4 μs for initialization.obj+= 1
instead of obj = obj + 1
whenever possible. This is generally a favorable thing to do—also for numpy arrays—as this saves the costs for repeated memory allocation. Nonetheless, also non-inplace operations can be improved in many cases, as often the produced and the initial distributed_data_object have all of their attributes in common, except for their data: they are of the same shape and datatype, and use the same distribution strategy and MPI communicator; cf. p = obj + 1
. With obj.copy()
and obj.copy_empty()
there exist two cloning methods that we implemented to be as fast as allowed by pure Python. Those methods reuse as much already initialized components as possible and are therefore faster than a fresh initialization: for the distributed_data_object from above obj.copy()
and obj.copy_empty()
consume 7.9 and 4.3 \(\upmu\), respectively.
copy_empty
—d2o will clearly follow behind numpy. However, increasing the array size from \(2^{20}\) to \(2^{22}\) elements implies a considerable performance drop for numpy’s memory allocation. This in turn means that for arrays with more than \(2^{22}\) elements d2o’s relative overhead becomes less significant: e.g. np.copy_empty
is then only a factor of four faster than obj.copy_empty()
.max
and sum
return a scalar number; no expensive return-array must be created. Hence, d2o’s overhead is quite modest: even for size 1 arrays, d2o’s relative performance lies above 50 %. Once the size is greater than \(2^{18}\) elements the performance is higher than 95 %.obj[::-2]
is slicing syntax for “take every second element from the array in reverse order”. It illustrates the costs of the data-distribution and collection logic that even plays a significant role if there is no inter-process communication involved. Again, with a large-enough array size, d2o’s efficiency becomes comparable to that of numpy.obj[::-2]
, the remaining functions in the table return a distributed_data_object as their result and therefore suffer from its initialization costs. However, with an array size of \(2^{16}\) elements and larger d2o’s relative performance is at least greater than approximately 65 %.obj + 0
and obj + obj
: As for the other functions, their relative performance starts to increase significantly when an array size of \(2^{16}\) is reached. However, in contrast to obj+= obj
which then immediately scales up above \(95\,\%\), the relative performance of the non-inplace additions temporarily decreases with increasing array size. This may be due to the fact that given our test scenario \(2^{18}\) elements almost take up half of the cache of C2PAP’s Intel E5-2680 CPUs. d2o’s memory overhead is now responsible for the fact, that its non-inplace operations—which need twice the initial memory—cannot profit that strongly from the cache anymore, whereas the numpy array still operates fast. Once the array size is above \(2^{22}\) elements numpy’s just as d2o’s array-object is too large for profiting from the cache and therefore become comparably fast again: the relative performance is then greater than \(98\,\%\).Weak scaling: proportional number of processes and size of data
copy
, copy_empty
, sum(axis = 1)
, obj + 0
, obj + obj
, obj += obj
and sqrt
.sum(axis = 0)
with sum(axis = 1)
illustrates the performance difference between those operations that involve inter-process communication and those that don’t: the equal distribution strategy slices the global array along its first axis in order to distribute the data among the individual MPI processes. Hence, sum(axis = 0)
—which means to take the sum along the first axis—does intrinsically involve inter-process communication whereas sum(axis = 1)
does not. Similarly to sum(axis = 0)
also the remaining functions in Table 4 are affected by an increasing number of processes as they involve inter-process communication.sum(axis = 0)
—the relative performance may drop to 28.2 % when using 256 processes, this means that the operation just took 3.5 times longer than the single-process run, whereat the array size has been increased by a factor of 256. This corresponds to a speedup factor of 72.2.Strong scaling: varying number of processes with a fixed size of data
initialization
and copy_empty
, show that their performance just does not increase with the number of processes. In contrast, operations without communication effort benefit strongly from the increasing total amount of CPU cache combined with smaller local arrays; above all copy
which is about 3 times faster than what one would expect from linear scaling to 256 processes.15
sum()
method as our test case. During the reduction phase, the n MPI-processes exchange their local results with each other. This corresponds to adding n times a fixed amount of communication time to the total computing time needed. Hence, we perform a linear fit to the weak-scaling data; cf. Table 4. Furthermore, we assume that the local computation time is roughly proportional to the local-array size. This is true for sufficiently large array sizes, since then numpy scales linearly and d2o is in a good efficiency regime, cf. Table 3. Again we performed a linear fit but now on the size-scaling timing data. Combining those two linear fits leads to the following run-time formula for applying sum()
to an array with shape (4096, 4096):sum()
can be explained as the combination of size- and weak scaling within about 20 % accuracy.
\(\#\mathrm {processes:~} n\)
| 1 (%) | 4 (%) | 8 (%) | 16 (%) | 32 (%) | 64 (%) | 128 (%) | 256 (%) |
---|---|---|---|---|---|---|---|---|
\(p(n)_{\mathrm {estimated}}\)
| 100 | 94.2 | 79.3 | 48.7 | 19.1 | 5.58 | 1.45 | 0.37 |
\(p(n)_{\mathrm {measured}}\)
| 100 | 91.7 | 74.1 | 60.9 | 24.4 | 7.05 | 1.82 | 0.45 |
\(\#\mathrm {nodes}\)
| 1 | 1 | 1 | 1 | 2 | 4 | 8 | 16 | 32 |
---|---|---|---|---|---|---|---|---|---|
\(\#\mathrm {processes:~} n\)
| 1 | 2 | 3 | 4 | 8 | 16 | 32 | 64 | 128 |
t [S] | 1618 | 622.0 | 404.2 | 364.2 | 181.7 | 94.50 | 46.79 | 18.74 | 8.56 |
\(s_n = 2t_2/t_n\)
| 0.769 | 2.00 | 3.08 | 3.42 | 6.85 | 13.2 | 26.6 | 66.4 | 145 |
\(q_n = {1/(1+\log (\frac{n}{s_n}))}\)
| 0.900 | 1.00 | 1.01 | 0.94 | 0.94 | 0.92 | 0.93 | 1.02 | 1.06 |
Array size | 2\(^{\mathrm {0}}\) (%) | 2\(^{\mathrm {2}}\) (%) | 2\(^{\mathrm {4}}\) (%) | 2\(^{\mathrm {6}}\) (%) | 2\(^{\mathrm {8}}\) (2 KiB) (%) | 2\(^{\mathrm {10}}\) (%) | 2\(^{\mathrm {12}}\) (%) | 2\(^{\mathrm {14}}\) (128 KiB) (%) | 2\(^{\mathrm {16}}\) (%) | 2\(^{\mathrm {18}}\) (%) | 2\(^{\mathrm {20}}\) (8 MiB) (%) |
\(2_{\mathrm {22}}\) (%) | 2\(^{\mathrm {23}}\) (%) | 2\(^{\mathrm {24}}\) (128 MiB) (%) | 2\(^{\mathrm {25}}\) (%) |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
initialization |
0.65
|
0.64
|
0.69
|
0.69
|
0.71
|
0.71
|
0.74
| 0.72 | 0.75 |
0.74
| 0.75 |
5.41
|
5.58
|
5.83
|
6.00
|
copy .empty |
3.77
|
3.86
|
4.00
|
4.12
|
4.57
|
3.92
|
3.95
|
3.98
|
4.01
|
4.10
|
4.15
|
25.0
|
24.2
|
25.3
|
26.0
|
max | 56.2 | 56.0 | 54.4 | 55.5 | 56.0 | 56.9 | 62.6 | 79.2 |
91.7
|
97.5
|
99.4
|
99.4
|
99.9
|
99.8
|
99.9
|
sum | 59.7 | 59.0 | 57.5 | 60.1 | 58.6 | 59.8 | 62.5 | 74.8 | 88.4 |
95.9
|
99.0
|
99.3
|
99.7
|
99.7
|
100.0
|
obj[::-2] |
1.18
|
1.17
|
1.20
|
1.24
|
1.34
|
1.50
|
2.14
| 4.77 |
15.0
| 22.2 |
28.7
|
24.1
| 45.8 | 47.3 | 47.7 |
copy |
8.16
|
8.86
|
9.30
|
9.72
|
9. 66
|
10.4
|
14.8
| 26.7 | 65.8 |
95.5
|
98.7
|
99.1
|
99.9
|
99.4
|
98.4
|
obj \(+\) 0 |
6.58
|
6.48
| 6.67 |
7.08
|
7.37
|
9.51
|
16.2
| 35.0 | 65.2 | 34.7 | 44.7 |
98.7
|
99.7
|
96.5
|
100.0
|
obj \(+\) obj |
3.07
|
3.10
|
3.24
|
3.59
|
3.98
|
5.70
|
13.3
| 31.9 | 64.0 | 34.6 | 45.0 |
98.7
|
99.9
|
99.5
|
99.8
|
obj \(+=\) obj |
5.17
|
5.35
|
5.41
|
6.02
|
6.66
|
11.3
|
22.8
| 46.4 | 75.3 | 91.9 |
97.8
|
98.0
|
99.8
|
99.3
|
99.7
|
sqrt |
3.25
|
3.17
|
3.26c
|
3.50
|
4.42
|
7.49
|
18.7
| 45.6 | 75.8 | 65.2 | 83.2 |
98.8
|
99.4
|
99.6
|
99.8
|
bincount |
3.57
|
3.35
|
3.76
|
4.04
|
4.97
|
7.54
|
16.5
| 35.4 | 58.0 | 75.1 | 78.0 | 76.8 | 78.9 | 82.1 | 83.2 |
Process count | 1 (%) | 2 (%) | 3 (%) | 4 (%) | S (%) | 16 (%) | 32 (%) | 64 (%) | 128 (%) | 256 (%) |
---|---|---|---|---|---|---|---|---|---|---|
initialization |
100.0
|
90.9
| 87.9 | 87.8 | 74.6 | 67.6 | 54.9 | 45.7 | 34.6 |
19.9
|
copy .empty |
100.0
|
97.5
|
96.2
|
97.5
|
97.6
|
103.6
|
97.8
|
97.7
|
97.6
|
95.1
|
max |
100.0
|
97.5
|
96.6
|
95.6
|
90.9
| 84.0 | 72.1 | 56.2 | 39.1 |
24.3
|
sum |
100.0
|
98.0
|
95.3
|
93.5
| 87.3 | 79.2 | 65.1 | 48.3 | 32.2 |
19.2
|
sum(axis\(\,=\,\)0) |
100.0
|
100.2
|
96.7
|
96.5
|
90.9
| 78.1 | 74.6 | 58.0 | 42.7 |
28.2
|
sum(axis\(\,=\,\)1) |
100.0
|
105.2
|
103.2
|
102.2
|
100.6
|
100.0
|
98.3
|
95.8
|
93.2
| 88.6 |
obj[::-2] |
100.0
| 70.4 | 65.9 | 64.0 | 46.2 | 46.6 | 42.8 | 33.6 | 31.1 |
25.3
|
copy |
100.0
|
104.7
|
103.1
|
101.3
|
101.3
|
105.3
|
101.4
|
101.2
|
101.3
|
101.5
|
obj \(+\) 0 |
100.0
|
105.1
|
102.6
|
100.6
|
99.9
|
103.5
|
100.2
|
100.0
|
99.7
|
100.1
|
obj \(+\) obj |
100.0
|
105.2
|
102.5
|
100.1
|
100.0
|
103.7
|
100.1
|
100.1
|
99.8
|
100.2
|
obj \(+\) = obj |
100.0
|
102.3
|
99.3
|
98.6
|
98.2
|
101.8
|
98.2
|
98.2
|
98.2
|
98.4
|
sqrt |
100.0
|
102.0
|
100.6
|
100.1
|
99.6
|
99.1
|
99.2
|
99.2
|
8.6
|
98.0
|
bincount |
100.0
|
103.0
|
101.2
|
99.9
|
98.8
|
97.6
|
94.1
| 88.3 | 79.4 | 65.8 |
Processes (local size) | 1 (128 MiB) (%) | 2 (64 MiB) (%) | 3 (42.7 MiB) (%) | 4 (32MiB) (%) | 8 (16 MiB) (%) | 16 (8 MiB) (%) | 32 (4 MiB) (%) | 64 (2 MiB) (%) | 128 (1 MiB) (%) | 256 (512 KiB) (%) |
---|---|---|---|---|---|---|---|---|---|---|
initialization |
100.0
| 40.3 |
23.7
| 17.1o
|
6.80
|
2.55
|
0.89
|
0.29
|
0.10,
|
0.03
|
copy .empty |
100.0
| 47.8 | 33.5 |
26.3
|
23.2
|
12.1
|
6.16
|
3.09
|
1.55
|
0.79
|
max |
100.0
|
99.0
|
96.7
|
94.0
| 80.1 | 64.3 |
29.6
|
9.10
|
2. 41
|
0.60
|
sum |
100.0
|
100.4
|
95.4
|
91.7
| 74.1 | 60.9 |
24-4
|
7.05
|
1.82
|
0.45
|
sum(axis\(\,=\,\)0) |
100.0
|
98.0
|
94.2
| 89.9 | 62.2 | 45.3 |
19.8
|
6.34
|
1.78:
|
0.45
|
sum(axis\(\,=\,\)1) |
100.0
|
100.5
|
92.3
|
92.6
| 79.0 | 77.7 | 47.1 |
20.6
|
4. 27
|
1.25
|
obj[::-2] |
100.0
| 65.4 | 62.4 | 58.5 | 40.7 | 33.1 | 26.6 |
18.3
|
8. 78
|
3.25
|
copy |
100.0
|
103.6
|
105.9
|
98.0
|
145.1
|
156.4
|
155.3
|
152.3
|
157.5
|
306.7
|
obj \(+\) 0 |
100.0
|
105.9
|
109.1
|
100.4
| 59.0 |
97.3
| 75.7 | 79.2 | 79.4 |
149.8
|
obj \(+\) obj |
100.0
|
106.2
|
109.2
|
100.3
| 59.0 |
97.6
| 74.9 | 79.0 | 80.1 |
150.2
|
obj \(+=\) obj |
100.0
|
103.1
|
101.3
|
98.0
|
97.8
|
124.0
|
122.8
|
117.9
|
108.1
|
94.4
|
sqrt |
100.0
|
101.8
|
99.4
|
98.7
|
95.7
| 88.8 | 76.0 | 56.0 | 37.1 |
16.4
|
bincount |
100.0
|
102.3
|
99.5
|
98.0
|
111.1
|
107.3
| 84.2 | 40.5 |
13.2
|
3.57
|
Strong scaling: comparison with distArray
Processes (local size) | 1 (128 MiB) | 2 (64 MiB) | 3 (42.7 MiB) | 4 (32 MiB) | 8 (16 MiB) | 16 (8 MiB) | 32 (4 MiB) | 64 (2 MiB) | 128 (1MiB) | 256 (512 KiB) |
---|---|---|---|---|---|---|---|---|---|---|
copy_empty | 23.49 | 27.70 | 33.99 | 40.03 | 1.11 × 10\(^{\mathrm {2}}\)
| 1.12 × 10\(^{\mathrm {3}}\)
| 1.06 × 10\(^{\mathrm {3}}\)
| 1.91 × 10\(^{\mathrm {3}}\)
| 5.57 × 10\(^{\mathrm {3}}\)
| 1.90 × 10\(^{\mathrm {4}}\)
|
Max | 1.05 | 1.11 | 1.16 | 1.20 | 1.34 | 5.00 | 4.74 | 3.77 | 3.25 | 4.17 |
Sum | 1.07 | 1.20 | 1.25 | 1.33 | 1.48 | 6.57 | 5.57 | 4.20 | 3.55 | 4.61 |
sum(axis\(\,=\,\)0) | 1.02 | 1.09 | 1.12 | 1.22 | 1.03 | 3.61 | 3.35 | 2.75 | 2.42 | 3.06 |
sum(axis\(\,=\,\)1) | 1.03 | 1.15 | 1.15 | 1.28 | 1.63 | 11.35 | 12.56 | 19.29 | 22.55 | 42.48 |
obj \(+\) 0 | 1.02 | 1.09 | 1.20 | 1.15 | 0.44 | 2.88 | 4.24 | 7.81 | 33.94 | 3.78 × 10\(^{\mathrm {2}}\)
|
obj \(+\) obj | 1.02 | 1.09 | 1.20 | 1.16 | 0.44 | 2.90 | 4.23 | 8.08 | 34.40 | 3.81 × 10\(^{\mathrm {2}}\)
|
obj \(+=\) obj | 2.27 | 2.38 | 2.51 | 2.53 | 1.60 | 8.23 | 14.89 | 26.24 | 1.04 × 10\(^{\mathrm {2}}\)
| 5.35 × 10\(^{\mathrm {2}}\)
|
Sqrt | 1.00 | 1.03 | 1.03 | 1.04 | 0.81 | 1.57 | 2.06 | 2.59 | 6.66 | 16.77 |
obj+=obj
is way slower with DistArray than with d2o which is on a par with numpy in most cases, cf. Tables 3, 4 and 5.obj+0
using 256 processes—indicates that d2o’s organizatorial overhead is much smaller than that of DistArray. Supporting evidence for this is that the initialization of an empty DistArray (copy_empty
) becomes disproportionately costly when increasing the number of processes used.Strong scaling: real-world application speedup—the Wiener filter
-
the true signal we try to reconstruct,
-
the data one gets from the hypothetical measurement, and
-
the reconstructed signal field that according to the Wiener filter has most likely produced the data.