diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index e64e30861d98..fa092137abd6 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::Arc; +use crate::metrics::ExecutorMetricsCollector; use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf; @@ -27,7 +28,7 @@ use ballista_core::serde::protobuf::ExecutorRegistration; use datafusion::error::DataFusionError; use datafusion::execution::context::TaskContext; use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::physical_plan::display::DisplayableExecutionPlan; + use datafusion::physical_plan::udaf::AggregateUDF; use datafusion::physical_plan::udf::ScalarUDF; use datafusion::physical_plan::{ExecutionPlan, Partitioning}; @@ -48,6 +49,9 @@ pub struct Executor { /// Runtime environment for Executor pub runtime: Arc, + + /// Collector for runtime execution metrics + pub metrics_collector: Arc, } impl Executor { @@ -56,6 +60,7 @@ impl Executor { metadata: ExecutorRegistration, work_dir: &str, runtime: Arc, + metrics_collector: Arc, ) -> Self { Self { metadata, @@ -64,6 +69,7 @@ impl Executor { scalar_functions: HashMap::new(), aggregate_functions: HashMap::new(), runtime, + metrics_collector, } } } @@ -101,13 +107,8 @@ impl Executor { let partitions = exec.execute_shuffle_write(part, task_ctx).await?; - println!( - "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n", - job_id, - stage_id, - part, - DisplayableExecutionPlan::with_metrics(&exec).indent() - ); + self.metrics_collector + .record_stage(&job_id, stage_id, part, exec); Ok(partitions) } diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs index bb6dfa69bbe2..4d145b269efd 100644 --- a/ballista/rust/executor/src/lib.rs +++ b/ballista/rust/executor/src/lib.rs @@ -22,6 +22,7 @@ pub mod execution_loop; pub mod executor; pub mod executor_server; pub mod flight_service; +pub mod metrics; mod cpu_bound_executor; mod standalone; diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 66d9bd71af1c..825ddd4d8fa5 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -42,6 +42,7 @@ use ballista_core::serde::BallistaCodec; use ballista_core::{print_version, BALLISTA_VERSION}; use ballista_executor::executor::Executor; use ballista_executor::flight_service::BallistaFlightService; +use ballista_executor::metrics::LoggingMetricsCollector; use config::prelude::*; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; @@ -118,7 +119,14 @@ async fn main() -> Result<()> { BallistaError::Internal("Failed to init Executor RuntimeEnv".to_owned()) })?); - let executor = Arc::new(Executor::new(executor_meta, &work_dir, runtime)); + let metrics_collector = Arc::new(LoggingMetricsCollector::default()); + + let executor = Arc::new(Executor::new( + executor_meta, + &work_dir, + runtime, + metrics_collector, + )); let scheduler = SchedulerGrpcClient::connect(scheduler_url) .await diff --git a/ballista/rust/executor/src/metrics/mod.rs b/ballista/rust/executor/src/metrics/mod.rs new file mode 100644 index 000000000000..2c7e1d504141 --- /dev/null +++ b/ballista/rust/executor/src/metrics/mod.rs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista_core::execution_plans::ShuffleWriterExec; +use datafusion::physical_plan::display::DisplayableExecutionPlan; + +/// `ExecutorMetricsCollector` records metrics for `ShuffleWriteExec` +/// after they are executed. +/// +/// After each stage completes, `ShuffleWriteExec::record_stage` will be +/// called. +pub trait ExecutorMetricsCollector: Send + Sync { + /// Record metrics for stage after it is executed + fn record_stage( + &self, + job_id: &str, + stage_id: usize, + partition: usize, + plan: ShuffleWriterExec, + ); +} + +/// Implementation of `ExecutorMetricsCollector` which logs the completed +/// plan to stdout. +#[derive(Default)] +pub struct LoggingMetricsCollector {} + +impl ExecutorMetricsCollector for LoggingMetricsCollector { + fn record_stage( + &self, + job_id: &str, + stage_id: usize, + partition: usize, + plan: ShuffleWriterExec, + ) { + println!( + "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n", + job_id, + stage_id, + partition, + DisplayableExecutionPlan::with_metrics(&plan).indent() + ); + } +} diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs index edbb857cca3a..d68052af8c7a 100644 --- a/ballista/rust/executor/src/standalone.rs +++ b/ballista/rust/executor/src/standalone.rs @@ -34,6 +34,7 @@ use tokio::net::TcpListener; use tonic::transport::{Channel, Server}; use uuid::Uuid; +use crate::metrics::LoggingMetricsCollector; use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService}; pub async fn new_standalone_executor< @@ -78,6 +79,7 @@ pub async fn new_standalone_executor< executor_meta, &work_dir, Arc::new(RuntimeEnv::new(config).unwrap()), + Arc::new(LoggingMetricsCollector::default()), )); let service = BallistaFlightService::new(executor.clone());