Announcement bar test test

Building Starburst Data Pipelines with SQL or Python

Bringing optionality to ETL workloads for data engineers

Share

Linkedin iconFacebook iconTwitter icon

More deployment options

It’s easy to see Starburst primarily as a SQL query engine, but did you know it is also a platform for your ELT/ETL workloads? 

Not only is it a capable and competent platform for transformational processing, but it also offers options to cater to different data engineering mindsets. SQL-oriented data engineers can build their pipelines as expected, and programming-based pipelines can be created with Python.

History of Starburst for ETL workloads

Starburst and ETL have a long history. Starburst products are built on top of Trino, which was initially developed for interactive querying. Within a year after Trino (then known as Presto) was released into production at Facebook, users started scheduling batch/ETL queries with Trino instead of Hive

The usage pattern caught on. A few years later, 50% of the existing ETL workloads were run on Trino, including 85% of all new workloads. 

Soon, others caught on, and companies like Salesforce and Lyft began utilizing Trino for their batch/ETL workloads, too. Today, ETL workloads are one of the most popular ways to use Starburst. 

Architectural concerns and remedies

Part of the story here revolves around Hive and the long-term replacement of Hive workloads

Once it was released, Trino came out of the gate quickly, blowing Hive away on query execution speed because Trino focused more on performance as opposed to Hive’s focus on reliability. On a side note, Comparing Foundational Features of Trino, Hive & Spark points out that all of these engines are always improving. 

To understand all of that, let’s talk about the original architecture at the foundational level of Starburst products.

Parallel processing fundamentals

Fundamentally, Starburst’s engine works in a distributed processing mechanism similar to Hive and Spark. In other words, it breaks work down into multiple stages organized in a directed acyclic graph (DAG). The stages are where the same processing can be done on many splits (Spark calls them partitions and Hive calls them blocks) of data simultaneously.

When it is necessary to move to another stage of processing, the data must be redistributed to enable continued processing in a highly parallel fashion. Examples of when this is needed are everyday SQL operations such as GROUP BY, JOIN, and ORDER BY

If these concepts are new to you, I recommend you watch at least the first video in Trino Query Plan Analysis (video series), which I’m embedding next.

Initial architecture

With that DAG/query plan knowledge fresh in your mind, take a look at the following image, which represents the base architecture of why Starburst runs fast.

Data leaving one stage to another is redistributed into the next stage. This process is often referred to as shuffling. 

Starburst’s speed benefit comes from not persisting that intermediary data to disk. Instead, it acts more like a streaming engine internally and sends the data on to the next stage directly. The consequence of this is that if the query fails before completion, the initiator is returned an error message.

The design goals for interactive querying performance did not provide sufficient support for long-running and memory-intensive queries.

  • Long-running queries are unreliable because the all-or-nothing architecture makes it particularly hard to deal with faulty queries.
  • Distributed memory limits are reached when streaming shuffle, aggregations, and joins process all data at once.

Also, with the original architecture, it is really hard to apply classic techniques like adaptive query execution, speculative execution, and skew handling.

Fault-tolerant execution

As Comparing foundational features of Trino, Hive & Spark calls out, platforms continue to grow over time to address as many features and limitations as possible. Starburst has been no different, and it has ultimately added fault-tolerant execution (FTE) benefits into the architectural design.

The limitations of the initial architecture were resolved by introducing storage for the intermediate data generated by parallel tasks within each stage. This design enforces a stage-by-stage execution model, similar to what Hive was known for, and provides the reliability that earlier approaches lacked. 

The diagram below illustrates how intermediate data flows into configurable storage.

Something important is happening here. The same work is accomplished within each stage, but since not all stages are active simultaneously and the data for subsequent stages is persisted durably, the following benefits are realized with FTE.

  • Tolerate individual task failures: If a worker fails, then only the work in the affected tasks of the currently executing stage needs to be restarted.
  • Reliability run queries of any size: All tasks within a stage do not have to be executed at the same time, thus the all-in-memory constraint is eliminated.
  • More flexible resource management: Starburst can be flexible enough to allow higher-priority queries to the front of the queue without cancelling others.
  • Adapt query execution dynamically midstream: Starburst has the ability to adjust according to different running conditions and optimize its execution strategy midstream.

FTE is a cluster-wide setting, so in some organizations it would make sense to run 1+ FTE clusters as well as 1+ (classically) ‘interactive’ clusters.

Pipelines across the medallion architecture

As the diagram below shows, the medallion architecture is a common pattern we see in data lake environments and is focused on the flow of data from ingestion all the way to consumption for analytical, ML/AI, or other data application needs. 

Data pipeline construction options

The next diagram suggests that this end-to-end flow of data constructs a comprehensive data pipeline. The diagram also shows that the E (extract) & L (load) from ELT is often referred to, in aggregate, as ingestion. Interestingly, the T (transform) output can be input for another transformation, or even more than one.

SQL

The best argument for building your transforms and other data pipeline activities, such as validating, enriching, and aggregating, is that almost everyone knows SQL.

Pros for SQL

  • Widespread knowledge
  • Compact
  • Easy to understand for simple logic

Cons for SQL

  • No real-time debugging
  • Can’t perform other actions on results
  • No unit tests
  • Version diffs are often hard to read
  • As complexity grows, SQL gets more and more difficult

Python

Some will want to implement their data pipelines with a programming approach, and currently, Python is the most popular language for data processing. While Starburst supports the simple Trino Python client, we need to use a Dataframe API for transformation jobs.

Dataframes are two-dimensional, tabular data structures that can be conceptually understood as tables. The following example is implemented with the very popular PySpark distributed processing tool.

Pros for Python Dataframes

  • Reusable functions
  • Version diffs are easy to read
  • Easily create unit tests
  • Could combine Python libraries like Numpy & Scipy
  • Perform real-time debugging
  • Easy to follow complex logic
  • Allows for logging

Cons for Python Dataframes

  • A bit verbose
  • Need to be a Python programmer

For a Python Dataframe application to run on Starburst, the framework has to convert the code into SQL that Starburst can understand. This process can produce SQL that differs from what you might create directly with SQL.

Here is a simple multi-table join query written by a human.

Programming that same functionality in Python using methods for each operation, such as filtering, joining, or aggregating, can end up creating a query that looks more exhaustive.

Fortunately, Starburst has a feature-rich Query optimizer that optimizes whatever query it receives to determine the best approach to run it quickly & efficiently. Back to our earlier ‘parallel processing’ discussion, this means that we expect the optimizer to create the same query plan / DAG regardless of the SQL presented if both are trying to answer the same question.

Dataframe API alternatives

The primary options for programming to a Dataframe API using Python with the goal of running the work in a Trino cluster are PyStarburt and Ibis.

PyStarburst

PyStarburst features a PySpark-like syntax using the same lazy execution model (waits until an input/output ‘action’ is called before actually submitting work to the cluster). At that point, the Python code is converted to SQL, allowing Starburst to handle the heavy lifting.

Ibis

Ibis provides a standard Dataframe API for data manipulation in Python, and compiles that API into a variety of compute engines.

With this tooling, Starburst is just one of the options for where your Dataframe code could be executed. Check out Ibis and Trino for code examples running in Starburst Galaxy.

Comparing Dataframe APIs

The APIs themselves for these two frameworks are similar, but surely not identical, as the following shows.

Although the SQL generated by each tool may differ, they aim to solve the same problem. We therefore rely on the CBO to produce a query plan/DAG that is either identical or very similar.

Orchestration and scheduling

Starburst provides limited native scheduling features, but many use first-class orchestration and/or scheduling solutions as well. Tools like Airflow, Dagster, and Prefect are popular and can have a mix of SQL, Python, and other tooling steps that need to be executed appropriately and with dependencies.

How ETL is the perfect workload for Starburst

ETL workloads are a natural fit for Starburst, making it an effective clustering technology for transformation processing jobs. 

There are several distinct reasons for this:

  1. The fault-tolerant execution mode makes it even more robust.
  2. Data engineers can leverage SQL and/or Python when constructing their pipelines.

If you absolutely need the ability to switch backend execution engines, Ibis is an appropriate solution. 

For Starburst customers, PyStarburst is recommended due to its similarity with the PySpark Dataframe API (easier to port from, or even back to, Spark). You also get the additional benefits from the core engine and the Dataframe API being built and supported by the same developer community.


Post originally published by Lester Martin as “building trino data pipelines (with sql or python)” at Lester Martin (l11n) Blog.

Start for Free with Starburst Galaxy

Try our free trial today and see how you can improve your data performance.
Start Free