From 5ae23dbd6510e96b5d5842fd582b1391f71c4a33 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 30 Nov 2021 14:27:44 +0800 Subject: [PATCH] fix partition id --- .../execution/memory_management/allocation_strategist.rs | 7 +++---- datafusion/src/execution/memory_management/mod.rs | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/src/execution/memory_management/allocation_strategist.rs b/datafusion/src/execution/memory_management/allocation_strategist.rs index 69d0ca379972..6e6e41fbe5af 100644 --- a/datafusion/src/execution/memory_management/allocation_strategist.rs +++ b/datafusion/src/execution/memory_management/allocation_strategist.rs @@ -17,7 +17,6 @@ //! Execution Memory Pool that guarantees a memory allocation strategy -use crate::execution::memory_management::MemoryConsumerId; use async_trait::async_trait; use hashbrown::HashMap; use log::{info, warn}; @@ -189,7 +188,7 @@ impl MemoryAllocationStrategist for FairStrategist { if to_grant < required && current_mem + to_grant < min_memory_per_partition { info!( "{:?} waiting for at least 1/2N of pool to be free", - consumer + partition_id ); let _ = self.notify.notified().await; } else { @@ -214,12 +213,12 @@ impl MemoryAllocationStrategist for FairStrategist { } else { let mut partition_usage = self.memory_usage.write().await; if granted_size > real_size { - *partition_usage.entry(consumer.partition_id).or_insert(0) -= + *partition_usage.entry(partition_id).or_insert(0) -= granted_size - real_size; } else { // TODO: this would have caused OOM already if size estimation ahead is much smaller than // that of actual allocation - *partition_usage.entry(consumer.partition_id).or_insert(0) += + *partition_usage.entry(partition_id).or_insert(0) += real_size - granted_size; } } diff --git a/datafusion/src/execution/memory_management/mod.rs b/datafusion/src/execution/memory_management/mod.rs index a77b14d3486e..651072bc99d4 100644 --- a/datafusion/src/execution/memory_management/mod.rs +++ b/datafusion/src/execution/memory_management/mod.rs @@ -116,7 +116,7 @@ impl MemoryManager { consumer: &MemoryConsumerId, ) { self.strategist - .update_usage(granted_size, real_size, consumer) + .update_usage(granted_size, real_size, consumer.partition_id) .await }