Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Oct 28, 2024
1 parent 6afde2b commit d43372e
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,3 @@ statement ok
drop source s cascade;

# TODO: test alter source with schema registry

# TODO: test alter source rename, change owner, etc.
5 changes: 3 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,20 @@ message BarrierMutation {
// Stop a set of actors, used for dropping materialized views. Empty dispatchers will be
// automatically removed.
StopMutation stop = 4;
// Update outputs and hash mappings for some dispatchers, used for scaling.
// Update outputs and hash mappings for some dispatchers, used for scaling and replace table.
UpdateMutation update = 5;
// Change the split of some sources.
SourceChangeSplitMutation splits = 6;
// Pause the dataflow of the whole streaming graph, only used for scaling.
PauseMutation pause = 7;
// Resume the dataflow of the whole streaming graph, only used for scaling.
ResumeMutation resume = 8;
// Throttle specific source exec or chain exec.
// Throttle specific source exec or backfill exec.
ThrottleMutation throttle = 10;
// Drop subscription on mv
DropSubscriptionsMutation drop_subscriptions = 12;
// Combined mutation.
// Currently, it can only be Add & Update, which is for sink into table.
CombinedMutation combined = 100;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ pub async fn handle_alter_source_column(
catalog.version += 1;

let catalog_writer = session.catalog_writer()?;
let replace_plan = todo!();
catalog_writer
.alter_source(catalog.to_prost(schema_id, db_id), todo!())
.alter_source(catalog.to_prost(schema_id, db_id), replace_plan)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ pub enum CreateStreamingJobType {
}

/// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
/// it will [build different barriers to send](Self::to_mutation),
/// and may [do different stuffs after the barrier is collected](CommandContext::post_collect).
#[derive(Debug, Clone, strum::Display)]
pub enum Command {
/// `Plain` command generates a barrier with the mutation it carries.
Expand Down Expand Up @@ -491,6 +491,7 @@ impl Command {

Command::Pause(_) => {
// Only pause when the cluster is not already paused.
// XXX: what if pause(r1) - pause(r2) - resume(r1) - resume(r2)??
if current_paused_reason.is_none() {
Some(Mutation::Pause(PauseMutation {}))
} else {
Expand Down Expand Up @@ -595,7 +596,6 @@ impl Command {
..
}) = job_type
{
// TODO: support in v2.
let update = Self::generate_update_mutation_for_replace_table(
old_table_fragments,
merge_updates,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ impl DdlController {
}
}

/// `target_replace_info`: when dropping a sink into table, we need to replace the table.
pub async fn drop_object(
&self,
object_type: ObjectType,
Expand Down

0 comments on commit d43372e

Please sign in to comment.