1 Introduction
2 Rheem background
3 Overview
4 Plan enrichment
4.1 Plan inflation
Map
and Reduce
, special purpose systems (e.g., graph processing systems) rather provide specialized operators (e.g., for the PageRank algorithm). Due to this diversity, 1-to-1 mappings are often insufficient and a flexible operator mapping technique is called for supporting more complex mappings.4.1.1 Graph-based operator mappings
4.1.2 Operator inflation
4.2 Operators cost estimation
4.2.1 Cost estimation
4.2.2 Cost learner
4.2.3 Cardinality estimation
5 Data movement
5.1 Channel conversion graph
5.2 Minimum conversion tree problem
5.3 Finding minimum conversion trees
5.3.1 Kernelization
5.3.2 Channel conversion graph exploration
5.3.3 Correctness and complexity
6 Plan enumeration
6.1 Plan enumeration algebra
6.1.1 Data structures
6.1.2 Algebra operations
connect
function connects \(sp_1\) and \(sp_2\) by adding conversion operators between operators of the two subplans.connect
function adds conversion operators to link the two \(\mathsf {Maps}\) in Subplan 1.Prune
as follows.6.1.3 Applying the algebra
6.2 Lossless pruning
6.3 Enumeration algorithm
7 Dealing with uncertainty
8 Extensibility
9 Experiments
Task | Description | \(\#\) Rheem operators | Dataset (size) | Default store |
---|---|---|---|---|
WordCount (TM) | Count distinct words | 6 | \(\mathsf {Wikipedia abstracts}\) (3 GB) | HDFS |
Word2NVec (TM) | Word neighborhood vectors | 14 | \(\mathsf {Wikipedia abstracts}\) (3 GB) | HDFS |
SimWords (TM) | Word neighborhood clustering | 26 | \(\mathsf {Wikipedia abstracts}\) (3 GB) | HDFS |
Aggregate (RA) | Aggregate query (TPC-H Q1) | 7 | \(\mathsf {TPC{-}H}\) (1–100 GB) | HDFS |
Join (RA) | 2-way join (TPC-H Q3) | 18 | \(\mathsf {TPC{-}H}\) (1–100 GB) | HDFS |
PolyJoin (RA) | n-way join (TPC-H Q5) | 31 | \(\mathsf {TPC{-}H}\) (1–100 GB) | Postgres, HDFS, LFS |
Kmeans (ML) | Clustering | 9 | \(\mathsf {USCensus1990}\) (361 MB) | HDFS |
SGD (ML) | Stochastic gradient descent | 10 | \(\mathsf {HIGGS}\) (7.4 GB) | HDFS |
CrocoPR (GM) | Cross-community PageRank | 22 | \(\mathsf {DBpedia pagelinks}\) (20 GB) | HDFS |
9.1 General setup
PolyJoin
); however, such cases are easier to handle as the search space becomes smaller, and we thus omit them from further evaluation.9.2 Single-platform optimization
PolyJoin
because it requires using several platforms. For the real-world datasets, we took samples from the initial datasets of increasing size. We also increased the input datasets up to 1TB for most tasks in order to further stress the optimizer. Note that, due to their complexity, we do not report the 1TB numbers for Word2NVec
and SimWords
: None of the platforms managed to finish in a reasonable time. The iterations for CrocoPR
, K-means
, and SGD
are 10, 100, and 1000, respectively.Word2NVec
on Spark for 5% and 100% of its input dataset. Spark performs worse than Flink because it employs only two compute nodes (one for each input data partition), while Flink uses all of them;9 (ii) SimWords
on Java for 1% of its input dataset (\(\sim 30\) MB); as SimWords
performs many CPU-intensive vector operations, using JavaStreams (i.e., a single compute node) simply slows down the entire process; (iii) WordCount
on Flink for 800% of its input dataset (i.e., 24 GB) and 1TB, where, in contrast to Spark, Flink suffers from a slower data reduce mechanism;10 (iv) Aggregate
on Flink for scale factors higher than 200, because it tends to write often to disk when dealing with large groups (formed by the \(\mathsf {GroupBy}\) operator); and (v) CrocoPR
on JGraph for more than 10% of its input dataset as it simply cannot efficiently process large datasets as well as on Spark and Flink for 1TB whose performance is deteriorated by the number of created objects. Thus, our optimizer is capable of discovering non-obvious cases: For example, for the Word2NVec
and SimWords
a simple rule-based optimizer based on input cardinalities would choose JavaStreams for the small input of 30 MB (i.e., \(1\%\) of the dataset). However, JavaStreams is 7\(\times \) to 12\(\times \) slower than Spark and Flink in these two cases.Aggregate
and Join
even if the execution times for Spark and Flink are quite similar in most of the cases. Only in few of these difficult cases, the optimizer fails to choose the best platform, e.g., Word2NVec
and SimWords
for 0.1% of input data: The accuracy of our optimizer is sensitive to uncertainty factors, such as cost and cardinality estimates. Still, all these results allow us to conclude that our optimizer chooses the best platform for almost all tasks and it prevents tasks from falling into worst execution cases.
9.3 Multi-platform optimization
Task | Selected platforms | Data transfer/Ite. |
---|---|---|
WordCount
| Spark, JavaStreams | \(\sim 82\) MB |
Word2NVec
| Flink | − |
SimWords
| Flink | − |
Aggregate
| Flink, Spark | \(\sim 23\)% of the input |
Join
| Flink | − |
Kmeans (k=10)
| Spark | − |
Kmeans (k=100, k=1000)
| Spark, JavaStreams | \(\sim 6\) KB and \(\sim 60\) KB |
SGD
| Spark, JavaStreams | \(\sim 0.14\) KB \(\times \) batch size |
CrocoPR
| Flink, JGraph, JavaStreams | \(\sim 544\) MB |
Kmeans
on 10x its entire dataset for a varying number of centroids, (ii) SGD
on its entire dataset for increasing batch sizes, and (iii) CrocoPR
on 10% of its input dataset for a varying number of iterations.SGD
, Rheem decided to handle the model parameters, which is typically tiny (\(\sim 0.1\) KB for our input dataset), with JavaStreams, while it processed the data points (typically a large dataset) with Spark. For CrocoPR
, surprisingly our optimizer uses a combination of Flink, JGraph, and JavaStreams, even if Giraph is the fastest baseline platform (for 10 iterations). This is because after the preparation phase of this task, the input dataset for the \(\mathsf {PageRank}\) operation on JGraph is \(\sim 544\) MB only. For WordCount
, Rheem surprisingly detected that moving the result data (\(\sim 82\) MB) from Spark to JavaStreams and afterward shipping it to the driver application is slightly faster than using Spark only. This is because when moving data to JavaStreams Rheem uses the action \(\mathsf {Rdd.collect()}\), which is more efficient than the \(\mathsf {Rdd.toLocalIterator()}\) operation that Spark offers to move data to the driver. For Aggregate
, our optimizer selects Flink and Spark, which allows it to run this task slightly faster than the fastest baseline platform. Our optimizer achieves this improvement by (i) exploiting the fast stream data processing mechanism native in Flink for the projection and selection operations, and (ii) avoiding the slow data reduce mechanism of Flink by using Spark for the \(\mathsf {ReduceBy}\) operation. In contrast to all previous tasks, Rheem can afford to transfer \(\sim 23\)% of the input data because it uses two big data platforms for processing this task. All these are surprising results perse. They show not only that Rheem outperforms state-of-the-art platforms, but also that it can spot hidden opportunities to improve performance and to be much more robust.
JoinX
) of PolyJoin
. This task gets the account balance ratio between a supplier and all customers in the same nation and calculates the average ratio per nation. For this, it joins the relations SUPPLIER
and CUSTOMER
(which are stored on Postgres) on the attribute nationkey
and aggregates the join results on the same attribute. For this additional experiment, we compare Rheem with the execution of JoinX
on Postgres, which is the obvious platform to run this kind of queries. The results are displayed in Fig. 9. Remarkably, we observe that Rheem significantly outperforms Postgres, even though the input data is stored there. In fact, Rheem is 2.5\(\times \) faster than Postgres for a scale factor of 10. This is because it simply pushes down the projection operation into Postgres and then moves the data into Spark to perform the join and aggregation operations, thereby leveraging the Spark parallelism. We thus do confirm that our optimizer both indeed identifies hidden opportunities to improve performance and performs more robustly by using multiple platforms.
PolyJoin
task, which takes the CUSTOMER
, LINEITEM
, NATION
, ORDERS
, REGION
, and SUPPLIER
TPC-H tables as input. We assumed the large LINEITEM
and ORDERS
tables are stored on HDFS, the medium-size tables CUSTOMER
, REGION
, and SUPPLIER
on Postgres, and the small NATION
table on a local file system (LFS). In this scenario, the common practice is either to move the data into a relational database in order to enact the queries inside the database [24, 59] or move the data entirely to HDFS and use Spark. We consider these two cases as the baselines. We measure the data migration time and the task execution time as the total runtime for these baselines. Rheem processes the input datasets directly on the data stores where they reside and moves data if necessary. For a fair comparison in this experiment, we set the parallel query and effective IO concurrency features of Postgres to 4.CrocoPR
and Kmeans
tasks. In contrast to previous experiments, we assume both input datasets (\(\mathsf {DBpedia}\) and \(\mathsf {USCensus1990}\)) to be on Postgres. As the implementation of these tasks on Postgres
would be very impractical and of utterly inferior performance, it is important to move the computation to a different processing platform. In these experiments, we consider the ideal case as baseline, i.e., the case where data is already on a platform being able to perform the task. As ideal case, we assume that the data is on HDFS and that Rheem uses either JavaStreams or Spark to run the tasks.9.4 Progressive optimization
Join
task for this experiment. We extended the Join
task with a low-selective selection predicate on the names of the suppliers and customers. To simulate the usual cases where users cannot provide accurate selectivity estimates, we provide a high selectivity hint to Rheem for this filter operator.
9.5 Optimizer scalability
Kmeans
when increasing the number of supported platforms. The results for the other tasks are similar. As expected, the time increases along with the number of platforms. This is because (i) the CCG gets larger, challenging our MCT algorithm, and (ii) our lossless pruning has to retain more alternative subplans. Still, we observe that our optimizer (the no top-k series in Fig. 13a) performs well for a practical number of platforms: It takes less than 10 s when having 5 different platforms. Yet, one could leverage our algebraic formulation of the plan enumeration problem to easily augment our optimizer with a simple top-k pruning strategy, which simply retains the k best subplans when applied to an enumeration. To do so, we just have to specify an additional rule for the Prune operator (see Sect. 6.1) to obtain a pruning strategy that combines the lossless pruning with a top-k one. While the former keeps intermediate subplans diverse, the latter removes the worst plans. Doing so allows our optimizer to gracefully scale with the number platforms, e.g., for \(k=8\), it takes less than 10 s for 10 different platforms (the top-8 series in Fig. 13a). Figure 13b shows the results regarding the scalability of our optimizer in terms of number of operators in a task. We observe that our optimizer scales to very large plans for both topologies. In practice, we do not expect to find situations where we have more than five platforms and plans with more than hundred operators. In fact, in our workload the tasks contain an average of 15 operators. All these numbers show the high scalability of our optimizer.
9.6 Optimizer internals
SimWords
and CrocoPR
because of the large number of possible execution operators that these plans have. We also found that the top-1 strategy, which merely selects the best alternative for each inflated operator, is pruning too aggressively and fails in 3 out of 7 times to detect the optimal execution plan. While the numbers now seem to suggest that the remaining lossless and top-10 pruning strategies are of the same value, there is a subtle difference: The lossless strategy guarantees to find the optimal plan (w.r.t. the cost estimates) and is, thus, superior.
k-means
, Rheem can be more than one order of magnitude faster when using CCG compared to using only HDFS files for data movement. For SGD
and CrocoPR
, it is always more than one order of magnitude faster. This shows the importance of well-planned data movement.
WordCount
task with only 6 operators. For this reason, in Fig. 16a we plot for SGD
and WordCount
the following: (i) the real execution time of the first three plans with the minimum estimated runtime; and (ii) the minimum, maximum, and average of the real execution times of 100 randomly chosen plans.10 Related work
Scope
and Cast
commands. Myria [63] provides a rule-based optimizer which is hard to maintain as the number of underlying platforms increases. In [25], the authors present a cross-platform system intended for optimizing complex pipelines. It allows only for simple one-to-one operator mappings and does not consider optimization at the atomic operator granularity. The authors in [61] focus on ETL workloads making it hard to extend their proposed solution with new operators and other analytic tasks. DBMS+ [46] is limited by the expressiveness of its declarative language, and hence, it is neither adaptive nor extensible. Furthermore, it is unclear how DBMS+ abstracts underlying platforms seamlessly. Other complementary works, such as [31, 52], focus on improving data movement among different platforms, but they do not provide a cross-platform optimizer. Apache Calcite [13] decouples the optimization process from the underlying processing making it suitable for integrating several platforms. However, no cross-platform optimization is provided. Tensorflow [1] follows a similar idea, but for cross-device execution of machine learning tasks, and thus, it is orthogonal to Rheem. Finally, WWHow! envisions a cross-platform optimizer but for data storage [36].