diff --git a/.github/repository-settings.md b/.github/repository-settings.md new file mode 100644 index 0000000000..5fc7b8cdcf --- /dev/null +++ b/.github/repository-settings.md @@ -0,0 +1,11 @@ +# Log of local changes + +Maintainers are expected to maintain this log. This is required as per +[OpenTelemetry Community +guidelines](https://github.com/open-telemetry/community/blob/main/docs/how-to-configure-new-repository.md#collaborators-and-teams). + +## April 30th 2024 + +Modified branch protection for main branch to require the following CI checks: +docs +test (stable) diff --git a/README.md b/README.md index 43627be24a..53af613a34 100644 --- a/README.md +++ b/README.md @@ -25,11 +25,18 @@ observability tools. ## Project Status -| Signal | Status | -| ------- | ---------- | -| Logs | Alpha* | -| Metrics | Alpha | -| Traces | Beta | +| Signal/Component | Overall Status | +| -------------------- | ------------------ | +| Logs-API | Alpha* | +| Logs-SDK | Alpha | +| Logs-OTLP Exporter | Alpha | +| Logs-Appender-Tracing | Alpha | +| Metrics-API | Alpha | +| Metrics-SDK | Alpha | +| Metrics-OTLP Exporter | Alpha | +| Traces-API | Beta | +| Traces-SDK | Beta | +| Traces-OTLP Exporter | Beta | *OpenTelemetry Rust is not introducing a new end user callable Logging API. Instead, it provides [Logs Bridge diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 7446b6e9f4..b37f3de024 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -62,7 +62,7 @@ impl LogProcessor for NoopProcessor { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&self) -> LogResult<()> { Ok(()) } diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index dfbd03926c..347a759004 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -17,6 +17,11 @@ [#1568]: https://github.com/open-telemetry/opentelemetry-rust/pull/1568 +### Changed + - **Breaking** Remove global provider for Logs [#1691](https://github.com/open-telemetry/opentelemetry-rust/pull/1691/) + - The method OtlpLogPipeline::install_simple() and OtlpLogPipeline::install_batch() now return `LoggerProvider` instead of + `Logger`. Refer to the [basic-otlp](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-otlp/examples/basic-otlp/src/main.rs) and [basic-otlp-http](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs) examples for how to initialize OTLP Log Exporter to use with OpenTelemetryLogBridge and OpenTelemetryTracingBridge respectively. + ## v0.15.0 ### Added diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index f06ba1d9de..91af4715fa 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -15,7 +15,7 @@ use std::error::Error; use tracing::info; use tracing_subscriber::prelude::*; -fn init_logs() -> Result { +fn init_logs() -> Result { let service_name = env!("CARGO_BIN_NAME"); opentelemetry_otlp::new_pipeline() .logging() @@ -76,13 +76,12 @@ static COMMON_ATTRIBUTES: Lazy<[KeyValue; 4]> = Lazy::new(|| { async fn main() -> Result<(), Box> { let _ = init_tracer()?; let _ = init_metrics()?; - let _ = init_logs(); + // Opentelemetry will not provide a global API to manage the logger provider. Application users must manage the lifecycle of the logger provider on their own. Dropping logger providers will disable log emitting. + let logger_provider = init_logs().unwrap(); let tracer = global::tracer("ex.com/basic"); let meter = global::meter("ex.com/basic"); - // configure the global logger to use our opentelemetry logger - let logger_provider = opentelemetry::global::logger_provider(); let layer = OpenTelemetryTracingBridge::new(&logger_provider); tracing_subscriber::registry().with(layer).init(); @@ -108,7 +107,7 @@ async fn main() -> Result<(), Box> { histogram.record(5.5, COMMON_ATTRIBUTES.as_ref()); global::shutdown_tracer_provider(); - global::shutdown_logger_provider(); + logger_provider.shutdown(); global::shutdown_meter_provider(); Ok(()) diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index 4f4320cab9..02e1cb3449 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -54,7 +54,7 @@ fn init_metrics() -> Result<(), MetricsError> { } } -fn init_logs() -> Result { +fn init_logs() -> Result { let service_name = env!("CARGO_BIN_NAME"); opentelemetry_otlp::new_pipeline() .logging() @@ -93,10 +93,7 @@ async fn main() -> Result<(), Box> { let _ = init_metrics()?; // Initialize logs, which sets the global loggerprovider. - let _ = init_logs(); - - // Retrieve the global LoggerProvider. - let logger_provider = global::logger_provider(); + let logger_provider = init_logs().unwrap(); // Create a new OpenTelemetryLogBridge using the above LoggerProvider. let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider); @@ -141,7 +138,7 @@ async fn main() -> Result<(), Box> { info!(target: "my-target", "hello from {}. My price is {}", "apple", 1.99); global::shutdown_tracer_provider(); - global::shutdown_logger_provider(); + logger_provider.shutdown(); global::shutdown_meter_provider(); Ok(()) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 1d8e177693..4bad1cc39e 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -19,7 +19,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = self.build_logs_export_body(batch)?; + let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) @@ -50,4 +50,8 @@ impl LogExporter for OtlpHttpClient { fn shutdown(&mut self) { let _ = self.client.lock().map(|mut c| c.take()); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); + } } diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 884971e028..fc46953ed1 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -5,6 +5,8 @@ use crate::{ }; use http::{HeaderName, HeaderValue, Uri}; use opentelemetry_http::HttpClient; +use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; + #[cfg(feature = "logs")] use opentelemetry_sdk::export::logs::LogData; #[cfg(feature = "trace")] @@ -274,6 +276,9 @@ struct OtlpHttpClient { headers: HashMap, protocol: Protocol, _timeout: Duration, + #[allow(dead_code)] + // would be removed once we support set_resource for metrics and traces. + resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } impl OtlpHttpClient { @@ -291,6 +296,7 @@ impl OtlpHttpClient { headers, protocol, _timeout: timeout, + resource: ResourceAttributesWithSchema::default(), } } @@ -318,12 +324,15 @@ impl OtlpHttpClient { fn build_logs_export_body( &self, logs: Vec, + resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; + let resource_logs = logs + .into_iter() + .map(|log_event| (log_event, resource).into()) + .collect::>(); + let req = ExportLogsServiceRequest { resource_logs }; - let req = ExportLogsServiceRequest { - resource_logs: logs.into_iter().map(Into::into).collect(), - }; match self.protocol { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 4b5a5787e3..65759f7a4b 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,6 +1,5 @@ -use core::fmt; - use async_trait::async_trait; +use core::fmt; use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, @@ -12,6 +11,9 @@ use super::BoxInterceptor; pub(crate) struct TonicLogsClient { inner: Option, + #[allow(dead_code)] + // would be removed once we support set_resource for metrics and traces. + resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { @@ -43,6 +45,7 @@ impl TonicLogsClient { client, interceptor, }), + resource: Default::default(), } } } @@ -62,13 +65,19 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; + let resource_logs = { + batch + .into_iter() + .map(|log_data| (log_data, &self.resource)) + .map(Into::into) + .collect() + }; + client .export(Request::from_parts( metadata, extensions, - ExportLogsServiceRequest { - resource_logs: batch.into_iter().map(Into::into).collect(), - }, + ExportLogsServiceRequest { resource_logs }, )) .await .map_err(crate::Error::from)?; @@ -79,4 +88,8 @@ impl LogExporter for TonicLogsClient { fn shutdown(&mut self) { let _ = self.inner.take(); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); + } } diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index d6e346bf61..714d4508a3 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -10,12 +10,10 @@ use crate::exporter::http::HttpExporterBuilder; use crate::{NoExporterConfig, OtlpPipeline}; use async_trait::async_trait; -use std::{borrow::Cow, fmt::Debug}; +use std::fmt::Debug; + +use opentelemetry::logs::LogError; -use opentelemetry::{ - global, - logs::{LogError, LoggerProvider}, -}; use opentelemetry_sdk::{export::logs::LogData, runtime::RuntimeChannel}; /// Compression algorithm to use, defaults to none. @@ -103,6 +101,10 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { async fn export(&mut self, batch: Vec) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.client.set_resource(resource); + } } /// Recommended configuration for an OTLP exporter pipeline. @@ -147,7 +149,7 @@ impl OtlpLogPipeline { /// Returns a [`Logger`] with the name `opentelemetry-otlp` and the current crate version. /// /// [`Logger`]: opentelemetry_sdk::logs::Logger - pub fn install_simple(self) -> Result { + pub fn install_simple(self) -> Result { Ok(build_simple_with_exporter( self.exporter_builder.build_log_exporter()?, self.log_config, @@ -163,7 +165,7 @@ impl OtlpLogPipeline { pub fn install_batch( self, runtime: R, - ) -> Result { + ) -> Result { Ok(build_batch_with_exporter( self.exporter_builder.build_log_exporter()?, self.log_config, @@ -176,19 +178,14 @@ impl OtlpLogPipeline { fn build_simple_with_exporter( exporter: LogExporter, log_config: Option, -) -> opentelemetry_sdk::logs::Logger { +) -> opentelemetry_sdk::logs::LoggerProvider { let mut provider_builder = opentelemetry_sdk::logs::LoggerProvider::builder().with_simple_exporter(exporter); if let Some(config) = log_config { provider_builder = provider_builder.with_config(config); } - let provider = provider_builder.build(); - let logger = provider - .logger_builder(Cow::Borrowed("opentelemetry-otlp")) - .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION"))) - .build(); - let _ = global::set_logger_provider(provider); - logger + // logger would be created in the tracing appender + provider_builder.build() } fn build_batch_with_exporter( @@ -196,7 +193,7 @@ fn build_batch_with_exporter( log_config: Option, runtime: R, batch_config: Option, -) -> opentelemetry_sdk::logs::Logger { +) -> opentelemetry_sdk::logs::LoggerProvider { let mut provider_builder = opentelemetry_sdk::logs::LoggerProvider::builder(); let batch_processor = opentelemetry_sdk::logs::BatchLogProcessor::builder(exporter, runtime) .with_batch_config(batch_config.unwrap_or_default()) @@ -206,11 +203,6 @@ fn build_batch_with_exporter( if let Some(config) = log_config { provider_builder = provider_builder.with_config(config); } - let provider = provider_builder.build(); - let logger = provider - .logger_builder(Cow::Borrowed("opentelemetry-otlp")) - .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION"))) - .build(); - let _ = global::set_logger_provider(provider); - logger + // logger would be created in the tracing appender + provider_builder.build() } diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs index 9aa29e4b22..8197d90574 100644 --- a/opentelemetry-proto/src/transform/common.rs +++ b/opentelemetry-proto/src/transform/common.rs @@ -22,6 +22,23 @@ pub mod tonic { use opentelemetry::{Array, Value}; use std::borrow::Cow; + #[cfg(any(feature = "trace", feature = "logs"))] + #[derive(Debug, Default)] + pub struct ResourceAttributesWithSchema { + pub attributes: Attributes, + pub schema_url: Option, + } + + #[cfg(any(feature = "trace", feature = "logs"))] + impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema { + fn from(resource: &opentelemetry_sdk::Resource) -> Self { + ResourceAttributesWithSchema { + attributes: resource_attributes(resource), + schema_url: resource.schema_url().map(ToString::to_string), + } + } + } + #[cfg(any(feature = "trace", feature = "logs"))] use opentelemetry_sdk::Resource; @@ -52,7 +69,7 @@ pub mod tonic { } /// Wrapper type for Vec<`KeyValue`> - #[derive(Default)] + #[derive(Default, Debug)] pub struct Attributes(pub ::std::vec::Vec); impl From> for Attributes { diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index bc5c2697c0..335526685b 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -7,7 +7,7 @@ pub mod tonic { resource::v1::Resource, Attributes, }, - transform::common::{to_nanos, tonic::resource_attributes}, + transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, }; use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; @@ -110,18 +110,26 @@ pub mod tonic { } } - impl From for ResourceLogs { - fn from(log_data: opentelemetry_sdk::export::logs::LogData) -> Self { + impl + From<( + opentelemetry_sdk::export::logs::LogData, + &ResourceAttributesWithSchema, + )> for ResourceLogs + { + fn from( + data: ( + opentelemetry_sdk::export::logs::LogData, + &ResourceAttributesWithSchema, + ), + ) -> Self { + let (log_data, resource) = data; + ResourceLogs { resource: Some(Resource { - attributes: resource_attributes(&log_data.resource).0, + attributes: resource.attributes.0.clone(), dropped_attributes_count: 0, }), - schema_url: log_data - .resource - .schema_url() - .map(Into::into) - .unwrap_or_default(), + schema_url: resource.schema_url.clone().unwrap_or_default(), scope_logs: vec![ScopeLogs { schema_url: log_data .instrumentation diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index f72ea62851..19dac050a4 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -12,7 +12,20 @@ - **Breaking** [#1624](https://github.com/open-telemetry/opentelemetry-rust/pull/1624) Remove `OsResourceDetector` and `ProcessResourceDetector` resource detectors, use the [`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead. +- [#1636](https://github.com/open-telemetry/opentelemetry-rust/pull/1636) [Logs SDK] Improves performance by sending + Resource information to processors (and exporters) once, instead of sending with every log. If you are an author + of Processor, Exporter, the following are *BREAKING* changes. + - Implement `set_resource` method in your custom LogProcessor, which invokes exporter's `set_resource`. + - Implement `set_resource` method in your custom LogExporter. This method should save the resource object + in original or serialized format, to be merged with every log event during export. + - `LogData` doesn't have the resource attributes. The `LogExporter::export()` method needs to merge it + with the earlier preserved resource before export. - Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640) +- Improves `shutdown` behavior of `LoggerProvider` and `LogProcessor` [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643). + - `shutdown` can be called by any clone of the `LoggerProvider` without the need of waiting on all `Logger` drops. Thus, `try_shutdown` has been removed. + - `shutdown` methods in `LoggerProvider` and `LogProcessor` now takes a immutable reference + - After `shutdown`, `LoggerProvider` will return noop `Logger` + - After `shutdown`, `LogProcessor` will not process any new logs ## v0.22.1 diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index cabcc84476..cf811acc01 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -29,6 +29,7 @@ url = { workspace = true, optional = true } tokio = { workspace = true, features = ["rt", "time"], optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } +lazy_static = "1.4.0" [package.metadata.docs.rs] all-features = true diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 39db15900f..8676db9d16 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -7,7 +7,7 @@ use opentelemetry::{ logs::{LogError, LogRecord, LogResult}, InstrumentationLibrary, }; -use std::{borrow::Cow, fmt::Debug}; +use std::fmt::Debug; /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] @@ -21,17 +21,16 @@ pub trait LogExporter: Send + Sync + Debug { fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { true } + /// Set the resource for the exporter. + fn set_resource(&mut self, _resource: &Resource) {} } -/// `LogData` associates a [`LogRecord`] with a [`Resource`] and -/// [`InstrumentationLibrary`]. +/// `LogData` represents a single log event without resource context. #[derive(Clone, Debug)] pub struct LogData { /// Log record pub record: LogRecord, - /// Resource for the emitter who produced this `LogData`. - pub resource: Cow<'static, Resource>, - /// Instrumentation details for the emitter who produced this `LogData`. + /// Instrumentation details for the emitter who produced this `LogEvent`. pub instrumentation: InstrumentationLibrary, } diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 10e9e444fd..e87214228b 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -13,12 +13,25 @@ use opentelemetry::{ #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; +use std::sync::atomic::AtomicBool; use std::{borrow::Cow, sync::Arc}; +use once_cell::sync::Lazy; + +// a no nop logger provider used as placeholder when the provider is shutdown +static NOOP_LOGGER_PROVIDER: Lazy = Lazy::new(|| LoggerProvider { + inner: Arc::new(LoggerProviderInner { + processors: Vec::new(), + config: Config::default(), + }), + is_shutdown: Arc::new(AtomicBool::new(true)), +}); + #[derive(Debug, Clone)] /// Creator for `Logger` instances. pub struct LoggerProvider { inner: Arc, + is_shutdown: Arc, } /// Default logger name if empty string is provided. @@ -59,6 +72,10 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider { } fn library_logger(&self, library: Arc) -> Self::Logger { + // If the provider is shutdown, new logger will refer a no-op logger provider. + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return Logger::new(library, NOOP_LOGGER_PROVIDER.clone()); + } Logger::new(library, self.clone()) } } @@ -87,22 +104,18 @@ impl LoggerProvider { .collect() } - /// Shuts down this `LoggerProvider`, panicking on failure. - pub fn shutdown(&mut self) -> Vec> { - self.try_shutdown() - .expect("cannot shutdown LoggerProvider when child Loggers are still active") - } - - /// Attempts to shutdown this `LoggerProvider`, succeeding only when - /// all cloned `LoggerProvider` values have been dropped. - pub fn try_shutdown(&mut self) -> Option>> { - Arc::get_mut(&mut self.inner).map(|inner| { - inner - .processors - .iter_mut() - .map(|processor| processor.shutdown()) - .collect() - }) + /// Shuts down this `LoggerProvider` + pub fn shutdown(&self) -> Vec> { + // mark itself as already shutdown + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); + // propagate the shutdown signal to processors + // it's up to the processor to properly block new logs after shutdown + self.inner + .processors + .iter() + .map(|processor| processor.shutdown()) + .collect() } } @@ -163,11 +176,16 @@ impl Builder { /// Create a new provider from this configuration. pub fn build(self) -> LoggerProvider { + // invoke set_resource on all the processors + for processor in &self.processors { + processor.set_resource(&self.config.resource); + } LoggerProvider { inner: Arc::new(LoggerProviderInner { processors: self.processors, config: self.config, }), + is_shutdown: Arc::new(AtomicBool::new(false)), } } } @@ -207,20 +225,19 @@ impl opentelemetry::logs::Logger for Logger { /// Emit a `LogRecord`. fn emit(&self, record: LogRecord) { let provider = self.provider(); - let config = provider.config(); let processors = provider.log_processors(); let trace_context = Context::map_current(|cx| { cx.has_active_span() .then(|| TraceContext::from(cx.span().span_context())) }); + for p in processors { - let mut record = record.clone(); + let mut cloned_record = record.clone(); if let Some(ref trace_context) = trace_context { - record.trace_context = Some(trace_context.clone()) + cloned_record.trace_context = Some(trace_context.clone()); } let data = LogData { - record, - resource: config.resource.clone(), + record: cloned_record, instrumentation: self.instrumentation_library().clone(), }; p.emit(data); @@ -252,12 +269,64 @@ mod tests { use crate::Resource; use super::*; - use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider}; use opentelemetry::logs::Logger; + use opentelemetry::logs::LoggerProvider as _; use opentelemetry::{Key, KeyValue, Value}; - use std::sync::Mutex; + use std::fmt::{Debug, Formatter}; + use std::sync::atomic::AtomicU64; + use std::sync::{Arc, Mutex}; use std::thread; + struct ShutdownTestLogProcessor { + is_shutdown: Arc>, + counter: Arc, + } + + impl Debug for ShutdownTestLogProcessor { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } + } + + impl ShutdownTestLogProcessor { + pub(crate) fn new(counter: Arc) -> Self { + ShutdownTestLogProcessor { + is_shutdown: Arc::new(Mutex::new(false)), + counter, + } + } + } + + impl LogProcessor for ShutdownTestLogProcessor { + fn emit(&self, _data: LogData) { + self.is_shutdown + .lock() + .map(|is_shutdown| { + if !*is_shutdown { + self.counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + }) + .expect("lock poisoned"); + } + + fn force_flush(&self) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&self) -> LogResult<()> { + self.is_shutdown + .lock() + .map(|mut is_shutdown| *is_shutdown = true) + .expect("lock poisoned"); + Ok(()) + } + + #[cfg(feature = "logs_level_enabled")] + fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { + true + } + } #[test] fn test_logger_provider_default_resource() { let assert_resource = |provider: &super::LoggerProvider, @@ -386,79 +455,56 @@ mod tests { #[test] fn shutdown_test() { + let counter = Arc::new(AtomicU64::new(0)); + let logger_provider = LoggerProvider::builder() + .with_log_processor(ShutdownTestLogProcessor::new(counter.clone())) + .build(); + + let logger1 = logger_provider.logger("test-logger1"); + let logger2 = logger_provider.logger("test-logger2"); + logger1.emit(LogRecord::default()); + logger2.emit(LogRecord::default()); + + let logger3 = logger_provider.logger("test-logger3"); + let handle = thread::spawn(move || { + logger3.emit(LogRecord::default()); + }); + handle.join().expect("thread panicked"); + + let _ = logger_provider.shutdown(); + logger1.emit(LogRecord::default()); + + assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3); + } + + #[test] + fn global_shutdown_test() { // cargo test shutdown_test --features=logs // Arrange let shutdown_called = Arc::new(Mutex::new(false)); let flush_called = Arc::new(Mutex::new(false)); - let signal_to_end = Arc::new(Mutex::new(false)); - let signal_to_thread_started = Arc::new(Mutex::new(false)); let logger_provider = LoggerProvider::builder() .with_log_processor(LazyLogProcessor::new( shutdown_called.clone(), flush_called.clone(), )) .build(); - set_logger_provider(logger_provider); + //set_logger_provider(logger_provider); + let logger1 = logger_provider.logger("test-logger1"); + let logger2 = logger_provider.logger("test-logger2"); - // Act - let logger1 = logger("test-logger1"); - let logger2 = logger("test-logger2"); + // Acts logger1.emit(LogRecord::default()); logger2.emit(LogRecord::default()); - let signal_to_end_clone = signal_to_end.clone(); - let signal_to_thread_started_clone = signal_to_thread_started.clone(); - - let handle = thread::spawn(move || { - let logger3 = logger("test-logger3"); - loop { - // signal the main thread that this thread has started. - *signal_to_thread_started_clone.lock().unwrap() = true; - logger3.emit(LogRecord::default()); - if *signal_to_end_clone.lock().unwrap() { - break; - } - } - }); - - // wait for the spawned thread to start before calling shutdown This is - // very important - if shutdown is called before the spawned thread - // obtains its logger, then the logger will be no-op one, and the test - // will pass, but it will not be testing the intended scenario. - while !*signal_to_thread_started.lock().unwrap() { - thread::sleep(std::time::Duration::from_millis(10)); - } - - // Intentionally *not* calling shutdown/flush on the provider, but - // instead relying on shutdown_logger_provider which causes the global - // provider to be dropped, leading to the sdk logger provider's drop to - // be called, which is expected to call shutdown on processors. - shutdown_logger_provider(); + // explicitly calling shutdown on logger_provider. This will + // indeed do the shutdown, even if there are loggers still alive. + logger_provider.shutdown(); // Assert - // shutdown_logger_provider is necessary but not sufficient, as loggers - // hold on to the the provider (via inner provider clones). - assert!(!*shutdown_called.lock().unwrap()); - - // flush is never called by the sdk. - assert!(!*flush_called.lock().unwrap()); - - // Drop one of the logger. Not enough! - drop(logger1); - assert!(!*shutdown_called.lock().unwrap()); - - // drop logger2, which is the only remaining logger in this thread. - // Still not enough! - drop(logger2); - assert!(!*shutdown_called.lock().unwrap()); - - // now signal the spawned thread to end, which causes it to drop its - // logger. Since that is the last logger, the provider (inner provider) - // is finally dropped, triggering shutdown - *signal_to_end.lock().unwrap() = true; - handle.join().unwrap(); + // shutdown is called. assert!(*shutdown_called.lock().unwrap()); // flush is never called by the sdk. @@ -493,7 +539,7 @@ mod tests { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&self) -> LogResult<()> { *self.shutdown_called.lock().unwrap() = true; Ok(()) } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 57ace8dad6..d3d69c87b8 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,6 +1,7 @@ use crate::{ export::logs::{ExportResult, LogData, LogExporter}, runtime::{RuntimeChannel, TrySend}, + Resource, }; use futures_channel::oneshot; use futures_util::{ @@ -13,10 +14,12 @@ use opentelemetry::{ global, logs::{LogError, LogResult}, }; +use std::sync::atomic::AtomicBool; use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, str::FromStr, + sync::Arc, time::Duration, }; @@ -46,10 +49,15 @@ pub trait LogProcessor: Send + Sync + Debug { /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. - fn shutdown(&mut self) -> LogResult<()>; + /// After shutdown returns the log processor should stop processing any logs. + /// It's up to the implementation on when to drop the LogProcessor. + fn shutdown(&self) -> LogResult<()>; #[cfg(feature = "logs_level_enabled")] /// Check if logging is enabled fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; + + /// Set the resource for the log processor. + fn set_resource(&self, _resource: &Resource) {} } /// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon @@ -59,18 +67,25 @@ pub trait LogProcessor: Send + Sync + Debug { #[derive(Debug)] pub struct SimpleLogProcessor { exporter: Mutex>, + is_shutdown: AtomicBool, } impl SimpleLogProcessor { pub(crate) fn new(exporter: Box) -> Self { SimpleLogProcessor { exporter: Mutex::new(exporter), + is_shutdown: AtomicBool::new(false), } } } impl LogProcessor for SimpleLogProcessor { fn emit(&self, data: LogData) { + // noop after shutdown + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + return; + } + let result = self .exporter .lock() @@ -85,7 +100,9 @@ impl LogProcessor for SimpleLogProcessor { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&self) -> LogResult<()> { + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); Ok(()) @@ -96,6 +113,12 @@ impl LogProcessor for SimpleLogProcessor { } } + fn set_resource(&self, resource: &Resource) { + if let Ok(mut exporter) = self.exporter.lock() { + exporter.set_resource(resource); + } + } + #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { true @@ -141,7 +164,7 @@ impl LogProcessor for BatchLogProcessor { .and_then(std::convert::identity) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&self) -> LogResult<()> { let (res_sender, res_receiver) = oneshot::channel(); self.message_sender .try_send(BatchMessage::Shutdown(res_sender)) @@ -151,6 +174,13 @@ impl LogProcessor for BatchLogProcessor { .map_err(|err| LogError::Other(err.into())) .and_then(std::convert::identity) } + + fn set_resource(&self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } } impl BatchLogProcessor { @@ -229,6 +259,11 @@ impl BatchLogProcessor { break; } + + // propagate the resource + BatchMessage::SetResource(resource) => { + exporter.set_resource(&resource); + } } } })); @@ -450,6 +485,8 @@ enum BatchMessage { Flush(Option>), /// Shut down the worker thread, push all logs in buffer to the backend. Shutdown(oneshot::Sender), + /// Set the resource for the exporter. + SetResource(Arc), } #[cfg(all(test, feature = "testing", feature = "logs"))] @@ -458,19 +495,52 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; + use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ + export::logs::{LogData, LogExporter}, logs::{ log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, }, - BatchConfig, BatchConfigBuilder, + BatchConfig, BatchConfigBuilder, Config, LogProcessor, LoggerProvider, + SimpleLogProcessor, }, runtime, testing::logs::InMemoryLogsExporter, + Resource, }; + use async_trait::async_trait; + use opentelemetry::{logs::LogResult, KeyValue}; + use std::sync::Arc; use std::time::Duration; + #[derive(Debug, Clone)] + struct MockLogExporter { + resource: Arc>, + } + + #[async_trait] + impl LogExporter for MockLogExporter { + async fn export(&mut self, _batch: Vec) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&mut self) {} + + fn set_resource(&mut self, resource: &Resource) { + let res = Arc::make_mut(&mut self.resource); + *res = Some(resource.clone()); + } + } + + // Implementation specific to the MockLogExporter, not part of the LogExporter trait + impl MockLogExporter { + fn get_resource(&self) -> Option { + (*self.resource).clone() + } + } + #[test] fn test_default_const_values() { assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY"); @@ -620,4 +690,98 @@ mod tests { assert_eq!(actual.max_export_timeout, Duration::from_millis(3)); assert_eq!(actual.max_queue_size, 4); } + + #[test] + fn test_set_resource_simple_processor() { + let exporter = MockLogExporter { + resource: Arc::new(Some(Resource::default())), + }; + let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let _ = LoggerProvider::builder() + .with_log_processor(processor) + .with_config(Config::default().with_resource(Resource::new(vec![ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v3"), + KeyValue::new("k3", "v3"), + KeyValue::new("k4", "v4"), + ]))) + .build(); + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 4); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_set_resource_batch_processor() { + let exporter = MockLogExporter { + resource: Arc::new(Some(Resource::default())), + }; + let processor = BatchLogProcessor::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + let provider = LoggerProvider::builder() + .with_log_processor(processor) + .with_config(Config::default().with_resource(Resource::new(vec![ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v3"), + KeyValue::new("k3", "v3"), + KeyValue::new("k4", "v4"), + ]))) + .build(); + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 4); + provider.shutdown(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_shutdown() { + // assert we will receive an error + // setup + let exporter = InMemoryLogsExporterBuilder::default() + .keep_records_on_shutdown() + .build(); + let processor = BatchLogProcessor::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + processor.emit(LogData { + record: Default::default(), + instrumentation: Default::default(), + }); + processor.force_flush().unwrap(); + processor.shutdown().unwrap(); + // todo: expect to see errors here. How should we assert this? + processor.emit(LogData { + record: Default::default(), + instrumentation: Default::default(), + }); + assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) + } + + #[test] + fn test_simple_shutdown() { + let exporter = InMemoryLogsExporterBuilder::default() + .keep_records_on_shutdown() + .build(); + let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + + processor.emit(LogData { + record: Default::default(), + instrumentation: Default::default(), + }); + + processor.shutdown().unwrap(); + + let is_shutdown = processor + .is_shutdown + .load(std::sync::atomic::Ordering::Relaxed); + assert!(is_shutdown); + + processor.emit(LogData { + record: Default::default(), + instrumentation: Default::default(), + }); + + assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) + } } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index aca67dc20b..63143f0b12 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,6 +1,9 @@ use crate::export::logs::{LogData, LogExporter}; +use crate::Resource; use async_trait::async_trait; -use opentelemetry::logs::{LogError, LogResult}; +use opentelemetry::logs::{LogError, LogRecord, LogResult}; +use opentelemetry::InstrumentationLibrary; +use std::borrow::Cow; use std::sync::{Arc, Mutex}; /// An in-memory logs exporter that stores logs data in memory.. @@ -36,6 +39,8 @@ use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { logs: Arc>>, + resource: Arc>, + should_reset_on_shutdown: bool, } impl Default for InMemoryLogsExporter { @@ -44,6 +49,18 @@ impl Default for InMemoryLogsExporter { } } +/// `LogDataWithResource` associates a [`LogRecord`] with a [`Resource`] and +/// [`InstrumentationLibrary`]. +#[derive(Clone, Debug)] +pub struct LogDataWithResource { + /// Log record + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogData`. + pub instrumentation: InstrumentationLibrary, + /// Resource for the emitter who produced this `LogData`. + pub resource: Cow<'static, Resource>, +} + ///Builder for ['InMemoryLogsExporter']. /// # Example /// @@ -71,7 +88,9 @@ impl Default for InMemoryLogsExporter { /// ``` /// #[derive(Debug, Clone)] -pub struct InMemoryLogsExporterBuilder {} +pub struct InMemoryLogsExporterBuilder { + reset_on_shutdown: bool, +} impl Default for InMemoryLogsExporterBuilder { fn default() -> Self { @@ -83,7 +102,9 @@ impl InMemoryLogsExporterBuilder { /// Creates a new instance of `InMemoryLogsExporter`. /// pub fn new() -> Self { - Self {} + Self { + reset_on_shutdown: true, + } } /// Creates a new instance of `InMemoryLogsExporter`. @@ -91,6 +112,16 @@ impl InMemoryLogsExporterBuilder { pub fn build(&self) -> InMemoryLogsExporter { InMemoryLogsExporter { logs: Arc::new(Mutex::new(Vec::new())), + resource: Arc::new(Mutex::new(Resource::default())), + should_reset_on_shutdown: self.reset_on_shutdown, + } + } + + /// If set, the records will not be [`InMemoryLogsExporter::reset`] on shutdown. + #[cfg(test)] + pub(crate) fn keep_records_on_shutdown(self) -> Self { + Self { + reset_on_shutdown: false, } } } @@ -107,13 +138,20 @@ impl InMemoryLogsExporter { /// let emitted_logs = exporter.get_emitted_logs().unwrap(); /// ``` /// - pub fn get_emitted_logs(&self) -> LogResult> { - self.logs - .lock() - .map(|logs_guard| logs_guard.iter().cloned().collect()) - .map_err(LogError::from) - } + pub fn get_emitted_logs(&self) -> LogResult> { + let logs_guard = self.logs.lock().map_err(LogError::from)?; + let resource_guard = self.resource.lock().map_err(LogError::from)?; + let logs: Vec = logs_guard + .iter() + .map(|log_data| LogDataWithResource { + record: log_data.record.clone(), + resource: Cow::Owned(resource_guard.clone()), + instrumentation: log_data.instrumentation.clone(), + }) + .collect(); + Ok(logs) + } /// Clears the internal (in-memory) storage of logs. /// /// # Example @@ -143,6 +181,13 @@ impl LogExporter for InMemoryLogsExporter { .map_err(LogError::from) } fn shutdown(&mut self) { - self.reset(); + if self.should_reset_on_shutdown { + self.reset(); + } + } + + fn set_resource(&mut self, resource: &Resource) { + let mut res_guard = self.resource.lock().expect("Resource lock poisoned"); + *res_guard = resource.clone(); } } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index cece3b07e8..fcea153133 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -5,6 +5,7 @@ use opentelemetry::{ ExportError, }; use opentelemetry_sdk::export::logs::{ExportResult, LogData}; +use opentelemetry_sdk::Resource; use std::io::{stdout, Write}; type Encoder = @@ -18,6 +19,7 @@ type Encoder = pub struct LogExporter { writer: Option>, encoder: Encoder, + resource: Resource, } impl LogExporter { @@ -44,7 +46,8 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout async fn export(&mut self, batch: Vec) -> ExportResult { if let Some(writer) = &mut self.writer { - let result = (self.encoder)(writer, crate::logs::LogData::from(batch)) as LogResult<()>; + let log_data = crate::logs::transform::LogData::from((batch, &self.resource)); + let result = (self.encoder)(writer, log_data) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) } else { Err("exporter is shut down".into()) @@ -54,6 +57,10 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { fn shutdown(&mut self) { self.writer.take(); } + + fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { + self.resource = res.clone(); + } } /// Stdout exporter's error @@ -127,6 +134,7 @@ impl LogExporterBuilder { pub fn build(self) -> LogExporter { LogExporter { writer: Some(self.writer.unwrap_or_else(|| Box::new(stdout()))), + resource: Resource::default(), encoder: self.encoder.unwrap_or_else(|| { Box::new(|writer, logs| { serde_json::to_writer(writer, &logs) diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 9612cf1ff6..10332953d2 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -15,18 +15,28 @@ pub struct LogData { resource_logs: Vec, } -impl From> for LogData { - fn from(sdk_logs: Vec) -> LogData { +impl + From<( + Vec, + &opentelemetry_sdk::Resource, + )> for LogData +{ + fn from( + (sdk_logs, sdk_resource): ( + Vec, + &opentelemetry_sdk::Resource, + ), + ) -> Self { let mut resource_logs = HashMap::::new(); for sdk_log in sdk_logs { - let resource_schema_url = sdk_log.resource.schema_url().map(|s| s.to_string().into()); + let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); let schema_url = sdk_log.instrumentation.schema_url.clone(); let scope: Scope = sdk_log.instrumentation.clone().into(); - let resource: Resource = sdk_log.resource.as_ref().into(); + let resource: Resource = sdk_resource.into(); let rl = resource_logs - .entry(sdk_log.resource.as_ref().into()) + .entry(sdk_resource.into()) .or_insert_with(move || ResourceLogs { resource, scope_logs: Vec::with_capacity(1), diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index e593fcbfa8..380daa3671 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -10,6 +10,16 @@ ### Removed - Remove `urlencoding` crate dependency. [#1613](https://github.com/open-telemetry/opentelemetry-rust/pull/1613) +- Remove global providers for Logs [$1691](https://github.com/open-telemetry/opentelemetry-rust/pull/1691) + LoggerProviders are not meant for end users to get loggers from. It is only required for the log bridges. + Below global constructs for the logs are removed from API: + - opentelemetry::global::logger + - opentelemetry::global::set_logger_provider + - opentelemetry::global::shutdown_logger_provider + - opentelemetry::global::logger_provider + - opentelemetry::global::GlobalLoggerProvider + - opentelemetry::global::ObjectSafeLoggerProvider + For creating appenders using Logging bridge API, refer to the opentelemetry-tracing-appender [example](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-appender-tracing/examples/basic.rs) ### Changed diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 7c5de3b6bc..f531dbb81e 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -40,7 +40,7 @@ logs_level_enabled = ["logs"] otel_unstable = [] [dev-dependencies] -opentelemetry_sdk = { path = "../opentelemetry-sdk" } # for documentation tests +opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["logs_level_enabled"]} # for documentation tests criterion = { version = "0.3" } [[bench]] diff --git a/opentelemetry/src/global/logs.rs b/opentelemetry/src/global/logs.rs deleted file mode 100644 index e2b53f43ee..0000000000 --- a/opentelemetry/src/global/logs.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::{ - borrow::Cow, - fmt, mem, - sync::{Arc, RwLock}, -}; - -use once_cell::sync::Lazy; - -use crate::{ - logs::{Logger, LoggerProvider, NoopLoggerProvider}, - InstrumentationLibrary, -}; - -/// Allows a specific [`LoggerProvider`] to be used generically, by mirroring -/// the interface, and boxing the returned types. -/// -/// [`LoggerProvider`]: crate::logs::LoggerProvider. -pub trait ObjectSafeLoggerProvider { - /// Creates a versioned named [`Logger`] instance that is a trait object - /// through the underlying [`LoggerProvider`]. - /// - /// [`Logger`]: crate::logs::Logger - /// [`LoggerProvider`]: crate::logs::LoggerProvider - fn boxed_logger( - &self, - library: Arc, - ) -> Box; -} - -impl ObjectSafeLoggerProvider for P -where - L: Logger + Send + Sync + 'static, - P: LoggerProvider, -{ - fn boxed_logger( - &self, - library: Arc, - ) -> Box { - Box::new(self.library_logger(library)) - } -} - -pub struct BoxedLogger(Box); - -impl fmt::Debug for BoxedLogger { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("BoxedLogger") - } -} - -impl Logger for BoxedLogger { - fn emit(&self, record: crate::logs::LogRecord) { - self.0.emit(record) - } - - #[cfg(feature = "logs_level_enabled")] - fn event_enabled(&self, level: crate::logs::Severity, target: &str) -> bool { - self.0.event_enabled(level, target) - } -} - -#[derive(Clone)] -/// Represents the globally configured [`LoggerProvider`] instance. -pub struct GlobalLoggerProvider { - provider: Arc, -} - -impl fmt::Debug for GlobalLoggerProvider { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("GlobalLoggerProvider") - } -} - -impl GlobalLoggerProvider { - fn new< - L: Logger + Send + Sync + 'static, - P: LoggerProvider + Send + Sync + 'static, - >( - provider: P, - ) -> Self { - GlobalLoggerProvider { - provider: Arc::new(provider), - } - } -} - -impl LoggerProvider for GlobalLoggerProvider { - type Logger = BoxedLogger; - - fn library_logger(&self, library: Arc) -> Self::Logger { - BoxedLogger(self.provider.boxed_logger(library)) - } -} - -static GLOBAL_LOGGER_PROVIDER: Lazy> = - Lazy::new(|| RwLock::new(GlobalLoggerProvider::new(NoopLoggerProvider::new()))); - -/// Returns an instance of the currently configured global [`LoggerProvider`] -/// through [`GlobalLoggerProvider`]. -/// -/// [`LoggerProvider`]: crate::logs::LoggerProvider -pub fn logger_provider() -> GlobalLoggerProvider { - GLOBAL_LOGGER_PROVIDER - .read() - .expect("GLOBAL_LOGGER_PROVIDER RwLock poisoned") - .clone() -} - -/// Creates a named instance of [`Logger`] via the configured -/// [`GlobalLoggerProvider`]. -/// -/// If `name` is an empty string, the provider will use a default name. -/// -/// [`Logger`]: crate::logs::Logger -pub fn logger(name: impl Into>) -> BoxedLogger { - logger_provider().logger(name) -} - -/// Sets the given [`LoggerProvider`] instance as the current global provider, -/// returning the [`LoggerProvider`] instance that was previously set as global -/// provider. -pub fn set_logger_provider(new_provider: P) -> GlobalLoggerProvider -where - L: Logger + Send + Sync + 'static, - P: LoggerProvider + Send + Sync + 'static, -{ - let mut provider = GLOBAL_LOGGER_PROVIDER - .write() - .expect("GLOBAL_LOGGER_PROVIDER RwLock poisoned"); - mem::replace(&mut *provider, GlobalLoggerProvider::new(new_provider)) -} - -/// Shut down the current global [`LoggerProvider`]. -pub fn shutdown_logger_provider() { - let _ = set_logger_provider(NoopLoggerProvider::new()); -} diff --git a/opentelemetry/src/global/metrics.rs b/opentelemetry/src/global/metrics.rs index 24d293f154..11c21805f3 100644 --- a/opentelemetry/src/global/metrics.rs +++ b/opentelemetry/src/global/metrics.rs @@ -7,7 +7,7 @@ use std::{ sync::{Arc, RwLock}, }; -/// The global `Meter` provider singleton. +/// The global `MeterProvider` singleton. static GLOBAL_METER_PROVIDER: Lazy> = Lazy::new(|| { RwLock::new(GlobalMeterProvider::new( metrics::noop::NoopMeterProvider::new(), @@ -111,7 +111,7 @@ pub fn meter_provider() -> GlobalMeterProvider { /// /// If the name is an empty string, the provider will use a default name. /// -/// This is a more convenient way of expressing `global::meter_provider().versioned_meter(name, None, None, None)`. +/// This is a more convenient way of expressing `global::meter_provider().meter(name)`. pub fn meter(name: impl Into>) -> Meter { meter_provider().meter(name.into()) } diff --git a/opentelemetry/src/global/mod.rs b/opentelemetry/src/global/mod.rs index a19da68e2f..aef0190072 100644 --- a/opentelemetry/src/global/mod.rs +++ b/opentelemetry/src/global/mod.rs @@ -83,8 +83,8 @@ //! //! ### Usage in Applications //! -//! Applications configure their meter either by installing a metrics pipeline, -//! or calling [`set_meter_provider`]. +//! Applications configure their meter by configuring a meter provider, +//! and calling [`set_meter_provider`] to set it as global meter provider. //! //! ``` //! # #[cfg(feature="metrics")] @@ -93,6 +93,8 @@ //! use opentelemetry::{global, KeyValue}; //! //! fn init_meter() { +//! // Swap this no-op provider with an actual meter provider, +//! // exporting to stdout, otlp, prometheus, etc. //! let provider = NoopMeterProvider::new(); //! //! // Configure the global `MeterProvider` singleton when your app starts @@ -101,17 +103,22 @@ //! } //! //! fn do_something_instrumented() { -//! // Then you can get a named tracer instance anywhere in your codebase. +//! // You can get a named meter instance anywhere in your codebase. //! let meter = global::meter("my-component"); +//! // It is recommended to reuse the same counter instance for the +//! // lifetime of the application //! let counter = meter.u64_counter("my_counter").init(); //! -//! // record metrics +//! // record measurements //! counter.add(1, &[KeyValue::new("mykey", "myvalue")]); //! } //! //! // in main or other app start //! init_meter(); //! do_something_instrumented(); +//! // Shutdown ensures any metrics still in memory are given to exporters +//! // before the program exits. +//! global::shutdown_meter_provider(); //! # } //! ``` //! @@ -122,13 +129,15 @@ //! # { //! use opentelemetry::{global, KeyValue}; //! -//! pub fn my_traced_library_function() { +//! pub fn my_instrumented_library_function() { //! // End users of your library will configure their global meter provider //! // so you can use the global meter without any setup //! let meter = global::meter("my-library-name"); +//! // It is recommended to reuse the same counter instance for the +//! // lifetime of the application //! let counter = meter.u64_counter("my_counter").init(); //! -//! // record metrics +//! // record measurements //! counter.add(1, &[KeyValue::new("mykey", "myvalue")]); //! } //! # } @@ -138,8 +147,6 @@ //! [`set_meter_provider`]: crate::global::set_meter_provider mod error_handler; -#[cfg(feature = "logs")] -mod logs; #[cfg(feature = "metrics")] mod metrics; #[cfg(feature = "trace")] @@ -148,12 +155,6 @@ mod propagation; mod trace; pub use error_handler::{handle_error, set_error_handler, Error}; -#[cfg(feature = "logs")] -#[cfg_attr(docsrs, doc(cfg(feature = "logs")))] -pub use logs::{ - logger, logger_provider, set_logger_provider, shutdown_logger_provider, GlobalLoggerProvider, - ObjectSafeLoggerProvider, -}; #[cfg(feature = "metrics")] #[cfg_attr(docsrs, doc(cfg(feature = "metrics")))] pub use metrics::*; diff --git a/opentelemetry/src/logs/logger.rs b/opentelemetry/src/logs/logger.rs index 6d4d48f973..11b7fb9d60 100644 --- a/opentelemetry/src/logs/logger.rs +++ b/opentelemetry/src/logs/logger.rs @@ -63,9 +63,11 @@ pub trait LoggerProvider { /// # Examples /// /// ``` - /// use opentelemetry::{global, logs::LoggerProvider}; + /// use opentelemetry::InstrumentationLibrary; + /// use crate::opentelemetry::logs::LoggerProvider; + /// use opentelemetry_sdk::logs::LoggerProvider as SdkLoggerProvider; /// - /// let provider = global::logger_provider(); + /// let provider = SdkLoggerProvider::builder().build(); /// /// // logger used in applications/binaries /// let logger = provider.logger_builder("my_app").build(); @@ -88,9 +90,11 @@ pub trait LoggerProvider { /// # Examples /// /// ``` - /// use opentelemetry::{global, InstrumentationLibrary, logs::LoggerProvider}; + /// use opentelemetry::InstrumentationLibrary; + /// use crate::opentelemetry::logs::LoggerProvider; + /// use opentelemetry_sdk::logs::LoggerProvider as SdkLoggerProvider; /// - /// let provider = global::logger_provider(); + /// let provider = SdkLoggerProvider::builder().build(); /// /// // logger used in applications/binaries /// let logger = provider.logger("my_app"); diff --git a/opentelemetry/src/logs/record.rs b/opentelemetry/src/logs/record.rs index ca705bac58..81fb27754f 100644 --- a/opentelemetry/src/logs/record.rs +++ b/opentelemetry/src/logs/record.rs @@ -373,10 +373,13 @@ impl LogRecordBuilder { } /// Sets the `event_name` of a record. - pub fn with_name(self, name: Cow<'static, str>) -> Self { + pub fn with_name(self, name: T) -> Self + where + T: Into>, + { Self { record: LogRecord { - event_name: Some(name), + event_name: Some(name.into()), ..self.record }, } diff --git a/stress/src/logs.rs b/stress/src/logs.rs index aaa3a1d7ae..fdc08e9dae 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -15,7 +15,7 @@ impl LogProcessor for NoOpLogProcessor { Ok(()) } - fn shutdown(&mut self) -> opentelemetry::logs::LogResult<()> { + fn shutdown(&self) -> opentelemetry::logs::LogResult<()> { Ok(()) }