diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index 3753704bb3..1668f0381d 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -20,6 +20,7 @@ #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/common/exporter_utils.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" // forward declare google::protobuf::Message @@ -83,28 +84,33 @@ struct OtlpHttpClientOptions // User agent std::string user_agent; - inline OtlpHttpClientOptions(nostd::string_view input_url, - bool input_ssl_insecure_skip_verify, - nostd::string_view input_ssl_ca_cert_path, - nostd::string_view input_ssl_ca_cert_string, - nostd::string_view input_ssl_client_key_path, - nostd::string_view input_ssl_client_key_string, - nostd::string_view input_ssl_client_cert_path, - nostd::string_view input_ssl_client_cert_string, - nostd::string_view input_ssl_min_tls, - nostd::string_view input_ssl_max_tls, - nostd::string_view input_ssl_cipher, - nostd::string_view input_ssl_cipher_suite, - HttpRequestContentType input_content_type, - JsonBytesMappingKind input_json_bytes_mapping, - nostd::string_view input_compression, - bool input_use_json_name, - bool input_console_debug, - std::chrono::system_clock::duration input_timeout, - const OtlpHeaders &input_http_headers, - std::size_t input_concurrent_sessions = 64, - std::size_t input_max_requests_per_connection = 8, - nostd::string_view input_user_agent = GetOtlpDefaultUserAgent()) + std::shared_ptr thread_instrumentation = + std::shared_ptr(nullptr); + + inline OtlpHttpClientOptions( + nostd::string_view input_url, + bool input_ssl_insecure_skip_verify, + nostd::string_view input_ssl_ca_cert_path, + nostd::string_view input_ssl_ca_cert_string, + nostd::string_view input_ssl_client_key_path, + nostd::string_view input_ssl_client_key_string, + nostd::string_view input_ssl_client_cert_path, + nostd::string_view input_ssl_client_cert_string, + nostd::string_view input_ssl_min_tls, + nostd::string_view input_ssl_max_tls, + nostd::string_view input_ssl_cipher, + nostd::string_view input_ssl_cipher_suite, + HttpRequestContentType input_content_type, + JsonBytesMappingKind input_json_bytes_mapping, + nostd::string_view input_compression, + bool input_use_json_name, + bool input_console_debug, + std::chrono::system_clock::duration input_timeout, + const OtlpHeaders &input_http_headers, + const std::shared_ptr &input_thread_instrumentation, + std::size_t input_concurrent_sessions = 64, + std::size_t input_max_requests_per_connection = 8, + nostd::string_view input_user_agent = GetOtlpDefaultUserAgent()) : url(input_url), ssl_options(input_url, input_ssl_insecure_skip_verify, @@ -127,7 +133,8 @@ struct OtlpHttpClientOptions http_headers(input_http_headers), max_concurrent_requests(input_concurrent_sessions), max_requests_per_connection(input_max_requests_per_connection), - user_agent(input_user_agent) + user_agent(input_user_agent), + thread_instrumentation(input_thread_instrumentation) {} }; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter_options.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter_options.h index 7f6d5a1b35..8818ff7174 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter_options.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_exporter_options.h @@ -4,10 +4,12 @@ #pragma once #include +#include #include #include "opentelemetry/exporters/otlp/otlp_environment.h" #include "opentelemetry/exporters/otlp/otlp_http.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -101,6 +103,9 @@ struct OPENTELEMETRY_EXPORT OtlpHttpExporterOptions /** Compression type. */ std::string compression; + + std::shared_ptr thread_instrumentation = + std::shared_ptr(nullptr); }; } // namespace otlp diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter_options.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter_options.h index 60f674a3a7..4cc3d6266c 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter_options.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_log_record_exporter_options.h @@ -4,10 +4,12 @@ #pragma once #include +#include #include #include "opentelemetry/exporters/otlp/otlp_environment.h" #include "opentelemetry/exporters/otlp/otlp_http.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -101,6 +103,9 @@ struct OPENTELEMETRY_EXPORT OtlpHttpLogRecordExporterOptions /** Compression type. */ std::string compression; + + std::shared_ptr thread_instrumentation = + std::shared_ptr(nullptr); }; } // namespace otlp diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_metric_exporter_options.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_metric_exporter_options.h index 82c91aa51f..78b81838dc 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_metric_exporter_options.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_metric_exporter_options.h @@ -4,11 +4,13 @@ #pragma once #include +#include #include #include "opentelemetry/exporters/otlp/otlp_environment.h" #include "opentelemetry/exporters/otlp/otlp_http.h" #include "opentelemetry/exporters/otlp/otlp_preferred_temporality.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -104,6 +106,9 @@ struct OPENTELEMETRY_EXPORT OtlpHttpMetricExporterOptions /** Compression type. */ std::string compression; + + std::shared_ptr thread_instrumentation = + std::shared_ptr(nullptr); }; } // namespace otlp diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 506295ad7f..3b337a566b 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -665,7 +665,7 @@ void ConvertListFieldToJson(nlohmann::json &value, OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options) : is_shutdown_(false), options_(options), - http_client_(http_client::HttpClientFactory::Create()), + http_client_(http_client::HttpClientFactory::Create(options.thread_instrumentation)), start_session_counter_(0), finished_session_counter_(0) { diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index f740a0babf..2b417e8189 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -57,7 +57,8 @@ OtlpHttpExporter::OtlpHttpExporter(const OtlpHttpExporterOptions &options) options.use_json_name, options.console_debug, options.timeout, - options.http_headers + options.http_headers, + options.thread_instrumentation #ifdef ENABLE_ASYNC_EXPORT , options.max_concurrent_requests, @@ -77,6 +78,7 @@ OtlpHttpExporter::OtlpHttpExporter(std::unique_ptr http_client) options.console_debug = http_client_->GetOptions().console_debug; options.timeout = http_client_->GetOptions().timeout; options.http_headers = http_client_->GetOptions().http_headers; + options.thread_instrumentation = http_client_->GetOptions().thread_instrumentation; #ifdef ENABLE_ASYNC_EXPORT options.max_concurrent_requests = http_client_->GetOptions().max_concurrent_requests; options.max_requests_per_connection = http_client_->GetOptions().max_requests_per_connection; diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index bd76bd10bd..4dc71fe2e3 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -60,7 +60,8 @@ OtlpHttpLogRecordExporter::OtlpHttpLogRecordExporter( options.use_json_name, options.console_debug, options.timeout, - options.http_headers + options.http_headers, + options.thread_instrumentation #ifdef ENABLE_ASYNC_EXPORT , options.max_concurrent_requests, @@ -74,13 +75,14 @@ OtlpHttpLogRecordExporter::OtlpHttpLogRecordExporter(std::unique_ptr(options_); - options.url = http_client_->GetOptions().url; - options.content_type = http_client_->GetOptions().content_type; - options.json_bytes_mapping = http_client_->GetOptions().json_bytes_mapping; - options.use_json_name = http_client_->GetOptions().use_json_name; - options.console_debug = http_client_->GetOptions().console_debug; - options.timeout = http_client_->GetOptions().timeout; - options.http_headers = http_client_->GetOptions().http_headers; + options.url = http_client_->GetOptions().url; + options.content_type = http_client_->GetOptions().content_type; + options.json_bytes_mapping = http_client_->GetOptions().json_bytes_mapping; + options.use_json_name = http_client_->GetOptions().use_json_name; + options.console_debug = http_client_->GetOptions().console_debug; + options.timeout = http_client_->GetOptions().timeout; + options.http_headers = http_client_->GetOptions().http_headers; + options.thread_instrumentation = http_client_->GetOptions().thread_instrumentation; #ifdef ENABLE_ASYNC_EXPORT options.max_concurrent_requests = http_client_->GetOptions().max_concurrent_requests; options.max_requests_per_connection = http_client_->GetOptions().max_requests_per_connection; diff --git a/exporters/otlp/src/otlp_http_metric_exporter.cc b/exporters/otlp/src/otlp_http_metric_exporter.cc index 18b4ed0712..4748d22b47 100644 --- a/exporters/otlp/src/otlp_http_metric_exporter.cc +++ b/exporters/otlp/src/otlp_http_metric_exporter.cc @@ -61,7 +61,8 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptio options.use_json_name, options.console_debug, options.timeout, - options.http_headers + options.http_headers, + options.thread_instrumentation #ifdef ENABLE_ASYNC_EXPORT , options.max_concurrent_requests, @@ -84,6 +85,7 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(std::unique_ptr h options.console_debug = http_client_->GetOptions().console_debug; options.timeout = http_client_->GetOptions().timeout; options.http_headers = http_client_->GetOptions().http_headers; + options.thread_instrumentation = http_client_->GetOptions().thread_instrumentation; #ifdef ENABLE_ASYNC_EXPORT options.max_concurrent_requests = http_client_->GetOptions().max_concurrent_requests; options.max_requests_per_connection = http_client_->GetOptions().max_requests_per_connection; diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index 4c4a6dbeb2..0824a59a06 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -54,6 +54,7 @@ static nostd::span MakeSpan(T (&array)[N]) OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type, bool async_mode) { + std::shared_ptr not_instrumented; OtlpHttpExporterOptions options; options.content_type = content_type; options.console_debug = true; @@ -70,7 +71,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t "", /* ssl_cipher */ "", /* ssl_cipher_suite */ options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name, - options.console_debug, options.timeout, options.http_headers); + options.console_debug, options.timeout, options.http_headers, not_instrumented); if (!async_mode) { otlp_http_client_options.max_concurrent_requests = 0; diff --git a/exporters/otlp/test/otlp_http_log_record_exporter_test.cc b/exporters/otlp/test/otlp_http_log_record_exporter_test.cc index 35a0bb62e2..606f162024 100644 --- a/exporters/otlp/test/otlp_http_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_record_exporter_test.cc @@ -54,6 +54,7 @@ static nostd::span MakeSpan(T (&array)[N]) OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type, bool async_mode) { + std::shared_ptr not_instrumented; OtlpHttpLogRecordExporterOptions options; options.content_type = content_type; options.console_debug = true; @@ -69,7 +70,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t "", /* ssl_cipher */ "", /* ssl_cipher_suite */ options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name, - options.console_debug, options.timeout, options.http_headers); + options.console_debug, options.timeout, options.http_headers, not_instrumented); if (!async_mode) { otlp_http_client_options.max_concurrent_requests = 0; diff --git a/exporters/otlp/test/otlp_http_metric_exporter_test.cc b/exporters/otlp/test/otlp_http_metric_exporter_test.cc index 0b44d98379..3c438093db 100644 --- a/exporters/otlp/test/otlp_http_metric_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_metric_exporter_test.cc @@ -28,6 +28,7 @@ #include "opentelemetry/ext/http/client/http_client.h" #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/sdk/common/exporter_utils.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" #include "opentelemetry/sdk/metrics/data/point_data.h" @@ -76,6 +77,7 @@ static IntegerType JsonToInteger(nlohmann::json value) OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_type, bool async_mode) { + std::shared_ptr not_instrumented; OtlpHttpMetricExporterOptions options; options.content_type = content_type; options.console_debug = true; @@ -91,7 +93,7 @@ OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType content_t "", /* ssl_cipher */ "", /* ssl_cipher_suite */ options.content_type, options.json_bytes_mapping, options.compression, options.use_json_name, - options.console_debug, options.timeout, options.http_headers); + options.console_debug, options.timeout, options.http_headers, not_instrumented); if (!async_mode) { otlp_http_client_options.max_concurrent_requests = 0; diff --git a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h index 97386890f2..db2a507305 100644 --- a/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h +++ b/ext/include/opentelemetry/ext/http/client/curl/http_client_curl.h @@ -23,6 +23,7 @@ #include "opentelemetry/nostd/function_ref.h" #include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -302,6 +303,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient public: // The call (curl_global_init) is not thread safe. Ensure this is called only once. HttpClient(); + HttpClient(const std::shared_ptr &thread_instrumentation); ~HttpClient() override; std::shared_ptr CreateSession( @@ -364,6 +366,7 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient std::mutex background_thread_m_; std::unique_ptr background_thread_; + std::shared_ptr background_thread_instrumentation_; std::chrono::milliseconds scheduled_delay_milliseconds_; nostd::shared_ptr curl_global_initializer_; diff --git a/ext/include/opentelemetry/ext/http/client/http_client_factory.h b/ext/include/opentelemetry/ext/http/client/http_client_factory.h index 43e15cf255..c1a326b7e6 100644 --- a/ext/include/opentelemetry/ext/http/client/http_client_factory.h +++ b/ext/include/opentelemetry/ext/http/client/http_client_factory.h @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 #pragma once + #include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" +#include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE namespace ext @@ -17,6 +20,8 @@ class HttpClientFactory static std::shared_ptr CreateSync(); static std::shared_ptr Create(); + static std::shared_ptr Create( + const std::shared_ptr &thread_instrumentation); }; } // namespace client } // namespace http diff --git a/ext/src/http/client/curl/http_client_curl.cc b/ext/src/http/client/curl/http_client_curl.cc index 6827b9f9c7..a3445ea5bb 100644 --- a/ext/src/http/client/curl/http_client_curl.cc +++ b/ext/src/http/client/curl/http_client_curl.cc @@ -21,6 +21,7 @@ #include "opentelemetry/ext/http/common/url_parser.h" #include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/nostd/string_view.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" #ifdef ENABLE_OTLP_COMPRESSION_PREVIEW @@ -188,6 +189,17 @@ HttpClient::HttpClient() : multi_handle_(curl_multi_init()), next_session_id_{0}, max_sessions_per_connection_{8}, + background_thread_instrumentation_(nullptr), + scheduled_delay_milliseconds_{std::chrono::milliseconds(256)}, + curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance()) +{} + +HttpClient::HttpClient( + const std::shared_ptr &thread_instrumentation) + : multi_handle_(curl_multi_init()), + next_session_id_{0}, + max_sessions_per_connection_{8}, + background_thread_instrumentation_(thread_instrumentation), scheduled_delay_milliseconds_{std::chrono::milliseconds(256)}, curl_global_initializer_(HttpCurlGlobalInitializer::GetInstance()) {} @@ -345,6 +357,11 @@ void HttpClient::MaybeSpawnBackgroundThread() background_thread_.reset(new std::thread( [](HttpClient *self) { + if (self->background_thread_instrumentation_ != nullptr) + { + self->background_thread_instrumentation_->OnStart(); + } + int still_running = 1; while (true) { @@ -445,6 +462,11 @@ void HttpClient::MaybeSpawnBackgroundThread() { if (self->background_thread_) { + if (self->background_thread_instrumentation_ != nullptr) + { + self->background_thread_instrumentation_->OnEnd(); + self->background_thread_instrumentation_.reset(); + } self->background_thread_->detach(); self->background_thread_.reset(); } diff --git a/ext/src/http/client/curl/http_client_factory_curl.cc b/ext/src/http/client/curl/http_client_factory_curl.cc index fd0a7d6a81..b339c2d93c 100644 --- a/ext/src/http/client/curl/http_client_factory_curl.cc +++ b/ext/src/http/client/curl/http_client_factory_curl.cc @@ -6,6 +6,7 @@ #include "opentelemetry/ext/http/client/curl/http_client_curl.h" #include "opentelemetry/ext/http/client/http_client.h" #include "opentelemetry/ext/http/client/http_client_factory.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" namespace http_client = opentelemetry::ext::http::client; @@ -14,6 +15,12 @@ std::shared_ptr http_client::HttpClientFactory::Create( return std::make_shared(); } +std::shared_ptr http_client::HttpClientFactory::Create( + const std::shared_ptr &thread_instrumentation) +{ + return std::make_shared(thread_instrumentation); +} + std::shared_ptr http_client::HttpClientFactory::CreateSync() { return std::make_shared(); diff --git a/sdk/include/opentelemetry/sdk/common/thread_instrumentation.h b/sdk/include/opentelemetry/sdk/common/thread_instrumentation.h new file mode 100644 index 0000000000..e04ac60903 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/common/thread_instrumentation.h @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace common +{ + +class ThreadInstrumentation +{ +public: + ThreadInstrumentation() = default; + virtual ~ThreadInstrumentation() = default; + + virtual void OnStart() = 0; + virtual void OnEnd() = 0; + virtual void BeforeWait() = 0; + virtual void AfterWait() = 0; + virtual void BeforeLoad() = 0; + virtual void AfterLoad() = 0; +}; + +} // namespace common +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h index b52476071a..da2bd25e9a 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h @@ -157,6 +157,7 @@ class BatchLogRecordProcessor : public LogRecordProcessor std::shared_ptr synchronization_data_; /* The background worker thread */ + std::shared_ptr worker_thread_instrumentation_; std::thread worker_thread_; }; diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor_options.h b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor_options.h index b130f947af..1f27d0f9b8 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor_options.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_record_processor_options.h @@ -6,6 +6,7 @@ #include #include +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -34,6 +35,9 @@ struct BatchLogRecordProcessorOptions * equal to max_queue_size. */ size_t max_export_batch_size = 512; + + std::shared_ptr thread_instrumentation = + std::shared_ptr(nullptr); }; } // namespace logs diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index d72ec08839..a8e51c2a25 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -11,6 +11,7 @@ #include #include +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h" #include "opentelemetry/sdk/metrics/instruments.h" #include "opentelemetry/sdk/metrics/metric_reader.h" @@ -47,15 +48,17 @@ class PeriodicExportingMetricReader : public MetricReader void DoBackgroundWork(); bool CollectAndExportOnce(); - /* The background worker thread */ - std::thread worker_thread_; - /* Synchronization primitives */ std::atomic is_force_wakeup_background_worker_{false}; std::atomic force_flush_pending_sequence_{0}; std::atomic force_flush_notified_sequence_{0}; std::condition_variable cv_, force_flush_cv_; std::mutex cv_m_, force_flush_m_; + + /* The background worker thread */ + std::shared_ptr worker_thread_instrumentation_; + std::shared_ptr collect_thread_instrumentation_; + std::thread worker_thread_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h index 80514f96d8..6c2f4b889c 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h @@ -3,6 +3,7 @@ #pragma once +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" #include @@ -28,6 +29,11 @@ struct PeriodicExportingMetricReaderOptions /* how long the export can run before it is cancelled. */ std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis); + + std::shared_ptr periodic_thread_instrumentation = + std::shared_ptr(nullptr); + std::shared_ptr collect_thread_instrumentation = + std::shared_ptr(nullptr); }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index e069a461e6..a6ba35f389 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -158,6 +158,7 @@ class BatchSpanProcessor : public SpanProcessor std::shared_ptr synchronization_data_; /* The background worker thread */ + std::shared_ptr worker_thread_instrumentation_; std::thread worker_thread_; }; diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor_options.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor_options.h index ac1052383b..eda4533f43 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor_options.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor_options.h @@ -5,6 +5,7 @@ #include +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -33,6 +34,9 @@ struct BatchSpanProcessorOptions * equal to max_queue_size. */ size_t max_export_batch_size = 512; + + std::shared_ptr thread_instrumentation = + std::shared_ptr(nullptr); }; } // namespace trace diff --git a/sdk/src/logs/batch_log_record_processor.cc b/sdk/src/logs/batch_log_record_processor.cc index f5c35ad339..8ed5c18269 100644 --- a/sdk/src/logs/batch_log_record_processor.cc +++ b/sdk/src/logs/batch_log_record_processor.cc @@ -19,6 +19,7 @@ #include "opentelemetry/sdk/common/atomic_unique_ptr.h" #include "opentelemetry/sdk/common/circular_buffer.h" #include "opentelemetry/sdk/common/circular_buffer_range.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/sdk/logs/batch_log_record_processor.h" #include "opentelemetry/sdk/logs/batch_log_record_processor_options.h" #include "opentelemetry/sdk/logs/exporter.h" @@ -44,8 +45,12 @@ BatchLogRecordProcessor::BatchLogRecordProcessor( max_export_batch_size_(max_export_batch_size), buffer_(max_queue_size_), synchronization_data_(std::make_shared()), - worker_thread_(&BatchLogRecordProcessor::DoBackgroundWork, this) -{} + worker_thread_instrumentation_(nullptr), + worker_thread_() +{ + // Make sure the constructor is complete before giving 'this' to a thread. + worker_thread_ = std::thread(&BatchLogRecordProcessor::DoBackgroundWork, this); +} BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptr &&exporter, const BatchLogRecordProcessorOptions &options) @@ -55,8 +60,12 @@ BatchLogRecordProcessor::BatchLogRecordProcessor(std::unique_ptr()), - worker_thread_(&BatchLogRecordProcessor::DoBackgroundWork, this) -{} + worker_thread_instrumentation_(options.thread_instrumentation), + worker_thread_() +{ + // Make sure the constructor is complete before giving 'this' to a thread. + worker_thread_ = std::thread(&BatchLogRecordProcessor::DoBackgroundWork, this); +} std::unique_ptr BatchLogRecordProcessor::MakeRecordable() noexcept { @@ -151,10 +160,20 @@ bool BatchLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex void BatchLogRecordProcessor::DoBackgroundWork() { + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->OnStart(); + } + auto timeout = scheduled_delay_millis_; while (true) { + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->BeforeWait(); + } + // Wait for `timeout` milliseconds std::unique_lock lk(synchronization_data_->cv_m); synchronization_data_->cv.wait_for(lk, timeout, [this] { @@ -168,6 +187,11 @@ void BatchLogRecordProcessor::DoBackgroundWork() synchronization_data_->is_force_wakeup_background_worker.store(false, std::memory_order_release); + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->AfterWait(); + } + if (synchronization_data_->is_shutdown.load() == true) { DrainQueue(); @@ -182,10 +206,20 @@ void BatchLogRecordProcessor::DoBackgroundWork() // Subtract the duration of this export call from the next `timeout`. timeout = scheduled_delay_millis_ - duration; } + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->OnEnd(); + } } void BatchLogRecordProcessor::Export() { + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->BeforeLoad(); + } + do { std::vector> records_arr; @@ -224,6 +258,11 @@ void BatchLogRecordProcessor::Export() nostd::span>(records_arr.data(), records_arr.size())); NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); } while (true); + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->AfterLoad(); + } } void BatchLogRecordProcessor::NotifyCompletion( diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index 682badf3a4..8eeb56a53e 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -14,6 +14,7 @@ #include "opentelemetry/common/timestamp.h" #include "opentelemetry/sdk/common/global_log_handler.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/sdk/metrics/export/metric_producer.h" #include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" #include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_options.h" @@ -43,7 +44,9 @@ PeriodicExportingMetricReader::PeriodicExportingMetricReader( const PeriodicExportingMetricReaderOptions &option) : exporter_{std::move(exporter)}, export_interval_millis_{option.export_interval_millis}, - export_timeout_millis_{option.export_timeout_millis} + export_timeout_millis_{option.export_timeout_millis}, + worker_thread_instrumentation_(option.periodic_thread_instrumentation), + collect_thread_instrumentation_(option.collect_thread_instrumentation) { if (export_interval_millis_ <= export_timeout_millis_) { @@ -67,18 +70,42 @@ void PeriodicExportingMetricReader::OnInitialized() noexcept void PeriodicExportingMetricReader::DoBackgroundWork() { - std::unique_lock lk(cv_m_); + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->OnStart(); + } + do { - auto start = std::chrono::steady_clock::now(); + auto start = std::chrono::steady_clock::now(); + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->BeforeLoad(); + } + auto status = CollectAndExportOnce(); + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->AfterLoad(); + } + if (!status) { OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.") } + auto end = std::chrono::steady_clock::now(); auto export_time_ms = std::chrono::duration_cast(end - start); auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->BeforeWait(); + } + + std::unique_lock lk(cv_m_); cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) { @@ -87,7 +114,18 @@ void PeriodicExportingMetricReader::DoBackgroundWork() } return IsShutdown(); }); + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->AfterWait(); + } + } while (IsShutdown() != true); + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->OnEnd(); + } } bool PeriodicExportingMetricReader::CollectAndExportOnce() @@ -106,6 +144,12 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() task_thread.reset( new std::thread([this, &cancel_export_for_timeout, sender = std::move(sender)] { + if (collect_thread_instrumentation_ != nullptr) + { + collect_thread_instrumentation_->OnStart(); + collect_thread_instrumentation_->BeforeLoad(); + } + this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) { if (cancel_export_for_timeout.load(std::memory_order_acquire)) { @@ -119,6 +163,12 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() }); const_cast &>(sender).set_value(); + + if (collect_thread_instrumentation_ != nullptr) + { + collect_thread_instrumentation_->AfterLoad(); + collect_thread_instrumentation_->OnEnd(); + } })); std::future_status status; diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index f0f55d20e8..578b0a4fa0 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -20,6 +20,7 @@ #include "opentelemetry/sdk/common/circular_buffer.h" #include "opentelemetry/sdk/common/circular_buffer_range.h" #include "opentelemetry/sdk/common/global_log_handler.h" +#include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/sdk/trace/batch_span_processor.h" #include "opentelemetry/sdk/trace/batch_span_processor_options.h" #include "opentelemetry/sdk/trace/exporter.h" @@ -46,8 +47,12 @@ BatchSpanProcessor::BatchSpanProcessor(std::unique_ptr &&exporter, max_export_batch_size_(options.max_export_batch_size), buffer_(max_queue_size_), synchronization_data_(std::make_shared()), - worker_thread_(&BatchSpanProcessor::DoBackgroundWork, this) -{} + worker_thread_instrumentation_(options.thread_instrumentation), + worker_thread_() +{ + // Make sure the constructor is complete before giving 'this' to a thread. + worker_thread_ = std::thread(&BatchSpanProcessor::DoBackgroundWork, this); +} std::unique_ptr BatchSpanProcessor::MakeRecordable() noexcept { @@ -149,10 +154,20 @@ bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept void BatchSpanProcessor::DoBackgroundWork() { + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->OnStart(); + } + auto timeout = schedule_delay_millis_; while (true) { + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->BeforeWait(); + } + // Wait for `timeout` milliseconds std::unique_lock lk(synchronization_data_->cv_m); synchronization_data_->cv.wait_for(lk, timeout, [this] { @@ -166,6 +181,11 @@ void BatchSpanProcessor::DoBackgroundWork() synchronization_data_->is_force_wakeup_background_worker.store(false, std::memory_order_release); + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->AfterWait(); + } + if (synchronization_data_->is_shutdown.load() == true) { DrainQueue(); @@ -180,10 +200,20 @@ void BatchSpanProcessor::DoBackgroundWork() // Subtract the duration of this export call from the next `timeout`. timeout = schedule_delay_millis_ - duration; } + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->OnEnd(); + } } void BatchSpanProcessor::Export() { + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->BeforeLoad(); + } + do { std::vector> spans_arr; @@ -222,6 +252,11 @@ void BatchSpanProcessor::Export() exporter_->Export(nostd::span>(spans_arr.data(), spans_arr.size())); NotifyCompletion(notify_force_flush, exporter_, synchronization_data_); } while (true); + + if (worker_thread_instrumentation_ != nullptr) + { + worker_thread_instrumentation_->AfterLoad(); + } } void BatchSpanProcessor::NotifyCompletion(