ETL: Performant Joins Between Imbalanced Datasets
A problem I have recently been faced with at work is the joining of imbalanced datasets. The data you’re interested in may be only a few hundred records, but you need corresponding data from a source with over a million. From using a straightforward JOIN technique, there is no way of getting around this; in order to verify the join was complete, the entirity of both sources must be loaded, meaning the total runtime will be the time it takes for the longest of your sources to return.
An Example
Let’s take an example. Let’s say I have Birthday Card business. Someone pays money to send their friends birthday cards on their birthday every year. Every day, our ETL queries a list of all participants whose birthday is today from the BIRTHDAY
table. Let’s assume this is ~2K people.
In order to send the physical card, we must send that persons information and their address to a third party card-mailing company. We get these addresses from the ADDRESS
table. This table is much larger than our BIRTHDAY
table, holding address for every person whose birthday we’re tracking. Let’s assume it has >1M records.
Important Note: Assume here that both tables are on different sources. Else we could use performant SQL JOINs in our query and solve our problem. Also assume that our ETL is less performant when joining tables from different sources than SQL JOINs between tables on the same datasource; this is typically the case.
TODO: Add diagram of this example
flowchart LR
db1[(Birthday Table)] -- 2K --> join[JOIN]
db2[(Address Table)] -- 1M --> join[JOIN]
join --> etl[Futher Processing]
The simplest implementation would be to query the ADDRESS
table in its entirety, then join the results of the two queries (birthday and address). We can see the issue here. The ADDRESS
table will take quite a while to return and we will be waiting on it in order to join with our result from BIRTHDAY
.
Even in a case where the ETL software allows us to stream data and operate our pipeline components in parallel, we still run into the same issue. To make sure that all possible joins have taken place, the entirity of both sources must be loaded.
In our example, we don’t need the 99.98% of address records that don’t correspond to birthday people. So what can we do?
A Solution
Let’s think of this idea:
Instead of querying the entirity of the larger source, use the smaller source to limit the result inside the query instead of afterwards, during the JOIN.
Let’s go back to our example. In our tables, people are represented with a field called PERSON_ID. In order to join between birthday and address sources, we query the results from both tables and then JOIN using PERSON_ID. Why not query the larger table by explicitly specifying the data from the smaller source? This way we’re only pulling the 0.2% we care about, letting the data source optimize the execution plan for us.
There are two ways to do this: Grouping into IN statement and Temporary Tables. Each option has its pros and cons depending on your ETL tool and development environment.
Grouping into IN
Here is an interesting solution. It may seem hacky to some of you, but I have employed this technique with significant performance improvement (orders of magnitude) in some of my pipelines.
SQL queries have an IN clause. This allows us to write a query that looks like this:
1
2
3
...
AND CONDITION = TRUE;
AND PERSON_ID IN ('1234', '5678',...)
Using a dynamic query, which most ETL tools support, we can essentially “hardcode” the IDs, we’re looking for into the query. From there, the SQL execution plan will efficiently hand select only those addresses matching our given Person IDs.
The flow would look like this:
flowchart TD
op1[[Source]] --> op2[[Copy and Isolate IDs]]
op2 --> op3[[Group IDs into lists]]
op3 --> op4[[Run Dynamic SQL against larger source with ID list]]
op4 --> op5[[Rejoin back to original Source]]
Another way to look at this is instead of making one call and receiving 1M results, we make 20 calls and receive results of 100 from each.
This option works well if you have an easy way to implement the concatenation of different rows into the same row. It is also a quick fix, especially with limited permissions.
However, when implementing this option you must be aware of the limitations of the larger data source’s query language. Here are a list of maximum values allowed inside the IN clause:
Query Language | IN Clause Limitation | Source |
---|---|---|
T-SQL (SQL Server) | indescript “many thousands of values” | IN (Transact-SQL) |
PL/SQL (Oracle) | 1000 | ORA-01795 |
pgSQL (PostgreSQL) (JDBC 9.1) | No explicit limit | PostgreSQL - max number of… |
MySQL | “limited by the max_allowed_packet value” | MySQL 8.0 Reference Manual: IN Function |
In short, this method scales well in situations with highly imbalanced datasets, correct ETL tooling, and/or limited permissions on the larger data source.
Temporary Tables
On digging deeper into this problem, some professionals faced with this problem also suggest another method. Here is the idea: create a TEMP
table in the larger source, and load the smaller source into it. The reason we can’t rely on data source SQL query execution plans is because our data sits on different sources. By using a TEMP
table, you can use this query execution plan to more efficiently join the imbalanced sources.
Here is some psuedo-ish code implementing our example from earlier in this method:
- Query Birthday Table
- Create Birthday TEMP table in the Address Table’s source
1 2 3
CREATE TABLE #BIRTHDAY_TEMP ( ... )
- Add rows from ETL into Birthday TEMP table
1 2
INSERT INTO #BirthdayTable (...) VALUES (...)
- Join in Address Table’s source
1 2 3 4
SELECT ... FROM BIRTHDAY_TEMP LEFT JOIN ADDRESS ON PERSON_ID
Cases where this may not be possible is when you don’t have restricted access to the larger source, or the data from the smaller source is not allowed to cross into other systems (e.g. HIPAA).
Also, temporary table implementations across languages may differ (e.g. Oracle’s stock temporary tables only delete data, not the table itself). Generally there are two types: Local and Global. Local temporary tables persist only through the connection/session and are deleted when that connection is closed. Global tables are visible to all connections and deleted when all connections referencing them have closed.
A wonderful article I found on WebArchive, titled Why Large IN Clauses are Problematic discuss this method in contrast to the previous one.
Summary
Both these methods use different ways to achieve the same outcome: More performant joins on imbalanced datasets in different sources. Below are summarized their caveats.
Grouping Into IN
- Scales linearly
- Query isn’t written until runtime
Temporary Tables
- More standard implementation across ETL tools
- Not viable with reduced permissions or data restrictions
- Requires loading
There may be other solutions to this problem.
Implementation Suggestions Across Different ETL Tools
How would you implement these across different ETL tools? Here are some suggestions.
Informatica
Though legacy, this ETL tool is still used widely. Temporary tables are definitely the easiest solution to implement here, as the Grouping By IN solution favors tools that have a higher capability for stream control.
If you were to implement the Grouping Into IN solution, however, you would need to first copy the stream, generate a sequence, associate each number with a row, concatenate values using the modulus of whatever chunk size you want to query (possibly rework this solution), put each of these strings into a Dynamic SQL Query, then rejoin.
Disclamer: I have not implemented this, but hopefully it gives you some ideas.
SnapLogic
SnapLogic is more streaming-oriented, so implementation of the Grouping Into IN is easier. There are specific Group By N “snaps” (transformations) that would let you copy stream, isolate IDs, Group By N, run a dynamic SQL Query, then rejoin.
Temporary table implementation is pretty standard across tools.
Talend
For Talend, I belive the tFixedFlowInput transformation could be used, following a similar implemenation as the solutions above (split, chunk into groups, dynamic SQL, rejoin).