Skip to content

Commit

Permalink
Wrap memory metadata in a single object for easier mutex usage
Browse files Browse the repository at this point in the history
  • Loading branch information
esensar committed Dec 16, 2024
1 parent 5032181 commit 93e90dd
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions src/enrichment_tables/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,28 @@ impl MemoryEntry {
}
}

struct MemoryMetadata {
last_ttl_scan: Instant,
last_flush: Instant,
}

impl Default for MemoryMetadata {
fn default() -> Self {
let now = Instant::now();
Self {
last_ttl_scan: now,
last_flush: now,
}
}
}

/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure.
pub struct Memory {
read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
write_handle: Arc<Mutex<evmap::WriteHandle<String, MemoryEntry>>>,
config: MemoryConfig,
// TODO: ensure correct behavior when shared between multiple instances (probably Arc + Mutex)
last_ttl_scan: Instant,
last_flush: Instant,
metadata: Arc<Mutex<MemoryMetadata>>,
}

impl Memory {
Expand All @@ -79,8 +92,7 @@ impl Memory {
read_handle_factory: read_handle.factory(),
read_handle: ThreadLocal::new(),
write_handle: Arc::new(Mutex::new(write_handle)),
last_ttl_scan: Instant::now(),
last_flush: Instant::now(),
metadata: Arc::new(Mutex::new(MemoryMetadata::default())),
}
}

Expand Down Expand Up @@ -109,8 +121,9 @@ impl Memory {
}

let mut needs_flush = false;
if now.duration_since(self.last_ttl_scan).as_secs() >= self.config.scan_interval {
self.last_ttl_scan = now;
let mut metadata = self.metadata.lock().unwrap();
if now.duration_since(metadata.last_ttl_scan).as_secs() >= self.config.scan_interval {
metadata.last_ttl_scan = now;
// Since evmap holds 2 separate maps for the data, we are free to directly remove
// elements via the writer, while we are iterating the reader
// Refresh will happen only after we manually invoke it after iteration
Expand All @@ -125,12 +138,12 @@ impl Memory {
}
}
};
} else if now.duration_since(self.last_flush).as_secs() >= self.config.flush_interval {
} else if now.duration_since(metadata.last_flush).as_secs() >= self.config.flush_interval {
needs_flush = true;
}

if needs_flush {
self.last_flush = now;
metadata.last_flush = now;
handle.refresh();
if let Some(reader) = self.get_read_handle().read() {
let mut byte_size = 0;
Expand All @@ -153,8 +166,7 @@ impl Clone for Memory {
read_handle: ThreadLocal::new(),
write_handle: Arc::clone(&self.write_handle),
config: self.config.clone(),
last_ttl_scan: self.last_ttl_scan,
last_flush: self.last_flush,
metadata: Arc::clone(&self.metadata),
}
}
}
Expand Down

0 comments on commit 93e90dd

Please sign in to comment.