Skip to content

Commit

Permalink
chore: return a warning message when creating sink with order by (#10239
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lmatz authored Jun 16, 2023
1 parent 558cef5 commit 1c1354c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
19 changes: 14 additions & 5 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::rc::Rc;

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId};
Expand Down Expand Up @@ -66,7 +68,7 @@ pub fn gen_sink_plan(
session: &SessionImpl,
context: OptimizerContextRef,
stmt: CreateSinkStatement,
) -> Result<(PlanRef, SinkCatalog)> {
) -> Result<(Box<Query>, PlanRef, SinkCatalog)> {
let db_name = session.database();
let (sink_schema_name, sink_table_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
Expand All @@ -83,7 +85,7 @@ pub fn gen_sink_plan(

let (dependent_relations, bound) = {
let mut binder = Binder::new_for_stream(session);
let bound = binder.bind_query(*query)?;
let bound = binder.bind_query(*query.clone())?;
(binder.included_relations(), bound)
};

Expand Down Expand Up @@ -127,7 +129,7 @@ pub fn gen_sink_plan(
dependent_relations.into_iter().collect_vec(),
);

Ok((sink_plan, sink_catalog))
Ok((query, sink_plan, sink_catalog))
}

pub async fn handle_create_sink(
Expand All @@ -139,8 +141,15 @@ pub async fn handle_create_sink(
session.check_relation_name_duplicated(stmt.sink_name.clone())?;

let (sink, graph) = {
let context = OptimizerContext::from_handler_args(handle_args);
let (plan, sink) = gen_sink_plan(&session, context.into(), stmt)?;
let context = Rc::new(OptimizerContext::from_handler_args(handle_args));
let (query, plan, sink) = gen_sink_plan(&session, context.clone(), stmt)?;
let has_order_by = !query.order_by.is_empty();
if has_order_by {
context.warn_to_user(
r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
.to_string(),
);
}
let mut graph = build_graph(plan);
graph.parallelism = session
.config()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async fn do_handle_explain(
.map(|x| x.0),

Statement::CreateSink { stmt } => {
gen_sink_plan(&session, context.clone(), stmt).map(|x| x.0)
gen_sink_plan(&session, context.clone(), stmt).map(|x| x.1)
}

Statement::CreateIndex {
Expand Down

0 comments on commit 1c1354c

Please sign in to comment.