From fb7fadb906890dd8268198a1a814797054f243de Mon Sep 17 00:00:00 2001 From: August Date: Thu, 21 Mar 2024 11:48:20 +0800 Subject: [PATCH] fix: fix uncommitted subscription changes and support altering subscription in sql backend (#15828) --- e2e_test/ddl/alter_rename.slt | 2 +- src/meta/src/controller/catalog.rs | 4 + src/meta/src/controller/rename.rs | 19 +- src/meta/src/manager/catalog/mod.rs | 8 +- src/meta/src/manager/catalog/utils.rs | 460 -------------------------- 5 files changed, 25 insertions(+), 468 deletions(-) diff --git a/e2e_test/ddl/alter_rename.slt b/e2e_test/ddl/alter_rename.slt index 03655ef99d20b..b614d03e6929d 100644 --- a/e2e_test/ddl/alter_rename.slt +++ b/e2e_test/ddl/alter_rename.slt @@ -124,7 +124,7 @@ ALTER SUBSCRIPTION subscription RENAME TO subscription1; query TT SHOW CREATE SUBSCRIPTION subscription1; ---- -public.subscription1 CREATE SUBSCRIPTION subscription1 FROM mv WITH (retention = '1D') +public.subscription1 CREATE SUBSCRIPTION subscription1 FROM mv2 WITH (retention = '1D') # alter mview rename with alias conflict, used by sink1 statement ok diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 95cf53b3e4fe1..7429b6098613f 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -1688,6 +1688,10 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?; check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn) .await?; + + let mut obj = obj.into_active_model(); + obj.schema_id = Set(Some(new_schema)); + let obj = obj.update(&txn).await?; relations.push(PbRelationInfo::Subscription( ObjectModel(subscription, obj).into(), )); diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs index 15be4d7ef83b8..da907cc4edf7d 100644 --- a/src/meta/src/controller/rename.rs +++ b/src/meta/src/controller/rename.rs @@ -17,9 +17,9 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::expr::expr_node::RexNode; use risingwave_pb::expr::{ExprNode, FunctionCall, UserDefinedFunction}; use risingwave_sqlparser::ast::{ - Array, CreateSink, CreateSinkStatement, CreateSourceStatement, Distinct, Expr, Function, - FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem, SetExpr, Statement, - TableAlias, TableFactor, TableWithJoins, + Array, CreateSink, CreateSinkStatement, CreateSourceStatement, CreateSubscriptionStatement, + Distinct, Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem, + SetExpr, Statement, TableAlias, TableFactor, TableWithJoins, }; use risingwave_sqlparser::parser::Parser; @@ -49,6 +49,13 @@ pub fn alter_relation_rename(definition: &str, new_name: &str) -> String { source_name: name, .. }, } + | Statement::CreateSubscription { + stmt: + CreateSubscriptionStatement { + subscription_name: name, + .. + }, + } | Statement::CreateSink { stmt: CreateSinkStatement { sink_name: name, .. @@ -93,6 +100,12 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str into_table_name: None, .. }, + }| Statement::CreateSubscription { + stmt: + CreateSubscriptionStatement { + subscription_from: table_name, + .. + }, } => replace_table_name(table_name, to), Statement::CreateSink { stmt: CreateSinkStatement { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3dd49ff07aa4e..63c959d63a3b1 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -126,10 +126,10 @@ use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::{Relation, RelationGroup}; pub(crate) use {commit_meta, commit_meta_with_trx}; -use crate::manager::catalog::utils::{ - alter_relation_rename, alter_relation_rename_refs, refcnt_dec_connection, - refcnt_inc_connection, ReplaceTableExprRewriter, +use crate::controller::rename::{ + alter_relation_rename, alter_relation_rename_refs, ReplaceTableExprRewriter, }; +use crate::manager::catalog::utils::{refcnt_dec_connection, refcnt_inc_connection}; use crate::rpc::ddl_controller::DropMode; use crate::telemetry::MetaTelemetryJobDesc; @@ -1902,7 +1902,7 @@ impl CatalogManager { if let Some(source) = &to_update_source { sources.insert(source.id, source.clone()); } - commit_meta!(self, tables, views, sinks, sources)?; + commit_meta!(self, tables, views, sinks, sources, subscriptions)?; // 5. notify frontend. assert!( diff --git a/src/meta/src/manager/catalog/utils.rs b/src/meta/src/manager/catalog/utils.rs index b4f08f46e0091..6c73e6eeebb39 100644 --- a/src/meta/src/manager/catalog/utils.rs +++ b/src/meta/src/manager/catalog/utils.rs @@ -12,17 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::util::column_index_mapping::ColIndexMapping; -use risingwave_pb::expr::expr_node::RexNode; -use risingwave_pb::expr::{ExprNode, FunctionCall, UserDefinedFunction}; -use risingwave_sqlparser::ast::{ - Array, CreateSink, CreateSinkStatement, CreateSourceStatement, CreateSubscriptionStatement, - Distinct, Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Query, SelectItem, - SetExpr, Statement, TableAlias, TableFactor, TableWithJoins, -}; -use risingwave_sqlparser::parser::Parser; use crate::manager::{ConnectionId, DatabaseManager}; @@ -50,453 +40,3 @@ pub fn refcnt_dec_connection( database_mgr.decrease_ref_count(connection_id); } } - -/// `alter_relation_rename` renames a relation to a new name in its `Create` statement, and returns -/// the updated definition raw sql. Note that the `definition` must be a `Create` statement and the -/// `new_name` must be a valid identifier, it should be validated before calling this function. To -/// update all relations that depend on the renamed one, use `alter_relation_rename_refs`. -pub fn alter_relation_rename(definition: &str, new_name: &str) -> String { - // This happens when we try to rename a table that's created by `CREATE TABLE AS`. Remove it - // when we support `SHOW CREATE TABLE` for `CREATE TABLE AS`. - if definition.is_empty() { - tracing::warn!("found empty definition when renaming relation, ignored."); - return definition.into(); - } - let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); - let mut stmt = ast - .into_iter() - .exactly_one() - .expect("should contains only one statement"); - - match &mut stmt { - Statement::CreateTable { name, .. } - | Statement::CreateView { name, .. } - | Statement::CreateIndex { name, .. } - | Statement::CreateSource { - stmt: CreateSourceStatement { - source_name: name, .. - }, - } - | Statement::CreateSubscription { - stmt: - CreateSubscriptionStatement { - subscription_name: name, - .. - }, - } - | Statement::CreateSink { - stmt: CreateSinkStatement { - sink_name: name, .. - }, - } => replace_table_name(name, new_name), - _ => unreachable!(), - }; - - stmt.to_string() -} - -/// `alter_relation_rename_refs` updates all references of renamed-relation in the definition of -/// target relation's `Create` statement. -pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> String { - let ast = Parser::parse_sql(definition).expect("failed to parse relation definition"); - let mut stmt = ast - .into_iter() - .exactly_one() - .expect("should contains only one statement"); - - match &mut stmt { - Statement::CreateTable { - query: Some(query), .. - } - | Statement::CreateView { query, .. } - | Statement::Query(query) // Used by view, actually we store a query as the definition of view. - | Statement::CreateSink { - stmt: - CreateSinkStatement { - sink_from: CreateSink::AsQuery(query), - into_table_name: None, - .. - }, - } => { - QueryRewriter::rewrite_query(query, from, to); - } - Statement::CreateIndex { table_name, .. } - | Statement::CreateSink { - stmt: - CreateSinkStatement { - sink_from: CreateSink::From(table_name), - into_table_name: None, - .. - }, - }| Statement::CreateSubscription { - stmt: - CreateSubscriptionStatement { - subscription_from: table_name, - .. - }, - } => replace_table_name(table_name, to), - Statement::CreateSink { - stmt: CreateSinkStatement { - sink_from, - into_table_name: Some(table_name), - .. - } - } => { - let idx = table_name.0.len() - 1; - if table_name.0[idx].real_value() == from { - table_name.0[idx] = Ident::new_unchecked(to); - } else { - match sink_from { - CreateSink::From(table_name) => replace_table_name(table_name, to), - CreateSink::AsQuery(query) => QueryRewriter::rewrite_query(query, from, to), - } - } - } - _ => unreachable!(), - }; - stmt.to_string() -} - -/// Replace the last ident in the `table_name` with the given name, the object name is ensured to be -/// non-empty. e.g. `schema.table` or `database.schema.table`. -fn replace_table_name(table_name: &mut ObjectName, to: &str) { - let idx = table_name.0.len() - 1; - table_name.0[idx] = Ident::new_unchecked(to); -} - -/// `QueryRewriter` is a visitor that updates all references of relation named `from` to `to` in the -/// given query, which is the part of create statement of `relation`. -struct QueryRewriter<'a> { - from: &'a str, - to: &'a str, -} - -impl QueryRewriter<'_> { - fn rewrite_query(query: &mut Query, from: &str, to: &str) { - let rewriter = QueryRewriter { from, to }; - rewriter.visit_query(query) - } - - /// Visit the query and update all references of relation named `from` to `to`. - fn visit_query(&self, query: &mut Query) { - if let Some(with) = &mut query.with { - for cte_table in &mut with.cte_tables { - self.visit_query(&mut cte_table.query); - } - } - self.visit_set_expr(&mut query.body); - for expr in &mut query.order_by { - self.visit_expr(&mut expr.expr); - } - } - - /// Visit table factor and update all references of relation named `from` to `to`. - /// Rewrite idents(i.e. `schema.table`, `table`) that contains the old name in the - /// following pattern: - /// 1. `FROM a` to `FROM new_a AS a` - /// 2. `FROM a AS b` to `FROM new_a AS b` - /// - /// So that we DON'T have to: - /// 1. rewrite the select and expr part like `schema.table.column`, `table.column`, - /// `alias.column` etc. - /// 2. handle the case that the old name is used as alias. - /// 3. handle the case that the new name is used as alias. - fn visit_table_factor(&self, table_factor: &mut TableFactor) { - match table_factor { - TableFactor::Table { name, alias, .. } => { - let idx = name.0.len() - 1; - if name.0[idx].real_value() == self.from { - if alias.is_none() { - *alias = Some(TableAlias { - name: Ident::new_unchecked(self.from), - columns: vec![], - }); - } - name.0[idx] = Ident::new_unchecked(self.to); - } - } - TableFactor::Derived { subquery, .. } => self.visit_query(subquery), - TableFactor::TableFunction { args, .. } => { - for arg in args { - self.visit_function_args(arg); - } - } - TableFactor::NestedJoin(table_with_joins) => { - self.visit_table_with_joins(table_with_joins); - } - } - } - - /// Visit table with joins and update all references of relation named `from` to `to`. - fn visit_table_with_joins(&self, table_with_joins: &mut TableWithJoins) { - self.visit_table_factor(&mut table_with_joins.relation); - for join in &mut table_with_joins.joins { - self.visit_table_factor(&mut join.relation); - } - } - - /// Visit query body expression and update all references. - fn visit_set_expr(&self, set_expr: &mut SetExpr) { - match set_expr { - SetExpr::Select(select) => { - if let Distinct::DistinctOn(exprs) = &mut select.distinct { - for expr in exprs { - self.visit_expr(expr); - } - } - for select_item in &mut select.projection { - self.visit_select_item(select_item); - } - for from_item in &mut select.from { - self.visit_table_with_joins(from_item); - } - if let Some(where_clause) = &mut select.selection { - self.visit_expr(where_clause); - } - for expr in &mut select.group_by { - self.visit_expr(expr); - } - if let Some(having) = &mut select.having { - self.visit_expr(having); - } - } - SetExpr::Query(query) => self.visit_query(query), - SetExpr::SetOperation { left, right, .. } => { - self.visit_set_expr(left); - self.visit_set_expr(right); - } - SetExpr::Values(_) => {} - } - } - - /// Visit function arguments and update all references. - fn visit_function_args(&self, function_args: &mut FunctionArg) { - match function_args { - FunctionArg::Unnamed(arg) | FunctionArg::Named { arg, .. } => match arg { - FunctionArgExpr::Expr(expr) | FunctionArgExpr::ExprQualifiedWildcard(expr, _) => { - self.visit_expr(expr) - } - FunctionArgExpr::QualifiedWildcard(_, None) | FunctionArgExpr::Wildcard(None) => {} - FunctionArgExpr::QualifiedWildcard(_, Some(exprs)) - | FunctionArgExpr::Wildcard(Some(exprs)) => { - for expr in exprs { - self.visit_expr(expr); - } - } - }, - } - } - - /// Visit function and update all references. - fn visit_function(&self, function: &mut Function) { - for arg in &mut function.args { - self.visit_function_args(arg); - } - } - - /// Visit expression and update all references. - fn visit_expr(&self, expr: &mut Expr) { - match expr { - Expr::FieldIdentifier(expr, ..) - | Expr::IsNull(expr) - | Expr::IsNotNull(expr) - | Expr::IsTrue(expr) - | Expr::IsNotTrue(expr) - | Expr::IsFalse(expr) - | Expr::IsNotFalse(expr) - | Expr::IsUnknown(expr) - | Expr::IsNotUnknown(expr) - | Expr::IsJson { expr, .. } - | Expr::InList { expr, .. } - | Expr::SomeOp(expr) - | Expr::AllOp(expr) - | Expr::UnaryOp { expr, .. } - | Expr::Cast { expr, .. } - | Expr::TryCast { expr, .. } - | Expr::AtTimeZone { - timestamp: expr, .. - } - | Expr::Extract { expr, .. } - | Expr::Substring { expr, .. } - | Expr::Overlay { expr, .. } - | Expr::Trim { expr, .. } - | Expr::Nested(expr) - | Expr::ArrayIndex { obj: expr, .. } - | Expr::ArrayRangeIndex { obj: expr, .. } => self.visit_expr(expr), - - Expr::Position { substring, string } => { - self.visit_expr(substring); - self.visit_expr(string); - } - - Expr::InSubquery { expr, subquery, .. } => { - self.visit_expr(expr); - self.visit_query(subquery); - } - Expr::Between { - expr, low, high, .. - } => { - self.visit_expr(expr); - self.visit_expr(low); - self.visit_expr(high); - } - Expr::SimilarTo { - expr, - pat, - esc_text, - .. - } => { - self.visit_expr(expr); - self.visit_expr(pat); - if let Some(e) = esc_text { - self.visit_expr(e); - } - } - - Expr::IsDistinctFrom(expr1, expr2) - | Expr::IsNotDistinctFrom(expr1, expr2) - | Expr::BinaryOp { - left: expr1, - right: expr2, - .. - } => { - self.visit_expr(expr1); - self.visit_expr(expr2); - } - Expr::Function(function) => self.visit_function(function), - Expr::Exists(query) | Expr::Subquery(query) | Expr::ArraySubquery(query) => { - self.visit_query(query) - } - - Expr::GroupingSets(exprs_vec) | Expr::Cube(exprs_vec) | Expr::Rollup(exprs_vec) => { - for exprs in exprs_vec { - for expr in exprs { - self.visit_expr(expr); - } - } - } - - Expr::Row(exprs) | Expr::Array(Array { elem: exprs, .. }) => { - for expr in exprs { - self.visit_expr(expr); - } - } - - Expr::LambdaFunction { body, args: _ } => self.visit_expr(body), - - // No need to visit. - Expr::Identifier(_) - | Expr::CompoundIdentifier(_) - | Expr::Collate { .. } - | Expr::Value(_) - | Expr::Parameter { .. } - | Expr::TypedString { .. } - | Expr::Case { .. } => {} - } - } - - /// Visit select item and update all references. - fn visit_select_item(&self, select_item: &mut SelectItem) { - match select_item { - SelectItem::UnnamedExpr(expr) - | SelectItem::ExprQualifiedWildcard(expr, _) - | SelectItem::ExprWithAlias { expr, .. } => self.visit_expr(expr), - SelectItem::QualifiedWildcard(_, None) | SelectItem::Wildcard(None) => {} - SelectItem::QualifiedWildcard(_, Some(exprs)) | SelectItem::Wildcard(Some(exprs)) => { - for expr in exprs { - self.visit_expr(expr); - } - } - } - } -} - -pub struct ReplaceTableExprRewriter { - pub table_col_index_mapping: ColIndexMapping, -} - -impl ReplaceTableExprRewriter { - pub fn rewrite_expr(&self, expr: &mut ExprNode) { - let rex_node = expr.rex_node.as_mut().unwrap(); - match rex_node { - RexNode::InputRef(input_col_idx) => { - *input_col_idx = self.table_col_index_mapping.map(*input_col_idx as usize) as u32 - } - RexNode::Constant(_) => {} - RexNode::Udf(udf) => self.rewrite_udf(udf), - RexNode::FuncCall(function_call) => self.rewrite_function_call(function_call), - RexNode::Now(_) => {} - } - } - - fn rewrite_udf(&self, udf: &mut UserDefinedFunction) { - udf.children - .iter_mut() - .for_each(|expr| self.rewrite_expr(expr)); - } - - fn rewrite_function_call(&self, function_call: &mut FunctionCall) { - function_call - .children - .iter_mut() - .for_each(|expr| self.rewrite_expr(expr)); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_alter_table_rename() { - let definition = "CREATE TABLE foo (a int, b int)"; - let new_name = "bar"; - let expected = "CREATE TABLE bar (a INT, b INT)"; - let actual = alter_relation_rename(definition, new_name); - assert_eq!(expected, actual); - } - - #[test] - fn test_rename_index_refs() { - let definition = "CREATE INDEX idx1 ON foo(v1 DESC, v2)"; - let from = "foo"; - let to = "bar"; - let expected = "CREATE INDEX idx1 ON bar(v1 DESC, v2)"; - let actual = alter_relation_rename_refs(definition, from, to); - assert_eq!(expected, actual); - } - - #[test] - fn test_rename_sink_refs() { - let definition = - "CREATE SINK sink_t FROM foo WITH (connector = 'kafka', format = 'append_only')"; - let from = "foo"; - let to = "bar"; - let expected = - "CREATE SINK sink_t FROM bar WITH (connector = 'kafka', format = 'append_only')"; - let actual = alter_relation_rename_refs(definition, from, to); - assert_eq!(expected, actual); - } - - #[test] - fn test_rename_with_alias_refs() { - let definition = - "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, foo.v2 AS m2v FROM foo"; - let from = "foo"; - let to = "bar"; - let expected = - "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, foo.v2 AS m2v FROM bar AS foo"; - let actual = alter_relation_rename_refs(definition, from, to); - assert_eq!(expected, actual); - - let definition = "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, (foo.v2).v3 AS m2v FROM foo WHERE foo.v1 = 1 AND (foo.v2).v3 IS TRUE"; - let expected = "CREATE MATERIALIZED VIEW mv1 AS SELECT foo.v1 AS m1v, (foo.v2).v3 AS m2v FROM bar AS foo WHERE foo.v1 = 1 AND (foo.v2).v3 IS TRUE"; - let actual = alter_relation_rename_refs(definition, from, to); - assert_eq!(expected, actual); - - let definition = "CREATE MATERIALIZED VIEW mv1 AS SELECT bar.v1 AS m1v, (bar.v2).v3 AS m2v FROM foo AS bar WHERE bar.v1 = 1"; - let expected = "CREATE MATERIALIZED VIEW mv1 AS SELECT bar.v1 AS m1v, (bar.v2).v3 AS m2v FROM bar AS bar WHERE bar.v1 = 1"; - let actual = alter_relation_rename_refs(definition, from, to); - assert_eq!(expected, actual); - } -}