From 93e90dde4dd6d88082c66c4e8d2f970212765fd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 16 Dec 2024 16:30:44 +0100 Subject: [PATCH] Wrap memory metadata in a single object for easier mutex usage --- src/enrichment_tables/memory/table.rs | 34 ++++++++++++++++++--------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index 7a8a48e102667..a1c3443d061b2 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -59,15 +59,28 @@ impl MemoryEntry { } } +struct MemoryMetadata { + last_ttl_scan: Instant, + last_flush: Instant, +} + +impl Default for MemoryMetadata { + fn default() -> Self { + let now = Instant::now(); + Self { + last_ttl_scan: now, + last_flush: now, + } + } +} + /// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure. pub struct Memory { read_handle_factory: evmap::ReadHandleFactory, read_handle: ThreadLocal>, write_handle: Arc>>, config: MemoryConfig, - // TODO: ensure correct behavior when shared between multiple instances (probably Arc + Mutex) - last_ttl_scan: Instant, - last_flush: Instant, + metadata: Arc>, } impl Memory { @@ -79,8 +92,7 @@ impl Memory { read_handle_factory: read_handle.factory(), read_handle: ThreadLocal::new(), write_handle: Arc::new(Mutex::new(write_handle)), - last_ttl_scan: Instant::now(), - last_flush: Instant::now(), + metadata: Arc::new(Mutex::new(MemoryMetadata::default())), } } @@ -109,8 +121,9 @@ impl Memory { } let mut needs_flush = false; - if now.duration_since(self.last_ttl_scan).as_secs() >= self.config.scan_interval { - self.last_ttl_scan = now; + let mut metadata = self.metadata.lock().unwrap(); + if now.duration_since(metadata.last_ttl_scan).as_secs() >= self.config.scan_interval { + metadata.last_ttl_scan = now; // Since evmap holds 2 separate maps for the data, we are free to directly remove // elements via the writer, while we are iterating the reader // Refresh will happen only after we manually invoke it after iteration @@ -125,12 +138,12 @@ impl Memory { } } }; - } else if now.duration_since(self.last_flush).as_secs() >= self.config.flush_interval { + } else if now.duration_since(metadata.last_flush).as_secs() >= self.config.flush_interval { needs_flush = true; } if needs_flush { - self.last_flush = now; + metadata.last_flush = now; handle.refresh(); if let Some(reader) = self.get_read_handle().read() { let mut byte_size = 0; @@ -153,8 +166,7 @@ impl Clone for Memory { read_handle: ThreadLocal::new(), write_handle: Arc::clone(&self.write_handle), config: self.config.clone(), - last_ttl_scan: self.last_ttl_scan, - last_flush: self.last_flush, + metadata: Arc::clone(&self.metadata), } } }