Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(planner): correctly handle hidden columns for SourceBackfill #19578

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ repos:
rev: v2.3.0
hooks:
- id: end-of-file-fixer
exclude: 'src/frontend/planner_test/tests/testdata/.*'
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.23.1
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu
apt-get -y install jq

echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \
risedev ci-start ci-inline-source-test
risedev slt './e2e_test/source_inline/**/*.slt' -j16
risedev slt './e2e_test/source_inline/**/*.slt.serial'
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/source_inline/kafka/issue_19563.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ explain create materialized view mv1 as select v1 from kafkasource where v1 betw
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (v1 <= $expr1), output: [v1, _row_id], cleaned_by_watermark: true }
├─StreamProject { exprs: [v1, _row_id], output_watermarks: [v1] }
│ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset], cleaned_by_watermark: true }
│ ├─StreamRowIdGen { row_id_index: 2 }
│ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
│ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], cleaned_by_watermark: true }
│ ├─StreamRowIdGen { row_id_index: 4 }
│ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow
└─StreamExchange { dist: Broadcast }
Expand Down
14 changes: 12 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,20 @@ pub fn build_additional_column_desc(
pub fn source_add_partition_offset_cols(
columns: &[ColumnCatalog],
connector_name: &str,
skip_col_id: bool,
) -> ([bool; 2], [ColumnDesc; 2]) {
let mut columns_exist = [false; 2];

let mut last_column_id = max_column_id(columns);
let mut assign_col_id = || {
if skip_col_id {
// col id will be filled outside later. Here just use a placeholder.
ColumnId::placeholder()
} else {
last_column_id = last_column_id.next();
last_column_id
}
};

let additional_columns: Vec<_> = {
let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS
Expand All @@ -292,11 +303,10 @@ pub fn source_add_partition_offset_cols(
["partition", "file", "offset"]
.iter()
.filter_map(|col_type| {
last_column_id = last_column_id.next();
if compat_col_types.contains(col_type) {
Some(
build_additional_column_desc(
last_column_id,
assign_col_id(),
connector_name,
col_type,
None,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/reader/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl SourceDescBuilder {
.map(|s| s.to_lowercase())
.unwrap();
let (columns_exist, additional_columns) =
source_add_partition_offset_cols(&self.columns, &connector_name);
source_add_partition_offset_cols(&self.columns, &connector_name, false);

let mut columns: Vec<_> = self
.columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
select * from s
logical_plan: |-
LogicalProject { exprs: [id, value] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, value] }
└─BatchKafkaScan { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) }
create_source:
format: plain
encode: protobuf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [x, y] }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
└─StreamRowIdGen { row_id_index: 5 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
with_config_map:
streaming_use_shared_source: 'true'
- before:
Expand All @@ -84,11 +84,11 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [x, y] }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) }
└─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [x, y, _row_id] }
└─StreamRowIdGen { row_id_index: 3 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] }
└─StreamRowIdGen { row_id_index: 5 }
└─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] }
with_config_map:
streaming_use_shared_source: 'true'
113 changes: 85 additions & 28 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ use risingwave_common::catalog::{
use risingwave_common::license::Feature;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::parser::additional_columns::{
build_additional_column_desc, get_supported_additional_columns,
source_add_partition_offset_cols,
};
use risingwave_connector::parser::{
fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig,
Expand Down Expand Up @@ -1493,6 +1495,7 @@ pub async fn bind_create_source_or_table_with_connector(
col_id_gen: &mut ColumnIdGenerator,
// `true` for "create source", `false` for "create table with connector"
is_create_source: bool,
is_shared_non_cdc: bool,
source_rate_limit: Option<u32>,
) -> Result<(SourceCatalog, DatabaseId, SchemaId)> {
let session = &handler_args.session;
Expand Down Expand Up @@ -1554,6 +1557,21 @@ pub async fn bind_create_source_or_table_with_connector(
if is_create_source {
// must behind `handle_addition_columns`
check_and_add_timestamp_column(&with_properties, &mut columns);

// For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor.
// For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS
if is_shared_non_cdc {
let (columns_exist, additional_columns) = source_add_partition_offset_cols(
&columns,
&with_properties.get_connector().unwrap(),
true, // col_id filled below at col_id_gen.generate
);
for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) {
if !existed {
columns.push(ColumnCatalog::hidden(c));
}
}
}
Comment on lines +1561 to +1574
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that previously due to the logic in the planner, shared CDC source will also emit columns _rw_mysql-cdc_offset, _rw_mysql-cdc_partition (besides _rw_table_name and _rw_offset).

.. Or maybe somehow not included for CDC source. Since at the beginning of this PR, (i.e., using is_shared, instead of is_shared_non_cdc here), CDC will actually fail with index out of range at Java side..

I'm not 100% sure how it works. But it seems current stage is working.

}

// resolve privatelink connection for Kafka
Expand Down Expand Up @@ -1670,14 +1688,14 @@ pub async fn handle_create_source(
let with_properties = bind_connector_props(&handler_args, &format_encode, true)?;

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source());
let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector()
&& session
.env()
.streaming_config()
.developer
.enable_shared_source
&& session.config().streaming_use_shared_source();
let is_shared = create_cdc_source_job || is_shared_non_cdc;

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &format_encode)?
Expand Down Expand Up @@ -1705,6 +1723,7 @@ pub async fn handle_create_source(
stmt.include_column_options,
&mut col_id_gen,
true,
is_shared_non_cdc,
overwrite_options.source_rate_limit,
)
.await?;
Expand Down Expand Up @@ -1777,8 +1796,7 @@ pub mod tests {
use std::sync::Arc;

use risingwave_common::catalog::{
CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME,
ROWID_PREFIX, TABLE_NAME_COLUMN_NAME,
CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROWID_PREFIX,
};
use risingwave_common::types::{DataType, StructType};

Expand Down Expand Up @@ -1932,15 +1950,29 @@ pub mod tests {
.columns
.iter()
.map(|col| (col.name(), col.data_type().clone()))
.collect::<HashMap<&str, DataType>>();
.collect::<Vec<(&str, DataType)>>();

let expected_columns = maplit::hashmap! {
ROWID_PREFIX => DataType::Serial,
"payload" => DataType::Jsonb,
OFFSET_COLUMN_NAME => DataType::Varchar,
TABLE_NAME_COLUMN_NAME => DataType::Varchar,
};
assert_eq!(columns, expected_columns);
expect_test::expect![[r#"
[
(
"payload",
Jsonb,
),
(
"_rw_offset",
Varchar,
),
(
"_rw_table_name",
Varchar,
),
(
"_row_id",
Serial,
),
]
"#]]
.assert_debug_eq(&columns);
}

#[tokio::test]
Expand Down Expand Up @@ -1969,16 +2001,41 @@ pub mod tests {
.unwrap();
assert_eq!(source.name, "s");

let columns = GET_COLUMN_FROM_CATALOG(source);
let expect_columns = maplit::hashmap! {
ROWID_PREFIX => DataType::Serial,
"v1" => DataType::Int32,
"_rw_kafka_key" => DataType::Bytea,
// todo: kafka connector will automatically derive the column
// will change to a required field in the include clause
"_rw_kafka_timestamp" => DataType::Timestamptz,
};
assert_eq!(columns, expect_columns);
let columns = source
.columns
.iter()
.map(|col| (col.name(), col.data_type().clone()))
.collect::<Vec<(&str, DataType)>>();

expect_test::expect![[r#"
[
(
"v1",
Int32,
),
(
"_rw_kafka_key",
Bytea,
),
(
"_rw_kafka_timestamp",
Timestamptz,
),
(
"_rw_kafka_partition",
Varchar,
),
(
"_rw_kafka_offset",
Varchar,
),
(
"_row_id",
Serial,
),
]
"#]]
.assert_debug_eq(&columns);

let sql =
"CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json"
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
include_column_options,
&mut col_id_gen,
false,
false,
rate_limit,
)
.await?;
Expand Down
Loading