Introduction
-
Consistency and Availability (CA) Since the graph data is stored on a distributed system, we cannot guarantee the availability and the consistency of the dataset across the cluster at each moment.
-
Consistency and Partition (CP) Eventually the data stored on each partition must be consistent.
-
Availability and Partition (AP) To ensure fault tolerance, the vertices and/or edges must be replicated on the nodes of the cluster.
Contributions
Organization
Model and formalism
Definitions and notations
-
\(\forall i\in \llbracket 1;k \rrbracket\), \(Gr_{i}\ne \emptyset\)
-
\(\forall i,j \in \llbracket 1;k \rrbracket\) such as \(i\ne j\), we have \(Gr_{i}\cap Gr_{j}\)=\(\emptyset\)
-
\(\bigcup \limits _{i=1}^{k} Gr_{i}= Gr\)
Formalism
-
Balancing of sub-graphs \(Gr'_{i}\)\(\forall i \in \llbracket 1;k \rrbracket\).
-
Minimization of cuts \(cut(Gr'_{i},Gr'_{j})\) between two sub-graphs \(Gr'_{i}\) and \(Gr'_{j}\).
Related works
Methodology
-
Partition balancing [41] the partition weights must be as close as possible. This makes it possible to have the same computational loads on each node.
-
Connectivity [37]: the sub-graphs induced in each partition must remain connected as much as possible as well as the clicks. This condition is not a necessity but it allows to preserve the topology of the original graph.
coordinator()
and partitioner()
.
-
\(G=(Vr, Ed)\): graph composed of set of vertex Vr and edge Ed such that \(n =|Vr|\) and \(m =|Ed|\).
-
k: number of sub-graphs of the graph Gr such that \(k>2\). It is a hyperparameter which impacts the execution time and the result optimality.
-
\(Gr'_{i}\): sub-graph i, such as \(i\in \llbracket 1; k \rrbracket\) the weight of each partition is defined by \(w(Gr'_{i})\).
-
slave[i]: slave node which hosts the partition \(Gr'_{i}\).
-
\(s_{i}\): vertex assigned to the partition \(Gr'_{i}\), such as \(s_{i} \in Vr\). We denote \(Vr(s_{i})\) the set of these adjacent vertices such that \(d(s_{i})\) is its degree.
-
M: contains all the vertices that have been assigned to one of the k partitions.
-
\(Cut_{s_{i}}(Gr'_{i})\): number of cut-edges generated by the assignment of the vertex \(s_{i}\) in the partition \(Gr'_{i}\).
-
\(In_{s_{i}}(Gr'_{i})\): number of induced edges generated by the assignment of \(s_{i}\) to the sub-graph \(Gr'_{i}\).
-
\(f_{R}(Gr'_i, s_i)\): ratio function of the number of induced edges compared to the number of cut-edges generated by the assignment of \(s_{i}\) to \(Gr_{i}\). It is calculated as follows:such as \(Cut_{s_{i}}(Gr'_{i}) = 1\) if there is no cut-edge generated by the assignment of the vertex \(s_i\) to the sub-graph \(Gr'_{i}\). Moreover, if no edge is generated inside the subgraph \(Gr'_{i}\), then \(In_{s_ {i}}(Gr'_{i}) = 1\).$$\begin{aligned} f_{R}(Gr'_i,s_i)=\frac{In_{s_{i}}(Gr'_{i})}{Cut_{s_{i}}(Gr'_{i})} \end{aligned}$$(7)
The load balancer
coordinator()
program is centralized on the master node, it is responsible of monitoring the state of the slave nodes and ensures that the weight of the partitions is equitably balanced across the cluster.coordinator()
. Initially all nodes into the cluster are in the active state. So, at each iteration, the coordinator evaluates the value of the partition balance \(B(P_{k})\). If the balance constraints of the partition with respect to the acceptance error \(\epsilon\) are not respected (see section 2) then, the coordinator checks at each iteration whether the weight \(w(Gr'_{i})\) of a subgraph \(Gr'_{i}\) is not far from the average or is not too high compared to the other subgraphs. Then the coordinator puts it in inactive state via the haltNode(true, slave[i])
method.The distributed partitioning strategy
coordinator()
which is centralized on the master node, partitioner()
is decentralized on all the slave nodes of the cluster.partitioner()
processes and assigns each vertex before proceeding to the next one, it keeps in memory the current weight of its partition. In the event of q compromise, the following rules are used in the placement decision:-
If the majority of the neighbors of the current vertex are already in a subgraph \(Gr'_{i}\), then the vertex will be added to this partition;
-
If it has no subgraph in common, the subgraph with the most edges associated with this vertex will be chosen;
-
If the vertex assignment generates the same placement ratio for all subgraphs, then the vertex will be assigned to the smallest subgraph \(Gr'_{i}\) such that \(w(Gr'_{i}) = min\{w(Gr'_{1}), w(Gr'_{2}), \ldots, w(Gr'_{k})\}\);
-
Otherwise the vertex will be randomly assigned to one of the k subgraphs.
partitioner()
program. Initially, each partition \(Gr'_{i}\) is empty as well as the associated weight \(w(Gr'_{i})\) and the node slave[i] receives a message from the master node notifying the start of the partitioning task. At each iteration, as long as the node slave[i] does not receive a message signaling the end of the job, then for each k unmarked vertices, we evaluate the ratio \(f_{R}(Gr'_i, s_i)\) of the number of induced edges generated by the placement of this vertex \(s_{i}\) in the partition \(Gr'_{i}\) compared to the number of cut-edges generated by the placement of the vertex \(s_{i}\). Then the vertex \(s^*_{i}\) having the maximum value of \(f_{R}(Gr'_i,s^*_{i})\) is chosen. If two slave nodes slave[i] and slave[j] whose vertices \(s^*_{i}\) and \(s^*_{j}\) are promising and if \(w(Gr'_{j}) \ge w(Gr'_{i})\) then the vertex \(s^*_{i}\) will be placed in the partition \(Gr'_{i}\) while the vertex \(s^*_{j}\) will be replaced by the vertex \(s_{j-1}\) and placed in the partition \(Gr'_{j}\). Each of the k vertices placed is marked. Subsequently we add all incident edges to vertex \(s_{i}\) as well as the cut-edges generated by the assignment of \(s_{i}\). Finally, the slave node slave[i] communicates by message the new value of the weight of its partition to the master node. It is important to emphasize that the partition task is parallelized according to BSP (Bulk Synchronous Parallel) paradigm [41]. So when a node finishes placing a vertex, it waits until the rest of nodes finish their job. Thus, the time complexity of each node is \(O(\frac{|Vr(s_{i})|n^2}{k}log(k))\).Results and discussions
Illustration of DPHV algorithm
The pre-processing phase
\(s_{i}\) | 1 | 3 | 2 | 4 | 7 | 5 | 6 |
\(d(s_{i})\) | 5 | 4 | 3 | 3 | 3 | 2 | 2 |
The partitioning phase
Test environment and dataset
Designation | Type | \(\mathbf |Vr|\) | \(\mathbf |Ed|\) | ACC | D |
---|---|---|---|---|---|
Biosnap | Undirected | 1018524 | 24735503 | 0.42 | 15 |
Twitter | Directed | 81306 | 1768149 | 0.5653 | 7 |
Usroad | Directed | 126146 | 161950 | 0.0145 | 617 |
Email | Undirected | 36692 | 183831 | 0.4970 | 11 |
Astro | Undirected | 18772 | 198110 | 0.6306 | 14 |
ageRRN | Directed | 860000 | 2360000 | 56 | 0.4970 |
cptTRM | Directed | 1070000 | 6000000 | 70 | 0.6235 |
elsaRR | Directed | 1508000 | 9000000 | 92 | 0.5653 |
osmMA | Directed | 4526700 | 12670000 | 412 | 0.0136 |