-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 JDBC source: adjust streaming query fetch size dynamically #12400
Conversation
9f81e2b
to
d63e0e1
Compare
/test connector=connectors/source-postgres
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a few places where I think a comment would be useful, but this looks really good!
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimator.java
Show resolved
Hide resolved
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/BaseSizeEstimator.java
Outdated
Show resolved
Hide resolved
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/TwoStageSizeEstimator.java
Show resolved
Hide resolved
One question @tuliren what happens if a user have 2 connections that run in parallel? The first one is only calculating a limit of usage of total memory? Are the batch size recalculate every time or when the source throw a OOM? |
The batch size is recalculated per streaming query. Even within one connection, queries for each table can have a different batch size based on how large the average row in that table. So it does not matter how many connections a user has.
Not sure what you mean by "the first one". When calculating the batch size, we currently assume that there is a 200 MB buffer size in memory. So the batch size = 200 MB / mean row byte size. |
|
||
public final class FetchSizeConstants { | ||
|
||
public static final long BUFFER_BYTE_SIZE = 200L * 1024L * 1024L; // 200 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this configurable, so that if a single row exceeds 200MB we can reconfigure the pod to have more memory and reconfigure the connector to have a larger buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 200 MB buffer size is not enforced. It is only used to calculated the fetch size. Currently, each connector has much more than 200 MB of heap size. The max row size the connector can handle is actually limited by the heap size.
I'd prefer not to expose this as a connector parameter. Users should not worry about this kind of low level details. It will make the setup confusing. For example, currently we let people configure part size in the blob storage connector. People don't always get what it means, and sometimes they can set a wrong value, resulting in failed connections. We are in the process of removing it.
If a row is larger than 200 MB, the user should store the data in the blob storage or something else. I don't think we need to support such edge case. No matter how large the buffer is, there can always be some use case that breaks it.
airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/streaming/AdaptiveStreamingQueryConfig.java
Outdated
Show resolved
Hide resolved
if (rawFetchSize > Integer.MAX_VALUE) { | ||
return maxFetchSize; | ||
} | ||
return Math.max(minFetchSize, Math.min(maxFetchSize, (int) rawFetchSize)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever return 0?
What happens when the estimator estimates that even a single row is larger than available buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it will always return a value in the range of [minFetchSize, maxFetchSize]
.
As I mentioned in the above comment, as long as the row can fit in the total heap, the connector can still handle it.
/test connector=connectors/source-postgres
|
/test connector=connectors/source-mysql
|
/test connector=connectors/source-mssql
|
/test connector=connectors/source-snowflake
|
/test connector=connectors/source-snowflake
|
/test connector=connectors/source-cockroachdb
|
/test connector=connectors/source-db2
|
/test connector=connectors/source-oracle
|
/test connector=connectors/source-redshift
|
/test connector=connectors/source-tidb
|
Will publish new connector versions in separate PRs. |
Hi @tuliren , When you say "adjust streaming query fetch size dynamically", does the fetch size depend on the capacity of database sources ? Now if i increase the capacity of my database sources (increase the buffer size and fetch size ) and if i give 20GO of RAM to Airbyte (JOB_MAIN_CONTAINER_MEMORY_REQUEST and JOB_MAIN_CONTAINER_MEMORY_LIMIT ) does this will help me to have better performance ? however the preformance the logs will print every 1000 rows ? |
Did you update the connector version @SamiRiahy ? |
@tuliren I'm late to the party... this is awesome! |
Hi @SamiRiahy,
No, the fetch size depends on how large the average row size is. Internally we allocate roughly 200 MB in memory as the buffer. We first measure the average row size, say it is X MB, and then calculate the fetch size by 200 / X. In this way, we can 1) avoid reading too much data in each fetch and prevent the out-of-memory issue, and 2) read more rows in each fetch if the average row size is small to improve the performance. Currently the max fetch size is set to 100K. In reality, I did not see much performance improvement by reading more rows per fetch. To better improve the performance, we need to investigate the bottleneck more closely. Here is the issue that tracks this topic: #12532
By "20GO", do you mean 20 GB? This is probably unnecessary, and I think it won't improve the performance much, for two reasons:
Internally we use 750 MB for the database source connectors. The database connector should be able to work with only 500 MB for most dataset (i.e. the dataset does not have tables with fat rows like 100MB per row). If you have spare resource, giving it 1GB should be more than enough.
Yes. We log the record count for every 1000 rows, and the fetch size for every 100 rows. I think the latter is too frequent. I will reduce the logging frequency. |
* Merge all streaming configs to one * Implement new streaming query config * Format code * Fix comparison * Use double for mean byte size * Update fetch size only when changed * Calculate mean size by sampling n rows * Add javadoc * Change min fetch size to 1 * Add comment by buffer size * Update java connector template * Perform division first * Add unit test for fetching large rows * Format code * Fix connector compilation error
What
How
Recommended reading order
InitialSizeEstimator.java
SamplingSizeEstimator.java
TwoStageSizeEstimator.java
AdaptiveStreamingQueryConfig.java
StreamingJdbcDatabase.java
🚨 User Impact 🚨