Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): print partial explain traces if there's error #9513

Merged
merged 8 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ alias = "dev"
[tasks.dev]
category = "RiseDev - Start"
dependencies = ["pre-start-dev"]
script = "RUST_BACKTRACE=full target/${BUILD_MODE_DIR}/risedev-dev ${@}"
script = "RUST_BACKTRACE=1 target/${BUILD_MODE_DIR}/risedev-dev ${@}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 LGTM

description = "Start a full RisingWave dev cluster using risedev-dev"

[tasks.kafka]
Expand Down
305 changes: 179 additions & 126 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Row;
Expand All @@ -34,35 +35,23 @@ use crate::optimizer::OptimizerContext;
use crate::scheduler::BatchPlanFragmenter;
use crate::stream_fragmenter::build_graph;
use crate::utils::explain_stream_graph;
use crate::OptimizerContextRef;

pub async fn handle_explain(
handler_args: HandlerArgs,
async fn do_handle_explain(
context: OptimizerContext,
stmt: Statement,
options: ExplainOptions,
analyze: bool,
) -> Result<RwPgResponse> {
let context = OptimizerContext::new(handler_args.clone(), options.clone());

if analyze {
return Err(ErrorCode::NotImplemented("explain analyze".to_string(), 4856.into()).into());
}

let session = context.session_ctx().clone();

let mut plan_fragmenter = None;
let mut rows = {
let plan = match stmt {
Statement::CreateView {
or_replace: false,
materialized: true,
query,
name,
columns,
..
} => gen_create_mv_plan(&session, context.into(), *query, name, columns)?.0,
blocks: &mut Vec<String>,
) -> Result<()> {
// Workaround to avoid `Rc` across `await` point.
let mut batch_plan_fragmenter = None;
Comment on lines +45 to +46
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, most of the time I spent on this PR is struggling with Rc and Send. 🥵 I guess either we need a refactor to split blocking steps off the planning, or we may migrate to thread-safe structures to make developing easier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 ❤️ 🥵 #9515


Statement::CreateSink { stmt } => gen_sink_plan(&session, context.into(), stmt)?.0,
{
let session = context.session_ctx().clone();

let (plan, context) = match stmt {
// `CREATE TABLE` takes the ownership of the `OptimizerContext` to avoid `Rc` across
// `await` point. We can only take the reference back from the `PlanRef` if it's
// successfully planned.
Statement::CreateTable {
name,
columns,
Expand All @@ -71,132 +60,196 @@ pub async fn handle_explain(
source_watermarks,
append_only,
..
} => match check_create_table_with_source(&handler_args.with_options, source_schema)? {
Some(s) => {
gen_create_table_plan_with_source(
context,
} => {
let with_options = context.with_options();
let plan = match check_create_table_with_source(with_options, source_schema)? {
Some(s) => {
gen_create_table_plan_with_source(
context,
name,
columns,
constraints,
s,
source_watermarks,
ColumnIdGenerator::new_initial(),
append_only,
)
.await?
.0
}
None => {
gen_create_table_plan(
context,
name,
columns,
constraints,
ColumnIdGenerator::new_initial(),
source_watermarks,
append_only,
)?
.0
}
};
let context = plan.ctx();

(Ok(plan), context)
}

// For other queries without `await` point, we can keep a copy of reference to the
// `OptimizerContext` even if the planning fails. This enables us to log the partial
// traces for better debugging experience.
_ => {
let context: OptimizerContextRef = context.into();
let plan = match stmt {
// -- Streaming DDLs --
Statement::CreateView {
or_replace: false,
materialized: true,
query,
name,
columns,
constraints,
s,
source_watermarks,
ColumnIdGenerator::new_initial(),
append_only,
)
.await?
.0
}
None => {
gen_create_table_plan(
context,
..
} => gen_create_mv_plan(&session, context.clone(), *query, name, columns)
.map(|x| x.0),

Statement::CreateSink { stmt } => {
gen_sink_plan(&session, context.clone(), stmt).map(|x| x.0)
}

Statement::CreateIndex {
name,
table_name,
columns,
constraints,
ColumnIdGenerator::new_initial(),
source_watermarks,
append_only,
)?
.0
}
},
include,
distributed_by,
..
} => gen_create_index_plan(
&session,
context.clone(),
name,
table_name,
columns,
include,
distributed_by,
)
.map(|x| x.0),

Statement::CreateIndex {
name,
table_name,
columns,
include,
distributed_by,
..
} => {
gen_create_index_plan(
&session,
context.into(),
name,
table_name,
columns,
include,
distributed_by,
)?
.0
}
// -- Batch Queries --
Statement::Insert { .. }
| Statement::Delete { .. }
| Statement::Update { .. }
| Statement::Query { .. } => {
gen_batch_plan_by_statement(&session, context.clone(), stmt).map(|x| x.plan)
}

stmt => gen_batch_plan_by_statement(&session, context.into(), stmt)?.plan,
};
_ => {
return Err(ErrorCode::NotImplemented(
format!("unsupported statement {:?}", stmt),
None.into(),
)
.into())
}
};

let ctx = plan.plan_base().ctx.clone();
let explain_trace = ctx.is_explain_trace();
let explain_verbose = ctx.is_explain_verbose();

let mut rows = if explain_trace {
let trace = ctx.take_trace();
trace
.iter()
.flat_map(|s| s.lines())
.map(|s| Row::new(vec![Some(s.to_string().into())]))
.collect::<Vec<_>>()
} else {
vec![]
(plan, context)
}
};

match options.explain_type {
ExplainType::DistSql => match plan.convention() {
Convention::Logical => unreachable!(),
Convention::Batch => {
plan_fragmenter = Some(BatchPlanFragmenter::new(
session.env().worker_node_manager_ref(),
session.env().catalog_reader().clone(),
session.config().get_batch_parallelism(),
plan,
)?);
}
Convention::Stream => {
let graph = build_graph(plan);
rows.extend(
explain_stream_graph(&graph, explain_verbose)
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
let explain_trace = context.is_explain_trace();
let explain_verbose = context.is_explain_verbose();
let explain_type = context.explain_type();

if explain_trace {
let trace = context.take_trace();
blocks.extend(trace);
}

match explain_type {
ExplainType::DistSql => {
if let Ok(plan) = &plan {
match plan.convention() {
Convention::Logical => unreachable!(),
Convention::Batch => {
batch_plan_fragmenter = Some(BatchPlanFragmenter::new(
session.env().worker_node_manager_ref(),
session.env().catalog_reader().clone(),
session.config().get_batch_parallelism(),
plan.clone(),
)?);
}
Convention::Stream => {
let graph = build_graph(plan.clone());
blocks.push(explain_stream_graph(&graph, explain_verbose));
}
}
}
},
}
ExplainType::Physical => {
// if explain trace is open, the plan has been in the rows
if !explain_trace {
// if explain trace is on, the plan has been in the rows
if !explain_trace && let Ok(plan) = &plan {
let output = plan.explain_to_string()?;
rows.extend(
output
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
blocks.push(output);
}
}
ExplainType::Logical => {
// if explain trace is open, the plan has been in the rows
// if explain trace is on, the plan has been in the rows
if !explain_trace {
let output = plan.ctx().take_logical().ok_or_else(|| {
let output = context.take_logical().ok_or_else(|| {
ErrorCode::InternalError("Logical plan not found for query".into())
})?;
rows.extend(
output
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
blocks.push(output);
}
}
}
rows
};

if let Some(plan_fragmenter) = plan_fragmenter {
let query = plan_fragmenter.generate_complete_query().await?;
// Throw the error.
plan?;
}

if let Some(fragmenter) = batch_plan_fragmenter {
let query = fragmenter.generate_complete_query().await?;
let stage_graph_json = serde_json::to_string_pretty(&query.stage_graph).unwrap();
rows.extend(
vec![stage_graph_json]
.iter()
.flat_map(|s| s.lines())
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
blocks.push(stage_graph_json);
}

Ok(())
}

pub async fn handle_explain(
handler_args: HandlerArgs,
stmt: Statement,
options: ExplainOptions,
analyze: bool,
) -> Result<RwPgResponse> {
if analyze {
return Err(ErrorCode::NotImplemented("explain analyze".to_string(), 4856.into()).into());
}

let context = OptimizerContext::new(handler_args.clone(), options.clone());

let mut blocks = Vec::new();
let result = do_handle_explain(context, stmt, &mut blocks).await;

if let Err(e) = result {
if options.trace {
// If `trace` is on, we include the error in the output with partial traces.
blocks.push(if options.verbose {
format!("ERROR: {:?}", e)
} else {
format!("ERROR: {}", e)
});
} else {
// Else, directly return the error.
return Err(e);
}
}

let rows = blocks
.iter()
.flat_map(|b| b.lines().map(|l| l.to_owned()))
.map(|l| Row::new(vec![Some(l.into())]))
.collect_vec();

Ok(PgResponse::new_for_stream(
StatementType::EXPLAIN,
None,
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/optimizer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,12 @@ impl OptimizerContext {
self.explain_options.trace
}

pub fn explain_type(&self) -> ExplainType {
self.explain_options.explain_type.clone()
}

pub fn is_explain_logical(&self) -> bool {
self.explain_options.explain_type == ExplainType::Logical
self.explain_type() == ExplainType::Logical
}

pub fn trace(&self, str: impl Into<String>) {
Expand Down