-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add internal events for memory enrichment table
- Loading branch information
Showing
5 changed files
with
265 additions
and
102 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
use crate::enrichment_tables::memory::Memory; | ||
use vector_lib::configurable::configurable_component; | ||
use vector_lib::enrichment::Table; | ||
|
||
use crate::config::EnrichmentTableConfig; | ||
|
||
/// Configuration for the `memory` enrichment table. | ||
#[configurable_component(enrichment_table("memory"))] | ||
#[derive(Clone, Eq, PartialEq)] | ||
pub struct MemoryConfig { | ||
/// TTL (time-to-live), used to limit lifetime of data stored in cache. | ||
/// When TTL expires, data behind a specific key in cache is removed. | ||
/// TTL is reset when replacing the key. | ||
#[serde(default = "default_ttl")] | ||
pub ttl: u64, | ||
/// Scan interval for updating TTL of keys in seconds. This is provided | ||
/// as an optimization, to ensure that TTL is updated, but without doing | ||
/// too many cache scans. | ||
#[serde(default = "default_scan_interval")] | ||
pub scan_interval: u64, | ||
/// Interval for making writes visible in the table. | ||
/// Longer interval might get better performance, | ||
/// but data would be visible in the table after a longer delay. | ||
/// Since every TTL scan makes its changes visible, this value | ||
/// only makes sense if it is shorter than scan_interval | ||
/// | ||
/// By default, all writes are made visible immediately. | ||
#[serde(default = "default_flush_interval")] | ||
pub flush_interval: u64, | ||
} | ||
|
||
impl Default for MemoryConfig { | ||
fn default() -> Self { | ||
Self { | ||
ttl: default_ttl(), | ||
scan_interval: default_scan_interval(), | ||
flush_interval: default_flush_interval(), | ||
} | ||
} | ||
} | ||
|
||
const fn default_ttl() -> u64 { | ||
600 | ||
} | ||
|
||
const fn default_scan_interval() -> u64 { | ||
30 | ||
} | ||
|
||
const fn default_flush_interval() -> u64 { | ||
0 | ||
} | ||
|
||
impl EnrichmentTableConfig for MemoryConfig { | ||
async fn build( | ||
&self, | ||
_globals: &crate::config::GlobalOptions, | ||
) -> crate::Result<Box<dyn Table + Send + Sync>> { | ||
Ok(Box::new(Memory::new(self.clone()))) | ||
} | ||
} | ||
|
||
impl std::fmt::Debug for MemoryConfig { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("MemoryConfig") | ||
.field("ttl", &self.ttl) | ||
.field("scan_interval", &self.scan_interval) | ||
.field("flush_interval", &self.flush_interval) | ||
.finish() | ||
} | ||
} | ||
|
||
impl_generate_config_from_default!(MemoryConfig); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
use metrics::{counter, gauge}; | ||
use vector_lib::internal_event::InternalEvent; | ||
|
||
#[derive(Debug)] | ||
pub(crate) struct MemoryEnrichmentTableRead { | ||
pub table: String, | ||
pub key: String, | ||
} | ||
|
||
impl InternalEvent for MemoryEnrichmentTableRead { | ||
fn emit(self) { | ||
counter!( | ||
"memory_enrichment_table_reads_total", | ||
"table" => self.table, | ||
"key" => self.key | ||
) | ||
.increment(1); | ||
} | ||
|
||
fn name(&self) -> Option<&'static str> { | ||
Some("MemoryEnrichmentTableRead") | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub(crate) struct MemoryEnrichmentTableInserted { | ||
pub table: String, | ||
pub key: String, | ||
} | ||
|
||
impl InternalEvent for MemoryEnrichmentTableInserted { | ||
fn emit(self) { | ||
counter!( | ||
"memory_enrichment_table_insertions_total", | ||
"table" => self.table.clone(), | ||
"key" => self.key | ||
) | ||
.increment(1); | ||
} | ||
|
||
fn name(&self) -> Option<&'static str> { | ||
Some("MemoryEnrichmentTableInserted") | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub(crate) struct MemoryEnrichmentTableFlushed { | ||
pub table: String, | ||
pub new_objects_count: usize, | ||
pub new_byte_size: usize, | ||
} | ||
|
||
impl InternalEvent for MemoryEnrichmentTableFlushed { | ||
fn emit(self) { | ||
counter!( | ||
"memory_enrichment_table_flushes_total", | ||
"table" => self.table.clone(), | ||
) | ||
.increment(1); | ||
gauge!( | ||
"memory_enrichment_table_objects_count", | ||
"table" => self.table.clone() | ||
) | ||
.set(self.new_objects_count as f64); | ||
gauge!( | ||
"memory_enrichment_table_byte_size", | ||
"table" => self.table | ||
) | ||
.set(self.new_byte_size as f64); | ||
} | ||
|
||
fn name(&self) -> Option<&'static str> { | ||
Some("MemoryEnrichmentTableFlushed") | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub(crate) struct MemoryEnrichmentTableTtlExpired { | ||
pub table: String, | ||
pub key: String, | ||
} | ||
|
||
impl InternalEvent for MemoryEnrichmentTableTtlExpired { | ||
fn emit(self) { | ||
counter!( | ||
"memory_enrichment_table_ttl_expirations", | ||
"table" => self.table.clone(), | ||
"key" => self.key | ||
) | ||
.increment(1); | ||
} | ||
|
||
fn name(&self) -> Option<&'static str> { | ||
Some("MemoryEnrichmentTableTtlExpired") | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
pub(crate) struct MemoryEnrichmentTableReadFailed { | ||
pub table: String, | ||
pub key: String, | ||
} | ||
|
||
impl InternalEvent for MemoryEnrichmentTableReadFailed { | ||
fn emit(self) { | ||
counter!( | ||
"memory_enrichment_table_failed_reads", | ||
"table" => self.table, | ||
"key" => self.key | ||
) | ||
.increment(1); | ||
} | ||
|
||
fn name(&self) -> Option<&'static str> { | ||
Some("MemoryEnrichmentTableReadFailed") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
//! Handles enrichment tables for `type = memory`. | ||
mod config; | ||
mod internal_events; | ||
mod table; | ||
|
||
pub use config::*; | ||
pub use table::*; |
Oops, something went wrong.