Skip to content

Commit

Permalink
feat: add is_shared column to rw_sources (#19027)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Oct 21, 2024
1 parent 1ca2ea7 commit 23bdf31
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 1 deletion.
5 changes: 5 additions & 0 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ create source s_before_produce (v1 int, v2 varchar) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

query T
select connector, is_shared from rw_sources where name = 's_before_produce';
----
KAFKA t

statement ok
create materialized view mv_before_produce as select * from s_before_produce;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@
│ │ │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
│ │ │ │ │ │ │ └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
│ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version, rw_sources.is_shared] }
│ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] }
│ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.key_columns, rw_indexes.include_columns, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] }
│ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct RwSource {
created_at: Option<Timestamptz>,
initialized_at_cluster_version: Option<String>,
created_at_cluster_version: Option<String>,
is_shared: bool,
}

#[system_catalog(table, "rw_catalog.rw_sources")]
Expand Down Expand Up @@ -83,6 +84,7 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>>
created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
created_at_cluster_version: source.created_at_cluster_version.clone(),
is_shared: source.info.is_shared(),
})
})
.collect())
Expand Down

0 comments on commit 23bdf31

Please sign in to comment.