Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 6, 2024
1 parent fc14654 commit 46e5995
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
30 changes: 17 additions & 13 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceMessage, SourceMeta,
SourceContextRef, SourceCtrlOpts, SourceMessage, SourceMeta,
};

mod access_builder;
Expand Down Expand Up @@ -221,29 +221,32 @@ async fn ensure_max_chunk_size(stream: BoxSourceStream, max_chunk_size: usize) {

#[easy_ext::ext(SourceParserIntoStreamExt)]
impl<P: ByteStreamSourceParser> P {
/// Parse a data stream of one source split into a stream of [`StreamChunk`].
/// Parse a stream of vectors of [`SourceMessage`] into a stream of [`StreamChunk`].
///
/// # Arguments
/// - `data_stream`: A data stream of one source split.
/// To be able to split multiple messages from mq, so it is not a pure byte stream
///
/// - `msg_stream`: A stream of vectors of [`SourceMessage`].
///
/// # Returns
///
/// A [`ChunkSourceStream`] which is a stream of parsed messages.
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream {
/// A [`ChunkSourceStream`] which is a stream of parsed chunks. Each of the parsed chunks
/// are guaranteed to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless
/// there's a large transaction and `source_ctrl_opts.split_txn` is false.
pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream {
let actor_id = self.source_ctx().actor_id;
let source_id = self.source_ctx().source_id.table_id();

// TODO(): remove this later
// Ensure chunk size is smaller than rate limit
let data_stream = Box::pin(ensure_max_chunk_size(
data_stream,
let msg_stream = Box::pin(ensure_max_chunk_size(
msg_stream,
self.source_ctx().source_ctrl_opts.chunk_size,
));

// The parser stream will be long-lived. We use `instrument_with` here to create
// The stream will be long-lived. We use `instrument_with` here to create
// a new span for the polling of each chunk.
into_chunk_stream_inner(self, data_stream)
let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
into_chunk_stream_inner(self, msg_stream, source_ctrl_opts)
.instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
}
}
Expand All @@ -257,7 +260,8 @@ const MAX_TRANSACTION_SIZE: usize = 4096;
#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
mut parser: P,
data_stream: BoxSourceStream,
msg_stream: BoxSourceStream,
source_ctrl_ops: SourceCtrlOpts,
) {
let columns = parser.columns().to_vec();

Expand All @@ -271,7 +275,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
let mut direct_cdc_event_lag_latency_metrics = HashMap::new();

#[for_await]
for batch in data_stream {
for batch in msg_stream {
// It's possible that the split is not active, which means the next batch may arrive
// very lately, so we should prefer emitting all records in current batch before the end
// of each iteration, instead of merging them with the next batch. An exception is when
Expand Down Expand Up @@ -490,7 +494,7 @@ pub enum ByteStreamSourceParserImpl {
}

impl ByteStreamSourceParserImpl {
/// Converts [`SourceMessage`] stream into [`StreamChunk`] stream.
/// Converts [`SourceMessage`] vec stream into [`StreamChunk`] stream.
pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin {
#[auto_enum(futures03::Stream)]
let stream = match self {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;
/// The max size of a chunk yielded by source stream.
pub const MAX_CHUNK_SIZE: usize = 1024;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct SourceCtrlOpts {
/// The max size of a chunk yielded by source stream.
pub chunk_size: usize,
Expand Down

0 comments on commit 46e5995

Please sign in to comment.