Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): remove chunk splitting logic in apply_rate_limit #19826

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,23 +360,9 @@ impl StreamChunk {
}
}

// Derive the chunk permits based on the provided rate limit
pub fn compute_rate_limit_chunk_permits(&self, limit: usize) -> usize {
let chunk_size = self.capacity();
let ends_with_update = if chunk_size >= 2 {
// Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`.
// If something inconsistent happens in the stream, we may not have `U+` after this `U-`.
self.ops()[chunk_size - 2].is_update_delete()
} else {
false
};
if chunk_size == limit + 1 && ends_with_update {
// If the chunk size exceed limit because of the last `Update` operation,
// we should minus 1 to make sure the permits consumed is within the limit (max burst).
chunk_size - 1
} else {
chunk_size
}
// Compute the required permits of this chunk for rate limiting.
pub fn compute_rate_limit_chunk_permits(&self) -> u64 {
self.capacity() as _
}
Comment on lines +363 to 366
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove it since there is no limit for burst?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I slightly prefer keeping this, because if we don't have this, we may have difficulty to decide between chunk.capacity() and chunk.cardinality() later.

}

Expand Down
10 changes: 4 additions & 6 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,9 @@ impl<R: LogReader> RateLimitedLogReader<R> {
RateLimit::Disabled => split_chunk,
RateLimit::Fixed(limit) => {
let limit = limit.get();
let required_permits: usize = split_chunk
.chunk
.compute_rate_limit_chunk_permits(limit as _);
if required_permits <= limit as _ {
self.rate_limiter.wait(required_permits as _).await;
let required_permits = split_chunk.chunk.compute_rate_limit_chunk_permits();
if required_permits <= limit {
self.rate_limiter.wait(required_permits).await;
split_chunk
} else {
// Cut the chunk into smaller chunks
Expand All @@ -441,7 +439,7 @@ impl<R: LogReader> RateLimitedLogReader<R> {

// Trigger rate limit and return the first chunk
self.rate_limiter
.wait(first_chunk.compute_rate_limit_chunk_permits(limit as _) as _)
.wait(first_chunk.compute_rate_limit_chunk_permits())
.await;
SplitChunk {
chunk: first_chunk,
Expand Down
12 changes: 6 additions & 6 deletions src/stream/src/executor/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,17 +345,17 @@ async fn apply_dml_rate_limit(
continue;
}
RateLimit::Fixed(limit) => {
let max_permits = limit.get() as usize;
let required_permits = chunk.compute_rate_limit_chunk_permits(max_permits);
let max_permits = limit.get();
let required_permits = chunk.compute_rate_limit_chunk_permits();
if required_permits <= max_permits {
rate_limiter.wait(required_permits as _).await;
rate_limiter.wait(required_permits).await;
yield TxnMsg::Data(txn_id, chunk);
} else {
// Split the chunk into smaller chunks.
for small_chunk in chunk.split(max_permits) {
for small_chunk in chunk.split(max_permits as _) {
let required_permits =
small_chunk.compute_rate_limit_chunk_permits(max_permits);
rate_limiter.wait(required_permits as _).await;
small_chunk.compute_rate_limit_chunk_permits();
rate_limiter.wait(required_permits).await;
yield TxnMsg::Data(txn_id, small_chunk);
}
}
Expand Down
28 changes: 14 additions & 14 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,21 @@ pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Opti
continue;
}

let limit = rate_limit_rps.unwrap() as usize;

let required_permits: usize = chunk.compute_rate_limit_chunk_permits(limit);
if required_permits <= limit {
limiter.wait(required_permits as _).await;
yield chunk;
} else {
// Cut the chunk into smaller chunks
for chunk in chunk.split(limit) {
limiter
.wait(chunk.compute_rate_limit_chunk_permits(limit) as _)
.await;
yield chunk;
}
let limit = rate_limit_rps.unwrap() as u64;
let required_permits = chunk.compute_rate_limit_chunk_permits();
if required_permits > limit {
// This should not happen after https://github.com/risingwavelabs/risingwave/pull/19698.
// But if it does happen, let's don't panic and just log an error.
tracing::error!(
chunk_size,
required_permits,
limit,
"unexpected large chunk size"
);
}

limiter.wait(required_permits).await;
yield chunk;
}
}

Expand Down
Loading