From d43372ee7047c6cd9a06b6a108652a60f91b352d Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 24 Oct 2024 16:24:06 +0800 Subject: [PATCH] . Signed-off-by: xxchan --- .../add_column_shared.slt} | 2 -- proto/stream_plan.proto | 5 +++-- src/frontend/src/handler/alter_source_column.rs | 3 ++- src/meta/src/barrier/command.rs | 6 +++--- src/meta/src/rpc/ddl_controller.rs | 1 + 5 files changed, 9 insertions(+), 8 deletions(-) rename e2e_test/source_inline/kafka/{shared_source_alter.slt => alter/add_column_shared.slt} (97%) diff --git a/e2e_test/source_inline/kafka/shared_source_alter.slt b/e2e_test/source_inline/kafka/alter/add_column_shared.slt similarity index 97% rename from e2e_test/source_inline/kafka/shared_source_alter.slt rename to e2e_test/source_inline/kafka/alter/add_column_shared.slt index eee79e3168c1e..f6e38a25a4595 100644 --- a/e2e_test/source_inline/kafka/shared_source_alter.slt +++ b/e2e_test/source_inline/kafka/alter/add_column_shared.slt @@ -136,5 +136,3 @@ statement ok drop source s cascade; # TODO: test alter source with schema registry - -# TODO: test alter source rename, change owner, etc. diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index fe3a2c93d11ab..dea89132a565a 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -116,7 +116,7 @@ message BarrierMutation { // Stop a set of actors, used for dropping materialized views. Empty dispatchers will be // automatically removed. StopMutation stop = 4; - // Update outputs and hash mappings for some dispatchers, used for scaling. + // Update outputs and hash mappings for some dispatchers, used for scaling and replace table. UpdateMutation update = 5; // Change the split of some sources. SourceChangeSplitMutation splits = 6; @@ -124,11 +124,12 @@ message BarrierMutation { PauseMutation pause = 7; // Resume the dataflow of the whole streaming graph, only used for scaling. ResumeMutation resume = 8; - // Throttle specific source exec or chain exec. + // Throttle specific source exec or backfill exec. ThrottleMutation throttle = 10; // Drop subscription on mv DropSubscriptionsMutation drop_subscriptions = 12; // Combined mutation. + // Currently, it can only be Add & Update, which is for sink into table. CombinedMutation combined = 100; } } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index 2354bfa822b2b..b4f5b52511da4 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -121,8 +121,9 @@ pub async fn handle_alter_source_column( catalog.version += 1; let catalog_writer = session.catalog_writer()?; + let replace_plan = todo!(); catalog_writer - .alter_source(catalog.to_prost(schema_id, db_id), todo!()) + .alter_source(catalog.to_prost(schema_id, db_id), replace_plan) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE)) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index f1f0bda1bd265..c455e6e2baa16 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -207,8 +207,8 @@ pub enum CreateStreamingJobType { } /// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands, -/// it will build different barriers to send, and may do different stuffs after the barrier is -/// collected. +/// it will [build different barriers to send](Self::to_mutation), +/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect). #[derive(Debug, Clone, strum::Display)] pub enum Command { /// `Plain` command generates a barrier with the mutation it carries. @@ -491,6 +491,7 @@ impl Command { Command::Pause(_) => { // Only pause when the cluster is not already paused. + // XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)?? if current_paused_reason.is_none() { Some(Mutation::Pause(PauseMutation {})) } else { @@ -595,7 +596,6 @@ impl Command { .. }) = job_type { - // TODO: support in v2. let update = Self::generate_update_mutation_for_replace_table( old_table_fragments, merge_updates, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 361b1400d9a9f..7ee378f2b7485 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1152,6 +1152,7 @@ impl DdlController { } } + /// `target_replace_info`: when dropping a sink into table, we need to replace the table. pub async fn drop_object( &self, object_type: ObjectType,