Skip to content

Commit

Permalink
Add ExecutorMetricsCollector interface (#2234)
Browse files Browse the repository at this point in the history
* Add ExecutorMetricsCollector interface

* Comments and license header
  • Loading branch information
thinkharderdev authored Apr 15, 2022
1 parent 3d2e7b0 commit 9f2ed42
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 9 deletions.
17 changes: 9 additions & 8 deletions ballista/rust/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::metrics::ExecutorMetricsCollector;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::display::DisplayableExecutionPlan;

use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
Expand All @@ -48,6 +49,9 @@ pub struct Executor {

/// Runtime environment for Executor
pub runtime: Arc<RuntimeEnv>,

/// Collector for runtime execution metrics
pub metrics_collector: Arc<dyn ExecutorMetricsCollector>,
}

impl Executor {
Expand All @@ -56,6 +60,7 @@ impl Executor {
metadata: ExecutorRegistration,
work_dir: &str,
runtime: Arc<RuntimeEnv>,
metrics_collector: Arc<dyn ExecutorMetricsCollector>,
) -> Self {
Self {
metadata,
Expand All @@ -64,6 +69,7 @@ impl Executor {
scalar_functions: HashMap::new(),
aggregate_functions: HashMap::new(),
runtime,
metrics_collector,
}
}
}
Expand Down Expand Up @@ -101,13 +107,8 @@ impl Executor {

let partitions = exec.execute_shuffle_write(part, task_ctx).await?;

println!(
"=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
job_id,
stage_id,
part,
DisplayableExecutionPlan::with_metrics(&exec).indent()
);
self.metrics_collector
.record_stage(&job_id, stage_id, part, exec);

Ok(partitions)
}
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod execution_loop;
pub mod executor;
pub mod executor_server;
pub mod flight_service;
pub mod metrics;

mod cpu_bound_executor;
mod standalone;
Expand Down
10 changes: 9 additions & 1 deletion ballista/rust/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use ballista_core::serde::BallistaCodec;
use ballista_core::{print_version, BALLISTA_VERSION};
use ballista_executor::executor::Executor;
use ballista_executor::flight_service::BallistaFlightService;
use ballista_executor::metrics::LoggingMetricsCollector;
use config::prelude::*;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};

Expand Down Expand Up @@ -118,7 +119,14 @@ async fn main() -> Result<()> {
BallistaError::Internal("Failed to init Executor RuntimeEnv".to_owned())
})?);

let executor = Arc::new(Executor::new(executor_meta, &work_dir, runtime));
let metrics_collector = Arc::new(LoggingMetricsCollector::default());

let executor = Arc::new(Executor::new(
executor_meta,
&work_dir,
runtime,
metrics_collector,
));

let scheduler = SchedulerGrpcClient::connect(scheduler_url)
.await
Expand Down
58 changes: 58 additions & 0 deletions ballista/rust/executor/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use ballista_core::execution_plans::ShuffleWriterExec;
use datafusion::physical_plan::display::DisplayableExecutionPlan;

/// `ExecutorMetricsCollector` records metrics for `ShuffleWriteExec`
/// after they are executed.
///
/// After each stage completes, `ShuffleWriteExec::record_stage` will be
/// called.
pub trait ExecutorMetricsCollector: Send + Sync {
/// Record metrics for stage after it is executed
fn record_stage(
&self,
job_id: &str,
stage_id: usize,
partition: usize,
plan: ShuffleWriterExec,
);
}

/// Implementation of `ExecutorMetricsCollector` which logs the completed
/// plan to stdout.
#[derive(Default)]
pub struct LoggingMetricsCollector {}

impl ExecutorMetricsCollector for LoggingMetricsCollector {
fn record_stage(
&self,
job_id: &str,
stage_id: usize,
partition: usize,
plan: ShuffleWriterExec,
) {
println!(
"=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
job_id,
stage_id,
partition,
DisplayableExecutionPlan::with_metrics(&plan).indent()
);
}
}
2 changes: 2 additions & 0 deletions ballista/rust/executor/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tokio::net::TcpListener;
use tonic::transport::{Channel, Server};
use uuid::Uuid;

use crate::metrics::LoggingMetricsCollector;
use crate::{execution_loop, executor::Executor, flight_service::BallistaFlightService};

pub async fn new_standalone_executor<
Expand Down Expand Up @@ -78,6 +79,7 @@ pub async fn new_standalone_executor<
executor_meta,
&work_dir,
Arc::new(RuntimeEnv::new(config).unwrap()),
Arc::new(LoggingMetricsCollector::default()),
));

let service = BallistaFlightService::new(executor.clone());
Expand Down

0 comments on commit 9f2ed42

Please sign in to comment.