Skip to content

Commit

Permalink
feat: support alter shared source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 21, 2024
1 parent 06bd105 commit 975cc21
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 18 deletions.
140 changes: 140 additions & 0 deletions e2e_test/source_inline/kafka/shared_source_alter.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
control substitution on

system ok
rpk topic create shared_source_alter -p 4

system ok
cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a", "v3": "a1"}
1 {"v1": 2, "v2": "b", "v3": "b1"}
2 {"v1": 3, "v2": "c", "v3": "c1"}
3 {"v1": 4, "v2": "d", "v3": "d1"}
EOF

statement ok
create source s (v1 int, v2 varchar, v3 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source_alter',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;


statement ok
create materialized view mv_before_alter as select * from s;

sleep 2s

query ?? rowsort
select * from s;
----
1 a
2 b
3 c
4 d

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d


statement ok
alter source s add column v3 varchar;

# New MV will have v3.

statement ok
create materialized view mv_after_alter as select * from s;

query ??? rowsort
select * from mv_after_alter;
----
1 a a1
2 b b1
3 c c1
4 d d1

# Batch select from source will have v3.

query ??? rowsort
select * from s;
----
1 a a1
2 b b1
3 c c1
4 d d1

# Old MV is not affected.

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d

# Produce new data.

system ok
cat << EOF | rpk topic produce shared_source_alter -f "%p %v\n" -p 0
0 {"v1": 5, "v2": "e", "v3": "e1"}
1 {"v1": 6, "v2": "f", "v3": "f1"}
2 {"v1": 7, "v2": "g", "v3": "g1"}
3 {"v1": 8, "v2": "h", "v3": "h1"}
EOF

sleep 2s


query ??? rowsort
select * from mv_after_alter;
----
1 a a1
2 b b1
3 c c1
4 d d1
5 e e1
6 f f1
7 g g1
8 h h1


# Batch select from source will have v3.

query ??? rowsort
select * from s;
----
1 a a1
2 b b1
3 c c1
4 d d1
5 e e1
6 f f1
7 g g1
8 h h1

# Old MV is not affected.

query ?? rowsort
select * from mv_before_alter;
----
1 a
2 b
3 c
4 d
5 e
6 f
7 g
8 h


statement ok
drop source s cascade;

# TODO: test alter source with schema registry

# TODO: test alter source rename, change owner, etc.
14 changes: 14 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ message DropSourceResponse {

message AlterSourceRequest {
catalog.Source source = 1;
// for shared source, we need to replace the streaming job
optional ReplaceStreamingJobPlan plan = 2;
}

message AlterSourceResponse {
Expand Down Expand Up @@ -343,6 +345,18 @@ message ReplaceTablePlan {
TableJobType job_type = 5;
}

// Replace a streaming job, but not a table. e.g., alter a shared source.
message ReplaceStreamingJobPlan {
// The new materialization plan, where all schema are updated.
stream_plan.StreamFragmentGraph fragment_graph = 2;
// The mapping from the old columns to the new columns of the table.
// If no column modifications occur (such as for sinking into table), this will be None.
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
TableJobType job_type = 5;
}

message ReplaceTablePlanRequest {
ReplaceTablePlan plan = 1;
}
Expand Down
20 changes: 16 additions & 4 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::ddl_service::{
alter_name_request, alter_owner_request, alter_set_schema_request, create_connection_request,
PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType, WaitVersion,
PbReplaceStreamingJobPlan, PbReplaceTablePlan, PbTableJobType, ReplaceTablePlan, TableJobType,
WaitVersion,
};
use risingwave_pb::meta::PbTableParallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
Expand Down Expand Up @@ -183,7 +184,11 @@ pub trait CatalogWriter: Send + Sync {
async fn alter_owner(&self, object: alter_owner_request::Object, owner_id: u32) -> Result<()>;

/// Replace the source in the catalog.
async fn alter_source(&self, source: PbSource) -> Result<()>;
async fn alter_source(
&self,
source: PbSource,
replace_streaming_job_plan: Option<PbReplaceStreamingJobPlan>,
) -> Result<()>;

async fn alter_parallelism(
&self,
Expand Down Expand Up @@ -480,8 +485,15 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn alter_source(&self, source: PbSource) -> Result<()> {
let version = self.meta_client.alter_source(source).await?;
async fn alter_source(
&self,
source: PbSource,
replace_streaming_job_plan: Option<PbReplaceStreamingJobPlan>,
) -> Result<()> {
let version = self
.meta_client
.alter_source(source, replace_streaming_job_plan)
.await?;
self.wait_version(version).await
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub async fn handle_alter_source_column(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_source(catalog.to_prost(schema_id, db_id))
.alter_source(catalog.to_prost(schema_id, db_id), todo!())
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ pub async fn handle_alter_source_with_sr(
pb_source.version += 1;

let catalog_writer = session.catalog_writer()?;
catalog_writer.alter_source(pb_source).await?;
catalog_writer.alter_source(pb_source, todo!()).await?;

Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
}
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_name_request, alter_set_schema_request, create_connection_request, DdlProgress,
PbTableJobType, ReplaceTablePlan, TableJobType,
PbReplaceStreamingJobPlan, PbTableJobType, ReplaceTablePlan, TableJobType,
};
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
Expand Down Expand Up @@ -582,7 +582,11 @@ impl CatalogWriter for MockCatalogWriter {
}
}

async fn alter_source(&self, source: PbSource) -> Result<()> {
async fn alter_source(
&self,
source: PbSource,
_replace_streaming_job_plan: Option<PbReplaceStreamingJobPlan>,
) -> Result<()> {
self.catalog.write().update_source(&source);
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,10 +682,10 @@ impl DdlService for DdlServiceImpl {
&self,
request: Request<AlterSourceRequest>,
) -> Result<Response<AlterSourceResponse>, Status> {
let AlterSourceRequest { source } = request.into_inner();
let AlterSourceRequest { source, plan } = request.into_inner();
let version = self
.ddl_controller
.run_command(DdlCommand::AlterSourceColumn(source.unwrap()))
.run_command(DdlCommand::AlterSource(source.unwrap(), plan))
.await?;
Ok(Response::new(AlterSourceResponse {
status: None,
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ impl CatalogController {
Ok(table_id_map)
}

/// Insert
pub async fn prepare_streaming_job(
&self,
table_fragment: PbTableFragments,
Expand Down Expand Up @@ -436,7 +437,7 @@ impl CatalogController {
}

if !for_replace {
// // Update dml fragment id.
// Update dml fragment id.
if let StreamingJob::Table(_, table, ..) = streaming_job {
Table::update(table::ActiveModel {
table_id: Set(table.id as _),
Expand Down
24 changes: 18 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::{
alter_name_request, alter_set_schema_request, DdlProgress, TableJobType, WaitVersion,
alter_name_request, alter_set_schema_request, DdlProgress, PbReplaceStreamingJobPlan,
TableJobType, WaitVersion,
};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::PbFragment;
Expand Down Expand Up @@ -149,7 +150,7 @@ pub enum DdlCommand {
DropStreamingJob(StreamingJobId, DropMode, Option<ReplaceTableInfo>),
AlterName(alter_name_request::Object, String),
ReplaceTable(ReplaceTableInfo),
AlterSourceColumn(Source),
AlterSource(Source, Option<PbReplaceStreamingJobPlan>),
AlterObjectOwner(Object, UserId),
AlterSetSchema(alter_set_schema_request::Object, SchemaId),
CreateConnection(Connection),
Expand Down Expand Up @@ -341,7 +342,7 @@ impl DdlController {
}
DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
DdlCommand::AlterSourceColumn(source) => ctrl.alter_source(source).await,
DdlCommand::AlterSource(source, plan) => ctrl.alter_source(source, plan).await,
DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
DdlCommand::CreateSubscription(subscription) => {
ctrl.create_subscription(subscription).await
Expand Down Expand Up @@ -458,11 +459,22 @@ impl DdlController {

/// This replaces the source in the catalog.
/// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
async fn alter_source(&self, source: Source) -> MetaResult<NotificationVersion> {
self.metadata_manager
async fn alter_source(
&self,
source: Source,
plan: Option<PbReplaceStreamingJobPlan>,
) -> MetaResult<NotificationVersion> {
let version = self
.metadata_manager
.catalog_controller
.alter_source(source)
.await
.await?;
if let Some(plan) = plan {
// finish_replace_streaming_job
} else {
// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
}
Ok(version)
}

async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
Expand Down
7 changes: 6 additions & 1 deletion src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,14 @@ impl MetaClient {
.ok_or_else(|| anyhow!("wait version not set"))?)
}

pub async fn alter_source(&self, source: PbSource) -> Result<WaitVersion> {
pub async fn alter_source(
&self,
source: PbSource,
plan: Option<PbReplaceStreamingJobPlan>,
) -> Result<WaitVersion> {
let request = AlterSourceRequest {
source: Some(source),
plan,
};
let resp = self.inner.alter_source(request).await?;
Ok(resp
Expand Down

0 comments on commit 975cc21

Please sign in to comment.