PipeGen: A Data Pipe Generator for Hybrid Analytics

By Brandon Haynes, Alvin Cheung, and Magdalena Balazinska, University of Washington

As the number of big data management systems continues to grow, users increasingly seek to leverage multiple systems in the context of a single data analysis task. A critical challenge of such “hybrid analytics” is a seamless and efficient movement of intermediate query results as an analysis crosses system boundaries. This functionality is critical to ensuring the high performance of hybrid analytics.

In this post, we describe our PipeGen tool, which addresses the challenge of transferring data among systems for hybrid analytics.

As an example, consider a scientist who collects image data in an array database management system (DBMS) while storing other metadata in a relational DBMS.  To analyze her data, she writes a query that retrieves data from the relational store, and ingests it temporarily into the array database to join it with her image data. She further extracts features from her images that she moves to a graph database to leverage its specialized capabilities, such as machine learning algorithms.

This type of hybrid analytics is increasingly common. Most big data system vendors today already support some type of connection between Hadoop and their parallel DBMSs (e.g., DB2, Polybase and Oracle). Several efforts focus on creating a more complete virtualized layer on top of all of an enterprise’s data (e.g., Denodo). Still other efforts build new generation systems designed specifically for hybrid analytics, including Musketeer and our BigDAWG polystore system. The latter is being developed in the Intel Science & Technology Center for Big Data program.

The following example highlights the challenges of data transfer management for hybrid analytics. We recently benchmarked an astronomy workflow that analyzes the output of an N-body simulation, using a clustering method that is available in Spark. The data is originally stored in our Myria relational DBMS. To perform this analysis, we first write a SQL query to prepare the data, and load it into Spark. Unfortunately, comma-separated values (CSV) is the only format common to both systems. As such, we needed to export the data to a CSV file from Myria, and subsequently import it into Spark. Figure 1 shows the time spent in running the analysis, where, surprisingly, a significant amount of time is spent in data transfer. 

PipeGen automatically generates data pipes between DBMSs. These data pipes enable the efficient transfer of data in a way that avoids file system materialization, includes multiple important format optimizations, and transfers data in parallel when possible.

PipeGen automatically generates data pipes between DBMSs. These data pipes enable the efficient transfer of data in a way that avoids file system materialization, includes multiple important format optimizations, and transfers data in parallel when possible.

Figure 1. Total time for an astronomy workflow with and without PipeGen data pipes. Distances between particles in an astronomy simulation are computed in Myria and transferred to Spark for clustering. (Source: Brandon Haynes et al., University of Washington.)

Figure 1. Total time for an astronomy workflow with and without PipeGen data pipes. Distances between particles in an astronomy simulation are computed in Myria and transferred to Spark for clustering. (Source: Brandon Haynes et al., University of Washington.)

Note that transferring data is typically part of the extract-transform-load (ETL) pipeline. And as such, there are two common approaches to solving this problem.

First, many DBMSs support serializing data between the internal format used by the DBMS and an external one. Users can transfer data by exporting it from one DBMS and importing into another. This works well when a common data format exists between the DBMSs involved, for instance text-based data formats such as CSV or binary data formats such as Arrow. Unfortunately, moving data using a text-oriented format is costly: it requires serializing the data from the source DBMS, storing the serialized data to the disk, and importing it into the internal format of the target DBMS. Using binary formats alleviates some of these overheads, although data materialization via the disk still incurs unnecessary and significant time and storage space. Additionally, while support for text-oriented formats such as CSV is common, shared binary formats remain rare.  

For example, our analysis of five big data systemsour own Myria system, Spark, Hadoop, Giraph and Derbyrevealed that CSV is the only text-oriented data format supported by all five DBMSs. No common binary data format is supported by all of them. 

Second, another approach is to implement dedicated data transfer programs, i.e., data pipes, between specific source and target DBMSs, thereby avoiding using the disk as an intermediary. Common data transfer protocols such as JDBC and Thrift are often not implemented to support reading or writing data in parallel, and because of this are impractical for efficiently moving data between systems. Other dedicated software packages exist that do transfer data in parallel between specific systems, such as spark-sframe for moving data between Spark and GraphLab. Unfortunately, generalizing this approach requires implementing O(n^2) data pipes to transfer data between n different DBMSs. Besides being impractical, this approach requires knowledge about the internals of each DBMS, making it inaccessible to non-technical users. Even for technical professionals, implementing dedicated data pipes is often a brittle and error-prone process with quickly outdated mechanisms.

In contrast, our PipeGen tool retains the benefits of dedicated data pipes but without the shortcomings of manual data pipe implementation or serialization via physical storage.
The key idea is to leverage a DBMS’s existing data serialization functionality for commonly-used formats (e.g., CSV) to  automatically generate data pipes from DBMS binaries.

Figure 2 illustrates how a user or a polystore system leverages the data pipes generated by PipeGen. The user or the optimizer executes two queries: one that exports data from a source DBMS extended by PipeGen, and one that imports data into a target DBMS extended by PipeGen. These queries may occur in any order; PipeGen automatically blocks until both DBMSs are ready. The queries issued to each DBMS are written as if the data were moved using the original export and import code, and they use a special filename (e.g., ”db://X”) to identify the use of the generated data pipes.

Figure 2. Using the data pipe generated by PipeGen for the hybrid analysis from the astronomy example. 1. User submits a query to the source DBMS (e.g., Myria) to compute distance and export to the target DBMS using data pipe. 2. User issues an import query on the target DBMS (e.g., Spark) to cluster the result. Data is transferred using the generated data pipe in 3. and in 4. a worker directory coordinates the connection process. PipeGen-related components are highlighted in green. (Source: Brandon Haynes et al, University of Washington.)

Figure 2. Using the data pipe generated by PipeGen for the hybrid analysis from the astronomy example: 1. User submits a query to the source DBMS (e.g., Myria) to compute distance and export to the target DBMS using data pipe. 2. User issues an import query on the target DBMS (e.g., Spark) to cluster the result. Data is transferred using the generated data pipe in 3. and in 4. a worker directory coordinates the connection process. PipeGen-related components are highlighted in green. (Source: Brandon Haynes et al., University of Washington.)

To automatically generate code for a data pipe, PipeGen uses the workflow shown in Figure 3,  PipeGen assumes existing import and export functionality for a given DBMS and unit tests that exercise the code associated with these operations, where export tests cover code paths from reading data in the internal representation to serializing it to the disk, and import tests read serialized data from disk back into the internal data representation. Should existing tests not cover all relevant code paths, PipeGen allows users to provide additional test cases to increase coverage.

Figure 3. Compile-time components of PipeGen. The file IO redirector (IORedirect) generates a data pipe to transfer data via a socket, while the format optimizer (FormOpt) improves efficiency for text-oriented formats. (Source: Brandon Haynes et al, University of Washington.)

Figure 3. Compile-time components of PipeGen. The file IO redirector (IORedirect) generates a data pipe to transfer data via a socket, while the format optimizer (FormOpt) improves efficiency for text-oriented formats. (Source: Brandon Haynes et al., University of Washington.)

Data pipes are created in two steps. First, PipeGen executes the data export unit tests and analyzes the DBMS binary to create an export data pipe that transmits data via a network socket rather than the disk. Then, PipeGen uses the import unit tests to create an import data pipe that reads data from a network socket rather than a disk file. The same tests are used to validate the correctness of the generated data pipes. To use the generated pipes, users issue queries that export and import from disk files using a special filename, whereupon PipeGen creates a network socket and connects the generated import and export pipes to transmit data.

To further speed up data transfer, PipeGen comes with a number of optimizations. First, PipeGen attempts to transmit data using binary rather than text-oriented formats. This is done by serializing data using external binary encoders (Apache Arrow in our current implementation) rather than text encoders in the generated pipes. As another optimization, PipeGen analyzes the code in an attempt to identify and eliminate textual delimiters during data transfer. If the source and target DBMSs support multiple threads, then PipeGen will utilize them to transfer data in parallel. Finally, PipeGen compresses the encoded data to further reduce transfer time.

Our experiments show that PipeGen can generate data pipes that speed up data transfer between DBMSs by up to 3.8X, both when the source and target DBMSs are colocated on a single node, and when they are located across different data centers. Figure 4 shows the speed-up when using PipeGen’s data pipes and transferring data between five pairs of DBMSs.

Figure 4: Total speedup between file system and PipeGen for 10 billion elements, each with a unique integer key in the range [0,n] followed by three (integer ∈ [0, n], double) pairs. (Source: Brandon Haynes et al., University of Washington.)

Figure 4: Total speedup between file system and PipeGen for 10 billion elements, each
with a unique integer key in the range [0,n] followed by three (integer ∈ [0, n], double) pairs. (Source: Brandon Haynes et al., University of Washington.)

PipeGen currently does not address the problem of schema matching and mapping.  It focuses instead on the mechanics of data movement. We expect the data pipes to be utilized directly by an end user or a query optimizer, which would insert operators to transform data into appropriate data types as needed.  

In summary, PipeGen makes the following contributions:

(1) It’s a new approach based on program analysis to automatically create data pipes for efficient direct data transfer between DBMSs

(2) It proposes techniques that improve the performance of the generated pipes, including encoding data using binary rather than text-oriented formats, applying compression, eliminating delimiters, transferring data in parallel, and transferring data in columnar form.

(3) It is publicly available in its prototype form.  We evaluated it by generating data pipes between five different Java-based DBMSs.  The optimized data pipes perform up to 3.8X faster than a naive approach that transfers data via the file system using CSV.

PipeGen will be presented at the Seventh annual ACM Symposium for Cloud Computing (SOCC’16) in Santa Clara, Calif.,  today, October 7.  Please try it out if you are interested, and join our mailing list to receive updates

Note: In a previous post on this blog, Professor Aaron Elmore and his collaborators described the Portage system.  Portage also focuses on efficient data transfer between component systems in a polystore. Complementary to PipeGen, Portage investigates optimizations that select data transfer formats based on the source and target engines, query, data and cluster configuration. This line of work could serve to further improve the performance of the data pipes that PipeGen generates (e.g., by using it instead of Apache Arrow buffers).

This entry was posted in Big Data Applications, Big Data Architecture, Data Management, ISTC for Big Data Blog, Polystores and tagged , , , , , . Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *


× six = 36