-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46641][SS] Add maxBytesPerTrigger threshold #44636
[SPARK-46641][SS] Add maxBytesPerTrigger threshold #44636
Conversation
@viirya |
@MaxNevermind Could you create a Jira ticket and replace |
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
Outdated
Show resolved
Hide resolved
Will do, but I didn't have a ASF JIra account, I've just requested it using a ASF self service, it says that it will take few days to review my request. |
I think your Jira account was created now. |
@viirya |
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most issues are addressed internally so I don't see other issues now. I took the tests now and looks okay to me.
cc @HeartSaVioR
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
Outdated
Show resolved
Hide resolved
Sorry for being late. I'm review now, @MaxNevermind . |
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (except a few minor style issues), @MaxNevermind .
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
Outdated
Show resolved
Hide resolved
@viirya @dongjoon-hyun |
To @MaxNevermind , this is still under review technically because the previous approval is not for the last commit. |
@dongjoon-hyun |
We are already on this PR. Do you want to bring someone-else? |
No, just want to confirm if there is no actin point to me here right now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To @viirya and @MaxNevermind . What is the conclusion for this discussion? I reviewed this PR Today once more. IIUC, this is the last remaining issue on this PR, isn't it? Did you make any conclusion on this?
Replied the comment. I also replied one issue on truncating |
…-option' into streaming-add-maxBytesPerTrigger-option
Pushed another commit. One issue was resolved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM, too.
Feel free to merge if CI passes.
@dongjoon-hyun |
Hi, @MaxNevermind . It was a comment for @viirya . Let me merge this. :) |
Merged to master for Apache Spark 4.0.0. Congratulations for your first commit, @MaxNevermind . I added you to the Apache Spark contributor group and assigned SPARK-46641 to you, @MaxNevermind . |
Thank you @MaxNevermind @dongjoon-hyun |
What changes were proposed in this pull request?
This PR adds Input Streaming Source's option
maxBytesPerTrigger
for limiting the total size of input files in a streaming batch. Semantics ofmaxBytesPerTrigger
is very close to already existing onemaxFilesPerTrigger
option.How a feature was implemented?
Because
maxBytesPerTrigger
is semantically close tomaxFilesPerTrigger
I used all themaxFilesPerTrigger
usages in the whole repository as a potential places that requires changes, that includes:I went over the usage of all usages of
maxFilesPerTrigger
inFileStreamSourceSuite
and implementedmaxBytesPerTrigger
in the same fashion as those two are pretty close in their nature. From the structure and elements of ReadLimit I've concluded that current design implies only one simple rule for ReadLimit, so I openly prohibited the setting of both maxFilesPerTrigger and maxBytesPerTrigger at the same time.Why are the changes needed?
This feature is useful for our and our sister teams and we expect it will find a broad acceptance among Spark users. We have a use-case in a few of the Spark pipelines we support when we use Available-now trigger for periodic processing using Spark Streaming. We use
maxFilesPerTrigger
threshold for now, but this is not ideal as Input file size might change with the time which requires periodic configuration adjustment ofmaxFilesPerTrigger
. Computational complexity of the job depends on the event count/total size of the input andmaxBytesPerTrigger
is a better predictor of that thanmaxFilesPerTrigger
.Does this PR introduce any user-facing change?
Yes
How was this patch tested?
New unit tests were added or existing
maxFilesPerTrigger
test were extended. I searchedmaxFilesPerTrigger
related test and added new tests or extended existing ones trying to minimize and simplify the changes.Was this patch authored or co-authored using generative AI tooling?
No.