Skip to content

Commit

Permalink
change SourceCtrlOpts::rate_limit to split_txn: bool
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 6, 2024
1 parent 185defc commit fc14654
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl SourceExecutor {
self.metrics,
SourceCtrlOpts {
chunk_size: self.chunk_size,
rate_limit: None,
split_txn: false,
},
ConnectorProperties::default(),
None,
Expand Down
14 changes: 7 additions & 7 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,14 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
}

#[try_stream(ok = Vec<SourceMessage>, error = ConnectorError)]
async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) {
async fn ensure_max_chunk_size(stream: BoxSourceStream, max_chunk_size: usize) {
#[for_await]
for batch in stream {
let mut batch = batch?;
let mut start = 0;
let end = batch.len();
while start < end {
let next = std::cmp::min(start + rate_limit as usize, end);
let next = std::cmp::min(start + max_chunk_size, end);
yield std::mem::take(&mut batch[start..next].as_mut()).to_vec();
start = next;
}
Expand All @@ -234,12 +234,12 @@ impl<P: ByteStreamSourceParser> P {
let actor_id = self.source_ctx().actor_id;
let source_id = self.source_ctx().source_id.table_id();

// TODO(): remove this later
// Ensure chunk size is smaller than rate limit
let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit {
Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit))
} else {
data_stream
};
let data_stream = Box::pin(ensure_max_chunk_size(
data_stream,
self.source_ctx().source_ctrl_opts.chunk_size,
));

// The parser stream will be long-lived. We use `instrument_with` here to create
// a new span for the polling of each chunk.
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ pub const MAX_CHUNK_SIZE: usize = 1024;
pub struct SourceCtrlOpts {
/// The max size of a chunk yielded by source stream.
pub chunk_size: usize,
/// Rate limit of source
pub rate_limit: Option<u32>,
/// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`.
pub split_txn: bool,
}

// The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it,
Expand Down Expand Up @@ -225,7 +225,7 @@ impl SourceContext {
Arc::new(SourceMetrics::default()),
SourceCtrlOpts {
chunk_size: MAX_CHUNK_SIZE,
rate_limit: None,
split_txn: false,
},
ConnectorProperties::default(),
None,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/common/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// limitations under the License.

/// Get the rate-limited max chunk size.
pub(crate) fn limited_chunk_size(rate_limit: Option<u32>) -> usize {
pub(crate) fn limited_chunk_size(rate_limit_burst: Option<u32>) -> usize {
let config_chunk_size = crate::config::chunk_size();
rate_limit
.map(|limit| config_chunk_size.min(limit as usize))
rate_limit_burst
.map(|burst| config_chunk_size.min(burst as usize))
.unwrap_or(config_chunk_size)
}
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
source_desc.metrics.clone(),
SourceCtrlOpts {
chunk_size: limited_chunk_size(self.rate_limit_rps),
rate_limit: self.rate_limit_rps,
split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
},
source_desc.source.config.clone(),
None,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
source_desc.metrics.clone(),
SourceCtrlOpts {
chunk_size: limited_chunk_size(self.rate_limit_rps),
rate_limit: self.rate_limit_rps,
split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
},
source_desc.source.config.clone(),
None,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
source_desc.metrics.clone(),
SourceCtrlOpts {
chunk_size: limited_chunk_size(self.rate_limit_rps),
rate_limit: self.rate_limit_rps,
split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
},
source_desc.source.config.clone(),
None,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl<S: StateStore> SourceExecutor<S> {
source_desc.metrics.clone(),
SourceCtrlOpts {
chunk_size: limited_chunk_size(self.rate_limit_rps),
rate_limit: self.rate_limit_rps,
split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn
},
source_desc.source.config.clone(),
schema_change_tx,
Expand Down

0 comments on commit fc14654

Please sign in to comment.