Join us for AI & Datanova 2025 — live in New York City on October 8-9 or virtually on October 22-23!

Share

Linkedin iconFacebook iconTwitter icon

More deployment options

Trino serves as an execution engine that can process data found in almost any data source. The user simply connects Trino to the data source, and then whenever that source needs to be queried, these queries can be sent to Trino, which then communicates with the data source to extract data from it, process it, and return the results.

The mechanism and communication protocol that Trino uses to extract and query data in different data sources vary. In some cases, Trino is able to get direct access to the data, pulling it in parallel to the Trino cluster for further processing. In other cases, Trino’s access is less direct. 

When the underlying data source is a traditional database system — especially if it is from a commercial vendor (such as Oracle, DB2, or Microsoft) — Trino’s access is usually not direct. Instead, it uses a well-known connection protocol called JDBC (Java Database Connectivity), in which it issues a SQL query to the underlying data source and then extracts the results one tuple at a time via an iterator model.  

In this post, I will explain how JDBC works and why accessing data over JDBC can cause enormous performance bottlenecks. Although end users have limited control over Trino’s data access from underlying sources, this post will outline mitigation strategies within their control. 

 

How JDBC was originally designed to be used

JDBC was designed to return the final results of a query after all relevant filters, joins, and aggregations have been performed. These final results are almost always much smaller than the number of tuples that are accessed as part of query processing, since many query operations — especially filters and aggregations — receive large datasets as input and produce a smaller result set. Thus, JDBC as a protocol is designed to send small(ish) results to the application — typically on the order of kilobytes, and not more than on the order of megabytes. 

 

Trino uses JDBC in a way that it was not designed for

Trino’s use of JDBC is fundamentally antithetical to this originally intended use case. Given that Trino is itself a high-performance query execution engine, it doesn’t need the underlying system to perform all the query processing work. On the contrary, Trino is typically deployed as a highly parallel cluster of machines. It can often perform query processing with better performance than the underlying system. 

Furthermore, there are several other practical reasons (discussed in a previous post) why Trino prefers not to request the underlying system to perform all the query processing. These reasons include differences in API and SQL dialects across different underlying database systems (making it difficult for Trino to express the query in the appropriate way before sending it to the underlying system), and challenges in query optimization when Trino is unable to predict the cost of different query execution options in the underlying system. 

For all these reasons, Trino typically extracts data at an earlier stage of query processing from the underlying system, often when it is considerably larger than its state after being fully processed. In many cases, Trino prefers to extract the entire raw data set (except for simple filters and aggregations) and do all the query processing itself. This may result in datasets on the order of gigabytes (or more) being passed from the underlying system to Trino. JDBC was certainly not designed for this type of use case.

 

Database vendors are not motivated to make data extraction fast

In truth, the problem of extracting large amounts of data from commercial database systems has always been a challenge — the bottlenecks are not limited to JDBC. In general, commercial database systems want to do more than just store your data. Rather, they want to “own” it, serving as the primary access point. Thus, database vendors have never been motivated to provide fast data export tools. This phenomenon is now being replicated by cloud vendors who almost always charge much higher egress prices than ingress prices. 

Database researchers have highlighted this problem

A great illustration of this can be found in the VLDB 2017 “Don’t Hold My Data Hostage” paper in which the authors ran a series of experiments on several widely used database systems where they extracted the entirety of a 7GB table from 8 different systems via the ODBC protocol (which is similar in design to JDBC except that it is not limited to Java) and found that every single system was over ten times slower than the cost of directly copying the data over a socket using netcat. Table 1 from their paper is reproduced below:

Table 1: Time taken for result set (de)serialization + transfer 

System Time (s) Size of data transferred (GB)
Netcat 10.25 7.19
MySQL 101.22 7.44
DB2  169.48  7.33
DBMS X (probably Oracle)  189.50  6.35
PostgreSQL 201.89 10.39
MonetDB 209.02 8.97
MySQL+C  391.27 2.85
Hive  627.75 8.69
MongoDB  686.45 43.6

In this experiment, some of the tables were compressed prior to transfer, resulting in smaller data sizes transferred (as shown in the third column of the above table). But this compression (and decompression when it arrives at its destination) process only increased the overall time to complete the extraction. For example, MySQL without compression spent 101 seconds to perform the extraction (second row in the above table), but with compression (sixth row), it took almost 4 times slower (and 40 times slower than the netcat baseline), despite the reduced size of the network transfer.  

These results indicate that the primary bottleneck in the extraction is the serialization process in which the database converts its raw data (however it is represented internally) into a format that can be transferred over the network, and deserialization when it arrives at its destination. These operations transform a process that should only take a few seconds into one that takes several minutes for this 7GB table. 

My lab at the University of Maryland has run into the same problem

I’ve seen similar results in my own experiments. In my lab at the University of Maryland, we observed similar results last year when we extracted a 32GB table sitting in PostgreSQL from Trino over a JDBC connection (that we ran in the context of an experiment for a different research project). The total extraction time was 855 seconds (approximately 37 MB/s). When extracting a 1.6GB Clickhouse table using Trino over a JDBC connection, it took 45 seconds (approximately 36 MB/s).

If each node in Trino is capable of processing data at a rate of hundreds of megabytes (or even single-digit gigabytes) per second, but data can only arrive at a rate of 36 or 37MB per second, that ingress time is going to be the bottleneck. It’s like trying to drive a Ferrari in the jungle — a total waste of the advanced processing capability of Trino.

Adding nodes to the Trino cluster will not help

This issue is only exacerbated if you are using a Trino cluster of many nodes. For example, if you have more than 10 nodes in your Trino cluster, they can potentially process data in aggregate over 10s of gigabytes per second. However, the ingress rate over JDBC remains unchanged: data can still be sent to Trino over the JDBC connection at the same 36 or 37MB per second, as the results of a JDBC query are sent over a single thread to the application. So there is now a 2 or 3 orders of magnitude processing performance difference between the Trino cluster and the data input over JDBC. The only way to fully utilize the capabilities of the Trino cluster in such a circumstance is to have thousands of queries running concurrently. 

Should Trino avoid accessing data in database systems?

The JDBC bottleneck is thus quite problematic, and only gets worse when the downstream system is a high-performance parallel processing system. Nobody cares about this bottleneck when only a few kilobytes or megabytes are being transferred, as in JDBC’s intended use case. However, when Trino attempts to extract raw datasets, the JDBC bottleneck becomes apparent.  

This is a major reason why Trino’s most popular use cases include accessing open format data (such as Parquet files) stored in data lakes. Such data sources can be extracted at high performance without needing to resort to a JDBC connection. Open format data sitting in data lakes allows Trino’s high-performance parallel query processing engine to do its thing without being encumbered by data ingress bottlenecks. 

But does this mean that Trino should not be used to access data in database systems? Absolutely not! One of the main benefits of Trino is the ability to access data anywhere, no matter the input data source. Restricting its usage to non-database systems would severely limit its impact within an organization. 

Rather, we need to mitigate the bottleneck.

 

JDBC bottleneck mitigation in theory

As a basic rule behind the mitigation strategies for this bottleneck, it is critical that Trino creates multiple JDBC connections to extract data in parallel from the underlying systems. This is a key area where the Starburst distribution of Trino differentiates from other distributions.   

In other words, instead of having a single JDBC connection that extracts data at 35MB per second, having 10 JDBC connections all extracting data at that same rate would produce an aggregate ingress rate of 350MB per second. Having 100 JDBC connections could theoretically produce an aggregate ingress rate of 3.5GB per second. 

A simple way to increase parallelism

One way to increase JDBC parallelism is to divide a large extraction query into many smaller extraction tasks. To explain, let’s use an example. Let’s say we have a dataset of “customers” or “users” that we want to extract from the underlying system into Trino. The original SQL query might look something like:

SELECT * FROM customers;

Running the above query as a single SQL query would result in a single JDBC connection to the underlying database and would almost certainly become a bottleneck. In order to increase the number of JDBC connections, we could theoretically divide the original query into 100 subqueries, each with a non-overlapping WHERE clause filter.  For example:

SELECT * FROM customers WHERE age = 1;
SELECT * FROM customers WHERE age = 2;
SELECT * FROM customers WHERE age = 3;
SELECT * FROM customers WHERE age = 100;

Each query in the above example extracts a different 1/100th of the raw dataset. Each of these 100 queries can be extracted via a separate JDBC connection. Running them in parallel could therefore achieve our goal of parallelizing JDBC connections. In theory, we could extract data 100 times faster (assuming there is approximately an equal number of customers for each age). 

The wack-a-mole problem

Unfortunately,  this technique would likely not see 100 times faster extraction rates to achieve the aggregate ingress rate of 3.5GB. This is because the JDBC bottleneck would likely get replaced by a different bottleneck: the ability of the underlying database system to handle and produce results for 100 concurrent input queries in parallel. 

Most database systems limit the number of input queries they are willing to process in parallel. This is especially the case if these input queries are coming from the same requesting source. This is because too many concurrent queries can cause thrashing for resources, as critical database resources (such as its in-memory buffer pool) get overwhelmed with competing requests from different queries. Therefore, unless the underlying system is unusually capable of handling many separate concurrent requests and processing them all in parallel without getting overwhelmed, there will be a limit to how many concurrent JDBC connections can be formed using this technique. 

The reason why database systems limit concurrency

To explain why the underlying system will not allow these 100 queries to run in parallel, let’s look at these requests from the vantage point of the database system. Assume (for now) that there is no index on age in the underlying system. If so, then each one of the 100 queries listed above would require a table scan of the entire customers table in order to extract all customers with the age specified by that particular query. 

Each scan requires dedicated CPU resources. If data is not stored in memory, it will also require disk I/O resources. Most systems lack the dedicated resources needed to run 100 scans in parallel. And although there have been clever research papers from academia that show how these 100 table scans could be combined into a single table scan for this exact type of use case, in practice, most available systems in industry do not implement this optimization (or at least struggle to get the full benefit of this optimization). 

Therefore, in practice, the underlying system will only run a fraction of these 100 queries in parallel. When the first subset of these 100 queries is complete, it will start working on the next subset. Thus, since each subset runs sequentially, the overall speed up will be far less than a factor of 100 when using this technique of breaking up a large extraction query into many smaller extraction queries. 

Secondary indexing will not solve the problem

Even if the underlying system has an index on age, if this index is not primary (i.e., data is not sorted by age), then the situation is the same: most systems will not allow 100 concurrent index lookups on a secondary index. The reason for this is that if data is not sorted by the attribute being extracted (age in this case), then the required tuples for each query will be located at different locations wherever the table is stored.

If the table is stored on an external storage device, each query will require many random reads to read data at each of these different locations, which puts a lot of stress on the storage device. Generally, 100 concurrent processes, each issuing many random reads, is likely to overwhelm the storage device where the data is located. 

Even if the table is stored in memory, random reads to memory are typically slower than sequential reads for this type of scenario. This is because each cache line sent from RAM to the CPU will generally be polluted with data from other nearby tuples that are irrelevant to the query being processed. 

Parallelism is best achieved through partitioning

The underlying system will be far more capable of executing more of these queries in parallel if it does not need to perform a complete table scan for each of these 100 queries.

For example, if each age is stored in a separate partition in the database, then each of the 100 queries with a WHERE clause predicate on age involves extracting the correct partition for that age, without having to read data from any other irrelevant partition. This consumes far fewer resources per query, and thus more can run in parallel. 

Most database systems support some sort of PARTITION BY command in SQL to declare a one-time partitioning strategy on a table. In our example, if we were to run something along the lines of the following command (the precise syntax will vary across systems):

ALTER TABLE customers
PARTITION BY LIST (age) (
    PARTITION p1 VALUES IN (1),
    PARTITION p2 VALUES IN (2),
    PARTITION p3 VALUES IN (3),
    …
    PARTITION p100 VALUES IN (100),
);

Each of the 100 ages will be stored in a separate partition. Thus, the queries in our example will run much faster and will generally be more parallelizable. 

Can a primary index achieve the same effect as partitioning?

We mentioned earlier that secondary indexes are less effective than partitioning because the data is not sorted by the attribute indexed by the secondary index. Therefore, extracting all data that has a particular value for this attribute will require many slow random lookups through the secondary index, whereas extracting all data in a partition is a simple sequential scan through that partition. 

What about a primary index? Tables are generally sorted (or at least “clustered”) by the primary index attribute. Therefore, all data that contains the same value for the primary index attribute will be next to each other, and extracting it will thus be a simple sequential scan, similar to a partition read. In our example. each query could simply use the index to quickly find the location on disk for the particular age requested by that query and extract results sequentially. 

Before we go too far down this line of thinking, it is important to point out that it may not be possible to create a primary index on whatever attribute we want. Many systems automatically make the primary index on the attribute that is declared to be the primary key of the table. Hence, creating a primary index on an attribute would require us to declare that attribute the primary key. If we were to run the following command: 

ALTER TABLE customers
ADD CONSTRAINT pk_age PRIMARY KEY (age);

This will fail in just about every system, since the primary key must be unique, and there will generally be repeats of the age attribute in most instances of our example customers table. 

Primary indexing can sometimes yield a similar effect to partitioning

Nonetheless, our choice to use a customer’s age as the attribute to divide our WHERE clause predicates in the example above was arbitrary. Instead of using age, we could have used customerID. Assuming such an attribute exists, it will probably be the primary key and thus usually have a primary index on it. For example, instead of subdividing the original extraction query into the following 100 subqueries:

SELECT * FROM customers WHERE age = 1;
SELECT * FROM customers WHERE age = 2;
SELECT * FROM customers WHERE age = 3;

SELECT * FROM customers WHERE age = 100;

We could instead subdivide it into the following alternative subqueries:

SELECT * FROM customers WHERE customerID >= 0 AND customerID < 1000;
SELECT * FROM customers WHERE customerID >=1000 AND customerID < 2000;
SELECT * FROM customers WHERE customerID >=2000 AND customerID < 3000;

SELECT * FROM customers WHERE customerID >=99000 AND customerID < 100000;

Figuring out how to divide the customerID attribute into 100 ranges of approximately equal size may require some knowledge of the dataset we are working with, but assuming we can do this, the underlying system will be able to use its primary index to quickly run each of the above 100 queries without performing a complete table scan for each query. 

Although it is conceivable that issuing the 100 queries above could run in 100 different parallel JDBC extraction tasks, in practice, even this approach is unlikely to fully get 100X extraction speedup. Most underlying systems have limits on how many parallel queries they are willing to run, even if each query is super lightweight and easy to process. In truth, even partition extractions will still run into this maximum parallel query constraint. Nonetheless, a simple partition extraction has a very clear and consistent performance cost, whereas index reads are more variable; many systems will allow for more parallel partition extraction tasks than sequential index reads. 

Furthermore, given that there is more flexibility in what attribute to use for partitioning data than there is in what attribute to use for creating a primary index, this is another reason to prefer partitioning over indexing.

Finally, the database connector in Trino is more likely to be able to automatically create parallel JDBC connections via the partitioning approach than the indexing approach, since partitions are easier to reason about and reference than ranges of the primary index.

For all of these reasons, the partitioning approach is overall more likely to be successful than the indexing approach. 

 

Summary of the above discussion on JDBC bottleneck mitigation in theory

The bottom line from the theoretical discussion I presented above is that the fastest way to get data out of an underlying system is to extract each partition separately and in parallel. Even if JDBC has to be used to access each partition, the underlying system will generally allow for the maximum possible parallelism if each partition is accessed separately. However, if the underlying system is not already partitioned or does not support partitioning at all, then the next best option is to access data via filters on the primary key of the table (on which there usually exists a primary index).

 

JDBC bottleneck mitigation in practice

To summarize what we have said in this post so far, there are three main points.

  1. If Trino is accessing an underlying database system via a JDBC connection, this JDBC connection is extremely likely to be a performance bottleneck.
  2. The best way to avoid this bottleneck is to use many JDBC connections to access data from the underlying system.
  3. This is most likely to be successful if each JDBC connection accesses a different partition in the underlying system. 

Now for some bad news: much of this is out of the control of the Trino end user. The end user simply issues the query, and it is the Trino database connector that does the work of creating one or more JDBC connections and issuing different queries over these connections. Whether it uses partitioning, indexing, or none of the above is a decision made by the database connector — not really the end user.

Nonetheless, we said above that partitioning is by far the most effective approach to avoid the JDBC bottleneck. It is impossible for the Trino connector to take advantage of this partitioning if the data in the underlying system is not already partitioned. Whoever administers the underlying system has control over whether data is partitioned or not. This is a very important step in potentially avoiding this bottleneck, and usually, the request to partition data must be submitted by a human.

 

How Starburst helps solve this problem

However, even if data is already partitioned in the underlying system, the partitioning approach to avoid the bottleneck will not be effective unless the Trino database connector knows how to take advantage of these partitions. There is a lot of variation in how well different Trino distributions take advantage of partitions to avoid the JDBC bottleneck. Indeed, this is a reason why many of Starburst’s customers choose Starburst when using Trino to access data in underlying database systems. Understanding the bottleneck and the mitigation options is a critical part of the process of choosing the right Trino distribution for your needs.

In my next post, I will support much of the theory in this post with real performance numbers that will investigate the cost of the JDBC bottleneck when running actual queries over large datasets. We will see the difference between running queries on a distribution that is unable to take advantage of partitioning in the underlying system and one that does. 

 

Start for Free with Starburst Galaxy

Try our free trial today and see how you can improve your data performance.
Start Free