From d29424d95dbc7c9afd039890df38681ba309853f Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 13 Jul 2023 18:48:40 -0400 Subject: [PATCH] feat: Migrate `LogSchema` `source_type_key` to new lookup code (#17947) This part of https://github.com/vectordotdev/vector/issues/13033. Tried this with a config where `source_type_key` is: * `""` --> no source entry * not defined --> same as above * `"foo"` --> `"foo":"demo_logs"` * `"foo.bar"` --> `"foo":{"bar":"demo_logs"}` * `"foo.."` --> `Configuration error. error=Invalid field path "foo.."` The config: ``` data_dir = "/Users/pavlos.rontidis/my_tests/vector" [log_schema] source_type_key = "foo" [sources.source0] format = "json" type = "demo_logs" [sinks.sink0] inputs = ["source0"] target = "stdout" type = "console" [sinks.sink0.encoding] codec = "json" ``` --- lib/opentelemetry-proto/src/convert.rs | 2 +- lib/vector-core/src/config/log_schema.rs | 25 ++++++------ lib/vector-core/src/config/mod.rs | 7 ++-- lib/vector-core/src/event/log_event.rs | 10 +++-- lib/vector-core/src/schema/definition.rs | 6 +-- .../src/lookup_v2/optional_path.rs | 7 ++++ src/sinks/datadog/events/sink.rs | 4 +- src/sinks/influxdb/logs.rs | 20 +++++----- src/sources/amqp.rs | 7 +++- src/sources/aws_kinesis_firehose/handlers.rs | 2 +- src/sources/aws_s3/sqs.rs | 2 +- src/sources/datadog_agent/mod.rs | 6 ++- src/sources/datadog_agent/tests.rs | 40 +++++++++++++++---- src/sources/datadog_agent/traces.rs | 4 +- src/sources/dnstap/mod.rs | 10 ++--- src/sources/docker_logs/mod.rs | 9 ++--- src/sources/docker_logs/tests.rs | 6 +-- src/sources/exec/mod.rs | 15 +++++-- src/sources/file.rs | 14 +++++-- src/sources/fluent/mod.rs | 4 +- src/sources/heroku_logs.rs | 17 ++++++-- src/sources/http_client/client.rs | 20 ++++++---- src/sources/http_client/integration_tests.rs | 8 ++-- src/sources/http_server.rs | 22 ++++++---- src/sources/journald.rs | 4 +- src/sources/kafka.rs | 10 ++++- src/sources/kubernetes_logs/mod.rs | 2 +- src/sources/logstash.rs | 2 +- src/sources/redis/mod.rs | 4 +- src/sources/socket/mod.rs | 14 +++---- src/sources/splunk_hec/mod.rs | 31 +++++++------- src/sources/syslog.rs | 25 +++++++++--- src/sources/util/framestream.rs | 15 ++++--- src/sources/util/message_decoding.rs | 2 +- src/sources/vector/mod.rs | 4 +- 35 files changed, 240 insertions(+), 140 deletions(-) diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index 4ccfda0b726ba..057561df755bb 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -208,7 +208,7 @@ impl ResourceLog { log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(SOURCE_NAME.as_bytes()), ); diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index d54575f89f252..9f9eab7cbc519 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -1,7 +1,8 @@ -use lookup::lookup_v2::{parse_target_path, OptionalValuePath}; -use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath}; +use lookup::lookup_v2::OptionalValuePath; +use lookup::{OwnedTargetPath, OwnedValuePath}; use once_cell::sync::{Lazy, OnceCell}; use vector_config::configurable_component; +use vrl::path::parse_target_path; static LOG_SCHEMA: OnceCell = OnceCell::new(); static LOG_SCHEMA_DEFAULT: Lazy = Lazy::new(LogSchema::default); @@ -60,7 +61,7 @@ pub struct LogSchema { /// /// This field will be set by the Vector source that the event was created in. #[serde(default = "LogSchema::default_source_type_key")] - source_type_key: String, + source_type_key: OptionalValuePath, /// The name of the event field to set the event metadata in. /// @@ -88,17 +89,15 @@ impl LogSchema { } fn default_timestamp_key() -> OptionalValuePath { - OptionalValuePath { - path: Some(owned_value_path!("timestamp")), - } + OptionalValuePath::new("timestamp") } fn default_host_key() -> String { String::from("host") } - fn default_source_type_key() -> String { - String::from("source_type") + fn default_source_type_key() -> OptionalValuePath { + OptionalValuePath::new("source_type") } fn default_metadata_key() -> String { @@ -126,8 +125,8 @@ impl LogSchema { &self.host_key } - pub fn source_type_key(&self) -> &str { - &self.source_type_key + pub fn source_type_key(&self) -> Option<&OwnedValuePath> { + self.source_type_key.path.as_ref() } pub fn metadata_key(&self) -> &str { @@ -146,8 +145,8 @@ impl LogSchema { self.host_key = v; } - pub fn set_source_type_key(&mut self, v: String) { - self.source_type_key = v; + pub fn set_source_type_key(&mut self, path: Option) { + self.source_type_key = OptionalValuePath { path }; } pub fn set_metadata_key(&mut self, v: String) { @@ -191,7 +190,7 @@ impl LogSchema { { errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned()); } else { - self.set_source_type_key(other.source_type_key().to_string()); + self.set_source_type_key(other.source_type_key().cloned()); } if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key() && self.metadata_key() != other.metadata_key() diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 71786155d1d8f..ed3dde2bae112 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -476,7 +476,7 @@ impl LogNamespace { ) { self.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(source_name.as_bytes()), ); @@ -551,14 +551,15 @@ mod test { use chrono::Utc; use lookup::{event_path, owned_value_path, OwnedTargetPath}; use vector_common::btreemap; + use vrl::path::OwnedValuePath; use vrl::value::Kind; #[test] fn test_insert_standard_vector_source_metadata() { - let nested_path = "a.b.c.d"; + let nested_path = "a.b.c.d".to_string(); let mut schema = LogSchema::default(); - schema.set_source_type_key(nested_path.to_owned()); + schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap())); init_log_schema(schema, false); let namespace = LogNamespace::Legacy; diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index e5755f12d7e66..e11423e122d13 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -466,10 +466,10 @@ impl LogEvent { /// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace). // TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the // "Global Log Schema" are updated to the new path lookup code - pub fn source_type_path(&self) -> &'static str { + pub fn source_type_path(&self) -> Option { match self.namespace() { - LogNamespace::Vector => "%vector.source_type", - LogNamespace::Legacy => log_schema().source_type_key(), + LogNamespace::Vector => Some("%vector.source_type".to_string()), + LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string), } } @@ -514,7 +514,9 @@ impl LogEvent { pub fn get_source_type(&self) -> Option<&Value> { match self.namespace() { LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")), - LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().source_type_key())), + LogNamespace::Legacy => log_schema() + .source_type_key() + .and_then(|key| self.get((PathPrefix::Event, key))), } } } diff --git a/lib/vector-core/src/schema/definition.rs b/lib/vector-core/src/schema/definition.rs index a3c5afc034cb4..421b0b91a043a 100644 --- a/lib/vector-core/src/schema/definition.rs +++ b/lib/vector-core/src/schema/definition.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, BTreeSet}; use crate::config::{log_schema, LegacyKey, LogNamespace}; -use lookup::lookup_v2::{parse_value_path, TargetPath}; +use lookup::lookup_v2::TargetPath; use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix}; use vrl::value::{kind::Collection, Kind}; @@ -144,9 +144,7 @@ impl Definition { #[must_use] pub fn with_standard_vector_source_metadata(self) -> Self { self.with_vector_metadata( - parse_value_path(log_schema().source_type_key()) - .ok() - .as_ref(), + log_schema().source_type_key(), &owned_value_path!("source_type"), Kind::bytes(), None, diff --git a/lib/vector-lookup/src/lookup_v2/optional_path.rs b/lib/vector-lookup/src/lookup_v2/optional_path.rs index 9328aa8a2f138..ee15ed3509cf6 100644 --- a/lib/vector-lookup/src/lookup_v2/optional_path.rs +++ b/lib/vector-lookup/src/lookup_v2/optional_path.rs @@ -1,4 +1,5 @@ use vector_config::configurable_component; +use vrl::owned_value_path; use crate::lookup_v2::PathParseError; use crate::{OwnedTargetPath, OwnedValuePath}; @@ -56,6 +57,12 @@ impl OptionalValuePath { pub fn none() -> Self { Self { path: None } } + + pub fn new(path: &str) -> Self { + Self { + path: Some(owned_value_path!(path)), + } + } } impl TryFrom for OptionalValuePath { diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index 0ee8538763268..19a7646c658b8 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -75,7 +75,9 @@ async fn ensure_required_fields(event: Event) -> Option { } if !log.contains("source_type_name") { - log.rename_key(log.source_type_path(), "source_type_name") + if let Some(source_type_path) = log.source_type_path() { + log.rename_key(source_type_path.as_str(), "source_type_name") + } } Some(Event::from(log)) diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index ef901ff4e66aa..ca12946f00bad 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -207,10 +207,8 @@ impl SinkConfig for InfluxDbLogsConfig { .source_type_key .clone() .and_then(|k| k.path) - .unwrap_or_else(|| { - parse_value_path(log_schema().source_type_key()) - .expect("global log_schema.source_type_key to be valid path") - }); + .or(log_schema().source_type_key().cloned()) + .unwrap(); let sink = InfluxDbLogsSink { uri, @@ -280,11 +278,15 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { self.tags.replace(host_path.clone()); log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key)); } - self.tags.replace(log.source_type_path().to_string()); - log.rename_key( - log.source_type_path(), - (PathPrefix::Event, &self.source_type_key), - ); + + if let Some(source_type_path) = log.source_type_path() { + self.tags.replace(source_type_path.clone()); + log.rename_key( + source_type_path.as_str(), + (PathPrefix::Event, &self.source_type_key), + ); + } + self.tags.replace("metric_type".to_string()); log.insert("metric_type", "logs"); diff --git a/src/sources/amqp.rs b/src/sources/amqp.rs index f5cf22f493a61..bbd93d1c81aeb 100644 --- a/src/sources/amqp.rs +++ b/src/sources/amqp.rs @@ -278,7 +278,7 @@ fn populate_event( log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()), ); @@ -713,7 +713,10 @@ mod integration_test { trace!("{:?}", log); assert_eq!(log[log_schema().message_key()], "my message".into()); assert_eq!(log["routing"], routing_key.into()); - assert_eq!(log[log_schema().source_type_key()], "amqp".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "amqp".into() + ); let log_ts = log[log_schema().timestamp_key().unwrap().to_string()] .as_timestamp() .unwrap(); diff --git a/src/sources/aws_kinesis_firehose/handlers.rs b/src/sources/aws_kinesis_firehose/handlers.rs index 11bc526b0648c..388e6187cd20f 100644 --- a/src/sources/aws_kinesis_firehose/handlers.rs +++ b/src/sources/aws_kinesis_firehose/handlers.rs @@ -93,7 +93,7 @@ pub(super) async fn firehose( if let Event::Log(ref mut log) = event { log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()), ); diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 3074a8f383d6e..508f6abbaed68 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -691,7 +691,7 @@ fn handle_single_log( log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(AwsS3Config::NAME.as_bytes()), ); diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 2e4dbf6656597..18536acf3adac 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -284,7 +284,7 @@ pub struct ApiKeyQueryParams { pub(crate) struct DatadogAgentSource { pub(crate) api_key_extractor: ApiKeyExtractor, pub(crate) log_schema_host_key: &'static str, - pub(crate) log_schema_source_type_key: &'static str, + pub(crate) log_schema_source_type_key: String, pub(crate) log_namespace: LogNamespace, pub(crate) decoder: Decoder, protocol: &'static str, @@ -334,7 +334,9 @@ impl DatadogAgentSource { .expect("static regex always compiles"), }, log_schema_host_key: log_schema().host_key(), - log_schema_source_type_key: log_schema().source_type_key(), + log_schema_source_type_key: log_schema() + .source_type_key() + .map_or("".to_string(), |key| key.to_string()), decoder, protocol, logs_schema_definition: Arc::new(logs_schema_definition), diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index 4fc3ef4321ee1..9647688279027 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -238,7 +238,10 @@ async fn full_payload_v1() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -300,7 +303,10 @@ async fn full_payload_v2() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -362,7 +368,10 @@ async fn no_api_key() { assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); assert!(event.metadata().datadog_api_key().is_none()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( event.metadata().schema_definition(), &test_logs_schema_definition() @@ -423,7 +432,10 @@ async fn api_key_in_url() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -488,7 +500,10 @@ async fn api_key_in_query_params() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -559,7 +574,10 @@ async fn api_key_in_header() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" @@ -706,7 +724,10 @@ async fn ignores_api_key() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert!(event.metadata().datadog_api_key().is_none()); assert_eq!( event.metadata().schema_definition(), @@ -1398,7 +1419,10 @@ async fn split_outputs() { assert_eq!(log["service"], "vector".into()); assert_eq!(log["ddsource"], "curl".into()); assert_eq!(log["ddtags"], "one,two,three".into()); - assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "datadog_agent".into() + ); assert_eq!( &event.metadata().datadog_api_key().as_ref().unwrap()[..], "12345678abcdefgh12345678abcdefgh" diff --git a/src/sources/datadog_agent/traces.rs b/src/sources/datadog_agent/traces.rs index 3fb55de8508e6..889da554ba9ca 100644 --- a/src/sources/datadog_agent/traces.rs +++ b/src/sources/datadog_agent/traces.rs @@ -142,7 +142,7 @@ fn handle_dd_trace_payload_v1( .set_datadog_api_key(Arc::clone(k)); } trace_event.insert( - source.log_schema_source_type_key, + source.log_schema_source_type_key.as_str(), Bytes::from("datadog_agent"), ); trace_event.insert("payload_version", "v2".to_string()); @@ -255,7 +255,7 @@ fn handle_dd_trace_payload_v0( trace_event.insert("language_name", lang.clone()); } trace_event.insert( - source.log_schema_source_type_key, + source.log_schema_source_type_key.as_str(), Bytes::from("datadog_agent"), ); trace_event.insert("payload_version", "v1".to_string()); diff --git a/src/sources/dnstap/mod.rs b/src/sources/dnstap/mod.rs index 69a9d13728805..231c5fd096245 100644 --- a/src/sources/dnstap/mod.rs +++ b/src/sources/dnstap/mod.rs @@ -212,7 +212,7 @@ pub struct DnstapFrameHandler { socket_send_buffer_size: Option, host_key: Option, timestamp_key: Option, - source_type_key: String, + source_type_key: Option, bytes_received: Registered, log_namespace: LogNamespace, } @@ -242,7 +242,7 @@ impl DnstapFrameHandler { socket_send_buffer_size: config.socket_send_buffer_size, host_key, timestamp_key: timestamp_key.cloned(), - source_type_key: source_type_key.to_string(), + source_type_key: source_type_key.cloned(), bytes_received: register!(BytesReceived::from(Protocol::from("protobuf"))), log_namespace, } @@ -307,7 +307,7 @@ impl FrameHandler for DnstapFrameHandler { self.log_namespace.insert_vector_metadata( &mut log_event, - Some(self.source_type_key()), + self.source_type_key(), path!("source_type"), DnstapConfig::NAME, ); @@ -343,8 +343,8 @@ impl FrameHandler for DnstapFrameHandler { &self.host_key } - fn source_type_key(&self) -> &str { - self.source_type_key.as_str() + fn source_type_key(&self) -> Option<&OwnedValuePath> { + self.source_type_key.as_ref() } fn timestamp_key(&self) -> Option<&OwnedValuePath> { diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 9dc7ffeeedd09..a37ae3260946c 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -14,8 +14,7 @@ use chrono::{DateTime, FixedOffset, Local, ParseError, Utc}; use codecs::{BytesDeserializer, BytesDeserializerConfig}; use futures::{Stream, StreamExt}; use lookup::{ - lookup_v2::{parse_value_path, OptionalValuePath}, - metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix, + lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix, }; use once_cell::sync::Lazy; use serde_with::serde_as; @@ -338,9 +337,7 @@ impl SourceConfig for DockerLogsConfig { Some("timestamp"), ) .with_vector_metadata( - parse_value_path(log_schema().source_type_key()) - .ok() - .as_ref(), + log_schema().source_type_key(), &owned_value_path!("source_type"), Kind::bytes(), None, @@ -1119,7 +1116,7 @@ impl ContainerLogInfo { log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(DockerLogsConfig::NAME.as_bytes()), ); diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index c9569209074ba..d1f55d885e996 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -447,7 +447,7 @@ mod integration_tests { assert!(log.get(format!("label.{}", label).as_str()).is_some()); assert_eq!(events[0].as_log()[&NAME], name.into()); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], DockerLogsConfig::NAME.into() ); }) @@ -654,7 +654,7 @@ mod integration_tests { assert!(log.get(format!("label.{}", label).as_str()).is_some()); assert_eq!(events[0].as_log()[&NAME], name.into()); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], DockerLogsConfig::NAME.into() ); }) @@ -795,7 +795,7 @@ mod integration_tests { .is_some()); assert_eq!(events[0].as_log()[&NAME], name.into()); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], DockerLogsConfig::NAME.into() ); }) diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 8af71919a3d76..627a4f123bbf9 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -761,7 +761,10 @@ mod tests { assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key()], "exec".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "exec".into() + ); assert!(log .get(( lookup::PathPrefix::Event, @@ -842,7 +845,10 @@ mod tests { assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key()], "exec".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "exec".into() + ); assert!(log .get(( lookup::PathPrefix::Event, @@ -1041,7 +1047,10 @@ mod tests { let log = event.as_log(); assert_eq!(log[COMMAND_KEY], config.command.clone().into()); assert_eq!(log[STREAM_KEY], STDOUT.into()); - assert_eq!(log[log_schema().source_type_key()], "exec".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "exec".into() + ); assert_eq!(log[log_schema().message_key()], "Hello World!".into()); assert_eq!(log[log_schema().host_key()], "Some.Machine".into()); assert!(log.get(PID_KEY).is_some()); diff --git a/src/sources/file.rs b/src/sources/file.rs index 1c48a2cefe298..230e8aac42ffb 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -751,7 +751,7 @@ fn create_event( log_namespace.insert_vector_metadata( &mut event, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(FileConfig::NAME.as_bytes()), ); @@ -1037,7 +1037,10 @@ mod tests { assert_eq!(log["host"], "Some.Machine".into()); assert_eq!(log["offset"], 0.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key()], "file".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "file".into() + ); assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()); } @@ -1059,7 +1062,10 @@ mod tests { assert_eq!(log["hostname"], "Some.Machine".into()); assert_eq!(log["off"], 0.into()); assert_eq!(log[log_schema().message_key()], "hello world".into()); - assert_eq!(log[log_schema().source_type_key()], "file".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "file".into() + ); assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp()); } @@ -1466,7 +1472,7 @@ mod tests { log_schema().host_key().to_string(), log_schema().message_key().to_string(), log_schema().timestamp_key().unwrap().to_string(), - log_schema().source_type_key().to_string() + log_schema().source_type_key().unwrap().to_string() ] .into_iter() .collect::>() diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index f25f413590324..e6fbb45b31d75 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -587,7 +587,7 @@ impl From> for LogEvent { log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(FluentConfig::NAME.as_bytes()), ); @@ -665,7 +665,7 @@ mod tests { Event::Log(LogEvent::from(BTreeMap::from([ (String::from("message"), Value::from(name)), ( - String::from(log_schema().source_type_key()), + log_schema().source_type_key().unwrap().to_string(), Value::from(FluentConfig::NAME), ), (String::from("tag"), Value::from("tag.name")), diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 2cdf7d7d44424..c2a6b7959ac44 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -531,7 +531,7 @@ mod tests { .into() ); assert_eq!(log[&log_schema().host_key()], "host".into()); - assert_eq!(log[log_schema().source_type_key()], "heroku_logs".into()); + assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); assert_eq!(log["appname"], "lumberjack-store".into()); assert_eq!(log["absent"], Value::Null); }).await; @@ -614,7 +614,10 @@ mod tests { .into() ); assert_eq!(log[log_schema().host_key()], "host".into()); - assert_eq!(log[log_schema().source_type_key()], "heroku_logs".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "heroku_logs".into() + ); } #[test] @@ -634,7 +637,10 @@ mod tests { log_schema().timestamp_key().unwrap() )) .is_some()); - assert_eq!(log[log_schema().source_type_key()], "heroku_logs".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "heroku_logs".into() + ); } #[test] @@ -653,7 +659,10 @@ mod tests { .into() ); assert_eq!(log[log_schema().host_key()], "host".into()); - assert_eq!(log[log_schema().source_type_key()], "heroku_logs".into()); + assert_eq!( + log[log_schema().source_type_key().unwrap().to_string()], + "heroku_logs".into() + ); } #[test] diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 55036ccde446b..fc2a39e0e5743 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -328,16 +328,20 @@ impl http_client::HttpClientContext for HttpClientContext { ); } Event::Metric(ref mut metric) => { - metric.replace_tag( - log_schema().source_type_key().to_string(), - HttpClientConfig::NAME.to_string(), - ); + if let Some(source_type_key) = log_schema().source_type_key() { + metric.replace_tag( + source_type_key.to_string(), + HttpClientConfig::NAME.to_string(), + ); + } } Event::Trace(ref mut trace) => { - trace.insert( - log_schema().source_type_key(), - Bytes::from(HttpClientConfig::NAME), - ); + if let Some(source_type_key) = log_schema().source_type_key() { + trace.insert( + source_type_key.to_string(), + Bytes::from(HttpClientConfig::NAME), + ); + } } } } diff --git a/src/sources/http_client/integration_tests.rs b/src/sources/http_client/integration_tests.rs index f7b04403e45d3..4bd5552ccaf67 100644 --- a/src/sources/http_client/integration_tests.rs +++ b/src/sources/http_client/integration_tests.rs @@ -84,7 +84,7 @@ async fn collected_logs_bytes() { // panics if not log event let log = events[0].as_log(); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], HttpClientConfig::NAME.into() ); } @@ -108,7 +108,7 @@ async fn collected_logs_json() { // panics if not log event let log = events[0].as_log(); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], HttpClientConfig::NAME.into() ); } @@ -136,7 +136,7 @@ async fn collected_metrics_native_json() { metric .tags() .unwrap() - .get(log_schema().source_type_key()) + .get(log_schema().source_type_key().unwrap().to_string().as_str()) .map(AsRef::as_ref), Some(HttpClientConfig::NAME) ); @@ -161,7 +161,7 @@ async fn collected_trace_native_json() { let trace = events[0].as_trace(); assert_eq!( - trace.as_map()[log_schema().source_type_key()], + trace.as_map()[log_schema().source_type_key().unwrap().to_string().as_str()], HttpClientConfig::NAME.into() ); } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 2bd2afecee34f..cadd154fc8795 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -662,7 +662,7 @@ mod tests { )) .is_some()); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], SimpleHttpConfig::NAME.into() ); assert_eq!(log["http_path"], "/".into()); @@ -958,10 +958,16 @@ mod tests { log_schema().timestamp_key().unwrap() )) .is_some()); - assert_eq!( - log[log_schema().source_type_key()], - SimpleHttpConfig::NAME.into() - ); + + let source_type_key_value = log + .get(( + lookup::PathPrefix::Event, + log_schema().source_type_key().unwrap(), + )) + .unwrap() + .as_str() + .unwrap(); + assert_eq!(source_type_key_value, SimpleHttpConfig::NAME); assert_eq!(log["http_path"], "/".into()); } @@ -1131,7 +1137,7 @@ mod tests { )) .is_some()); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], SimpleHttpConfig::NAME.into() ); } @@ -1184,7 +1190,7 @@ mod tests { )) .is_some()); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], SimpleHttpConfig::NAME.into() ); } @@ -1200,7 +1206,7 @@ mod tests { )) .is_some()); assert_eq!( - log[log_schema().source_type_key()], + log[log_schema().source_type_key().unwrap().to_string()], SimpleHttpConfig::NAME.into() ); } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 3ec938bda0b47..c28faf4318e24 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -756,7 +756,7 @@ fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) { // Add source type. log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), JournaldConfig::NAME, ); @@ -1165,7 +1165,7 @@ mod tests { Value::Bytes("System Initialization".into()) ); assert_eq!( - received[0].as_log()[log_schema().source_type_key()], + received[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "journald".into() ); assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000)); diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 8ba6511c53557..fb64dd8a033f6 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -23,6 +23,7 @@ use rdkafka::{ use serde_with::serde_as; use snafu::{ResultExt, Snafu}; use tokio_util::codec::FramedRead; +use vrl::path::PathPrefix; use vector_common::finalizer::OrderedFinalizer; use vector_config::configurable_component; @@ -602,7 +603,12 @@ impl ReceivedMessage { ); } LogNamespace::Legacy => { - log.insert(log_schema().source_type_key(), KafkaSourceConfig::NAME); + if let Some(source_type_key) = log_schema().source_type_key() { + log.insert( + (PathPrefix::Event, source_type_key), + KafkaSourceConfig::NAME, + ); + } } } @@ -1054,7 +1060,7 @@ mod integration_test { format!("{} {}", KEY, i).into() ); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "kafka".into() ); assert_eq!( diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index bd004a778881b..5b22564625cb6 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -910,7 +910,7 @@ fn create_event( log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from(Config::NAME), ); diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 8d58f09a10c42..2cbabbfacbc4c 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -209,7 +209,7 @@ impl TcpSource for LogstashSource { self.log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from_static(LogstashConfig::NAME.as_bytes()), ); diff --git a/src/sources/redis/mod.rs b/src/sources/redis/mod.rs index 03f4da5fe5dd8..1239a04c60915 100644 --- a/src/sources/redis/mod.rs +++ b/src/sources/redis/mod.rs @@ -256,7 +256,7 @@ impl InputHandler { if let Event::Log(ref mut log) = event { self.log_namespace.insert_vector_metadata( log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), path!("source_type"), Bytes::from(RedisSourceConfig::NAME), ); @@ -482,7 +482,7 @@ mod integration_test { for event in events { assert_eq!(event.as_log()[log_schema().message_key()], text.into()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], RedisSourceConfig::NAME.into() ); } diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index d381b65cecfdd..910bf6617a697 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -497,7 +497,7 @@ mod test { let event = rx.next().await.unwrap(); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) @@ -1121,7 +1121,7 @@ mod test { let events = collect_n(rx, 1).await; assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) @@ -1327,7 +1327,7 @@ mod test { "test".into() ); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); assert_eq!(events[0].as_log()["host"], UNNAMED_SOCKET_HOST.into()); @@ -1419,7 +1419,7 @@ mod test { "foo\nbar".into() ); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) @@ -1506,7 +1506,7 @@ mod test { "test".into() ); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) @@ -1546,12 +1546,12 @@ mod test { assert_eq!(events.len(), 2); assert_eq!(events[0].as_log()[log_schema().message_key()], "foo".into()); assert_eq!( - events[0].as_log()[log_schema().source_type_key()], + events[0].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); assert_eq!(events[1].as_log()[log_schema().message_key()], "bar".into()); assert_eq!( - events[1].as_log()[log_schema().source_type_key()], + events[1].as_log()[log_schema().source_type_key().unwrap().to_string()], "socket".into() ); }) diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index e33f3c7fea22e..fa0a24564754e 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -661,7 +661,7 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { // Add source type self.log_namespace.insert_vector_metadata( &mut log, - Some(log_schema().source_type_key()), + log_schema().source_type_key(), lookup::path!("source_type"), SplunkConfig::NAME, ); @@ -1420,7 +1420,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1447,7 +1447,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1478,7 +1478,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1506,7 +1506,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1537,7 +1537,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1567,7 +1567,10 @@ mod tests { log_schema().timestamp_key().unwrap() )) .is_some()); - assert_eq!(event[log_schema().source_type_key()], "splunk_hec".into()); + assert_eq!( + event[log_schema().source_type_key().unwrap().to_string()], + "splunk_hec".into() + ); assert!(event.metadata().splunk_hec_token().is_none()); } @@ -1630,7 +1633,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1657,7 +1660,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert!(event.metadata().splunk_hec_token().is_none()); @@ -1896,7 +1899,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); assert_eq!( @@ -1973,7 +1976,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); }) @@ -2003,7 +2006,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); }) @@ -2031,7 +2034,7 @@ mod tests { )) .is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); }) @@ -2060,7 +2063,7 @@ mod tests { assert_eq!(event.as_log()["bool"], true.into()); assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some()); assert_eq!( - event.as_log()[log_schema().source_type_key()], + event.as_log()[log_schema().source_type_key().unwrap().to_string()], "splunk_hec".into() ); }).await; diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 728cda24a6c46..69b12a7ee3368 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -811,7 +811,10 @@ mod test { .single() .expect("invalid timestamp"), ); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); @@ -863,7 +866,10 @@ mod test { ); expected.insert(log_schema().host_key(), "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("severity", "notice"); expected.insert("facility", "user"); expected.insert("version", 1); @@ -1003,7 +1009,10 @@ mod test { expected_date, ); expected.insert(log_schema().host_key(), "74794bfb6795"); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "notice"); expected.insert("facility", "user"); @@ -1048,7 +1057,10 @@ mod test { ), expected_date, ); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "info"); @@ -1085,7 +1097,10 @@ mod test { .and_then(|t| t.with_nanosecond(605_850 * 1000)) .expect("invalid timestamp"), ); - expected.insert(log_schema().source_type_key(), "syslog"); + expected.insert( + (PathPrefix::Event, log_schema().source_type_key().unwrap()), + "syslog", + ); expected.insert("host", "74794bfb6795"); expected.insert("hostname", "74794bfb6795"); expected.insert("severity", "info"); diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 8bae468a85241..87708f3866d32 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -350,7 +350,7 @@ pub trait FrameHandler { fn socket_send_buffer_size(&self) -> Option; fn host_key(&self) -> &Option; fn timestamp_key(&self) -> Option<&OwnedValuePath>; - fn source_type_key(&self) -> &str; + fn source_type_key(&self) -> Option<&OwnedValuePath>; } /** @@ -630,7 +630,7 @@ mod test { extra_task_handling_routine: F, host_key: Option, timestamp_key: Option, - source_type_key: String, + source_type_key: Option, log_namespace: LogNamespace, } @@ -648,7 +648,7 @@ mod test { extra_task_handling_routine: extra_routine, host_key: Some(owned_value_path!("test_framestream")), timestamp_key: Some(owned_value_path!("my_timestamp")), - source_type_key: "source_type".to_string(), + source_type_key: Some(owned_value_path!("source_type")), log_namespace: LogNamespace::Legacy, } } @@ -665,7 +665,10 @@ mod test { fn handle_event(&self, received_from: Option, frame: Bytes) -> Option { let mut log_event = LogEvent::from(frame); - log_event.insert(log_schema().source_type_key(), "framestream"); + log_event.insert( + log_schema().source_type_key().unwrap().to_string().as_str(), + "framestream", + ); if let Some(host) = received_from { self.log_namespace.insert_source_metadata( "framestream", @@ -711,8 +714,8 @@ mod test { self.timestamp_key.as_ref() } - fn source_type_key(&self) -> &str { - self.source_type_key.as_str() + fn source_type_key(&self) -> Option<&OwnedValuePath> { + self.source_type_key.as_ref() } } diff --git a/src/sources/util/message_decoding.rs b/src/sources/util/message_decoding.rs index 3d9b959db678f..379f8c1b09583 100644 --- a/src/sources/util/message_decoding.rs +++ b/src/sources/util/message_decoding.rs @@ -33,7 +33,7 @@ pub fn decode_message<'a>( if let Event::Log(ref mut log) = event { log_namespace.insert_vector_metadata( log, - Some(schema.source_type_key()), + schema.source_type_key(), path!("source_type"), Bytes::from(source_type), ); diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index 44b0921d75bcd..4c4a746f5d886 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -275,6 +275,7 @@ mod test { mod tests { use vector_common::assert_event_data_eq; use vector_core::config::log_schema; + use vrl::path::PathPrefix; use super::*; use crate::{ @@ -305,10 +306,11 @@ mod tests { let (mut events, stream) = test_util::random_events_with_stream(100, 100, None); sink.run(stream).await.unwrap(); + let source_type_key = log_schema().source_type_key().unwrap(); for event in &mut events { event .as_mut_log() - .insert(log_schema().source_type_key(), "vector"); + .insert((PathPrefix::Event, source_type_key), "vector"); } let output = test_util::collect_ready(rx).await;