diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 5ffba0bdd9db6..ecc285fe55bf4 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -65,8 +65,6 @@ pub struct TaskContext { window_functions: HashMap>, /// Runtime environment associated with this task context runtime: Arc, - // TODO: to remove, this is only for testing - active_partition: Mutex>, /// Registered relation handlers relation_handlers: Mutex>, } @@ -86,7 +84,6 @@ impl Default for TaskContext { window_functions: HashMap::new(), runtime: Arc::new(runtime), relation_handlers: Mutex::new(HashMap::new()), - active_partition: Mutex::new(None), } } } @@ -115,19 +112,6 @@ impl TaskContext { window_functions, runtime, relation_handlers: Mutex::new(HashMap::new()), - active_partition: Mutex::new(None), - } - } - - pub fn set_and_increment_partition(&self) -> usize { - let mut partition_guard = self.active_partition.lock(); - if let Some(ref mut partition) = &mut *partition_guard { - let prev = *partition; - *partition += 1; - prev - } else { - *partition_guard = Some(1); - 0 } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 47df5797ce84a..392fa28804b4c 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -185,12 +185,12 @@ impl ExecutionPlan for FilterExec { context: Arc, ) -> Result { trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - // let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(FilterExecStream { schema: self.input.schema(), predicate: self.predicate.clone(), input: self.input.execute(partition, context)?, - // baseline_metrics, + baseline_metrics, })) } @@ -296,8 +296,8 @@ struct FilterExecStream { predicate: Arc, /// The input partition to filter. input: SendableRecordBatchStream, - // /// runtime metrics recording - // baseline_metrics: BaselineMetrics, + /// runtime metrics recording + baseline_metrics: BaselineMetrics, } pub(crate) fn batch_filter( @@ -347,8 +347,7 @@ impl Stream for FilterExecStream { } } } - // self.baseline_metrics.record_poll(poll) - poll + self.baseline_metrics.record_poll(poll) } fn size_hint(&self) -> (usize, Option) { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 7eceb17c3aeec..4fc48e971ca92 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -324,7 +324,7 @@ impl ExecutionPlan for ProjectionExec { schema: self.schema.clone(), expr: self.expr.iter().map(|x| x.0.clone()).collect(), input: self.input.execute(partition, context)?, - // baseline_metrics: BaselineMetrics::new(&self.metrics, partition), + baseline_metrics: BaselineMetrics::new(&self.metrics, partition), })) } @@ -450,7 +450,7 @@ fn stats_projection( impl ProjectionStream { fn batch_project(&self, batch: &RecordBatch) -> Result { // records time on drop - // let _timer = self.baseline_metrics.elapsed_compute().timer(); + let _timer = self.baseline_metrics.elapsed_compute().timer(); let arrays = self .expr .iter() @@ -474,7 +474,7 @@ struct ProjectionStream { schema: SchemaRef, expr: Vec>, input: SendableRecordBatchStream, - // baseline_metrics: BaselineMetrics, + baseline_metrics: BaselineMetrics, } impl Stream for ProjectionStream { @@ -489,8 +489,7 @@ impl Stream for ProjectionStream { other => other, }); - // self.baseline_metrics.record_poll(poll) - poll + self.baseline_metrics.record_poll(poll) } fn size_hint(&self) -> (usize, Option) {