From 65a05f3ab0436f98b5a05f618b7a68ee585743ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 24 Oct 2024 17:34:53 +0200 Subject: [PATCH] Add internal events for memory enrichment table --- Cargo.lock | 4 +- src/enrichment_tables/memory/config.rs | 73 ++++++++ .../memory/internal_events.rs | 117 +++++++++++++ src/enrichment_tables/memory/mod.rs | 8 + .../{memory.rs => memory/table.rs} | 165 +++++++----------- 5 files changed, 265 insertions(+), 102 deletions(-) create mode 100644 src/enrichment_tables/memory/config.rs create mode 100644 src/enrichment_tables/memory/internal_events.rs create mode 100644 src/enrichment_tables/memory/mod.rs rename src/enrichment_tables/{memory.rs => memory/table.rs} (78%) diff --git a/Cargo.lock b/Cargo.lock index ab212078389e8..b5cefeb5b4dee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3409,7 +3409,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "332b1937705b7ed2fce76837024e9ae6f41cd2ad18a32c052de081f89982561b" dependencies = [ - "proc-macro2 1.0.86", + "proc-macro2 1.0.89", "quote 1.0.37", "syn 1.0.109", ] @@ -10940,8 +10940,8 @@ dependencies = [ "vector-lookup", "vector-stream", "vector-tap", - "vrl-cache", "vrl", + "vrl-cache", ] [[package]] diff --git a/src/enrichment_tables/memory/config.rs b/src/enrichment_tables/memory/config.rs new file mode 100644 index 0000000000000..f56495e7cdcdc --- /dev/null +++ b/src/enrichment_tables/memory/config.rs @@ -0,0 +1,73 @@ +use crate::enrichment_tables::memory::Memory; +use vector_lib::configurable::configurable_component; +use vector_lib::enrichment::Table; + +use crate::config::EnrichmentTableConfig; + +/// Configuration for the `memory` enrichment table. +#[configurable_component(enrichment_table("memory"))] +#[derive(Clone, Eq, PartialEq)] +pub struct MemoryConfig { + /// TTL (time-to-live), used to limit lifetime of data stored in cache. + /// When TTL expires, data behind a specific key in cache is removed. + /// TTL is reset when replacing the key. + #[serde(default = "default_ttl")] + pub ttl: u64, + /// Scan interval for updating TTL of keys in seconds. This is provided + /// as an optimization, to ensure that TTL is updated, but without doing + /// too many cache scans. + #[serde(default = "default_scan_interval")] + pub scan_interval: u64, + /// Interval for making writes visible in the table. + /// Longer interval might get better performance, + /// but data would be visible in the table after a longer delay. + /// Since every TTL scan makes its changes visible, this value + /// only makes sense if it is shorter than scan_interval + /// + /// By default, all writes are made visible immediately. + #[serde(default = "default_flush_interval")] + pub flush_interval: u64, +} + +impl Default for MemoryConfig { + fn default() -> Self { + Self { + ttl: default_ttl(), + scan_interval: default_scan_interval(), + flush_interval: default_flush_interval(), + } + } +} + +const fn default_ttl() -> u64 { + 600 +} + +const fn default_scan_interval() -> u64 { + 30 +} + +const fn default_flush_interval() -> u64 { + 0 +} + +impl EnrichmentTableConfig for MemoryConfig { + async fn build( + &self, + _globals: &crate::config::GlobalOptions, + ) -> crate::Result> { + Ok(Box::new(Memory::new(self.clone()))) + } +} + +impl std::fmt::Debug for MemoryConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryConfig") + .field("ttl", &self.ttl) + .field("scan_interval", &self.scan_interval) + .field("flush_interval", &self.flush_interval) + .finish() + } +} + +impl_generate_config_from_default!(MemoryConfig); diff --git a/src/enrichment_tables/memory/internal_events.rs b/src/enrichment_tables/memory/internal_events.rs new file mode 100644 index 0000000000000..37e895192c222 --- /dev/null +++ b/src/enrichment_tables/memory/internal_events.rs @@ -0,0 +1,117 @@ +use metrics::{counter, gauge}; +use vector_lib::internal_event::InternalEvent; + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableRead { + pub table: String, + pub key: String, +} + +impl InternalEvent for MemoryEnrichmentTableRead { + fn emit(self) { + counter!( + "memory_enrichment_table_reads_total", + "table" => self.table, + "key" => self.key + ) + .increment(1); + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableRead") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableInserted { + pub table: String, + pub key: String, +} + +impl InternalEvent for MemoryEnrichmentTableInserted { + fn emit(self) { + counter!( + "memory_enrichment_table_insertions_total", + "table" => self.table.clone(), + "key" => self.key + ) + .increment(1); + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableInserted") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableFlushed { + pub table: String, + pub new_objects_count: usize, + pub new_byte_size: usize, +} + +impl InternalEvent for MemoryEnrichmentTableFlushed { + fn emit(self) { + counter!( + "memory_enrichment_table_flushes_total", + "table" => self.table.clone(), + ) + .increment(1); + gauge!( + "memory_enrichment_table_objects_count", + "table" => self.table.clone() + ) + .set(self.new_objects_count as f64); + gauge!( + "memory_enrichment_table_byte_size", + "table" => self.table + ) + .set(self.new_byte_size as f64); + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableFlushed") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableTtlExpired { + pub table: String, + pub key: String, +} + +impl InternalEvent for MemoryEnrichmentTableTtlExpired { + fn emit(self) { + counter!( + "memory_enrichment_table_ttl_expirations", + "table" => self.table.clone(), + "key" => self.key + ) + .increment(1); + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableTtlExpired") + } +} + +#[derive(Debug)] +pub(crate) struct MemoryEnrichmentTableReadFailed { + pub table: String, + pub key: String, +} + +impl InternalEvent for MemoryEnrichmentTableReadFailed { + fn emit(self) { + counter!( + "memory_enrichment_table_failed_reads", + "table" => self.table, + "key" => self.key + ) + .increment(1); + } + + fn name(&self) -> Option<&'static str> { + Some("MemoryEnrichmentTableReadFailed") + } +} diff --git a/src/enrichment_tables/memory/mod.rs b/src/enrichment_tables/memory/mod.rs new file mode 100644 index 0000000000000..36a1ed878e25f --- /dev/null +++ b/src/enrichment_tables/memory/mod.rs @@ -0,0 +1,8 @@ +//! Handles enrichment tables for `type = memory`. + +mod config; +mod internal_events; +mod table; + +pub use config::*; +pub use table::*; diff --git a/src/enrichment_tables/memory.rs b/src/enrichment_tables/memory/table.rs similarity index 78% rename from src/enrichment_tables/memory.rs rename to src/enrichment_tables/memory/table.rs index 0519f016b1f87..049d034368ae2 100644 --- a/src/enrichment_tables/memory.rs +++ b/src/enrichment_tables/memory/table.rs @@ -1,4 +1,8 @@ -//! Handles enrichment tables for `type = memory`. +use crate::enrichment_tables::memory::internal_events::{ + MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead, + MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired, +}; +use crate::enrichment_tables::memory::MemoryConfig; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -6,13 +10,12 @@ use evmap::shallow_copy::CopyValue; use evmap::{self}; use evmap_derive::ShallowCopy; use thread_local::ThreadLocal; -use vector_lib::EstimatedJsonEncodedSizeOf; +use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; use tokio_stream::StreamExt; -use vector_lib::configurable::configurable_component; use vector_lib::enrichment::{Case, Condition, IndexHandle, Table}; use vector_lib::event::{Event, EventStatus, Finalizable}; use vector_lib::internal_event::{ @@ -21,76 +24,6 @@ use vector_lib::internal_event::{ use vector_lib::sink::StreamSink; use vrl::value::{KeyString, ObjectMap, Value}; -use crate::config::EnrichmentTableConfig; - -/// Configuration for the `memory` enrichment table. -#[configurable_component(enrichment_table("memory"))] -#[derive(Clone, Eq, PartialEq)] -pub struct MemoryConfig { - /// TTL (time-to-live), used to limit lifetime of data stored in cache. - /// When TTL expires, data behind a specific key in cache is removed. - /// TTL is reset when replacing the key. - #[serde(default = "default_ttl")] - ttl: u64, - /// Scan interval for updating TTL of keys in seconds. This is provided - /// as an optimization, to ensure that TTL is updated, but without doing - /// too many cache scans. - #[serde(default = "default_scan_interval")] - scan_interval: u64, - /// Interval for making writes visible in the table. - /// Longer interval might get better performance, - /// but data would be visible in the table after a longer delay. - /// Since every TTL scan makes its changes visible, this value - /// only makes sense if it is shorter than scan_interval - /// - /// By default, all writes are made visible immediately. - #[serde(default = "default_write_refresh_interval")] - write_refresh_interval: u64, -} - -impl Default for MemoryConfig { - fn default() -> Self { - Self { - ttl: default_ttl(), - scan_interval: default_scan_interval(), - write_refresh_interval: default_write_refresh_interval(), - } - } -} - -const fn default_ttl() -> u64 { - 600 -} - -const fn default_scan_interval() -> u64 { - 30 -} - -const fn default_write_refresh_interval() -> u64 { - 0 -} - -impl EnrichmentTableConfig for MemoryConfig { - async fn build( - &self, - _globals: &crate::config::GlobalOptions, - ) -> crate::Result> { - Ok(Box::new(Memory::new(self.clone()))) - } -} - -impl std::fmt::Debug for MemoryConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MemoryConfig") - .field("ttl", &self.ttl) - .field("scan_interval", &self.scan_interval) - .field("write_refresh_interval", &self.write_refresh_interval) - .finish() - } -} - -impl_generate_config_from_default!(MemoryConfig); - /// Single memory entry containing the value and TTL #[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)] pub struct MemoryEntry { @@ -99,11 +32,15 @@ pub struct MemoryEntry { update_time: CopyValue, } +impl ByteSizeOf for MemoryEntry { + fn allocated_bytes(&self) -> usize { + self.value.size_of() + } +} + impl MemoryEntry { - fn into_object_map(&self, now: Instant, total_ttl: u64) -> ObjectMap { - let ttl = total_ttl - .checked_sub(now.duration_since(*self.update_time).as_secs()) - .unwrap_or(0); + fn as_object_map(&self, now: Instant, total_ttl: u64) -> ObjectMap { + let ttl = total_ttl.saturating_sub(now.duration_since(*self.update_time).as_secs()); ObjectMap::from([ ( KeyString::from("key"), @@ -118,7 +55,7 @@ impl MemoryEntry { } fn expired(&self, now: Instant, ttl: u64) -> bool { - return now.duration_since(*self.update_time).as_secs() > ttl; + now.duration_since(*self.update_time).as_secs() > ttl } } @@ -129,7 +66,7 @@ pub struct Memory { write_handle: Arc>>, config: MemoryConfig, last_ttl_scan: Instant, - last_write_refresh: Instant, + last_flush: Instant, } impl Memory { @@ -142,7 +79,7 @@ impl Memory { read_handle: ThreadLocal::new(), write_handle: Arc::new(Mutex::new(write_handle)), last_ttl_scan: Instant::now(), - last_write_refresh: Instant::now(), + last_flush: Instant::now(), } } @@ -165,9 +102,13 @@ impl Memory { update_time: now.into(), }, ); + emit!(MemoryEnrichmentTableInserted { + table: "table".to_string(), + key: k.as_str().to_string() + }); } - let mut needs_refresh = false; + let mut needs_flush = false; if now.duration_since(self.last_ttl_scan).as_secs() >= self.config.scan_interval { self.last_ttl_scan = now; // Since evmap holds 2 separate maps for the data, we are free to directly remove @@ -178,20 +119,33 @@ impl Memory { if let Some(entry) = v.get_one() { if entry.expired(now, self.config.ttl) { handle.empty(k.clone()); - needs_refresh = true; + emit!(MemoryEnrichmentTableTtlExpired { + table: "table".to_string(), + key: k.to_string() + }); + needs_flush = true; } } } }; - } else if now.duration_since(self.last_write_refresh).as_secs() - >= self.config.write_refresh_interval - { - needs_refresh = true; + } else if now.duration_since(self.last_flush).as_secs() >= self.config.flush_interval { + needs_flush = true; } - if needs_refresh { - self.last_write_refresh = now; - handle.refresh(); + if needs_flush { + self.last_flush = now; + handle.flush(); + if let Some(reader) = self.get_read_handle().read() { + let mut byte_size = 0; + for (k, v) in reader.iter() { + byte_size += k.size_of() + v.get_one().size_of(); + } + emit!(MemoryEnrichmentTableFlushed { + table: "table".to_string(), + new_objects_count: reader.len(), + new_byte_size: byte_size + }); + } } } } @@ -201,10 +155,10 @@ impl Clone for Memory { Self { read_handle_factory: self.read_handle_factory.clone(), read_handle: ThreadLocal::new(), - write_handle: self.write_handle.clone(), + write_handle: Arc::clone(&self.write_handle), config: self.config.clone(), - last_ttl_scan: self.last_ttl_scan.clone(), - last_write_refresh: self.last_write_refresh.clone(), + last_ttl_scan: self.last_ttl_scan, + last_flush: self.last_flush, } } } @@ -238,8 +192,20 @@ impl Table for Memory { Some(Condition::Equals { value, .. }) => { let key = value.to_string_lossy(); match self.get_read_handle().get_one(key.as_ref()) { - Some(row) => Ok(vec![row.into_object_map(Instant::now(), self.config.ttl)]), - None => Ok(Default::default()), + Some(row) => { + emit!(MemoryEnrichmentTableRead { + table: "table".to_string(), + key: key.to_string() + }); + Ok(vec![row.as_object_map(Instant::now(), self.config.ttl)]) + } + None => { + emit!(MemoryEnrichmentTableReadFailed { + table: "table".to_string(), + key: key.to_string() + }); + Ok(Default::default()) + } } } Some(_) => Err("Only equality condition is allowed".to_string()), @@ -285,9 +251,8 @@ impl StreamSink for Memory { // Panic: This sink only accepts Logs, so this should never panic let log = event.into_log(); - match log.value() { - Value::Object(map) => self.handle_value(map), - _ => (), + if let Value::Object(map) = log.value() { + self.handle_value(map) }; finalizers.update_status(EventStatus::Delivered); @@ -325,7 +290,7 @@ mod tests { assert_eq!( Ok(ObjectMap::from([ ("key".into(), Value::from("test_key")), - ("ttl".into(), Value::from(default_ttl())), + ("ttl".into(), Value::from(memory.config.ttl)), ("value".into(), Value::from(5)), ])), memory.find_table_row(Case::Sensitive, &[condition], None, None) @@ -415,11 +380,11 @@ mod tests { } #[test] - fn does_not_show_values_before_refresh_interval() { + fn does_not_show_values_before_flush_interval() { let ttl = 100; let mut memory = Memory::new(MemoryConfig { ttl, - write_refresh_interval: 10, + flush_interval: 10, ..Default::default() }); memory.handle_value(&ObjectMap::from([("test_key".into(), Value::from(5))]));