Skip to content

Commit

Permalink
enhancement(datadog service): Add datadog global config options (#18929)
Browse files Browse the repository at this point in the history
* Add datadog section

Signed-off-by: Stephen Wakely <[email protected]>

* Send global datadog options to sinks

Signed-off-by: Stephen Wakely <[email protected]>

* Fix tests

Signed-off-by: Stephen Wakely <[email protected]>

* Added a test

Signed-off-by: Stephen Wakely <[email protected]>

* Update docs

Signed-off-by: Stephen Wakely <[email protected]>

* Remove source feature

Signed-off-by: Stephen Wakely <[email protected]>

* Test local setting can override the global one

Signed-off-by: Stephen Wakely <[email protected]>

* Spelling

Signed-off-by: Stephen Wakely <[email protected]>

* Update docs

Signed-off-by: Stephen Wakely <[email protected]>

* Feedback from Doug

Signed-off-by: Stephen Wakely <[email protected]>

* Allow datadog options to be specified by passing in the RootOpts

Signed-off-by: Stephen Wakely <[email protected]>

* Pass datadog options through a new extra_context parameter

Signed-off-by: Stephen Wakely <[email protected]>

* Fix extra context

Signed-off-by: Stephen Wakely <[email protected]>

* Tidy up a little

Signed-off-by: Stephen Wakely <[email protected]>

* Make Datadog options pub

Signed-off-by: Stephen Wakely <[email protected]>

* Updated licenses

Signed-off-by: Stephen Wakely <[email protected]>

* Add anymap to spellings

Signed-off-by: Stephen Wakely <[email protected]>

* Add to upgrade guide

Signed-off-by: Stephen Wakely <[email protected]>

* Feedback from Doug

Signed-off-by: Stephen Wakely <[email protected]>

* Spelling

Signed-off-by: Stephen Wakely <[email protected]>

* Feedback from Bryce

Signed-off-by: Stephen Wakely <[email protected]>

* Component docs

Signed-off-by: Stephen Wakely <[email protected]>

* Feedback from Bruce

Signed-off-by: Stephen Wakely <[email protected]>

* Use single_value in tests

Signed-off-by: Stephen Wakely <[email protected]>

* Rename dd_common to local_dd_common

Signed-off-by: Stephen Wakely <[email protected]>

* Pass default for extra context when running as a Windows service

Signed-off-by: Stephen Wakely <[email protected]>

---------

Signed-off-by: Stephen Wakely <[email protected]>
  • Loading branch information
StephenWakely authored Dec 4, 2023
1 parent 5b89574 commit 9b814f1
Show file tree
Hide file tree
Showing 43 changed files with 920 additions and 243 deletions.
1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ andy
ansicpg
anumber
anycondition
anymap
anypb
apievent
apipodspec
Expand Down
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ sha2 = { version = "0.10.8", default-features = false, optional = true }
greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "bc32362adf0df17a41a95bae4221d6d8f1775656", optional = true }

# External libs
anymap = { version = "0.12", default-features = false }
arc-swap = { version = "1.6", default-features = false, optional = true }
async-compression = { version = "0.4.5", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
apache-avro = { version = "0.16.0", default-features = false, optional = true }
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anstyle-query,https://github.com/rust-cli/anstyle,MIT OR Apache-2.0,The anstyle-
anstyle-wincon,https://github.com/rust-cli/anstyle,MIT OR Apache-2.0,The anstyle-wincon Authors
anyhow,https://github.com/dtolnay/anyhow,MIT OR Apache-2.0,David Tolnay <[email protected]>
anymap,https://github.com/chris-morgan/anymap,BlueOak-1.0.0 OR MIT OR Apache-2.0,Chris Morgan <[email protected]>
anymap,https://github.com/chris-morgan/anymap,MIT OR Apache-2.0,Chris Morgan <[email protected]>
apache-avro,https://github.com/apache/avro,Apache-2.0,Apache Avro team <[email protected]>
arbitrary,https://github.com/rust-fuzz/arbitrary,MIT OR Apache-2.0,"The Rust-Fuzz Project Developers, Nick Fitzgerald <[email protected]>, Manish Goregaokar <[email protected]>, Simonas Kazlauskas <[email protected]>, Brian L. Troutwine <[email protected]>, Corey Farwell <[email protected]>"
arc-swap,https://github.com/vorner/arc-swap,MIT OR Apache-2.0,Michal 'vorner' Vaner <[email protected]>
Expand Down
46 changes: 33 additions & 13 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::config::enterprise::{
attach_enterprise_components, report_configuration, EnterpriseError, EnterpriseMetadata,
EnterpriseReporter,
};
use crate::extra_context::ExtraContext;
#[cfg(not(feature = "enterprise-tests"))]
use crate::metrics;
#[cfg(feature = "api")]
Expand Down Expand Up @@ -49,6 +50,7 @@ pub struct ApplicationConfig {
pub api: config::api::Options,
#[cfg(feature = "enterprise")]
pub enterprise: Option<EnterpriseReporter<BoxFuture<'static, ()>>>,
pub extra_context: ExtraContext,
}

pub struct Application {
Expand All @@ -61,6 +63,7 @@ impl ApplicationConfig {
pub async fn from_opts(
opts: &RootOpts,
signal_handler: &mut SignalHandler,
extra_context: ExtraContext,
) -> Result<Self, ExitCode> {
let config_paths = opts.config_paths_with_formats();

Expand All @@ -77,12 +80,13 @@ impl ApplicationConfig {
)
.await?;

Self::from_config(config_paths, config).await
Self::from_config(config_paths, config, extra_context).await
}

pub async fn from_config(
config_paths: Vec<ConfigPath>,
config: Config,
extra_context: ExtraContext,
) -> Result<Self, ExitCode> {
// This is ugly, but needed to allow `config` to be mutable for building the enterprise
// features, but also avoid a "does not need to be mutable" warning when the enterprise
Expand All @@ -95,9 +99,10 @@ impl ApplicationConfig {
#[cfg(feature = "api")]
let api = config.api;

let (topology, graceful_crash_receiver) = RunningTopology::start_init_validated(config)
.await
.ok_or(exitcode::CONFIG)?;
let (topology, graceful_crash_receiver) =
RunningTopology::start_init_validated(config, extra_context.clone())
.await
.ok_or(exitcode::CONFIG)?;

Ok(Self {
config_paths,
Expand All @@ -108,11 +113,18 @@ impl ApplicationConfig {
api,
#[cfg(feature = "enterprise")]
enterprise,
extra_context,
})
}

pub async fn add_internal_config(&mut self, config: Config) -> Result<(), ExitCode> {
let Some((topology, _)) = RunningTopology::start_init_validated(config).await else {
pub async fn add_internal_config(
&mut self,
config: Config,
extra_context: ExtraContext,
) -> Result<(), ExitCode> {
let Some((topology, _)) =
RunningTopology::start_init_validated(config, extra_context).await
else {
return Err(exitcode::CONFIG);
};
self.internal_topologies.push(topology);
Expand Down Expand Up @@ -155,28 +167,34 @@ impl ApplicationConfig {
}

impl Application {
pub fn run() -> ExitStatus {
let (runtime, app) = Self::prepare_start().unwrap_or_else(|code| std::process::exit(code));
pub fn run(extra_context: ExtraContext) -> ExitStatus {
let (runtime, app) =
Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));

runtime.block_on(app.run())
}

pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> {
Self::prepare()
pub fn prepare_start(
extra_context: ExtraContext,
) -> Result<(Runtime, StartedApplication), ExitCode> {
Self::prepare(extra_context)
.and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
}

pub fn prepare() -> Result<(Runtime, Self), ExitCode> {
pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
let opts = Opts::get_matches().map_err(|error| {
// Printing to stdout/err can itself fail; ignore it.
_ = error.print();
exitcode::USAGE
})?;

Self::prepare_from_opts(opts)
Self::prepare_from_opts(opts, extra_context)
}

pub fn prepare_from_opts(opts: Opts) -> Result<(Runtime, Self), ExitCode> {
pub fn prepare_from_opts(
opts: Opts,
extra_context: ExtraContext,
) -> Result<(Runtime, Self), ExitCode> {
init_global(!opts.root.openssl_no_probe);

let color = opts.root.color.use_color();
Expand Down Expand Up @@ -205,6 +223,7 @@ impl Application {
let config = runtime.block_on(ApplicationConfig::from_opts(
&opts.root,
&mut signals.handler,
extra_context.clone(),
))?;

Ok((
Expand Down Expand Up @@ -239,6 +258,7 @@ impl Application {
require_healthy: root_opts.require_healthy,
#[cfg(feature = "enterprise")]
enterprise_reporter: config.enterprise,
extra_context: config.extra_context,
});

Ok(StartedApplication {
Expand Down
32 changes: 29 additions & 3 deletions src/common/datadog.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Functionality shared between Datadog sources and sinks.
// Allow unused imports here, since use of these functions will differ depending on the
// Datadog component type, whether it's used in integration tests, etc.
#![allow(dead_code)]
#![allow(unreachable_pub)]
use serde::{Deserialize, Serialize};
use vector_lib::event::DatadogMetricOriginMetadata;
use vector_lib::{event::DatadogMetricOriginMetadata, sensitive_string::SensitiveString};

pub const DD_US_SITE: &str = "datadoghq.com";
pub const DD_EU_SITE: &str = "datadoghq.eu";
pub(crate) const DD_US_SITE: &str = "datadoghq.com";
pub(crate) const DD_EU_SITE: &str = "datadoghq.eu";

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub(crate) struct DatadogSeriesMetric {
Expand Down Expand Up @@ -50,3 +51,28 @@ pub(crate) fn get_api_base_endpoint(endpoint: Option<&String>, site: &str) -> St
.cloned()
.unwrap_or_else(|| format!("https://api.{}", site))
}

/// Default settings to use for Datadog components.
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
pub struct Options {
/// Default Datadog API key to use for Datadog components.
///
/// This can also be specified with the `DD_API_KEY` environment variable.
#[derivative(Default(value = "default_api_key()"))]
pub api_key: Option<SensitiveString>,

/// Default site to use for Datadog components.
///
/// This can also be specified with the `DD_SITE` environment variable.
#[derivative(Default(value = "default_site()"))]
pub site: String,
}

fn default_api_key() -> Option<SensitiveString> {
std::env::var("DD_API_KEY").ok().map(Into::into)
}

pub(crate) fn default_site() -> String {
std::env::var("DD_SITE").unwrap_or(DD_US_SITE.to_string())
}
3 changes: 2 additions & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Modules that are common between sources and sinks.
#[cfg(any(
feature = "sources-datadog_agent",
feature = "sinks-datadog_events",
Expand All @@ -6,7 +7,7 @@
feature = "sinks-datadog_traces",
feature = "enterprise"
))]
pub(crate) mod datadog;
pub mod datadog;

#[cfg(any(
feature = "sources-aws_sqs",
Expand Down
15 changes: 13 additions & 2 deletions src/components/validation/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
codecs::Encoder,
components::validation::{RunnerMetrics, TestCase},
config::ConfigBuilder,
extra_context::ExtraContext,
topology::RunningTopology,
};

Expand Down Expand Up @@ -154,17 +155,20 @@ pub struct Runner {
configuration: ValidationConfiguration,
test_case_data_path: PathBuf,
validators: HashMap<String, Box<dyn Validator>>,
extra_context: ExtraContext,
}

impl Runner {
pub fn from_configuration(
configuration: ValidationConfiguration,
test_case_data_path: PathBuf,
extra_context: ExtraContext,
) -> Self {
Self {
configuration,
test_case_data_path,
validators: HashMap::new(),
extra_context,
}
}

Expand Down Expand Up @@ -271,7 +275,11 @@ impl Runner {
// At this point, we need to actually spawn the configured component topology so that it
// runs, and make sure we have a way to tell it when to shutdown so that we can properly
// sequence the test in terms of sending inputs, waiting for outputs, etc.
spawn_component_topology(config_builder, &topology_task_coordinator);
spawn_component_topology(
config_builder,
&topology_task_coordinator,
self.extra_context.clone(),
);
let topology_task_coordinator = topology_task_coordinator.started().await;

// Now we'll spawn two tasks: one for sending inputs, and one for collecting outputs.
Expand Down Expand Up @@ -462,6 +470,7 @@ fn build_external_resource(
fn spawn_component_topology(
config_builder: ConfigBuilder,
topology_task_coordinator: &TaskCoordinator<Configuring>,
extra_context: ExtraContext,
) {
let topology_started = topology_task_coordinator.track_started();
let topology_completed = topology_task_coordinator.track_completed();
Expand All @@ -482,7 +491,9 @@ fn spawn_component_topology(
debug!("Building component topology...");

let (topology, mut crash_rx) =
RunningTopology::start_init_validated(config).await.unwrap();
RunningTopology::start_init_validated(config, extra_context)
.await
.unwrap();

debug!("Component topology built and spawned.");
topology_started.mark_as_done();
Expand Down
27 changes: 12 additions & 15 deletions src/config/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ use super::{
SourceOuter, TransformOuter,
};
use crate::{
common::datadog::get_api_base_endpoint,
common::datadog::{default_site, get_api_base_endpoint},
conditions::AnyCondition,
http::{HttpClient, HttpError},
sinks::{
datadog::{
default_site, logs::DatadogLogsConfig, metrics::DatadogMetricsConfig,
DatadogCommonConfig,
logs::DatadogLogsConfig, metrics::DatadogMetricsConfig, LocalDatadogCommonConfig,
},
util::{http::RequestConfig, retries::ExponentialBackoff},
},
Expand Down Expand Up @@ -461,12 +460,11 @@ fn setup_logs_reporting(

// Create a Datadog logs sink to consume and emit internal logs.
let datadog_logs = DatadogLogsConfig {
dd_common: DatadogCommonConfig {
endpoint: datadog.endpoint.clone(),
site: datadog.site.clone(),
default_api_key: api_key.into(),
..Default::default()
},
local_dd_common: LocalDatadogCommonConfig::new(
datadog.endpoint.clone(),
Some(datadog.site.clone()),
Some(api_key.into()),
),
request: RequestConfig {
headers: IndexMap::from([(
"DD-EVP-ORIGIN".to_string(),
Expand Down Expand Up @@ -568,12 +566,11 @@ fn setup_metrics_reporting(

// Create a Datadog metrics sink to consume and emit internal + host metrics.
let datadog_metrics = DatadogMetricsConfig {
dd_common: DatadogCommonConfig {
endpoint: datadog.endpoint.clone(),
site: datadog.site.clone(),
default_api_key: api_key.into(),
..Default::default()
},
local_dd_common: LocalDatadogCommonConfig::new(
datadog.endpoint.clone(),
Some(datadog.site.clone()),
Some(api_key.into()),
),
..Default::default()
};

Expand Down
3 changes: 2 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ mod tests {
let c2 = config::load_from_str(config, format).unwrap();
match (
config::warnings(&c2),
topology::TopologyPieces::build(&c, &diff, HashMap::new()).await,
topology::TopologyPieces::build(&c, &diff, HashMap::new(), Default::default())
.await,
) {
(warnings, Ok(_pieces)) => Ok(warnings),
(_, Err(errors)) => Err(errors),
Expand Down
Loading

0 comments on commit 9b814f1

Please sign in to comment.