
Introducing streaming ingest in Starburst Galaxy — Now in Private Preview




Eric Hwang
Distinguished Engineer
Starburst
Monica Miller
Developer Advocate
Starburst
Brian Zhan
Product Manager
Starburst
Eric Hwang
Distinguished Engineer
Starburst
Monica Miller
Developer Advocate
Starburst
Brian Zhan
Product Manager
Starburst
It is quite popular in today’s data climate for modern data architectures to have some sort of batch processing system to move data into a data lake at an hourly or daily cadence. This is considered an adequate process for lots of analytics needs that do not require immediate updates, like drilling into revenue or user trends. However, the ability to analyze near real-time data opens up the world to more latency-sensitive use cases that can accelerate anomaly detection and decision making, such as:
Today, people often face challenges with real-time data because of the lack of flexibility in common setups. When a software engineer deploys a real-time stream processing application, they typically need to pre-allocate infrastructure and resources to run this operation continuously, which takes time to configure and maintain. If they then want to modify the streaming data operation, they would have to re-configure this system as well as perform some possible data migration steps. For example, if you write a streaming application to compute advertiser per country breakdowns, and you later want to analyze advertiser per country per age group breakdowns, then you’d need to reconfigure the system with the new aggregation groups and figure out how to handle (if at all) your previously processed data.
A common pattern is for real-time data to be loaded in raw formats which are typically inefficient to query, such as JSON or CSV, and are almost always row-oriented. Furthermore, to maintain low latency bounds, real-time data often needs to be flushed out from IO buffers frequently to allow the data to be quickly accessed. However, this will often generate many small files. Both of these factors will make it expensive to perform computations on such data at scale.
Through this blog and the GitHub repository, we hope to show you how to navigate these barriers in order to enable near real-time use cases by streaming data into a data lake, using a widely known language (SQL), and efficient file formats. The beauty of this approach is that it gives end-users a scalable solution to query their data lake and discover the flexible possibilities provided by near real-time analytics. To help you get started faster, we showcase how utilize a fully managed Trino service (Starburst Galaxy), so that you don’t have to start from scratch.
This technology stack uses Apache Flink hosted by Amazon Kinesis Data Analytics to continuously consume data from a Kafka JSON data source and transactionally write to an AWS Glue table in Iceberg format on an S3 data lake. The messages landed in S3 can then be queried using Trino (provided by Starburst Galaxy), essentially allowing you to query your data in near real-time.
The Flink framework provides a checkpointing system which guarantees robust execution. If any failure occurs, the streaming fault-tolerance system uses State Snapshots to provide Exactly Once guarantees.
In various alternative architectures, the schema is often stored in multiple systems, requiring these disparate systems to be kept in sync with each other. Here, the continuously running Flink job parses the JSON data source directly following the Iceberg table schema definition. This means future schema changes can be facilitated through just one source of truth.
Serves as the data lake analytics engine which empowers live querying of near real-time data for the purpose of interactive and batch analytics at scale.
Modern data formats like Iceberg provide high performance data access, while still providing transactional data operations out of the box. The transactional aspect allows for operations such as data compaction (rewriting smaller files into larger files) to be run behind the scenes without requiring coordination with the read workloads.
The popular stream processing framework is natively supported by AWS, and provides convenient and durable semantics for writing Iceberg tables at scale.
Amazon Kinesis Data Analytics provides managed Apache Flink, AWS Glue acts as metastore, and S3 serves as the object storage for the data lake.
A best-in-class fully-managed query engine for data lake analytics.
The information provided in this blog post is to supplement the step-by-step instructions found in the github repository, specifically the README.md. For more detailed information, visit this repository. Trino works best with case-insensitive identifiers, so all the SQL columns and tables should be lowercase by convention. The recommended practice is to use lower snake case (ex: “hello_world” or “order_id”).
For the sake of this technical tutorial, you will be using the Sample Kafka Publisher, a simple data generation tool provided in the repository, to publish messages to your Kafka topic. Within the tutorial instructions, we will assume that you do not already have a topic available and will walk you through the steps to create a topic quickly using Confluent Cloud (which has a free tier).
The tutorial will demonstrate both running the loader Flink job via local execution as well as in a more substantial state via production-like deployment. The local execution mode is valuable to exercise for building the project as well as for verifying the configuration and functionality of the components. To completely experience the benefits of this tutorial, it is advised to complete the local execution first and then move on to the production deployment. However, if you would like to skip straight to the production configuration, the Flink jar is available to download in the github repository. Please be advised that the tutorial has instructions based on a wide permission set in order to enable users to quickly test out this solution. We recommend that these permissions be narrowed if this solution is to be applied outside of a development environment.
Select your region and perform the necessary tasks to configure your AWS S3 data lake and grant Starburst Galaxy access to that data lake. It is strongly recommended that for the purpose of this tutorial, you configure all AWS services in the same region, ideally in the same region as your data source. This will help minimize any AWS cross region network charges.
AWS
AmazonS3FullAccess
and AWSGlueConsoleFullAccess
existing policies to it. Save your Access Key ID and Secret Access Key.Starburst Galaxy
KAFKA_LOADER_API_KEY
, KAFKA_LOADER_API_SECRET
, KAFKA_LOADER_BOOSTRAP_SERVER
.Create a new secret in AWS SecretsManager to store the Kafka API access credentials by selecting the “Other type of secret” secret type. Make sure to create the secret in the same region as the rest of your work. For more information on properly configuring the secret, visit the github repository.
Build the Job
To build the Loader you will need a Java 11 runtime environment, so make sure you have Java 11 installed on your Operating System.
AmazonS3FullAccess
, AWSGlueConsoleFullAccess
, SecretsManagerReadWrite
.Launch the Loader
Launch the loader locally by replacing all the default values according to your own environment setup. The README instructions give an example of a typical loader configuration.
Run the Sample Kafka Publisher
The Sample Kafka Publisher was created to generate sample data. Each time the tool is run, exactly one new row will be inserted into your Kafka topic. If the loader is running, it should be able to process this new message. The default Flink flush time is one minute, so it may take up to that time for the data to appear in your Starburst Galaxy data lake. After waiting for the appropriate default time, use Starburst Galaxy to select from the table to see the sample records in Starburst Galaxy.
Before proceeding with the production execution, validate that you have terminated the local loader. The local loader uses the same Kafka client ID and thus may conflict with the production loader.
Launch Flink job with AWS Kinesis Data Analytics
Focusing solely on the primary setup for a new system and failing to consider the longevity of the architecture may potentially result in future errors. We have compiled important considerations for both Maintenance and Schema Evolution.
Effective and efficient near real-time analytics is a force-multiplier, enabling faster time to insight and driving quick decision making. However, there is some ongoing periodic maintenance that must be performed to upkeep the health of the system.
Real-time streaming ingestion tends to flush out data frequently to minimize data query latency. However, this typically results in many small files on the underlying storage system. Small files increase the IO cost of scanning the data and dramatically reduce overall query efficiency. Iceberg format allows for transparent asynchronous compactions, which rewrite the many small files into fewer larger ones, and these compactions must happen periodically to retain healthy analytic query performance. The following Starburst Galaxy SQL query demonstrates how this compaction could be run:
The Iceberg storage format creates a new snapshot, or table version, for each write to the table. These snapshots are retained indefinitely and accumulate over time. In order to limit the size of the system metadata, it is recommended to periodically run the expire snapshots command to remove any old snapshots that will not be used anymore.
Data Governance and Compliance are critical aspects for any ingestion system, since data is continuously flowing in. A common practice in the industry is for each table to have a data retention period pre-established based on the semantic nature of the data stream. This helps to comply with regulatory requirements as well as limit total storage size. The following Starburst Galaxy SQL query demonstrates how such a retention operation could be run:
A common issue is to have malformed messages accidentally inserted into the ingest pipeline. A typical best-practice is to add a Dead Letter Queue (DLQ) topic to enable the Flink job to push out any messages that fail to parse. This allows the ingestion process to continue without halting in the presence of bad messages. If a dead letter queue topic is not configured, any incoming messages that fail to parse will halt the loading process. Another best practice is to specifically create validations around the incoming data value of the messages. Some examples include sampling account_id fields every ten minutes to validate the data flowing in is valid or setting up heartbeat tests to ensure data is continuously flowing in or an alert is triggered. Because of the nature of the streaming environment, forgoing these continuous checks and failing to catch time sensitive errors can result in disastrous results such as losing data forever.
Data schema is almost never finalized from the outset, and a fully fledged production streaming ingest system must be able to adapt to new schema changes. To simplify this process as much as possible, the Apache Flink job has been developed to use the Iceberg table schema as the schema’s single system of record, which will mitigate any need to replicate the schema to any other systems.
1. Adding a column to an Iceberg table
If a new column is required, this column can easily be added through the following SQL command in Starburst Galaxy.
In order for the changes to take effect, the Kinesis Data Analytics Streaming Application must be restarted, either through pressing the button to ‘“Stop”, waiting for the application to end, and then pressing the button to “Run”. A faster method is to click the “Configure” button, and then the “Save changes” button at the bottom of the configurations page – forcing the Flink job to restart. Either restart option will cause the job to detect the new schema changes and begin operating with that configuration.
Any previously loaded rows before this column addition will be automatically assigned NULL values for this column. New records will be parsed appropriately, and any records missing this field will be assumed to have a NULL value.
2. Removing a column from an Iceberg table
If a column is required to be removed, this column can easily be dropped through the following SQL command in Starburst Galaxy.
In order for the changes to take effect, the Kinesis Data Analytics Streaming Application must be restarted, either through pressing the button to “Stop”, waiting for the application to end, and then pressing the button to “Run”. A faster method is to click the “Configure” button, and then the “Save changes” button at the bottom of the configurations page – forcing the Flink job to restart. Either restart option will identify the new schema changes and display them within the table. Once the loader restarts, the dropped column will no longer be written.
3. Scaling up the loader
If the loader is ever unable to keep up with the ingested data volume, you can always add more parallelism. Navigate to the corresponding Kinesis Data Analytics streaming application, click “Configure”, and then change the “Parallelism” number to something larger, and then click “Save changes” at the bottom of the configuration page. This will force the loader Flink job to stop and restart with more concurrent tasks. It is important to note that the Flink job cannot have parallelism greater than the number of Kafka topic partitions. If you want to set the parallelism greater, you will have to first increase the number of Kafka topic partitions.
We hope that this tutorial will demonstrate an example architecture that provides an efficient and thoughtful solution to some of the common challenges faced with near real-time analytics systems. Many systems strictly focus on the present configuration; however, Iceberg’s built-in capabilities provide simple mechanisms to ensure the health of the system even after the implementation of new schema changes. Visit the github repository for more information to test out this ingestion process yourself.