diff --git a/e2e_test/source_legacy/opendal/data/data1.csv b/e2e_test/source_inline/fs/data/data1.csv similarity index 100% rename from e2e_test/source_legacy/opendal/data/data1.csv rename to e2e_test/source_inline/fs/data/data1.csv diff --git a/e2e_test/source_legacy/opendal/data/data2.csv b/e2e_test/source_inline/fs/data/data2.csv similarity index 100% rename from e2e_test/source_legacy/opendal/data/data2.csv rename to e2e_test/source_inline/fs/data/data2.csv diff --git a/e2e_test/source_legacy/opendal/posix_fs.slt b/e2e_test/source_inline/fs/posix_fs.slt similarity index 90% rename from e2e_test/source_legacy/opendal/posix_fs.slt rename to e2e_test/source_inline/fs/posix_fs.slt index 88cc57df16ccd..79980737a0cc8 100644 --- a/e2e_test/source_legacy/opendal/posix_fs.slt +++ b/e2e_test/source_inline/fs/posix_fs.slt @@ -10,7 +10,7 @@ CREATE TABLE diamonds ( ) WITH ( connector = 'posix_fs', match_pattern = 'data*.csv', - posix_fs.root = 'e2e_test/source_legacy/opendal/data', + posix_fs.root = 'e2e_test/source_inline/fs/data', ) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ','); sleep 10s diff --git a/e2e_test/source_legacy/basic/datagen.slt b/e2e_test/source_inline/kafka/datagen.slt similarity index 100% rename from e2e_test/source_legacy/basic/datagen.slt rename to e2e_test/source_inline/kafka/datagen.slt diff --git a/e2e_test/source_legacy/basic/handling_mode.slt b/e2e_test/source_inline/kafka/handling_mode.slt similarity index 77% rename from e2e_test/source_legacy/basic/handling_mode.slt rename to e2e_test/source_inline/kafka/handling_mode.slt index caf9be9a01fce..40259e130dcb3 100644 --- a/e2e_test/source_legacy/basic/handling_mode.slt +++ b/e2e_test/source_inline/kafka/handling_mode.slt @@ -1,9 +1,24 @@ +control substitution on + +system ok +rpk topic delete json_timestamptz_handling_mode || true + +system ok +rpk topic create json_timestamptz_handling_mode -p 1 + +system ok +cat <>) with ( - connector = 'kafka', - properties.bootstrap.server='message_queue:29092', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'json_timestamptz_handling_mode') format plain encode json (timestamptz.handling.mode = 'mili'); @@ -13,8 +28,7 @@ create table plain_guess ( "case" varchar, payload struct>) with ( - connector = 'kafka', - properties.bootstrap.server='message_queue:29092', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'json_timestamptz_handling_mode') format plain encode json (timestamptz.handling.mod = 'mili'); @@ -23,8 +37,7 @@ create table plain_milli ( "case" varchar, payload struct>) with ( - connector = 'kafka', - properties.bootstrap.server='message_queue:29092', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'json_timestamptz_handling_mode') format plain encode json (timestamptz.handling.mode = 'milli'); @@ -33,8 +46,7 @@ create table plain_micro ( "case" varchar, payload struct>) with ( - connector = 'kafka', - properties.bootstrap.server='message_queue:29092', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'json_timestamptz_handling_mode') format plain encode json (timestamptz.handling.mode = 'micro'); @@ -43,8 +55,7 @@ create table plain_utc ( "case" varchar, payload struct>) with ( - connector = 'kafka', - properties.bootstrap.server='message_queue:29092', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'json_timestamptz_handling_mode') format plain encode json (timestamptz.handling.mode = 'utc_string'); @@ -53,8 +64,7 @@ create table plain_naive ( "case" varchar, payload struct>) with ( - connector = 'kafka', - properties.bootstrap.server='message_queue:29092', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'json_timestamptz_handling_mode') format plain encode json (timestamptz.handling.mode = 'utc_without_suffix'); @@ -62,8 +72,7 @@ statement ok create table debezium_milli ( "case" varchar, at timestamptz, primary key("case")) with ( - connector = 'kafka', - properties.bootstrap.server='message_queue:29092', + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'json_timestamptz_handling_mode') format debezium encode json (timestamptz.handling.mode = 'milli'); diff --git a/e2e_test/source_legacy/basic/kinesis.slt b/e2e_test/source_inline/kafka/kinesis.slt similarity index 100% rename from e2e_test/source_legacy/basic/kinesis.slt rename to e2e_test/source_inline/kafka/kinesis.slt diff --git a/e2e_test/source_legacy/basic/temporary_kafka_batch.slt b/e2e_test/source_inline/kafka/temporary_kafka_batch.slt similarity index 74% rename from e2e_test/source_legacy/basic/temporary_kafka_batch.slt rename to e2e_test/source_inline/kafka/temporary_kafka_batch.slt index 578cc76b47d59..69144a884486f 100644 --- a/e2e_test/source_legacy/basic/temporary_kafka_batch.slt +++ b/e2e_test/source_inline/kafka/temporary_kafka_batch.slt @@ -1,8 +1,23 @@ +control substitution on + +system ok +rpk topic delete test_temporary_kafka_batch || true + +system ok +rpk topic create test_temporary_kafka_batch -p 1 + +system ok +cat < 0 from (select * from nexmark_q3 limit 1); ----- -t - -statement ok -drop materialized view nexmark_q3; - -include ../../../nexmark/drop_sources.slt.part - -include ../../../nexmark/create_sources.slt.part -include ../../../streaming/nexmark/views/q4.slt.part - -sleep 10s - -query I -select count(*) > 0 from (select * from nexmark_q4 limit 1); ----- -t - -statement ok -drop materialized view nexmark_q4; - - -include ../../../nexmark/drop_sources.slt.part - -include ../../../nexmark/create_sources.slt.part -include ../../../streaming/nexmark/views/q5.slt.part - -sleep 20s - -query I -select count(*) > 0 from (select * from nexmark_q5 limit 1); ----- -t - -statement ok -drop materialized view nexmark_q5; - -include ../../../nexmark/drop_sources.slt.part diff --git a/e2e_test/source_legacy/basic/scripts/test_data/json_timestamptz_handling_mode.1 b/e2e_test/source_legacy/basic/scripts/test_data/json_timestamptz_handling_mode.1 deleted file mode 100644 index 4ff1440b47f29..0000000000000 --- a/e2e_test/source_legacy/basic/scripts/test_data/json_timestamptz_handling_mode.1 +++ /dev/null @@ -1,4 +0,0 @@ -{"case":"0 number small","payload":{"after":{"case":"0 number small","at":100},"op":"r"}} -{"case":"1 number recent","payload":{"after":{"case":"1 number recent","at":1712800800123456},"op":"r"}} -{"case":"2 string utc","payload":{"after":{"case":"2 string utc","at":"2024-04-11T02:00:00.654321Z"},"op":"r"}} -{"case":"3 string naive","payload":{"after":{"case":"3 string naive","at":"2024-04-11 02:00:00.234321"},"op":"r"}} diff --git a/e2e_test/source_legacy/basic/scripts/test_data/weiling.1 b/e2e_test/source_legacy/basic/scripts/test_data/weiling.1 deleted file mode 100644 index b0255bb720681..0000000000000 --- a/e2e_test/source_legacy/basic/scripts/test_data/weiling.1 +++ /dev/null @@ -1 +0,0 @@ -{"v1": 1664267577140960} \ No newline at end of file diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 6b9756733f51f..bd166f8082c4c 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -203,7 +203,7 @@ impl BackfillStage { vis } - /// Updates backfill states and returns whether the row from upstream `SourceExecutor` is visible. + /// Updates backfill states and returns whether the row backfilled from external system is visible. fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool { let state = self.states.get_mut(split_id).unwrap(); match state {