Skip to content

Commit

Permalink
fix: fix uncommitted subscription changes and support altering subscr…
Browse files Browse the repository at this point in the history
…iption in sql backend (#15828)
  • Loading branch information
yezizp2012 authored Mar 21, 2024
1 parent 7e4cf71 commit fb7fadb
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 468 deletions.
2 changes: 1 addition & 1 deletion e2e_test/ddl/alter_rename.slt
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ ALTER SUBSCRIPTION subscription RENAME TO subscription1;
query TT
SHOW CREATE SUBSCRIPTION subscription1;
----
public.subscription1 CREATE SUBSCRIPTION subscription1 FROM mv WITH (retention = '1D')
public.subscription1 CREATE SUBSCRIPTION subscription1 FROM mv2 WITH (retention = '1D')

# alter mview rename with alias conflict, used by sink1
statement ok
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,10 @@ impl CatalogController {
.ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
.await?;

let mut obj = obj.into_active_model();
obj.schema_id = Set(Some(new_schema));
let obj = obj.update(&txn).await?;
relations.push(PbRelationInfo::Subscription(
ObjectModel(subscription, obj).into(),
));
Expand Down
19 changes: 16 additions & 3 deletions src/meta/src/controller/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::{ExprNode, FunctionCall, UserDefinedFunction};
use risingwave_sqlparser::ast::{
Array, CreateSink, CreateSinkStatement, CreateSourceStatement, Distinct, Expr, Function,
FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem, SetExpr, Statement,
TableAlias, TableFactor, TableWithJoins,
Array, CreateSink, CreateSinkStatement, CreateSourceStatement, CreateSubscriptionStatement,
Distinct, Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem,
SetExpr, Statement, TableAlias, TableFactor, TableWithJoins,
};
use risingwave_sqlparser::parser::Parser;

Expand Down Expand Up @@ -49,6 +49,13 @@ pub fn alter_relation_rename(definition: &str, new_name: &str) -> String {
source_name: name, ..
},
}
| Statement::CreateSubscription {
stmt:
CreateSubscriptionStatement {
subscription_name: name,
..
},
}
| Statement::CreateSink {
stmt: CreateSinkStatement {
sink_name: name, ..
Expand Down Expand Up @@ -93,6 +100,12 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
into_table_name: None,
..
},
}| Statement::CreateSubscription {
stmt:
CreateSubscriptionStatement {
subscription_from: table_name,
..
},
} => replace_table_name(table_name, to),
Statement::CreateSink {
stmt: CreateSinkStatement {
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::{Relation, RelationGroup};
pub(crate) use {commit_meta, commit_meta_with_trx};

use crate::manager::catalog::utils::{
alter_relation_rename, alter_relation_rename_refs, refcnt_dec_connection,
refcnt_inc_connection, ReplaceTableExprRewriter,
use crate::controller::rename::{
alter_relation_rename, alter_relation_rename_refs, ReplaceTableExprRewriter,
};
use crate::manager::catalog::utils::{refcnt_dec_connection, refcnt_inc_connection};
use crate::rpc::ddl_controller::DropMode;
use crate::telemetry::MetaTelemetryJobDesc;

Expand Down Expand Up @@ -1902,7 +1902,7 @@ impl CatalogManager {
if let Some(source) = &to_update_source {
sources.insert(source.id, source.clone());
}
commit_meta!(self, tables, views, sinks, sources)?;
commit_meta!(self, tables, views, sinks, sources, subscriptions)?;

// 5. notify frontend.
assert!(
Expand Down
Loading

0 comments on commit fb7fadb

Please sign in to comment.