Skip to content

Commit

Permalink
feat: Migrate LogSchema::host_key to new lookup code (vectordotdev#17972
Browse files Browse the repository at this point in the history
)

This part of vectordotdev#13033.

Summary of these changes:
* LogSchema::host_key is now an `OptionalValuePath`
* `host_key`s that appear in configs are now also `OptionalValuePath`s
* There should be no `unwrap()` calls outside of tests.

---------

Co-authored-by: Stephen Wakely <[email protected]>
  • Loading branch information
pront and StephenWakely authored Jul 18, 2023
1 parent fd10e69 commit 32950d8
Show file tree
Hide file tree
Showing 39 changed files with 306 additions and 201 deletions.
16 changes: 8 additions & 8 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct LogSchema {
/// This field will generally represent a real host, or container, that generated the message,
/// but is somewhat source-dependent.
#[serde(default = "LogSchema::default_host_key")]
host_key: String,
host_key: OptionalValuePath,

/// The name of the event field to set the source identifier in.
///
Expand Down Expand Up @@ -92,8 +92,8 @@ impl LogSchema {
OptionalValuePath::new("timestamp")
}

fn default_host_key() -> String {
String::from("host")
fn default_host_key() -> OptionalValuePath {
OptionalValuePath::new("host")
}

fn default_source_type_key() -> OptionalValuePath {
Expand Down Expand Up @@ -121,8 +121,8 @@ impl LogSchema {
self.timestamp_key.path.as_ref()
}

pub fn host_key(&self) -> &str {
&self.host_key
pub fn host_key(&self) -> Option<&OwnedValuePath> {
self.host_key.path.as_ref()
}

pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
Expand All @@ -141,8 +141,8 @@ impl LogSchema {
self.timestamp_key = OptionalValuePath { path: v };
}

pub fn set_host_key(&mut self, v: String) {
self.host_key = v;
pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
self.host_key = OptionalValuePath { path };
}

pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
Expand All @@ -169,7 +169,7 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
} else {
self.set_host_key(other.host_key().to_string());
self.set_host_key(other.host_key().cloned());
}
if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
&& self.message_key() != other.message_key()
Expand Down
6 changes: 4 additions & 2 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl LogEvent {
pub fn host_path(&self) -> Option<String> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("host"),
LogNamespace::Legacy => Some(log_schema().host_key().to_owned()),
LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string),
}
}

Expand Down Expand Up @@ -505,7 +505,9 @@ impl LogEvent {
pub fn get_host(&self) -> Option<&Value> {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("host"),
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().host_key())),
LogNamespace::Legacy => log_schema()
.host_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
}
}

Expand Down
8 changes: 8 additions & 0 deletions lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,11 @@ impl From<OwnedValuePath> for OptionalValuePath {
Self { path: Some(path) }
}
}

impl From<Option<OwnedValuePath>> for OptionalValuePath {
fn from(value: Option<OwnedValuePath>) -> Self {
value.map_or(OptionalValuePath::none(), |path| {
OptionalValuePath::from(path)
})
}
}
10 changes: 8 additions & 2 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,10 @@ mod tests {
)
.unwrap();

assert_eq!("host", config.global.log_schema.host_key().to_string());
assert_eq!(
"host",
config.global.log_schema.host_key().unwrap().to_string()
);
assert_eq!(
"message",
config.global.log_schema.message_key().to_string()
Expand Down Expand Up @@ -879,7 +882,10 @@ mod tests {
)
.unwrap();

assert_eq!("this", config.global.log_schema.host_key().to_string());
assert_eq!(
"this",
config.global.log_schema.host_key().unwrap().to_string()
);
assert_eq!("that", config.global.log_schema.message_key().to_string());
assert_eq!(
"then",
Expand Down
10 changes: 8 additions & 2 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,10 @@ fn sketch_to_proto_message(
let name = get_namespaced_name(metric, default_namespace);
let ts = encode_timestamp(metric.timestamp());
let mut tags = metric.tags().cloned().unwrap_or_default();
let host = tags.remove(log_schema.host_key()).unwrap_or_default();
let host = log_schema
.host_key()
.map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
.unwrap_or_default();
let tags = encode_tags(&tags);

let cnt = ddsketch.count() as i64;
Expand Down Expand Up @@ -497,7 +500,10 @@ fn generate_series_metrics(
let name = get_namespaced_name(metric, default_namespace);

let mut tags = metric.tags().cloned().unwrap_or_default();
let host = tags.remove(log_schema.host_key());
let host = log_schema
.host_key()
.map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());

let source_type_name = tags.remove("source_type_name");
let device = tags.remove("device");
let ts = encode_timestamp(metric.timestamp());
Expand Down
13 changes: 9 additions & 4 deletions src/sinks/datadog/traces/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use futures_util::{
};
use tokio::sync::oneshot::{channel, Sender};
use tower::Service;
use vrl::path::PathPrefix;

use vector_core::{
config::log_schema,
event::Event,
Expand All @@ -15,11 +17,13 @@ use vector_core::{
stream::{BatcherSettings, DriverResponse},
};

use super::service::TraceApiRequest;
use crate::{
internal_events::DatadogTracesEncodingError,
sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
};

use super::service::TraceApiRequest;

#[derive(Default)]
struct EventPartitioner;

Expand Down Expand Up @@ -51,9 +55,10 @@ impl Partitioner for EventPartitioner {
Event::Trace(t) => PartitionKey {
api_key: item.metadata().datadog_api_key(),
env: t.get("env").map(|s| s.to_string_lossy().into_owned()),
hostname: t
.get(log_schema().host_key())
.map(|s| s.to_string_lossy().into_owned()),
hostname: log_schema().host_key().and_then(|key| {
t.get((PathPrefix::Event, key))
.map(|s| s.to_string_lossy().into_owned())
}),
agent_version: t
.get("agent_version")
.map(|s| s.to_string_lossy().into_owned()),
Expand Down
23 changes: 13 additions & 10 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use lookup::lookup_v2::OptionalValuePath;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;

use super::host_key;
use super::config_host_key;
use crate::sinks::splunk_hec::common::config_timestamp_key;
use crate::{
codecs::EncodingConfig,
Expand Down Expand Up @@ -74,8 +74,8 @@ pub struct HumioLogsConfig {
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
///
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
#[serde(default = "host_key")]
pub(super) host_key: String,
#[serde(default = "config_host_key")]
pub(super) host_key: OptionalValuePath,

/// Event fields to be added to Humio’s extra fields.
///
Expand Down Expand Up @@ -154,7 +154,7 @@ impl GenerateConfig for HumioLogsConfig {
event_type: None,
indexed_fields: vec![],
index: None,
host_key: host_key(),
host_key: config_host_key(),
compression: Compression::default(),
request: TowerRequestConfig::default(),
batch: BatchConfig::default(),
Expand Down Expand Up @@ -231,6 +231,7 @@ mod integration_tests {
use serde::Deserialize;
use serde_json::{json, Value as JsonValue};
use tokio::time::Duration;
use vrl::path::PathPrefix;

use super::*;
use crate::{
Expand Down Expand Up @@ -262,14 +263,14 @@ mod integration_tests {
let message = random_string(100);
let host = "192.168.1.1".to_string();
let mut event = LogEvent::from(message.clone());
event.insert(log_schema().host_key(), host.clone());
event.insert(
(PathPrefix::Event, log_schema().host_key().unwrap()),
host.clone(),
);

let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
event.insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
ts,
);

Expand Down Expand Up @@ -387,7 +388,9 @@ mod integration_tests {
source: None,
encoding: JsonSerializerConfig::default().into(),
event_type: None,
host_key: log_schema().host_key().to_string(),
host_key: OptionalValuePath {
path: log_schema().host_key().cloned(),
},
indexed_fields: vec![],
index: None,
compression: Compression::None,
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/humio/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vector_config::configurable_component;
use vector_core::sink::StreamSink;

use super::{
host_key,
config_host_key,
logs::{HumioLogsConfig, HOST},
};
use crate::{
Expand Down Expand Up @@ -86,8 +86,8 @@ pub struct HumioMetricsConfig {
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
///
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
#[serde(default = "host_key")]
host_key: String,
#[serde(default = "config_host_key")]
host_key: OptionalValuePath,

/// Event fields to be added to Humio’s extra fields.
///
Expand Down
8 changes: 6 additions & 2 deletions src/sinks/humio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use lookup::lookup_v2::OptionalValuePath;

pub mod logs;
pub mod metrics;

fn host_key() -> String {
crate::config::log_schema().host_key().to_string()
pub fn config_host_key() -> OptionalValuePath {
OptionalValuePath {
path: crate::config::log_schema().host_key().cloned(),
}
}
24 changes: 14 additions & 10 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use bytes::{Bytes, BytesMut};
use futures::SinkExt;
use http::{Request, Uri};
use indoc::indoc;
use vrl::value::Kind;

use lookup::lookup_v2::{parse_value_path, OptionalValuePath};
use lookup::{OwnedValuePath, PathPrefix};
use vector_config::configurable_component;
use vector_core::config::log_schema;
use vector_core::schema;
use vrl::value::Kind;

use crate::{
codecs::Transformer,
Expand Down Expand Up @@ -189,10 +190,8 @@ impl SinkConfig for InfluxDbLogsConfig {
.host_key
.clone()
.and_then(|k| k.path)
.unwrap_or_else(|| {
parse_value_path(log_schema().host_key())
.expect("global log_schema.host_key to be valid path")
});
.or(log_schema().host_key().cloned())
.expect("global log_schema.host_key to be valid path");

let message_key = self
.message_key
Expand Down Expand Up @@ -409,10 +408,10 @@ mod tests {
use futures::{channel::mpsc, stream, StreamExt};
use http::{request::Parts, StatusCode};
use indoc::indoc;

use lookup::owned_value_path;
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};

use super::*;
use crate::{
sinks::{
influxdb::test_util::{assert_fields, split_line_protocol, ts},
Expand All @@ -427,6 +426,8 @@ mod tests {
},
};

use super::*;

type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;

#[test]
Expand Down Expand Up @@ -880,16 +881,17 @@ mod tests {
#[cfg(feature = "influxdb-integration-tests")]
#[cfg(test)]
mod integration_tests {
use std::sync::Arc;

use chrono::Utc;
use codecs::BytesDeserializerConfig;
use futures::stream;
use vrl::value;

use codecs::BytesDeserializerConfig;
use lookup::{owned_value_path, path};
use std::sync::Arc;
use vector_core::config::{LegacyKey, LogNamespace};
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};
use vrl::value;

use super::*;
use crate::{
config::SinkContext,
sinks::influxdb::{
Expand All @@ -900,6 +902,8 @@ mod integration_tests {
test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
};

use super::*;

#[tokio::test]
async fn influxdb2_logs_put_data() {
let endpoint = address_v2();
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/splunk_hec/common/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ pub fn build_uri(
uri.parse::<Uri>()
}

pub fn host_key() -> String {
crate::config::log_schema().host_key().to_string()
pub fn config_host_key() -> OptionalValuePath {
OptionalValuePath {
path: crate::config::log_schema().host_key().cloned(),
}
}

pub fn config_timestamp_key() -> OptionalValuePath {
Expand Down
Loading

0 comments on commit 32950d8

Please sign in to comment.