From 0f14c0d02d5f9bc4ed68236d07d74a70eab13c64 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 21 Jul 2023 14:14:59 -0400 Subject: [PATCH] feat: Migrate LogSchema::message_key to new lookup code (#18024) ## Motivation This part of https://github.com/vectordotdev/vector/issues/13033. ## Summary * `LogSchema::message_key` is now an `OptionalValuePath`. * To avoid hacky `String` to `&'static str` conversions, I changed the `Requirement::meaning` key type to `String`. --- benches/codecs/character_delimited_bytes.rs | 2 +- benches/codecs/newline_bytes.rs | 2 +- lib/codecs/src/decoding/format/bytes.rs | 35 ++--- lib/codecs/src/decoding/format/gelf.rs | 4 +- lib/codecs/src/decoding/format/syslog.rs | 14 +- lib/codecs/src/encoding/format/common.rs | 12 ++ lib/codecs/src/encoding/format/gelf.rs | 17 ++- lib/codecs/src/encoding/format/mod.rs | 1 + lib/codecs/src/encoding/format/raw_message.rs | 20 +-- lib/codecs/src/encoding/format/text.rs | 18 +-- lib/opentelemetry-proto/src/convert.rs | 3 +- lib/vector-core/src/config/log_schema.rs | 33 +++-- lib/vector-core/src/event/log_event.rs | 24 +++- lib/vector-core/src/event/vrl_target.rs | 64 ++++----- lib/vector-core/src/schema/requirement.rs | 49 ++++--- src/codecs/decoding/decoder.rs | 2 +- src/config/mod.rs | 7 +- src/sinks/file/mod.rs | 17 +-- src/sinks/influxdb/logs.rs | 6 +- src/sinks/loki/integration_tests.rs | 9 +- src/sinks/redis.rs | 2 +- src/sinks/splunk_hec/logs/tests.rs | 4 +- src/sinks/websocket/sink.rs | 7 +- src/sources/amqp.rs | 5 +- src/sources/aws_sqs/integration_tests.rs | 3 +- src/sources/aws_sqs/source.rs | 3 +- src/sources/datadog_agent/tests.rs | 2 +- src/sources/demo_logs.rs | 6 +- src/sources/dnstap/mod.rs | 15 +- src/sources/docker_logs/mod.rs | 33 ++++- src/sources/docker_logs/tests.rs | 56 ++++---- src/sources/exec/mod.rs | 29 +++- src/sources/file.rs | 38 +++-- .../file_descriptors/file_descriptor.rs | 9 +- src/sources/file_descriptors/stdin.rs | 16 ++- src/sources/heroku_logs.rs | 14 +- src/sources/http_server.rs | 30 +++- src/sources/journald.rs | 4 +- src/sources/kafka.rs | 2 +- src/sources/kubernetes_logs/mod.rs | 2 +- src/sources/kubernetes_logs/parser/cri.rs | 14 +- src/sources/kubernetes_logs/parser/docker.rs | 23 ++- src/sources/kubernetes_logs/parser/mod.rs | 14 +- .../kubernetes_logs/partial_events_merger.rs | 7 +- .../kubernetes_logs/transform_utils/mod.rs | 13 ++ src/sources/nats.rs | 5 +- .../opentelemetry/integration_tests.rs | 4 +- src/sources/redis/mod.rs | 35 ++++- src/sources/socket/mod.rs | 73 ++++++---- src/sources/splunk_hec/mod.rs | 132 +++++++++++++----- src/sources/util/framestream.rs | 17 +-- src/test_util/mock/transforms/basic.rs | 30 ++-- src/topology/test/mod.rs | 9 +- src/transforms/dedupe.rs | 5 +- src/transforms/sample.rs | 49 ++++--- 55 files changed, 639 insertions(+), 410 deletions(-) create mode 100644 lib/codecs/src/encoding/format/common.rs diff --git a/benches/codecs/character_delimited_bytes.rs b/benches/codecs/character_delimited_bytes.rs index 9e8774edacf3e..90e6f9d996f3c 100644 --- a/benches/codecs/character_delimited_bytes.rs +++ b/benches/codecs/character_delimited_bytes.rs @@ -55,7 +55,7 @@ fn decoding(c: &mut Criterion) { .map(|ml| CharacterDelimitedDecoder::new_with_max_length(b'a', ml)) .unwrap_or(CharacterDelimitedDecoder::new(b'a')), ); - let deserializer = Deserializer::Bytes(BytesDeserializer::new()); + let deserializer = Deserializer::Bytes(BytesDeserializer); let decoder = vector::codecs::Decoder::new(framer, deserializer); (Box::new(decoder), param.input.clone()) diff --git a/benches/codecs/newline_bytes.rs b/benches/codecs/newline_bytes.rs index e7b4bce8a0392..a3fc1751a6df7 100644 --- a/benches/codecs/newline_bytes.rs +++ b/benches/codecs/newline_bytes.rs @@ -53,7 +53,7 @@ fn decoding(c: &mut Criterion) { .map(|ml| NewlineDelimitedDecoder::new_with_max_length(ml)) .unwrap_or(NewlineDelimitedDecoder::new()), ); - let deserializer = Deserializer::Bytes(BytesDeserializer::new()); + let deserializer = Deserializer::Bytes(BytesDeserializer); let decoder = vector::codecs::Decoder::new(framer, deserializer); (Box::new(decoder), param.input.clone()) diff --git a/lib/codecs/src/decoding/format/bytes.rs b/lib/codecs/src/decoding/format/bytes.rs index e27df861cf1e0..cb3d19f1f81d2 100644 --- a/lib/codecs/src/decoding/format/bytes.rs +++ b/lib/codecs/src/decoding/format/bytes.rs @@ -1,5 +1,4 @@ use bytes::Bytes; -use lookup::lookup_v2::parse_value_path; use lookup::OwnedTargetPath; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; @@ -9,6 +8,7 @@ use vector_core::{ event::{Event, LogEvent}, schema, }; +use vrl::path::PathPrefix; use vrl::value::Kind; use super::Deserializer; @@ -25,7 +25,7 @@ impl BytesDeserializerConfig { /// Build the `BytesDeserializer` from this configuration. pub fn build(&self) -> BytesDeserializer { - BytesDeserializer::new() + BytesDeserializer } /// Return the type of event build by this deserializer. @@ -37,7 +37,7 @@ impl BytesDeserializerConfig { pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition { match log_namespace { LogNamespace::Legacy => schema::Definition::empty_legacy_namespace().with_event_field( - &parse_value_path(log_schema().message_key()).expect("valid message key"), + log_schema().message_key().expect("valid message key"), Kind::bytes(), Some("message"), ), @@ -54,32 +54,16 @@ impl BytesDeserializerConfig { /// This deserializer can be considered as the no-op action for input where no /// further decoding has been specified. #[derive(Debug, Clone)] -pub struct BytesDeserializer { - // Only used with the "Legacy" namespace. The "Vector" namespace decodes the data at the root of the event. - log_schema_message_key: &'static str, -} - -impl Default for BytesDeserializer { - fn default() -> Self { - Self::new() - } -} +pub struct BytesDeserializer; impl BytesDeserializer { - /// Creates a new `BytesDeserializer`. - pub fn new() -> Self { - Self { - log_schema_message_key: log_schema().message_key(), - } - } - /// Deserializes the given bytes, which will always produce a single `LogEvent`. pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent { match log_namespace { LogNamespace::Vector => log_namespace.new_log_from_data(bytes), LogNamespace::Legacy => { let mut log = LogEvent::default(); - log.insert(self.log_schema_message_key, bytes); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), bytes); log } } @@ -107,7 +91,7 @@ mod tests { #[test] fn deserialize_bytes_legacy_namespace() { let input = Bytes::from("foo"); - let deserializer = BytesDeserializer::new(); + let deserializer = BytesDeserializer; let events = deserializer.parse(input, LogNamespace::Legacy).unwrap(); let mut events = events.into_iter(); @@ -115,7 +99,10 @@ mod tests { { let event = events.next().unwrap(); let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "foo".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "foo".into() + ); } assert_eq!(events.next(), None); @@ -124,7 +111,7 @@ mod tests { #[test] fn deserialize_bytes_vector_namespace() { let input = Bytes::from("foo"); - let deserializer = BytesDeserializer::new(); + let deserializer = BytesDeserializer; let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); assert_eq!(events.len(), 1); diff --git a/lib/codecs/src/decoding/format/gelf.rs b/lib/codecs/src/decoding/format/gelf.rs index e5b7dbe96c315..f762eb0e151a5 100644 --- a/lib/codecs/src/decoding/format/gelf.rs +++ b/lib/codecs/src/decoding/format/gelf.rs @@ -293,7 +293,7 @@ mod tests { Some(&Value::Bytes(Bytes::from_static(b"example.org"))) ); assert_eq!( - log.get(log_schema().message_key()), + log.get((PathPrefix::Event, log_schema().message_key().unwrap())), Some(&Value::Bytes(Bytes::from_static( b"A short message that helps you identify what is going on" ))) @@ -348,7 +348,7 @@ mod tests { let events = deserialize_gelf_input(&input).unwrap(); assert_eq!(events.len(), 1); let log = events[0].as_log(); - assert!(log.contains(log_schema().message_key())); + assert!(log.contains((PathPrefix::Event, log_schema().message_key().unwrap()))); } // filter out id diff --git a/lib/codecs/src/decoding/format/syslog.rs b/lib/codecs/src/decoding/format/syslog.rs index 336d7c1aa232f..23e412ad75ea2 100644 --- a/lib/codecs/src/decoding/format/syslog.rs +++ b/lib/codecs/src/decoding/format/syslog.rs @@ -1,7 +1,6 @@ use bytes::Bytes; use chrono::{DateTime, Datelike, Utc}; use derivative::Derivative; -use lookup::lookup_v2::parse_value_path; use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix}; use smallvec::{smallvec, SmallVec}; use std::borrow::Cow; @@ -71,7 +70,7 @@ impl SyslogDeserializerConfig { // The `message` field is always defined. If parsing fails, the entire body becomes the // message. .with_event_field( - &parse_value_path(log_schema().message_key()).expect("valid message key"), + log_schema().message_key().expect("valid message key"), Kind::bytes(), Some("message"), ); @@ -429,7 +428,7 @@ fn insert_fields_from_syslog( ) { match log_namespace { LogNamespace::Legacy => { - log.insert(event_path!(log_schema().message_key()), parsed.msg); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), parsed.msg); } LogNamespace::Vector => { log.insert(event_path!("message"), parsed.msg); @@ -500,7 +499,10 @@ mod tests { let events = deserializer.parse(input, LogNamespace::Legacy).unwrap(); assert_eq!(events.len(), 1); - assert_eq!(events[0].as_log()[log_schema().message_key()], "MSG".into()); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "MSG".into() + ); assert!( events[0].as_log()[log_schema().timestamp_key().unwrap().to_string()].is_timestamp() ); @@ -522,8 +524,8 @@ mod tests { fn init() { let mut schema = LogSchema::default(); - schema.set_message_key("legacy_message".to_string()); - schema.set_message_key("legacy_timestamp".to_string()); + schema.set_message_key(Some(owned_value_path!("legacy_message"))); + schema.set_message_key(Some(owned_value_path!("legacy_timestamp"))); init_log_schema(schema, false); } } diff --git a/lib/codecs/src/encoding/format/common.rs b/lib/codecs/src/encoding/format/common.rs new file mode 100644 index 0000000000000..44d7b3e44c88b --- /dev/null +++ b/lib/codecs/src/encoding/format/common.rs @@ -0,0 +1,12 @@ +use vector_core::config::log_schema; +use vector_core::schema; +use vrl::value::Kind; + +/// Inspect the global log schema and create a schema requirement. +pub fn get_serializer_schema_requirement() -> schema::Requirement { + if let Some(message_key) = log_schema().message_key() { + schema::Requirement::empty().required_meaning(message_key.to_string(), Kind::any()) + } else { + schema::Requirement::empty() + } +} diff --git a/lib/codecs/src/encoding/format/gelf.rs b/lib/codecs/src/encoding/format/gelf.rs index 3a0c6d0461991..7cce8cafc0d47 100644 --- a/lib/codecs/src/encoding/format/gelf.rs +++ b/lib/codecs/src/encoding/format/gelf.rs @@ -12,6 +12,7 @@ use vector_core::{ event::Value, schema, }; +use vrl::path::PathPrefix; /// On GELF encoding behavior: /// Graylog has a relaxed parsing. They are much more lenient than the spec would @@ -138,13 +139,15 @@ fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result err_missing_field(HOST)?; } - let message_key = log_schema().message_key(); if !log.contains(SHORT_MESSAGE) { - // rename the log_schema().message_key() to SHORT_MESSAGE - if log.contains(message_key) { - log.rename_key(message_key, SHORT_MESSAGE); - } else { - err_missing_field(SHORT_MESSAGE)?; + if let Some(message_key) = log_schema().message_key() { + // rename the log_schema().message_key() to SHORT_MESSAGE + let target_path = (PathPrefix::Event, message_key); + if log.contains(target_path) { + log.rename_key(target_path, SHORT_MESSAGE); + } else { + err_missing_field(SHORT_MESSAGE)?; + } } } Ok(log) @@ -329,7 +332,7 @@ mod tests { let event_fields = btreemap! { VERSION => "1.1", HOST => "example.org", - log_schema().message_key() => "Some message", + log_schema().message_key().unwrap().to_string() => "Some message", }; let jsn = do_serialize(true, event_fields).unwrap(); diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 1d4e008380516..efff723f65b46 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -4,6 +4,7 @@ #![deny(missing_docs)] mod avro; +mod common; mod csv; mod gelf; mod json; diff --git a/lib/codecs/src/encoding/format/raw_message.rs b/lib/codecs/src/encoding/format/raw_message.rs index b955ae8355dc6..1f341295c12ea 100644 --- a/lib/codecs/src/encoding/format/raw_message.rs +++ b/lib/codecs/src/encoding/format/raw_message.rs @@ -1,12 +1,8 @@ +use crate::encoding::format::common::get_serializer_schema_requirement; use bytes::{BufMut, BytesMut}; use serde::{Deserialize, Serialize}; use tokio_util::codec::Encoder; -use vector_core::{ - config::{log_schema, DataType}, - event::Event, - schema, -}; -use vrl::value::Kind; +use vector_core::{config::DataType, event::Event, schema}; /// Config used to build a `RawMessageSerializer`. #[derive(Debug, Clone, Default, Deserialize, Serialize)] @@ -30,7 +26,7 @@ impl RawMessageSerializerConfig { /// The schema required by the serializer. pub fn schema_requirement(&self) -> schema::Requirement { - schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any()) + get_serializer_schema_requirement() } } @@ -49,18 +45,10 @@ impl Encoder for RawMessageSerializer { type Error = vector_common::Error; fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { - let message_key = log_schema().message_key(); - let log = event.as_log(); - - if let Some(bytes) = log - .get_by_meaning(message_key) - .or_else(|| log.get(message_key)) - .map(|value| value.coerce_to_bytes()) - { + if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) { buffer.put(bytes); } - Ok(()) } } diff --git a/lib/codecs/src/encoding/format/text.rs b/lib/codecs/src/encoding/format/text.rs index 0b9e49a44b976..3820c606da2b7 100644 --- a/lib/codecs/src/encoding/format/text.rs +++ b/lib/codecs/src/encoding/format/text.rs @@ -1,11 +1,7 @@ +use crate::encoding::format::common::get_serializer_schema_requirement; use bytes::{BufMut, BytesMut}; use tokio_util::codec::Encoder; -use vector_core::{ - config::{log_schema, DataType}, - event::Event, - schema, -}; -use vrl::value::Kind; +use vector_core::{config::DataType, event::Event, schema}; use crate::MetricTagValues; @@ -42,7 +38,7 @@ impl TextSerializerConfig { /// The schema required by the serializer. pub fn schema_requirement(&self) -> schema::Requirement { - schema::Requirement::empty().required_meaning(log_schema().message_key(), Kind::any()) + get_serializer_schema_requirement() } } @@ -67,15 +63,9 @@ impl Encoder for TextSerializer { type Error = vector_common::Error; fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { - let message_key = log_schema().message_key(); - match event { Event::Log(log) => { - if let Some(bytes) = log - .get_by_meaning(message_key) - .or_else(|| log.get(message_key)) - .map(|value| value.coerce_to_bytes()) - { + if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) { buffer.put(bytes); } } diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index 057561df755bb..4b9f4804b5dcd 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -7,6 +7,7 @@ use vector_core::{ config::{log_schema, LegacyKey, LogNamespace}, event::{Event, LogEvent}, }; +use vrl::path::PathPrefix; use vrl::value::Value; use super::proto::{ @@ -94,7 +95,7 @@ impl ResourceLog { LogNamespace::Legacy => { let mut log = LogEvent::default(); if let Some(v) = self.log_record.body.and_then(|av| av.value) { - log.insert(log_schema().message_key(), v); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), v); } log } diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index 8b23b1be7b1c6..88bec5a328674 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -2,11 +2,16 @@ use lookup::lookup_v2::OptionalValuePath; use lookup::{OwnedTargetPath, OwnedValuePath}; use once_cell::sync::{Lazy, OnceCell}; use vector_config::configurable_component; -use vrl::path::parse_target_path; static LOG_SCHEMA: OnceCell = OnceCell::new(); static LOG_SCHEMA_DEFAULT: Lazy = Lazy::new(LogSchema::default); +const MESSAGE: &str = "message"; +const TIMESTAMP: &str = "timestamp"; +const HOST: &str = "host"; +const SOURCE_TYPE: &str = "source_type"; +const METADATA: &str = "metadata"; + /// Loads Log Schema from configurations and sets global schema. Once this is /// done, configurations can be correctly loaded using configured log schema /// defaults. @@ -44,7 +49,7 @@ pub struct LogSchema { /// /// This would be the field that holds the raw message, such as a raw log line. #[serde(default = "LogSchema::default_message_key")] - message_key: String, + message_key: OptionalValuePath, /// The name of the event field to treat as the event timestamp. #[serde(default = "LogSchema::default_timestamp_key")] @@ -84,28 +89,28 @@ impl Default for LogSchema { } impl LogSchema { - fn default_message_key() -> String { - String::from("message") + fn default_message_key() -> OptionalValuePath { + OptionalValuePath::new(MESSAGE) } fn default_timestamp_key() -> OptionalValuePath { - OptionalValuePath::new("timestamp") + OptionalValuePath::new(TIMESTAMP) } fn default_host_key() -> OptionalValuePath { - OptionalValuePath::new("host") + OptionalValuePath::new(HOST) } fn default_source_type_key() -> OptionalValuePath { - OptionalValuePath::new("source_type") + OptionalValuePath::new(SOURCE_TYPE) } fn default_metadata_key() -> String { - String::from("metadata") + String::from(METADATA) } - pub fn message_key(&self) -> &str { - &self.message_key + pub fn message_key(&self) -> Option<&OwnedValuePath> { + self.message_key.path.as_ref() } /// Returns an `OwnedTargetPath` of the message key. @@ -114,7 +119,7 @@ impl LogSchema { /// This should only be used where the result will either be cached, /// or performance isn't critical, since this requires parsing / memory allocation. pub fn owned_message_path(&self) -> OwnedTargetPath { - parse_target_path(self.message_key()).expect("valid message key") + OwnedTargetPath::event(self.message_key.clone().path.expect("valid message key")) } pub fn timestamp_key(&self) -> Option<&OwnedValuePath> { @@ -133,8 +138,8 @@ impl LogSchema { &self.metadata_key } - pub fn set_message_key(&mut self, v: String) { - self.message_key = v; + pub fn set_message_key(&mut self, path: Option) { + self.message_key = OptionalValuePath { path }; } pub fn set_timestamp_key(&mut self, v: Option) { @@ -176,7 +181,7 @@ impl LogSchema { { errors.push("conflicting values for 'log_schema.message_key' found".to_owned()); } else { - self.set_message_key(other.message_key().to_string()); + self.set_message_key(other.message_key().cloned()); } if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key() && self.timestamp_key() != other.timestamp_key() diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index d615a5ba340c9..703546404b985 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -20,6 +20,7 @@ use vector_common::{ request_metadata::GetEventCountTags, EventDataEq, }; +use vrl::path::OwnedValuePath; use super::{ estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, @@ -150,7 +151,8 @@ impl LogEvent { /// valid for `LogNamespace::Legacy` pub fn from_str_legacy(msg: impl Into) -> Self { let mut log = LogEvent::default(); - log.insert(log_schema().message_key(), msg.into()); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), msg.into()); + if let Some(timestamp_key) = log_schema().timestamp_key() { log.insert((PathPrefix::Event, timestamp_key), Utc::now()); } @@ -346,6 +348,17 @@ impl LogEvent { } } + pub fn maybe_insert( + &mut self, + prefix: PathPrefix, + path: Option<&OwnedValuePath>, + value: impl Into, + ) { + if let Some(path) = path { + self.insert((prefix, path), value); + } + } + // deprecated - using this means the schema is unknown pub fn try_insert<'a>(&mut self, path: impl TargetPath<'a>, value: impl Into) { if !self.contains(path.clone()) { @@ -444,7 +457,7 @@ impl LogEvent { pub fn message_path(&self) -> Option { match self.namespace() { LogNamespace::Vector => self.find_key_by_meaning("message"), - LogNamespace::Legacy => Some(log_schema().message_key().to_owned()), + LogNamespace::Legacy => log_schema().message_key().map(ToString::to_string), } } @@ -486,7 +499,9 @@ impl LogEvent { pub fn get_message(&self) -> Option<&Value> { match self.namespace() { LogNamespace::Vector => self.get_by_meaning("message"), - LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().message_key())), + LogNamespace::Legacy => log_schema() + .message_key() + .and_then(|key| self.get((PathPrefix::Event, key))), } } @@ -556,8 +571,7 @@ mod test_utils { impl From for LogEvent { fn from(message: Bytes) -> Self { let mut log = LogEvent::default(); - - log.insert(log_schema().message_key(), message); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message); if let Some(timestamp_key) = log_schema().timestamp_key() { log.insert((PathPrefix::Event, timestamp_key), Utc::now()); } diff --git a/lib/vector-core/src/event/vrl_target.rs b/lib/vector-core/src/event/vrl_target.rs index 3ca714be778f0..bbd2f6d1c9754 100644 --- a/lib/vector-core/src/event/vrl_target.rs +++ b/lib/vector-core/src/event/vrl_target.rs @@ -52,6 +52,12 @@ pub struct TargetIter { _marker: PhantomData, } +fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent { + let mut log = LogEvent::new_with_metadata(metadata); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), value); + log +} + impl Iterator for TargetIter { type Item = Event; @@ -59,11 +65,7 @@ impl Iterator for TargetIter { self.iter.next().map(|v| { match v { value @ Value::Object(_) => LogEvent::from_parts(value, self.metadata.clone()), - value => { - let mut log = LogEvent::new_with_metadata(self.metadata.clone()); - log.insert(log_schema().message_key(), value); - log - } + value => create_log_event(value, self.metadata.clone()), } .into() }) @@ -79,11 +81,7 @@ impl Iterator for TargetIter { value @ Value::Object(_) => { TraceEvent::from(LogEvent::from_parts(value, self.metadata.clone())) } - value => { - let mut log = LogEvent::new_with_metadata(self.metadata.clone()); - log.insert(log_schema().message_key(), value); - TraceEvent::from(log) - } + value => TraceEvent::from(create_log_event(value, self.metadata.clone())), } .into() }) @@ -150,11 +148,7 @@ impl VrlTarget { LogNamespace::Vector => { TargetEvents::One(LogEvent::from_parts(v, metadata).into()) } - LogNamespace::Legacy => { - let mut log = LogEvent::new_with_metadata(metadata); - log.insert(log_schema().message_key(), v); - TargetEvents::One(log.into()) - } + LogNamespace::Legacy => TargetEvents::One(create_log_event(v, metadata).into()), }, }, VrlTarget::Trace(value, metadata) => match value { @@ -169,11 +163,7 @@ impl VrlTarget { _marker: PhantomData, }), - v => { - let mut log = LogEvent::new_with_metadata(metadata); - log.insert(log_schema().message_key(), v); - TargetEvents::One(log.into()) - } + v => TargetEvents::One(create_log_event(v, metadata).into()), }, VrlTarget::Metric { metric, .. } => TargetEvents::One(Event::Metric(metric)), } @@ -202,22 +192,24 @@ fn move_field_definitions_into_message(mut definition: Definition) -> Definition message.remove_array(); if !message.is_never() { - // We need to add the given message type to a field called `message` - // in the event. - let message = Kind::object(Collection::from(BTreeMap::from([( - log_schema().message_key().into(), - message, - )]))); - - definition.event_kind_mut().remove_bytes(); - definition.event_kind_mut().remove_integer(); - definition.event_kind_mut().remove_float(); - definition.event_kind_mut().remove_boolean(); - definition.event_kind_mut().remove_timestamp(); - definition.event_kind_mut().remove_regex(); - definition.event_kind_mut().remove_null(); - - *definition.event_kind_mut() = definition.event_kind().union(message); + if let Some(message_key) = log_schema().message_key() { + // We need to add the given message type to a field called `message` + // in the event. + let message = Kind::object(Collection::from(BTreeMap::from([( + message_key.to_string().into(), + message, + )]))); + + definition.event_kind_mut().remove_bytes(); + definition.event_kind_mut().remove_integer(); + definition.event_kind_mut().remove_float(); + definition.event_kind_mut().remove_boolean(); + definition.event_kind_mut().remove_timestamp(); + definition.event_kind_mut().remove_regex(); + definition.event_kind_mut().remove_null(); + + *definition.event_kind_mut() = definition.event_kind().union(message); + } } definition diff --git a/lib/vector-core/src/schema/requirement.rs b/lib/vector-core/src/schema/requirement.rs index 6b3721d606b21..b42e97009ab8d 100644 --- a/lib/vector-core/src/schema/requirement.rs +++ b/lib/vector-core/src/schema/requirement.rs @@ -14,7 +14,7 @@ use super::Definition; #[derive(Debug, Clone, PartialEq)] pub struct Requirement { /// Semantic meanings configured for this requirement. - meaning: BTreeMap<&'static str, SemanticMeaning>, + meaning: BTreeMap, } /// The semantic meaning of an event. @@ -52,7 +52,7 @@ impl Requirement { /// Add a restriction to the schema. #[must_use] - pub fn required_meaning(mut self, meaning: &'static str, kind: Kind) -> Self { + pub fn required_meaning(mut self, meaning: impl Into, kind: Kind) -> Self { self.insert_meaning(meaning, kind, false); self } @@ -63,14 +63,14 @@ impl Requirement { /// specified meaning defined, but invalid for that meaning to be defined, but its [`Kind`] not /// matching the configured expectation. #[must_use] - pub fn optional_meaning(mut self, meaning: &'static str, kind: Kind) -> Self { + pub fn optional_meaning(mut self, meaning: impl Into, kind: Kind) -> Self { self.insert_meaning(meaning, kind, true); self } - fn insert_meaning(&mut self, identifier: &'static str, kind: Kind, optional: bool) { + fn insert_meaning(&mut self, identifier: impl Into, kind: Kind, optional: bool) { let meaning = SemanticMeaning { kind, optional }; - self.meaning.insert(identifier, meaning); + self.meaning.insert(identifier.into(), meaning); } /// Validate the provided [`Definition`] against the current requirement. @@ -97,7 +97,10 @@ impl Requirement { // Check if we're dealing with an invalid meaning, meaning the definition has a single // meaning identifier pointing to multiple paths. if let Some(paths) = definition.invalid_meaning(identifier).cloned() { - errors.push(ValidationError::MeaningDuplicate { identifier, paths }); + errors.push(ValidationError::MeaningDuplicate { + identifier: identifier.clone(), + paths, + }); continue; } @@ -118,14 +121,16 @@ impl Requirement { // The semantic meaning kind does not match the expected // kind, so we can't use it in the sink. errors.push(ValidationError::MeaningKind { - identifier, + identifier: identifier.clone(), want: req_meaning.kind.clone(), got: definition_kind, }); } } None if !req_meaning.optional => { - errors.push(ValidationError::MeaningMissing { identifier }); + errors.push(ValidationError::MeaningMissing { + identifier: identifier.clone(), + }); } _ => {} } @@ -176,18 +181,18 @@ impl std::fmt::Display for ValidationErrors { #[allow(clippy::enum_variant_names)] pub enum ValidationError { /// A required semantic meaning is missing. - MeaningMissing { identifier: &'static str }, + MeaningMissing { identifier: String }, /// A semantic meaning has an invalid `[Kind]`. MeaningKind { - identifier: &'static str, + identifier: String, want: Kind, got: Kind, }, /// A semantic meaning is pointing to multiple paths. MeaningDuplicate { - identifier: &'static str, + identifier: String, paths: BTreeSet, }, } @@ -301,7 +306,9 @@ mod tests { TestCase { requirement: Requirement::empty().required_meaning("foo", Kind::any()), definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()), - errors: vec![ValidationError::MeaningMissing { identifier: "foo" }], + errors: vec![ValidationError::MeaningMissing { + identifier: "foo".into(), + }], }, ), ( @@ -312,8 +319,12 @@ mod tests { .required_meaning("bar", Kind::any()), definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()), errors: vec![ - ValidationError::MeaningMissing { identifier: "bar" }, - ValidationError::MeaningMissing { identifier: "foo" }, + ValidationError::MeaningMissing { + identifier: "bar".into(), + }, + ValidationError::MeaningMissing { + identifier: "foo".into(), + }, ], }, ), @@ -332,7 +343,9 @@ mod tests { .optional_meaning("foo", Kind::any()) .required_meaning("bar", Kind::any()), definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()), - errors: vec![ValidationError::MeaningMissing { identifier: "bar" }], + errors: vec![ValidationError::MeaningMissing { + identifier: "bar".into(), + }], }, ), ( @@ -342,7 +355,7 @@ mod tests { definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()) .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo")), errors: vec![ValidationError::MeaningKind { - identifier: "foo", + identifier: "foo".into(), want: Kind::boolean(), got: Kind::integer(), }], @@ -355,7 +368,7 @@ mod tests { definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()) .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo")), errors: vec![ValidationError::MeaningKind { - identifier: "foo", + identifier: "foo".into(), want: Kind::boolean(), got: Kind::integer(), }], @@ -376,7 +389,7 @@ mod tests { ), ), errors: vec![ValidationError::MeaningDuplicate { - identifier: "foo", + identifier: "foo".into(), paths: BTreeSet::from([ parse_target_path("foo").unwrap(), parse_target_path("bar").unwrap(), diff --git a/src/codecs/decoding/decoder.rs b/src/codecs/decoding/decoder.rs index ecf19c17d715e..cf4749ca64d58 100644 --- a/src/codecs/decoding/decoder.rs +++ b/src/codecs/decoding/decoder.rs @@ -27,7 +27,7 @@ impl Default for Decoder { fn default() -> Self { Self { framer: Framer::NewlineDelimited(NewlineDelimitedDecoder::new()), - deserializer: Deserializer::Bytes(BytesDeserializer::new()), + deserializer: Deserializer::Bytes(BytesDeserializer), log_namespace: LogNamespace::Legacy, } } diff --git a/src/config/mod.rs b/src/config/mod.rs index 799e8f6a3c7c3..86b7f7dde7238 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -849,7 +849,7 @@ mod tests { ); assert_eq!( "message", - config.global.log_schema.message_key().to_string() + config.global.log_schema.message_key().unwrap().to_string() ); assert_eq!( "timestamp", @@ -886,7 +886,10 @@ mod tests { "this", config.global.log_schema.host_key().unwrap().to_string() ); - assert_eq!("that", config.global.log_schema.message_key().to_string()); + assert_eq!( + "that", + config.global.log_schema.message_key().unwrap().to_string() + ); assert_eq!( "then", config diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 04f166aa986fb..3a2261e517b3e 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -556,36 +556,37 @@ mod tests { lines_from_file(directory.join("errors-2019-29-07.log")), ]; + let message_key = log_schema().message_key().unwrap().to_string(); assert_eq!( - input[0].as_log()[log_schema().message_key()], + input[0].as_log()[&message_key], From::<&str>::from(&output[0][0]) ); assert_eq!( - input[1].as_log()[log_schema().message_key()], + input[1].as_log()[&message_key], From::<&str>::from(&output[1][0]) ); assert_eq!( - input[2].as_log()[log_schema().message_key()], + input[2].as_log()[&message_key], From::<&str>::from(&output[0][1]) ); assert_eq!( - input[3].as_log()[log_schema().message_key()], + input[3].as_log()[&message_key], From::<&str>::from(&output[3][0]) ); assert_eq!( - input[4].as_log()[log_schema().message_key()], + input[4].as_log()[&message_key], From::<&str>::from(&output[2][0]) ); assert_eq!( - input[5].as_log()[log_schema().message_key()], + input[5].as_log()[&message_key], From::<&str>::from(&output[2][1]) ); assert_eq!( - input[6].as_log()[log_schema().message_key()], + input[6].as_log()[&message_key], From::<&str>::from(&output[4][0]) ); assert_eq!( - input[7].as_log()[log_schema().message_key()], + input[7].as_log()[message_key], From::<&str>::from(&output[5][0]) ); } diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index 78ed0155367ba..cea355dabb487 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -6,7 +6,7 @@ use http::{Request, Uri}; use indoc::indoc; use vrl::value::Kind; -use lookup::lookup_v2::{parse_value_path, OptionalValuePath}; +use lookup::lookup_v2::OptionalValuePath; use lookup::{OwnedValuePath, PathPrefix}; use vector_config::configurable_component; use vector_core::config::log_schema; @@ -198,7 +198,9 @@ impl SinkConfig for InfluxDbLogsConfig { .clone() .and_then(|k| k.path) .unwrap_or_else(|| { - parse_value_path(log_schema().message_key()) + log_schema() + .message_key() + .cloned() .expect("global log_schema.message_key to be valid path") }); diff --git a/src/sinks/loki/integration_tests.rs b/src/sinks/loki/integration_tests.rs index 245c8ebcb2c7b..8f4232de6fdcf 100644 --- a/src/sinks/loki/integration_tests.rs +++ b/src/sinks/loki/integration_tests.rs @@ -10,6 +10,7 @@ use vector_core::{ config::LogNamespace, event::{BatchNotifier, BatchStatus, Event, LogEvent}, }; +use vrl::path::PathPrefix; use vrl::value::{kind::Collection, Kind}; use super::config::{LokiConfig, OutOfOrderAction}; @@ -327,7 +328,7 @@ async fn many_streams() { let index = (i % 5) * 2; let message = lines[index] .as_log() - .get(log_schema().message_key()) + .get((PathPrefix::Event, log_schema().message_key().unwrap())) .unwrap() .to_string_lossy(); assert_eq!(output, &message); @@ -337,7 +338,7 @@ async fn many_streams() { let index = ((i % 5) * 2) + 1; let message = lines[index] .as_log() - .get(log_schema().message_key()) + .get((PathPrefix::Event, log_schema().message_key().unwrap())) .unwrap() .to_string_lossy(); assert_eq!(output, &message); @@ -384,7 +385,7 @@ async fn interpolate_stream_key() { for (i, output) in outputs.iter().enumerate() { let message = lines[i] .as_log() - .get(log_schema().message_key()) + .get((PathPrefix::Event, log_schema().message_key().unwrap())) .unwrap() .to_string_lossy(); assert_eq!(output, &message); @@ -637,7 +638,7 @@ async fn test_out_of_order_events( assert_eq!( &expected[i] .as_log() - .get(log_schema().message_key()) + .get((PathPrefix::Event, log_schema().message_key().unwrap())) .unwrap() .to_string_lossy(), output, diff --git a/src/sinks/redis.rs b/src/sinks/redis.rs index 402ad28aee906..8960edd8171fb 100644 --- a/src/sinks/redis.rs +++ b/src/sinks/redis.rs @@ -429,7 +429,7 @@ mod tests { .item .value; let map: HashMap = serde_json::from_slice(&result[..]).unwrap(); - assert_eq!(msg, map[&log_schema().message_key().to_string()]); + assert_eq!(msg, map[&log_schema().message_key().unwrap().to_string()]); } #[test] diff --git a/src/sinks/splunk_hec/logs/tests.rs b/src/sinks/splunk_hec/logs/tests.rs index 898f39af5a63c..308a54fabcc6c 100644 --- a/src/sinks/splunk_hec/logs/tests.rs +++ b/src/sinks/splunk_hec/logs/tests.rs @@ -152,7 +152,9 @@ fn splunk_encode_log_event_json() { assert_eq!(event.get("key").unwrap(), &serde_json::Value::from("value")); assert_eq!(event.get("int_val").unwrap(), &serde_json::Value::from(123)); assert_eq!( - event.get(&log_schema().message_key().to_string()).unwrap(), + event + .get(&log_schema().message_key().unwrap().to_string()) + .unwrap(), &serde_json::Value::from("hello world") ); assert!(event diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 6acbf99967b43..db5dd364e6b7e 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -521,10 +521,13 @@ mod tests { let output = receiver.await; assert_eq!(lines.len(), output.len()); - let message_key = crate::config::log_schema().message_key(); + let message_key = crate::config::log_schema() + .message_key() + .expect("global log_schema.message_key to be valid path") + .to_string(); for (source, received) in lines.iter().zip(output) { let json = serde_json::from_str::(&received).expect("Invalid JSON"); - let received = json.get(message_key).unwrap().as_str().unwrap(); + let received = json.get(message_key.as_str()).unwrap().as_str().unwrap(); assert_eq!(source, received); } } diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index bbd93d1c81aeb..99346ead6e790 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -711,7 +711,10 @@ mod integration_test { let log = events[0].as_log(); trace!("{:?}", log); - assert_eq!(log[log_schema().message_key()], "my message".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "my message".into() + ); assert_eq!(log["routing"], routing_key.into()); assert_eq!( log[log_schema().source_type_key().unwrap().to_string()], diff --git a/src/sources/aws_sqs/integration_tests.rs b/src/sources/aws_sqs/integration_tests.rs index 62ba72cbe63b0..be2cc6d67e938 100644 --- a/src/sources/aws_sqs/integration_tests.rs +++ b/src/sources/aws_sqs/integration_tests.rs @@ -7,6 +7,7 @@ use aws_sdk_sqs::output::CreateQueueOutput; use aws_types::region::Region; use futures::StreamExt; use tokio::time::timeout; +use vrl::path::PathPrefix; use crate::{ aws::{auth::AwsAuthentication, region::RegionOrEndpoint}, @@ -109,7 +110,7 @@ pub(crate) async fn test() { for event in events { let message = event .as_log() - .get(log_schema().message_key()) + .get((PathPrefix::Event, log_schema().message_key().unwrap())) .unwrap() .to_string_lossy(); if !expected_messages.remove(message.as_ref()) { diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index f05784ab4e5cb..2f8f9634317b1 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -222,6 +222,7 @@ mod tests { use crate::codecs::DecodingConfig; use chrono::SecondsFormat; use lookup::path; + use vrl::path::PathPrefix; use super::*; use crate::config::{log_schema, SourceConfig}; @@ -312,7 +313,7 @@ mod tests { events[0] .clone() .as_log() - .get(log_schema().message_key()) + .get((PathPrefix::Event, log_schema().message_key().unwrap())) .unwrap() .to_string_lossy(), message diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index 9647688279027..992a35a1d4078 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -85,7 +85,7 @@ fn test_decode_log_body() { let api_key = None; let decoder = crate::codecs::Decoder::new( Framer::Bytes(BytesDecoder::new()), - Deserializer::Bytes(BytesDeserializer::new()), + Deserializer::Bytes(BytesDeserializer), ); let source = DatadogAgentSource::new( diff --git a/src/sources/demo_logs.rs b/src/sources/demo_logs.rs index 9d0454ed4f933..35044dd80a0bd 100644 --- a/src/sources/demo_logs.rs +++ b/src/sources/demo_logs.rs @@ -399,7 +399,7 @@ mod tests { #[tokio::test] async fn shuffle_demo_logs_copies_lines() { - let message_key = log_schema().message_key(); + let message_key = log_schema().message_key().unwrap().to_string(); let mut rx = runit( r#"format = "shuffle" lines = ["one", "two", "three", "four"] @@ -439,7 +439,7 @@ mod tests { #[tokio::test] async fn shuffle_demo_logs_adds_sequence() { - let message_key = log_schema().message_key(); + let message_key = log_schema().message_key().unwrap().to_string(); let mut rx = runit( r#"format = "shuffle" lines = ["one", "two"] @@ -539,7 +539,7 @@ mod tests { #[tokio::test] async fn json_format_generates_output() { - let message_key = log_schema().message_key(); + let message_key = log_schema().message_key().unwrap().to_string(); let mut rx = runit( r#"format = "json" count = 5"#, diff --git a/src/sources/dnstap/mod.rs b/src/sources/dnstap/mod.rs index 0bd44b4b5a2ea..529abc5c7134d 100644 --- a/src/sources/dnstap/mod.rs +++ b/src/sources/dnstap/mod.rs @@ -127,14 +127,15 @@ impl DnstapConfig { let schema = vector_core::schema::Definition::empty_legacy_namespace(); if self.raw_data_only.unwrap_or(false) { - schema.with_event_field( - &owned_value_path!(log_schema().message_key()), - Kind::bytes(), - Some("message"), - ) - } else { - event_schema.schema_definition(schema) + if let Some(message_key) = log_schema().message_key() { + return schema.with_event_field( + message_key, + Kind::bytes(), + Some("message"), + ); + } } + event_schema.schema_definition(schema) } LogNamespace::Vector => { let schema = vector_core::schema::Definition::new_with_default_metadata( diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 03afcc4229fd8..bd32dfac4a431 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -1059,7 +1059,7 @@ impl ContainerLogInfo { }; // Build the log. - let deserializer = BytesDeserializer::new(); + let deserializer = BytesDeserializer; let mut log = deserializer.parse_single(bytes_message, log_namespace); // Container ID @@ -1166,7 +1166,10 @@ impl ContainerLogInfo { LogNamespace::Legacy => { partial_event_merge_state.merge_in_next_event( log, - &[log_schema().message_key().to_string()], + &[log_schema() + .message_key() + .expect("global log_schema.message_key to be valid path") + .to_string()], ); } } @@ -1187,8 +1190,13 @@ impl ContainerLogInfo { LogNamespace::Vector => { partial_event_merge_state.merge_in_final_event(log, &["."]) } - LogNamespace::Legacy => partial_event_merge_state - .merge_in_final_event(log, &[log_schema().message_key().to_string()]), + LogNamespace::Legacy => partial_event_merge_state.merge_in_final_event( + log, + &[log_schema() + .message_key() + .expect("global log_schema.message_key to be valid path") + .to_string()], + ), }, None => log, } @@ -1264,7 +1272,12 @@ fn line_agg_adapter( let message_value = match log_namespace { LogNamespace::Vector => log.remove(".").expect("`.` must exist in the event"), LogNamespace::Legacy => log - .remove(log_schema().message_key()) + .remove(( + PathPrefix::Event, + log_schema() + .message_key() + .expect("global log_schema.message_key to be valid path"), + )) .expect("`message` must exist in the event"), }; let stream_value = match log_namespace { @@ -1282,7 +1295,15 @@ fn line_agg_adapter( line_agg_out.map(move |(_, message, mut log)| { match log_namespace { LogNamespace::Vector => log.insert(".", message), - LogNamespace::Legacy => log.insert(log_schema().message_key(), message), + LogNamespace::Legacy => log.insert( + ( + PathPrefix::Event, + log_schema() + .message_key() + .expect("global log_schema.message_key to be valid path"), + ), + message, + ), }; log }) diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index d1f55d885e996..11738dca029e9 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -279,7 +279,7 @@ mod integration_tests { // Wait for before message let events = collect_n(out, 1).await; assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "before".into() ); @@ -346,7 +346,7 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], message.into() ); }) @@ -440,7 +440,10 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); let log = events[0].as_log(); - assert_eq!(log[log_schema().message_key()], message.into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert_eq!(log[CONTAINER], id.into()); assert!(log.get(CREATED_AT).is_some()); assert_eq!(log[IMAGE], "busybox".into()); @@ -479,15 +482,10 @@ mod integration_tests { let definition = schema_definitions.unwrap(); definition.assert_valid_for_event(&events[0]); - assert_eq!( - events[0].as_log()[log_schema().message_key()], - message.into() - ); + let message_key = log_schema().message_key().unwrap().to_string(); + assert_eq!(events[0].as_log()[&message_key], message.into()); definition.assert_valid_for_event(&events[1]); - assert_eq!( - events[1].as_log()[log_schema().message_key()], - message.into() - ); + assert_eq!(events[1].as_log()[message_key], message.into()); }) .await; } @@ -521,7 +519,7 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], message.into() ); }) @@ -567,18 +565,13 @@ mod integration_tests { assert_eq!(events.len(), 2); let definition = schema_definitions.unwrap(); - definition.assert_valid_for_event(&events[0]); - assert_eq!( - events[0].as_log()[log_schema().message_key()], - will_be_read.into() - ); + + let message_key = log_schema().message_key().unwrap().to_string(); + assert_eq!(events[0].as_log()[&message_key], will_be_read.into()); definition.assert_valid_for_event(&events[1]); - assert_eq!( - events[1].as_log()[log_schema().message_key()], - will_be_read.into() - ); + assert_eq!(events[1].as_log()[message_key], will_be_read.into()); }) .await; } @@ -613,7 +606,7 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], message.into() ); }) @@ -647,7 +640,10 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); let log = events[0].as_log(); - assert_eq!(log[log_schema().message_key()], message.into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert_eq!(log[CONTAINER], id.into()); assert!(log.get(CREATED_AT).is_some()); assert_eq!(log[IMAGE], "busybox".into()); @@ -692,7 +688,7 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], message.into() ); }) @@ -782,7 +778,10 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); let log = events[0].as_log(); - assert_eq!(log[log_schema().message_key()], message.into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert_eq!(log[CONTAINER], id.into()); assert!(log.get(CREATED_AT).is_some()); assert_eq!(log[IMAGE], "busybox".into()); @@ -831,7 +830,10 @@ mod integration_tests { .unwrap() .assert_valid_for_event(&events[0]); let log = events[0].as_log(); - assert_eq!(log[log_schema().message_key()], message.into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + message.into() + ); }) .await; } @@ -968,7 +970,7 @@ mod integration_tests { event .into_log() - .remove(crate::config::log_schema().message_key()) + .remove((PathPrefix::Event, log_schema().message_key().unwrap())) .unwrap() .to_string_lossy() .into_owned() diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 74720f9eeb22f..dbb64b53f90fa 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -766,7 +766,10 @@ mod tests { assert_eq!(log[STREAM_KEY], STDOUT.into()); assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); - assert_eq!(log[log_schema().message_key()], "hello world".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "hello world".into() + ); assert_eq!( log[log_schema().source_type_key().unwrap().to_string()], "exec".into() @@ -853,7 +856,10 @@ mod tests { assert_eq!(log[STREAM_KEY], STDOUT.into()); assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); - assert_eq!(log[log_schema().message_key()], "hello world".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "hello world".into() + ); assert_eq!( log[log_schema().source_type_key().unwrap().to_string()], "exec".into() @@ -964,7 +970,7 @@ mod tests { assert_eq!(events.len(), 1); let log = events[0].as_log(); assert_eq!( - log[log_schema().message_key()], + log[log_schema().message_key().unwrap().to_string()], Bytes::from("hello world").into() ); assert_eq!(origin, STDOUT); @@ -976,7 +982,7 @@ mod tests { assert_eq!(events.len(), 1); let log = events[0].as_log(); assert_eq!( - log[log_schema().message_key()], + log[log_schema().message_key().unwrap().to_string()], Bytes::from("hello rocket 🚀").into() ); assert_eq!(origin, STDOUT); @@ -1060,7 +1066,10 @@ mod tests { log[log_schema().source_type_key().unwrap().to_string()], "exec".into() ); - assert_eq!(log[log_schema().message_key()], "Hello World!".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "Hello World!".into() + ); assert_eq!( log[log_schema().host_key().unwrap().to_string().as_str()], "Some.Machine".into() @@ -1122,14 +1131,20 @@ mod tests { if let Poll::Ready(Some(event)) = futures::poll!(rx.next()) { let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "signal received".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "signal received".into() + ); } else { panic!("Expected to receive event"); } if let Poll::Ready(Some(event)) = futures::poll!(rx.next()) { let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "slept".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "slept".into() + ); } else { panic!("Expected to receive event"); } diff --git a/src/sources/file.rs b/src/sources/file.rs index 2f1b3419e2c5d..ae4a2a79a6b01 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -746,7 +746,7 @@ fn create_event( meta: &EventMetadata, log_namespace: LogNamespace, ) -> LogEvent { - let deserializer = BytesDeserializer::new(); + let deserializer = BytesDeserializer; let mut event = deserializer.parse_single(line, log_namespace); log_namespace.insert_vector_metadata( @@ -1036,7 +1036,10 @@ mod tests { assert_eq!(log["file"], "some_file.rs".into()); assert_eq!(log["host"], "Some.Machine".into()); assert_eq!(log["offset"], 0.into()); - assert_eq!(log[log_schema().message_key()], "hello world".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "hello world".into() + ); assert_eq!( log[log_schema().source_type_key().unwrap().to_string()], "file".into() @@ -1061,7 +1064,10 @@ mod tests { assert_eq!(log["file_path"], "some_file.rs".into()); assert_eq!(log["hostname"], "Some.Machine".into()); assert_eq!(log["off"], 0.into()); - assert_eq!(log[log_schema().message_key()], "hello world".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "hello world".into() + ); assert_eq!( log[log_schema().source_type_key().unwrap().to_string()], "file".into() @@ -1154,7 +1160,8 @@ mod tests { let mut goodbye_i = 0; for event in received { - let line = event.as_log()[log_schema().message_key()].to_string_lossy(); + let line = + event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy(); if line.starts_with("hello") { assert_eq!(line, format!("hello {}", hello_i)); assert_eq!( @@ -1248,7 +1255,8 @@ mod tests { path.to_str().unwrap() ); - let line = event.as_log()[log_schema().message_key()].to_string_lossy(); + let line = + event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy(); if pre_trunc { assert_eq!(line, format!("pretrunc {}", i)); @@ -1309,7 +1317,8 @@ mod tests { path.to_str().unwrap() ); - let line = event.as_log()[log_schema().message_key()].to_string_lossy(); + let line = + event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy(); if pre_rot { assert_eq!(line, format!("prerot {}", i)); @@ -1362,7 +1371,8 @@ mod tests { let mut is = [0; 3]; for event in received { - let line = event.as_log()[log_schema().message_key()].to_string_lossy(); + let line = + event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy(); let mut split = line.split(' '); let file = split.next().unwrap().parse::().unwrap(); assert_ne!(file, 4); @@ -1470,7 +1480,7 @@ mod tests { .expect("file key to exist") .to_string(), log_schema().host_key().unwrap().to_string(), - log_schema().message_key().to_string(), + log_schema().message_key().unwrap().to_string(), log_schema().timestamp_key().unwrap().to_string(), log_schema().source_type_key().unwrap().to_string() ] @@ -1752,12 +1762,16 @@ mod tests { let before_lines = received .iter() .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before")) - .map(|event| event.as_log()[log_schema().message_key()].to_string_lossy()) + .map(|event| { + event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy() + }) .collect::>(); let after_lines = received .iter() .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after")) - .map(|event| event.as_log()[log_schema().message_key()].to_string_lossy()) + .map(|event| { + event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy() + }) .collect::>(); assert_eq!(before_lines, vec!["second line"]); assert_eq!(after_lines, vec!["_first line", "_second line"]); @@ -2323,7 +2337,7 @@ mod tests { .into_iter() .map(Event::into_log) .map(|log| { - log[log_schema().message_key()] + log[log_schema().message_key().unwrap().to_string()] .to_string_lossy() .into_owned() }) @@ -2334,7 +2348,7 @@ mod tests { received .into_iter() .map(Event::into_log) - .map(|log| log[log_schema().message_key()].clone()) + .map(|log| log[log_schema().message_key().unwrap().to_string()].clone()) .collect() } } diff --git a/src/sources/file_descriptors/file_descriptor.rs b/src/sources/file_descriptors/file_descriptor.rs index 4770811ad401b..908ea1e02a5f5 100644 --- a/src/sources/file_descriptors/file_descriptor.rs +++ b/src/sources/file_descriptors/file_descriptor.rs @@ -143,19 +143,16 @@ mod tests { config.build(context).await.unwrap().await.unwrap(); let event = stream.next().await; + let message_key = log_schema().message_key().unwrap().to_string(); assert_eq!( Some("hello world".into()), - event.map(|event| event.as_log()[log_schema().message_key()] - .to_string_lossy() - .into_owned()) + event.map(|event| event.as_log()[&message_key].to_string_lossy().into_owned()) ); let event = stream.next().await; assert_eq!( Some("hello world again".into()), - event.map(|event| event.as_log()[log_schema().message_key()] - .to_string_lossy() - .into_owned()) + event.map(|event| event.as_log()[message_key].to_string_lossy().into_owned()) ); let event = stream.next().await; diff --git a/src/sources/file_descriptors/stdin.rs b/src/sources/file_descriptors/stdin.rs index 0f21db79ac4a3..c598758670afc 100644 --- a/src/sources/file_descriptors/stdin.rs +++ b/src/sources/file_descriptors/stdin.rs @@ -142,17 +142,21 @@ mod tests { let event = stream.next().await; assert_eq!( Some("hello world".into()), - event.map(|event| event.as_log()[log_schema().message_key()] - .to_string_lossy() - .into_owned()) + event.map( + |event| event.as_log()[log_schema().message_key().unwrap().to_string()] + .to_string_lossy() + .into_owned() + ) ); let event = stream.next().await; assert_eq!( Some("hello world again".into()), - event.map(|event| event.as_log()[log_schema().message_key()] - .to_string_lossy() - .into_owned()) + event.map( + |event| event.as_log()[log_schema().message_key().unwrap().to_string()] + .to_string_lossy() + .into_owned() + ) ); let event = stream.next().await; diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index b08a311a5364a..cbfa33ce7c6e3 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -521,7 +521,7 @@ mod tests { let log = event.as_log(); assert_eq!( - log[log_schema().message_key()], + log[log_schema().message_key().unwrap().to_string()], r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into() ); assert_eq!( @@ -606,7 +606,10 @@ mod tests { let events = super::line_to_events(Default::default(), log_namespace, body.into()); let log = events[0].as_log(); - assert_eq!(log[log_schema().message_key()], "foo bar baz".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "foo bar baz".into() + ); assert_eq!( log[log_schema().timestamp_key().unwrap().to_string()], "2020-01-08T22:33:57.353034+00:00" @@ -632,7 +635,7 @@ mod tests { let log = events[0].as_log(); assert_eq!( - log[log_schema().message_key()], + log[log_schema().message_key().unwrap().to_string()], "what am i doing here".into() ); assert!(log @@ -654,7 +657,10 @@ mod tests { let events = super::line_to_events(Default::default(), log_namespace, body.into()); let log = events[0].as_log(); - assert_eq!(log[log_schema().message_key()], "i'm not that long".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "i'm not that long".into() + ); assert_eq!( log[log_schema().timestamp_key().unwrap().to_string()], "2020-01-08T22:33:57.353034+00:00" diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index cadd154fc8795..354e1c4bb4525 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -654,7 +654,10 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "test body".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "test body".into() + ); assert!(log .get(( lookup::PathPrefix::Event, @@ -671,7 +674,10 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "test body 2".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "test body 2".into() + ); assert_event_metadata(log).await; } } @@ -703,13 +709,19 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "test body".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "test body".into() + ); assert_event_metadata(log).await; } { let event = events.remove(0); let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "test body 2".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "test body 2".into() + ); assert_event_metadata(log).await; } } @@ -742,7 +754,10 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "foo\nbar".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "foo\nbar".into() + ); assert_event_metadata(log).await; } } @@ -1094,7 +1109,10 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - assert_eq!(log[log_schema().message_key()], "test body".into()); + assert_eq!( + log[log_schema().message_key().unwrap().to_string()], + "test body".into() + ); assert_event_metadata(log).await; } } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 20c55b32d86e5..0b876da9b7a28 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -784,7 +784,7 @@ fn create_log_event_from_record( let mut log = LogEvent::from_iter(record).with_batch_notifier_option(batch); if let Some(message) = log.remove(MESSAGE) { - log.insert(log_schema().message_key(), message); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message); } log @@ -1487,7 +1487,7 @@ mod tests { } fn message(event: &Event) -> Value { - event.as_log()[log_schema().message_key()].clone() + event.as_log()[log_schema().message_key().unwrap().to_string()].clone() } fn timestamp(event: &Event) -> Value { diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index fb64dd8a033f6..302060debefaf 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1052,7 +1052,7 @@ mod integration_test { for (i, event) in events.into_iter().enumerate() { if let LogNamespace::Legacy = log_namespace { assert_eq!( - event.as_log()[log_schema().message_key()], + event.as_log()[log_schema().message_key().unwrap().to_string()], format!("{} {:03}", TEXT, i).into() ); assert_eq!( diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 5b22564625cb6..2965c4fad4171 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -897,7 +897,7 @@ fn create_event( ingestion_timestamp_field: Option<&OwnedTargetPath>, log_namespace: LogNamespace, ) -> Event { - let deserializer = BytesDeserializer::new(); + let deserializer = BytesDeserializer; let mut log = deserializer.parse_single(line, log_namespace); log_namespace.insert_source_metadata( diff --git a/src/sources/kubernetes_logs/parser/cri.rs b/src/sources/kubernetes_logs/parser/cri.rs index ce8f6a9035c56..00e5b9c80b987 100644 --- a/src/sources/kubernetes_logs/parser/cri.rs +++ b/src/sources/kubernetes_logs/parser/cri.rs @@ -3,7 +3,9 @@ use derivative::Derivative; use lookup::path; use vector_common::conversion; use vector_core::config::{log_schema, LegacyKey, LogNamespace}; +use vrl::path::PathPrefix; +use crate::sources::kubernetes_logs::transform_utils::get_message_field; use crate::{ event::{self, Event, Value}, internal_events::{ @@ -40,20 +42,18 @@ impl Cri { impl FunctionTransform for Cri { fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) { - let message_field = match self.log_namespace { - LogNamespace::Vector => ".", - LogNamespace::Legacy => log_schema().message_key(), - }; + let message_field = get_message_field(self.log_namespace); + let target_path = (PathPrefix::Event, message_field.as_str()); // Get the log field with the message, if it exists, and coerce it to bytes. let log = event.as_mut_log(); - let value = log.remove(message_field).map(|s| s.coerce_to_bytes()); + let value = log.remove(target_path).map(|s| s.coerce_to_bytes()); match value { None => { // The message field was missing, inexplicably. If we can't find the message field, there's nothing for // us to actually decode, so there's no event we could emit, and so we just emit the error and return. emit!(ParserMissingFieldError:: { - field: message_field + field: &message_field.to_string() }); return; } @@ -70,7 +70,7 @@ impl FunctionTransform for Cri { // MESSAGE // Insert either directly into `.` or `log_schema().message_key()`, // overwriting the original "full" CRI log that included additional fields. - drop(log.insert(message_field, Value::Bytes(s.slice_ref(parsed_log.message)))); + drop(log.insert(target_path, Value::Bytes(s.slice_ref(parsed_log.message)))); // MULTILINE_TAG // If the MULTILINE_TAG is 'P' (partial), insert our generic `_partial` key. diff --git a/src/sources/kubernetes_logs/parser/docker.rs b/src/sources/kubernetes_logs/parser/docker.rs index 5b3c71201779c..a94e6aa0cd23f 100644 --- a/src/sources/kubernetes_logs/parser/docker.rs +++ b/src/sources/kubernetes_logs/parser/docker.rs @@ -4,7 +4,9 @@ use lookup::{path, OwnedTargetPath}; use serde_json::Value as JsonValue; use snafu::{OptionExt, ResultExt, Snafu}; use vector_core::config::{LegacyKey, LogNamespace}; +use vrl::path::PathPrefix; +use crate::sources::kubernetes_logs::transform_utils::get_message_field; use crate::{ config::log_schema, event::{self, Event, LogEvent, Value}, @@ -51,13 +53,11 @@ impl FunctionTransform for Docker { /// Parses `message` as json object and removes it. fn parse_json(log: &mut LogEvent, log_namespace: LogNamespace) -> Result<(), ParsingError> { - let message_field = match log_namespace { - LogNamespace::Vector => ".", - LogNamespace::Legacy => log_schema().message_key(), - }; + let message_field = get_message_field(log_namespace); + let target_path = (PathPrefix::Event, message_field.as_str()); let value = log - .remove(message_field) + .remove(target_path) .ok_or(ParsingError::NoMessageField)?; let bytes = match value { @@ -69,7 +69,7 @@ fn parse_json(log: &mut LogEvent, log_namespace: LogNamespace) -> Result<(), Par Ok(JsonValue::Object(object)) => { for (key, value) in object { match key.as_str() { - MESSAGE_KEY => drop(log.insert(message_field, value)), + MESSAGE_KEY => drop(log.insert(target_path, value)), STREAM_KEY => log_namespace.insert_source_metadata( Config::NAME, log, @@ -133,12 +133,9 @@ fn normalize_event( } // Parse message, remove trailing newline and detect if it's partial. - let message_key = match log_namespace { - LogNamespace::Vector => ".", - LogNamespace::Legacy => log_schema().message_key(), - }; - let message = log.remove(message_key).context(LogFieldMissingSnafu)?; - + let message_field = get_message_field(log_namespace); + let target_path = (PathPrefix::Event, message_field.as_str()); + let message = log.remove(target_path).context(LogFieldMissingSnafu)?; let mut message = match message { Value::Bytes(val) => val, _ => return Err(NormalizationError::LogValueUnexpectedType), @@ -159,7 +156,7 @@ fn normalize_event( message.truncate(message.len() - 1); is_partial = false; }; - log.insert(message_key, message); + log.insert(target_path, message); // For partial messages add a partial event indicator. if is_partial { diff --git a/src/sources/kubernetes_logs/parser/mod.rs b/src/sources/kubernetes_logs/parser/mod.rs index fe6f765477bd3..5b53c4689e5e1 100644 --- a/src/sources/kubernetes_logs/parser/mod.rs +++ b/src/sources/kubernetes_logs/parser/mod.rs @@ -3,9 +3,10 @@ mod docker; mod test_util; use vector_core::config::LogNamespace; +use vrl::path::PathPrefix; +use crate::sources::kubernetes_logs::transform_utils::get_message_field; use crate::{ - config::log_schema, event::{Event, Value}, internal_events::KubernetesLogsFormatPickerEdgeCase, transforms::{FunctionTransform, OutputBuffer}, @@ -42,12 +43,11 @@ impl FunctionTransform for Parser { fn transform(&mut self, output: &mut OutputBuffer, event: Event) { match &mut self.state { ParserState::Uninitialized => { - let message_field = match self.log_namespace { - LogNamespace::Vector => ".", - LogNamespace::Legacy => log_schema().message_key(), - }; - - let message = match event.as_log().get(message_field) { + let message_field = get_message_field(self.log_namespace); + let message = match event + .as_log() + .get((PathPrefix::Event, message_field.as_str())) + { Some(message) => message, None => { emit!(KubernetesLogsFormatPickerEdgeCase { diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index d3648dcd893ec..e4e1cf28c9bc4 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -5,9 +5,9 @@ use indexmap::IndexMap; use vector_core::config::LogNamespace; use super::{transform_utils::optional::Optional, FILE_KEY}; +use crate::sources::kubernetes_logs::transform_utils::get_message_field; use crate::{ conditions::AnyCondition, - config::log_schema, event, transforms::reduce::{MergeStrategy, Reduce, ReduceConfig}, }; @@ -17,10 +17,7 @@ pub type PartialEventsMerger = Optional; pub fn build(enabled: bool, log_namespace: LogNamespace) -> PartialEventsMerger { let reducer = if enabled { - let key = match log_namespace { - LogNamespace::Vector => ".".to_string(), - LogNamespace::Legacy => log_schema().message_key().to_string(), - }; + let key = get_message_field(log_namespace); // Merge the message field of each event by concatenating it, with a space delimiter. let mut merge_strategies = IndexMap::new(); diff --git a/src/sources/kubernetes_logs/transform_utils/mod.rs b/src/sources/kubernetes_logs/transform_utils/mod.rs index 4b24267e84d53..c0dc4a07c3e1f 100644 --- a/src/sources/kubernetes_logs/transform_utils/mod.rs +++ b/src/sources/kubernetes_logs/transform_utils/mod.rs @@ -1 +1,14 @@ +use vector_core::config::{log_schema, LogNamespace}; + pub mod optional; + +pub(crate) fn get_message_field(log_namespace: LogNamespace) -> String { + match log_namespace { + LogNamespace::Vector => ".".to_string(), + LogNamespace::Legacy => log_schema() + .message_key() + .expect("global log_schema.message_key to be valid path") + .clone() + .to_string(), + } +} diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 6600bffc1a256..507e95cbd822d 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -390,7 +390,10 @@ mod integration_tests { .await; println!("Received event {:?}", events[0].as_log()); - assert_eq!(events[0].as_log()[log_schema().message_key()], msg.into()); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + msg.into() + ); Ok(()) } diff --git a/src/sources/opentelemetry/integration_tests.rs b/src/sources/opentelemetry/integration_tests.rs index 65204ba0035e6..5f5ad56d0e015 100644 --- a/src/sources/opentelemetry/integration_tests.rs +++ b/src/sources/opentelemetry/integration_tests.rs @@ -89,8 +89,8 @@ async fn receive_logs_legacy_namespace() { let events = collect_n(logs_output, 2).await; assert_eq!(events.len(), 2); assert_eq!( - events[0].as_log()[log_schema().message_key()], - events[1].as_log()[log_schema().message_key()] + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()] ); }) .await; diff --git a/src/sources/redis/mod.rs b/src/sources/redis/mod.rs index 1239a04c60915..f35b311f9e074 100644 --- a/src/sources/redis/mod.rs +++ b/src/sources/redis/mod.rs @@ -354,9 +354,18 @@ mod integration_test { let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await; - assert_eq!(events[0].as_log()[log_schema().message_key()], "3".into()); - assert_eq!(events[1].as_log()[log_schema().message_key()], "2".into()); - assert_eq!(events[2].as_log()[log_schema().message_key()], "1".into()); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "3".into() + ); + assert_eq!( + events[1].as_log()[log_schema().message_key().unwrap().to_string()], + "2".into() + ); + assert_eq!( + events[2].as_log()[log_schema().message_key().unwrap().to_string()], + "1".into() + ); } #[tokio::test] @@ -427,9 +436,18 @@ mod integration_test { let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await; - assert_eq!(events[0].as_log()[log_schema().message_key()], "1".into()); - assert_eq!(events[1].as_log()[log_schema().message_key()], "2".into()); - assert_eq!(events[2].as_log()[log_schema().message_key()], "3".into()); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "1".into() + ); + assert_eq!( + events[1].as_log()[log_schema().message_key().unwrap().to_string()], + "2".into() + ); + assert_eq!( + events[2].as_log()[log_schema().message_key().unwrap().to_string()], + "3".into() + ); } #[tokio::test] @@ -480,7 +498,10 @@ mod integration_test { assert_eq!(events.len(), 10000); for event in events { - assert_eq!(event.as_log()[log_schema().message_key()], text.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + text.into() + ); assert_eq!( event.as_log()[log_schema().source_type_key().unwrap().to_string()], RedisSourceConfig::NAME.into() diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 4af7b22becd4a..408e500331c7f 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -472,8 +472,14 @@ mod test { let events = collect_n(rx, 2).await; assert_eq!(events.len(), 2); - assert_eq!(events[0].as_log()[log_schema().message_key()], "foo".into()); - assert_eq!(events[1].as_log()[log_schema().message_key()], "bar".into()); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "foo".into() + ); + assert_eq!( + events[1].as_log()[log_schema().message_key().unwrap().to_string()], + "bar".into() + ); }) .await; } @@ -531,11 +537,14 @@ mod test { send_lines(addr, lines.into_iter()).await.unwrap(); let event = rx.next().await.unwrap(); - assert_eq!(event.as_log()[log_schema().message_key()], "short".into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "short".into() + ); let event = rx.next().await.unwrap(); assert_eq!( - event.as_log()[log_schema().message_key()], + event.as_log()[log_schema().message_key().unwrap().to_string()], "more short".into() ); }) @@ -585,7 +594,7 @@ mod test { let event = rx.next().await.unwrap(); assert_eq!( - event.as_log()[log_schema().message_key()], + event.as_log()[log_schema().message_key().unwrap().to_string()], "one line".into() ); @@ -597,7 +606,7 @@ mod test { let event = rx.next().await.unwrap(); assert_eq!( - event.as_log()[log_schema().message_key()], + event.as_log()[log_schema().message_key().unwrap().to_string()], "another line".into() ); @@ -703,7 +712,10 @@ mod test { .unwrap(); let event = rx.next().await.unwrap(); - assert_eq!(event.as_log()[log_schema().message_key()], "test".into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "test".into() + ); // Now signal to the Source to shut down. let deadline = Instant::now() + Duration::from_secs(10); @@ -781,10 +793,10 @@ mod test { .collect::>(); assert_eq!(100, events.len()); - let message_key = log_schema().message_key(); + let message_key = log_schema().message_key().unwrap().to_string(); let expected_message = message.clone().into(); for event in events.into_iter().flat_map(EventContainer::into_events) { - assert_eq!(event.as_log()[message_key], expected_message); + assert_eq!(event.as_log()[message_key.as_str()], expected_message); } // Now trigger shutdown on the source and ensure that it shuts down before or at the @@ -956,7 +968,7 @@ mod test { let events = collect_n(rx, 1).await; assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "test".into() ); }) @@ -973,7 +985,7 @@ mod test { let events = collect_n(rx, 1).await; assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "foo\nbar".into() ); }) @@ -990,11 +1002,11 @@ mod test { let events = collect_n(rx, 2).await; assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "test".into() ); assert_eq!( - events[1].as_log()[log_schema().message_key()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()], "test2".into() ); }) @@ -1021,11 +1033,11 @@ mod test { let events = collect_n(rx, 2).await; assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "short line".into() ); assert_eq!( - events[1].as_log()[log_schema().message_key()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()], "a short un".into() ); }) @@ -1057,11 +1069,11 @@ mod test { let events = collect_n(rx, 2).await; assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "test with".into() ); assert_eq!( - events[1].as_log()[log_schema().message_key()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()], "short one".into() ); }) @@ -1142,7 +1154,7 @@ mod test { let events = collect_n(rx, 1).await; assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "test".into() ); @@ -1183,7 +1195,10 @@ mod test { let events = collect_n(rx, 100).await; assert_eq!(100, events.len()); for event in events { - assert_eq!(event.as_log()[log_schema().message_key()], "test".into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "test".into() + ); } let deadline = Instant::now() + Duration::from_secs(10); @@ -1268,11 +1283,11 @@ mod test { assert_eq!(2, events.len()); assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "test".into() ); assert_eq!( - events[1].as_log()[log_schema().message_key()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()], "test2".into() ); } @@ -1323,7 +1338,7 @@ mod test { assert_eq!(events.len(), 1); assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "test".into() ); assert_eq!( @@ -1415,7 +1430,7 @@ mod test { assert_eq!(events.len(), 1); assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "foo\nbar".into() ); assert_eq!( @@ -1502,7 +1517,7 @@ mod test { assert_eq!(1, events.len()); assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "test".into() ); assert_eq!( @@ -1544,12 +1559,18 @@ mod test { let events = collect_n(rx, 2).await; assert_eq!(events.len(), 2); - assert_eq!(events[0].as_log()[log_schema().message_key()], "foo".into()); + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "foo".into() + ); assert_eq!( events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); - assert_eq!(events[1].as_log()[log_schema().message_key()], "bar".into()); + assert_eq!( + events[1].as_log()[log_schema().message_key().unwrap().to_string()], + "bar".into() + ); assert_eq!( events[1].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 868508b2d0f78..3c957be4084a7 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -25,6 +25,7 @@ use vector_core::{ schema::meaning, EstimatedJsonEncodedSizeOf, }; +use vrl::path::PathPrefix; use vrl::value::{kind::Collection, Kind}; use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply}; @@ -181,19 +182,26 @@ impl SourceConfig for SplunkConfig { let log_namespace = global_log_namespace.merge(self.log_namespace); let schema_definition = match log_namespace { - LogNamespace::Legacy => vector_core::schema::Definition::empty_legacy_namespace() - .with_event_field( - &owned_value_path!(log_schema().message_key()), - Kind::bytes().or_undefined(), - Some(meaning::MESSAGE), - ) - .with_event_field( - &owned_value_path!("line"), - Kind::object(Collection::empty()) - .or_array(Collection::empty()) - .or_undefined(), - None, - ), + LogNamespace::Legacy => { + let definition = vector_core::schema::Definition::empty_legacy_namespace() + .with_event_field( + &owned_value_path!("line"), + Kind::object(Collection::empty()) + .or_array(Collection::empty()) + .or_undefined(), + None, + ); + + if let Some(message_key) = log_schema().message_key() { + definition.with_event_field( + message_key, + Kind::bytes().or_undefined(), + Some(meaning::MESSAGE), + ) + } else { + definition + } + } LogNamespace::Vector => vector_core::schema::Definition::new_with_default_metadata( Kind::bytes().or_object(Collection::empty()), [log_namespace], @@ -804,7 +812,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { if string.is_empty() { return Err(ApiError::EmptyEventField { event: self.events }.into()); } - log.insert(log_schema().message_key(), string); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), string); } JsonValue::Object(mut object) => { if object.is_empty() { @@ -819,7 +827,11 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { log.insert("line", line); } _ => { - log.insert(log_schema().message_key(), line); + log.maybe_insert( + PathPrefix::Event, + log_schema().message_key(), + line, + ); } } } @@ -993,9 +1005,7 @@ fn raw_event( LogNamespace::Vector => LogEvent::from(message), LogNamespace::Legacy => { let mut log = LogEvent::default(); - - // Add message - log.insert(log_schema().message_key(), message); + log.maybe_insert(PathPrefix::Event, log_schema().message_key(), message); log } }; @@ -1425,7 +1435,10 @@ mod tests { let event = channel_n(vec![message], sink, source).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert!(event .as_log() .get(( @@ -1452,7 +1465,10 @@ mod tests { let event = channel_n(vec![message], sink, source).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert!(event .as_log() .get(( @@ -1483,7 +1499,10 @@ mod tests { let events = channel_n(messages.clone(), sink, source).await; for (msg, event) in messages.into_iter().zip(events.into_iter()) { - assert_eq!(event.as_log()[log_schema().message_key()], msg.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + msg.into() + ); assert!(event .as_log() .get(( @@ -1511,7 +1530,10 @@ mod tests { let event = channel_n(vec![message], sink, source).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert!(event .as_log() .get(( @@ -1542,7 +1564,10 @@ mod tests { let events = channel_n(messages.clone(), sink, source).await; for (msg, event) in messages.into_iter().zip(events.into_iter()) { - assert_eq!(event.as_log()[log_schema().message_key()], msg.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + msg.into() + ); assert!(event .as_log() .get(( @@ -1624,7 +1649,10 @@ mod tests { sink.run_events(vec![event.into()]).await.unwrap(); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], "hello".into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "hello".into() + ); assert!(event.metadata().splunk_hec_token().is_none()); } @@ -1637,7 +1665,10 @@ mod tests { assert_eq!(200, post(address, "services/collector/raw", message).await); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert_eq!(event.as_log()[&super::CHANNEL], "channel".into()); assert!(event .as_log() @@ -1664,7 +1695,10 @@ mod tests { assert_eq!(200, post(address, "services/collector", message).await); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], "root".into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "root".into() + ); assert_eq!(event.as_log()[&super::CHANNEL], "channel".into()); assert!(event .as_log() @@ -1894,7 +1928,10 @@ mod tests { let event = channel_n(vec![message], sink, source).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert_eq!( &event.metadata().splunk_hec_token().as_ref().unwrap()[..], TOKEN @@ -1912,7 +1949,10 @@ mod tests { assert_eq!(200, post(address, "services/collector/raw", message).await); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert_eq!(event.as_log()[&super::CHANNEL], "channel".into()); assert!(event .as_log() @@ -1948,7 +1988,10 @@ mod tests { let event = channel_n(vec![message], sink, source).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert!(event.metadata().splunk_hec_token().is_none()); }) .await; @@ -1969,7 +2012,10 @@ mod tests { let event = channel_n(vec![message], sink, source).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert_eq!( &event.metadata().splunk_hec_token().as_ref().unwrap()[..], TOKEN @@ -1990,7 +2036,10 @@ mod tests { ); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], "first".into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "first".into() + ); assert!(event .as_log() .get(( @@ -2020,7 +2069,10 @@ mod tests { ); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], "first".into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "first".into() + ); assert!(event .as_log() .get(( @@ -2048,7 +2100,10 @@ mod tests { ); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], "first".into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "first".into() + ); assert!(event .as_log() .get(( @@ -2106,19 +2161,19 @@ mod tests { let events = collect_n(source, 3).await; assert_eq!( - events[0].as_log()[log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "first".into() ); assert_eq!(events[0].as_log()[&super::SOURCE], "main".into()); assert_eq!( - events[1].as_log()[log_schema().message_key()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()], "second".into() ); assert_eq!(events[1].as_log()[&super::SOURCE], "main".into()); assert_eq!( - events[2].as_log()[log_schema().message_key()], + events[2].as_log()[log_schema().message_key().unwrap().to_string()], "third".into() ); assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into()); @@ -2178,7 +2233,10 @@ mod tests { let event = channel_n(vec![message], sink, source).await.remove(0); - assert_eq!(event.as_log()[log_schema().message_key()], message.into()); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + message.into() + ); assert!(event .as_log() .get((PathPrefix::Event, log_schema().host_key().unwrap())) diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 87708f3866d32..6208dc09425b5 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -854,11 +854,11 @@ mod test { send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Stop)).await; assert_eq!( - events[0].as_log()[&log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "hello".into(), ); assert_eq!( - events[1].as_log()[&log_schema().message_key()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()], "world".into(), ); @@ -903,12 +903,13 @@ mod test { //5 - send STOP frame send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Stop)).await; + let message_key = log_schema().message_key().unwrap().to_string(); assert!(events .iter() - .any(|e| e.as_log()[&log_schema().message_key()] == "hello".into())); + .any(|e| e.as_log()[&message_key] == "hello".into())); assert!(events .iter() - .any(|e| e.as_log()[&log_schema().message_key()] == "world".into())); + .any(|e| e.as_log()[&message_key] == "world".into())); drop(sock_stream); //explicitly drop the stream so we don't get warnings about not using it @@ -1025,11 +1026,11 @@ mod test { send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Stop)).await; assert_eq!( - events[0].as_log()[&log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "hello".into(), ); assert_eq!( - events[1].as_log()[&log_schema().message_key()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()], "world".into(), ); @@ -1065,11 +1066,11 @@ mod test { send_control_frame(&mut sock_sink, create_control_frame(ControlHeader::Stop)).await; assert_eq!( - events[0].as_log()[&log_schema().message_key()], + events[0].as_log()[log_schema().message_key().unwrap().to_string()], "hello".into(), ); assert_eq!( - events[1].as_log()[&log_schema().message_key()], + events[1].as_log()[log_schema().message_key().unwrap().to_string()], "world".into(), ); diff --git a/src/test_util/mock/transforms/basic.rs b/src/test_util/mock/transforms/basic.rs index 94f1860df83fe..c7f877eabb244 100644 --- a/src/test_util/mock/transforms/basic.rs +++ b/src/test_util/mock/transforms/basic.rs @@ -12,6 +12,7 @@ use vector_core::{ schema, transform::{FunctionTransform, OutputBuffer, Transform}, }; +use vrl::path::PathPrefix; use vrl::value::Value; use crate::config::{OutputId, TransformConfig, TransformContext}; @@ -75,13 +76,12 @@ impl FunctionTransform for BasicTransform { fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) { match &mut event { Event::Log(log) => { - let mut v = log - .get(crate::config::log_schema().message_key()) - .unwrap() - .to_string_lossy() - .into_owned(); - v.push_str(&self.suffix); - log.insert(crate::config::log_schema().message_key(), Value::from(v)); + if let Some(message_key) = crate::config::log_schema().message_key() { + let target_path = (PathPrefix::Event, message_key); + let mut v = log.get(target_path).unwrap().to_string_lossy().into_owned(); + v.push_str(&self.suffix); + log.insert(target_path, Value::from(v)); + } } Event::Metric(metric) => { let increment = match metric.value() { @@ -118,13 +118,15 @@ impl FunctionTransform for BasicTransform { } } Event::Trace(trace) => { - let mut v = trace - .get(crate::config::log_schema().message_key()) - .unwrap() - .to_string_lossy() - .into_owned(); - v.push_str(&self.suffix); - trace.insert(crate::config::log_schema().message_key(), Value::from(v)); + if let Some(message_key) = crate::config::log_schema().message_key() { + let mut v = trace + .get((PathPrefix::Event, message_key)) + .unwrap() + .to_string_lossy() + .into_owned(); + v.push_str(&self.suffix); + trace.insert(message_key.to_string().as_str(), Value::from(v)); + } } }; output.push(event); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index 2f6e795320e85..d772a7162bf66 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -29,6 +29,7 @@ use tokio::{ use vector_buffers::{BufferConfig, BufferType, WhenFull}; use vector_common::config::ComponentKey; use vector_core::config::OutputId; +use vrl::path::PathPrefix; mod backpressure; mod compliance; @@ -67,9 +68,10 @@ fn basic_config_with_sink_failing_healthcheck() -> Config { } fn into_message(event: Event) -> String { + let message_key = crate::config::log_schema().message_key().unwrap(); event .as_log() - .get(crate::config::log_schema().message_key()) + .get((PathPrefix::Event, message_key)) .unwrap() .to_string_lossy() .into_owned() @@ -120,7 +122,10 @@ async fn topology_shutdown_while_active() { .flat_map(EventArray::into_events) { assert_eq!( - event.as_log()[&crate::config::log_schema().message_key()], + event.as_log()[&crate::config::log_schema() + .message_key() + .unwrap() + .to_string()], "test transformed".to_owned().into() ); } diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index ae7f94604ea6b..0e88626cb2b25 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -103,7 +103,10 @@ fn default_cache_config() -> CacheConfig { // structure can vary significantly. This should probably either become a required field // in the future, or maybe the "semantic meaning" can be utilized here. fn default_match_fields() -> Vec { - let mut fields = vec![log_schema().message_key().into()]; + let mut fields = Vec::new(); + if let Some(message_key) = log_schema().message_key() { + fields.push(message_key.to_string()); + } if let Some(host_key) = log_schema().host_key() { fields.push(host_key.to_string()); } diff --git a/src/transforms/sample.rs b/src/transforms/sample.rs index c703e56ead9e0..c919e9e9bad11 100644 --- a/src/transforms/sample.rs +++ b/src/transforms/sample.rs @@ -194,8 +194,11 @@ mod tests { let events = random_events(num_events); let mut sampler = Sample::new( 2, - Some(log_schema().message_key().into()), - Some(condition_contains(log_schema().message_key(), "na")), + log_schema().message_key().map(ToString::to_string), + Some(condition_contains( + log_schema().message_key().unwrap().to_string().as_str(), + "na", + )), ); let total_passed = events .into_iter() @@ -212,8 +215,11 @@ mod tests { let events = random_events(num_events); let mut sampler = Sample::new( 25, - Some(log_schema().message_key().into()), - Some(condition_contains(log_schema().message_key(), "na")), + log_schema().message_key().map(ToString::to_string), + Some(condition_contains( + log_schema().message_key().unwrap().to_string().as_str(), + "na", + )), ); let total_passed = events .into_iter() @@ -233,8 +239,11 @@ mod tests { let events = random_events(1000); let mut sampler = Sample::new( 2, - Some(log_schema().message_key().into()), - Some(condition_contains(log_schema().message_key(), "na")), + log_schema().message_key().map(ToString::to_string), + Some(condition_contains( + log_schema().message_key().unwrap().to_string().as_str(), + "na", + )), ); let first_run = events @@ -260,12 +269,15 @@ mod tests { #[test] fn always_passes_events_matching_pass_list() { - for key_field in &[None, Some(log_schema().message_key().into())] { + for key_field in &[None, log_schema().message_key().map(ToString::to_string)] { let event = Event::Log(LogEvent::from("i am important")); let mut sampler = Sample::new( 0, key_field.clone(), - Some(condition_contains(log_schema().message_key(), "important")), + Some(condition_contains( + log_schema().message_key().unwrap().to_string().as_str(), + "important", + )), ); let iterations = 0..1000; let total_passed = iterations @@ -302,20 +314,17 @@ mod tests { #[test] fn sampler_adds_sampling_rate_to_event() { - for key_field in &[None, Some(log_schema().message_key().into())] { + for key_field in &[None, log_schema().message_key().map(ToString::to_string)] { let events = random_events(10000); + let message_key = log_schema().message_key().unwrap().to_string(); let mut sampler = Sample::new( 10, key_field.clone(), - Some(condition_contains(log_schema().message_key(), "na")), + Some(condition_contains(&message_key, "na")), ); let passing = events .into_iter() - .filter(|s| { - !s.as_log()[log_schema().message_key()] - .to_string_lossy() - .contains("na") - }) + .filter(|s| !s.as_log()[&message_key].to_string_lossy().contains("na")) .find_map(|event| transform_one(&mut sampler, event)) .unwrap(); assert_eq!(passing.as_log()["sample_rate"], "10".into()); @@ -324,15 +333,11 @@ mod tests { let mut sampler = Sample::new( 25, key_field.clone(), - Some(condition_contains(log_schema().message_key(), "na")), + Some(condition_contains(&message_key, "na")), ); let passing = events .into_iter() - .filter(|s| { - !s.as_log()[log_schema().message_key()] - .to_string_lossy() - .contains("na") - }) + .filter(|s| !s.as_log()[&message_key].to_string_lossy().contains("na")) .find_map(|event| transform_one(&mut sampler, event)) .unwrap(); assert_eq!(passing.as_log()["sample_rate"], "25".into()); @@ -341,7 +346,7 @@ mod tests { let mut sampler = Sample::new( 25, key_field.clone(), - Some(condition_contains(log_schema().message_key(), "na")), + Some(condition_contains(&message_key, "na")), ); let event = Event::Log(LogEvent::from("nananana")); let passing = transform_one(&mut sampler, event).unwrap();