Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: finish source backfill immediately for scan.startup.mode=latest (#20285) #20289

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ SET streaming_use_shared_source TO false;
statement ok
create materialized view mv_2 as select * from s0;

statement ok
SET streaming_use_shared_source TO true;


sleep 2s

query ?? rowsort
Expand Down Expand Up @@ -370,3 +374,32 @@ drop source s0 cascade;

statement ok
drop source s_before_produce cascade;

# test: scan.startup.mode=latest should not be blocked when there's no data to backfill
# https://github.com/risingwavelabs/risingwave/issues/20083#issuecomment-2609422824
statement ok
create source s_latest (v1 int, v2 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

# Note: batch kafka scan ignores scan.startup.mode
query ? rowsort
select count(*) from s_latest;
----
55

statement ok
create materialized view mv_latest as select * from s_latest;

query ? rowsort
select count(*) from mv_latest;
----
0

statement ok
drop source s_latest cascade;

system ok
rpk topic delete shared_source;
16 changes: 12 additions & 4 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ impl SplitReader for KafkaSplitReader {
properties.common.sync_call_timeout,
)
.await?;
tracing::debug!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
// note: low is inclusive, high is exclusive
if low == high {
tracing::info!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
// note: low is inclusive, high is exclusive, start_offset is exclusive
if low == high || split.start_offset.is_some_and(|offset| offset + 1 >= high) {
backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
} else {
debug_assert!(high > 0);
Expand All @@ -146,7 +146,15 @@ impl SplitReader for KafkaSplitReader {
);
}
}
tracing::debug!("backfill_info: {:?}", backfill_info);
tracing::info!(
topic = properties.common.topic,
source_name = source_ctx.source_name,
fragment_id = source_ctx.fragment_id,
source_id = source_ctx.source_id.table_id,
actor_id = source_ctx.actor_id,
"backfill_info: {:?}",
backfill_info
);

consumer.assign(&tpl)?;

Expand Down
Loading