1 Introduction
-
Bitemporal graph model We formally outline the bitemporal property graph model TPGM used in Gradoop, supporting valid and transactional time information for evolving graphs and graph collections.
-
Temporal graph operators We describe the extended set of graph operators that support temporal graph analysis. In particular, we present temporal extensions of the grouping and pattern matching operator with new query language constructs to express and detect time-dependent patterns.
-
Implementation and evaluation We provide implementation details for the new temporal graph operators and evaluate their scalability and performance for different datasets and distributed configurations.
-
Lessons learned We briefly summarize findings from five years of research on distributed graph analysis with Gradoop.
2 Graph system landscape
2.1 Temporal property graphs and query languages
2.2 Graph database and graph processing systems
2.3 Temporal graph processing systems
3 System architecture overview
4 Temporal property graph model
4.1 Graph data model
Person
or knows
). Attributes have the form of key-value pairs (e.g., name:Alice
or classYear:2015
) and are referred to as properties. Properties are set at the instance level without an upfront schema definition. A temporal property graph is a property graph with additional time information on its vertices and edges, which primarily describes the historical development of the structure and attributes of the graph, i.e., when a graph element was available and when it was superseded. Our presented TPGM adds support for two time dimensions, valid and transaction time, to differentiate between the evolution of the graph data with respect to the real-world application (valid time) and with respect to the visibility of changed graph data to the system managing the data (transaction time). This concept of maintaining two orthogonal time domains is known as bitemporality [38]. In addition, the TPGM supports graph collections, which were introduced by the EPGM [40], the non-temporal predecessor of the TPGM. A graph collection contains multiple, possibly overlapping property graphs, which are referred to as logical graphs. Like vertices and edges, logical graphs also have bitemporal information, a type label and an arbitrary number of properties. Before the data model is formally defined, the following preliminaries3 have to be considered:Station
and are further described by their properties (e.g., name: Christ Hospital
). Edges describe the relationships or interactions between vertices and also have a type label (Trip
) and properties. The key set K contains all property keys, for example, bikeId
, userType
and capacity
, while the value set A contains all property values, for example, 21233
, Cust
and 22
. Vertices with the same type label may have different property keys, e.g., \(v_1\) and \(v_2\).
21233
at station \(v_0\) at 10 a.m. and its return to station \(v_1\) at 11 a.m. The bike was then rented again from 11:10 a.m. to 12:10 p.m., which is represented by edge \(e_5\). Completed trips are stored in the graph every full hour, which is made visible by the transaction times.BikeGraph
or Region
) and may have properties, which can be used to describe the graph by annotating it with specific metrics (e.g., stationCount:3
) or general information about that graph (e.g., location: Jersey City
). Logical graphs, like those in our example, are either declared explicitly or are the output of a graph operator or algorithm, e.g., graph pattern matching or community detection. In both cases, they can be used as input for subsequent operators and algorithms.4.2 Operators
4.2.1 Subgraph
Station
having a property capacity
with a value less than 30
and their edges of type Trip
with a property gender
which value is equal to female
:5
4.2.2 Snapshot
’2019-09-06 09:55:00’
, edges \(e_6\) and \(e_8\) would no longer belong to the result set, since the information about these trips was not yet persisted at this point in time.4.2.3 Difference
_diff
on each graph element, whereas the value of the property will be a number indicating that an element is either equal in both snapshots (0) or added (1) or removed (-1) in the second snapshot. This resulting graph can then be used by subsequent operators to, for example, filter for added elements, group removed elements or aggregate specific values of persistent elements. For the given example in Fig. 2, the following operator call calculates the difference between the graph at 9 a.m. and 10 a.m. of the given day:
_diff
is added to K and the values \(\{-1,0,1\}\) are added to A. Since all vertices and the edge \(e_8\) are valid in both snapshots, a property _diff:0
is added to them. The edges \(e_6\) and \(e_1\) are no longer available in the second snapshot; therefore, they are extended by the property _diff:-1
, whereas the edges \(e_2\) and \(e_7\) are annotated by _diff:1
to show that they were created during this time period.4.2.4 Time-dependent Graph Grouping
label()
key function) and property key regionId
(using the property()
key function). Edges are grouped by label and by the start of the valid time interval. The timeStamp
key function was used for the latter to extract the start of the valid time interval and to calculate the hour of the day for this time (lines 10–11). Type labels are added as grouping keys in both cases, since we want to retain this information on supervertices and superedges. In lines 3–9 and 12–15, we declare the vertex and edge aggregate functions, respectively. Both receive a superelement (i.e., superVertex
, superEdge
) and a set of group members (i.e., vertices
, edges
) as inputs. They then calculate values for the group and attach them as properties to the superelement. In our example, a count
property is set storing the number of elements in the group. We also use the avg
function to calculate the average value of a numeric property and the averageDuration
function to get the average length of the valid time interval for elements. Figure. 3 shows the resulting logical graph for this example.4.2.5 Temporal pattern matching
(a)-[e]->(b)
denotes a directed edge e
from vertex a
to vertex b
and can be used in a MATCH
clause. Predicates are either embedded into the pattern by defining type labels and properties or expressed in the WHERE
clause. For a more detailed description of the (non-temporal) language on which TemporalGDL is based, we refer to our previous publication [41]. We extended the language by various syntactic constructs that are partly inspired by Allen’s conventions [9] and the SQL:2011 standard [47], to support the TPGM-specific bitemporal attributes. These extensions enable the construction of time-dependent patterns, e.g., to define a chronological order of elements or to define Boolean relations by accessing the element’s temporal intervals and their bounds. Table 3 gives an overview of a subset of the TemporalGDL syntax including access options for intervals and their bounds of both dimensions (e.g., a.val
to get the valid time interval of the graph element that is represented by variable a
), a set of binary relations for intervals and timestamps (e.g., a.val.overlaps(b.val)
to check whether the valid time intervals of the graph elements assigned to a
and b
overlap), functions to create a duration time constant of a specific time unit (e.g., Seconds(10)
to get a duration constant of 10 seconds) and binary relations between duration constants and interval durations, e.g, a.val.shorterThan(b.val)
to check whether the duration of the valid time interval of a
is shorter than the one of b
or a.val.longerThan(Minutes(5))
to check whether the duration of the interval is longer than five minutes.
a,b,c,d
for vertices and e,f,g
for edges). Variables are optionally followed by a label (e.g., a:Station
) and properties (e.g., {name:’Christ Hospital’}
). More complex predicates can be expressed within the WHERE
clause. Here, the user has access to vertex and edge properties using their variable and property keys (e.g., e.bikeId = f.bikeId
). In addition, bitemporal information of the elements can be accessed in a similar way using predefined identifiers (e.g., e.val
) as described before. A chronological order of the edges is defined by binary relations, for example, e.val.precedes(f.val)
. When called for graph \(G_0\) of Fig. 2, the operator returns a graph collection containing a single logical graph as shown in Fig. 4. Each graph in the result collection contains a new property storing the mapping between query variables and entity identifiers.4.2.6 Graph transformation operators
4.2.7 Additional graph operators
4.2.8 Graph collection operators
4.3 Iterative graph algorithms
callForGraph
and callForCollection
operators. Using the former function, a user has access to the API and complete library of iterative graph algorithms of Apache Flink’s Gelly [3], which is the Apache Flink implementation of Google Pregel [52]. By utilizing Flink’s dataset iteration, co-group and flatMap functions Gelly is able to provide different kinds of iterative graph algorithms. For now, vertex iteration, gather–sum–apply and scatter–gather algorithms are supported. However, since Gelly is based on the property graph model we use a bidirectional translation between Gradoop’s logical graph and Gelly’s property graph, as described in Sect. 5.4. Thus, Gradoop already provides a set of algorithms that can be seamlessly integrated into a graph analytical program (see Table 1), e.g., PageRank, Label Propagation and Connected Components. Besides, we provide TPGM-tailored algorithm implementations, e.g., for frequent subgraph mining (FSM) within a graph collection [61].5 Implementation
5.1 Apache Flink
5.2 Graph representation
props
) and two tuples (val
and tx
) representing time intervals of the valid- and transaction-time dimension. Each tuple consists of two timestamps that define the interval bounds. Each timestamp is a 8-byte long value that stores Unix-epoch milliseconds. Since TPGM elements are self-descriptive, properties are represented by a key-value map, whereas the property key is of type string and the property value is encoded in a byte array. The current implementation supports values of all primitive Java types as well as arrays, sets and maps of those. Vertices and edges maintain their graph membership in a dedicated set of graph identifiers (graphs
). Edges additionally store the identifiers of their incident vertices (i.e., sid
/tid
).
5.2.1 Programming abstractions
yob
that is greater than the value 1980
and all source and target vertices. Based on that subgraph, we call the PageRank algorithm and store the resulting rank as a new vertex property pr
. Using the match operator, we extract triangles of vertices in which the total page rank exceeds a given value. The resulting collection of matching triangles is stored using a specific data sink. Note that the program is executed lazily by either writing to a data sink or by executing specific action methods on the underlying DataSets, e.g., for collecting, printing or counting their elements. In Sect. 6, we will provide more complex examples as part of our evaluation.5.2.2 Graph Layouts
5.3 Graph operators
IN
and produces a DataSet containing elements of type OUT
. Application-specific logic is expressed through a user-defined function (udf: IN -> OUT
) that maps an element of the input DataSet to exactly one element of the output DataSet. Further DataSet transformations are well known from relational systems, e.g., select (filter), join, group-by, project and distinct.T
(e.g., \(\texttt {DataSet<String>}\), \(\texttt {DataSet<Vertex>}\) or \(\texttt {DataSet<Tuple2<Int,Int{>}>}\))Name | Description |
---|---|
Map | The map transformation applies a user-defined map function to each element of the input DataSet. Since the function returns exactly one element, it guarantees a one-to-one relation between the two DataSets. |
\(\texttt {DataSet<IN>.map(udf: IN -> OUT) : DataSet<OUT>}\) | |
FlatMap | The flatMap transformation applies a user-defined flatmap function to each element of the input DataSet. This variant of a map function can return zero, one or arbitrary many result elements for each input element. |
\(\texttt {DataSet<IN>.flatMap(udf: IN -> OUT) : DataSet<OUT>}\) | |
Filter | The filter transformation evaluates a user-defined predicate function to each element of the input DataSet. If the function evaluates to true , the particular element will be contained in the output DataSet. |
\(\texttt {DataSet<IN>.filter(udf: IN -> Boolean) : DataSet<IN>}\) | |
Project | The projection transformation takes a DataSet containing a tuple type as input and forwards a subset of user-defined tuple fields to the output DataSet. |
\(\texttt {DataSet<TupleX>.project(fields) : DataSet<TupleY> (X,Y in [1,25])}\) | |
Equi-Join | The join transformation creates pairs of elements from two input DataSets which have equal values on defined keys (e.g., field positions in a tuple). A user-defined join function is executed for each of these pairs and produces exactly one output element. |
\(\texttt {DataSet<L>.join(DataSet<R>).where(leftKeys).equalTo(rightKeys).}{} \texttt {with(udf: (L,R) -> OUT) : DataSet<OUT>}\) | |
ReduceGroup | DataSet elements can be grouped using custom keys (similar to join keys). The ReduceGroup transformation applies a user-defined function to each group of elements and produces an arbitrary number of output elements. |
\(\texttt {DataSet<IN>.groupBy(keys).reduceGroup(udf: IN[] -> OUT[]) : DataSet<OUT>}\) |
(v => v.capacity>= 40)
). The resulting vertex DataSet \(V_1\) can already be used to construct the output graph. However, we have to ensure that no dangling edges exist, i.e., only those filtered edges are selected where source and target vertex are contained in the output vertex set. To achieve that, the operator performs a join transformation between filtered edges and filtered vertices for both sourceId
and targetId
of an edge. Edges that have a join partner for both transformations are forwarded to the output DataSet. During construction of the output graph, a new graph head is generated and used to update graph membership of vertices and edges.9asOf
or fromTo
. We implemented the operator analogous to the subgraph operator by using two Flink filter transformations. Each transformation applies a temporal predicate to each record of V and E, respectively. Remember that a graph in TPGM is fully evolved and contains the whole history of all changes for both time dimensions. Therefore, the predicate will check the valid or transaction time of each graph element, depending on an optional identifier of the dimension to be used, and thus decides whether to keep the element or to discard it. Just like subgraph, the filter may produce dangling edges, since vertices and edges are handled separately. The subsequent verification step (two join
transformations) is thus performed to remove these dangling edges.
Vertex
instances for each tuple.
Edge
instances, representing the final superedges.(e:Trip)
and e.val.asOf(...)
), we introduce a Filter transformation on the input vertex DataSet \(E_0\) and a subsequent Map transformation to convert the edge object into a tuple containing only the edge id, source id and target id for subsequent joins and the val
interval (that represents the edge’s valid time) for later predicate evaluation (DataSet \(E_1\)). Join transformations compute partial structural matches (DataSet \(M_i\)) and subsequent Filter transformations validate the edge isomorphism semantics that demands a Filter transformation on DataSet \(M_2\). Predicates that span multiple query variables (e.g., a.capacity> b.capacity
) can be evaluated as soon as the required input values are contained in the partial match (DataSet \(M_4\)). Each row in the resulting DataSet represents a mapping between the query variables and the data entities and is converted into a logical graph in a postprocessing step.
5.4 Iterative graph algorithms
GradoopGellyAlgorithm
which includes transformation functions to translate a logical graph to a Gelly graph representation. Various algorithms are already available in Gradoop and can be integrated into an analytical GrALa pipeline using the Call operator (see auxiliary operators in Table 2). Custom or other Gelly algorithms can also be integrated simply by extending the aforementioned base class and using the provided graph transformations. Algorithm results, e.g., the PageRank scores of vertices, the component identifiers or the number of triangles, are typically integrated in the resulting logical graph by adding new properties to the graph (head), vertices or edges.5.5 Graph storage
DataSource
and DataSink
with methods to read and write logical graphs and graph collections, respectively (see the listing in Sect. 5.2.1). Notwithstanding, Gradoop contains a set of embedded storage implementations, including file formats and NoSQL stores.6 Evaluation
Name | SF | |V| | |E| | Disk size |
---|---|---|---|---|
LDBC | 1 | 3.3 M | 17.9 M | 4.2 GB |
LDBC | 10 | 30,4 M | 180.4 M | 42.3 GB |
LDBC | 100 | 282.6 M | 1.77 B | 421.9 GB |
CitiBike | – | 1174 | 97.5 M | 22.6 GB |
cellId
calculated from the geographical coordinates in its properties to each vertex (rental station) with the Transformation operator. The Temporal Pattern Matching operator in lines 6–14 uses the enriched snapshot to match all pairs of rentals with a duration of at least 40 or 90 minutes, each where the first trip starts in a specific cell (2883) in NYC and the second trip starts in the destination of the first trip after the end of the first trip. Since the result of the Temporal Pattern Matching operator is a graph collection we use a Reduce operator (line 15) to combine the results to a single logical graph. We further group the combined graph by the vertex properties name
and cellId
(line 17) and the edge creation per month (line 19) using the time-dependent grouping operator. By applying two aggregation functions on the edges (lines 20–22), we determine both the number of edges represented by a superedge and the average duration of the found rentals. Finally, we apply a Subgraph operator to output only superedges with a count greater than 1 (line 23). The corresponding source code is available online12.