From 47a897d197cbc4acc5362d07c254ef6744ed8241 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Fri, 14 Oct 2022 23:15:27 -0700 Subject: [PATCH] Add timeout support to MeterContext::ForceFlush (#1673) --- sdk/src/logs/multi_log_processor.cc | 2 +- sdk/src/metrics/meter_context.cc | 42 ++++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/sdk/src/logs/multi_log_processor.cc b/sdk/src/logs/multi_log_processor.cc index a6b2eef1f34..f8de227806e 100644 --- a/sdk/src/logs/multi_log_processor.cc +++ b/sdk/src/logs/multi_log_processor.cc @@ -68,7 +68,7 @@ void MultiLogProcessor::OnEmit(std::unique_ptr &&record) noexcept bool MultiLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept { - // Converto nanos to prevent overflow + // Convert to nanos to prevent overflow std::chrono::nanoseconds timeout_ns = std::chrono::nanoseconds::max(); if (std::chrono::duration_cast(timeout_ns) > timeout) { diff --git a/sdk/src/metrics/meter_context.cc b/sdk/src/metrics/meter_context.cc index 8c7abd97e95..37d0910c9d2 100644 --- a/sdk/src/metrics/meter_context.cc +++ b/sdk/src/metrics/meter_context.cc @@ -85,15 +85,51 @@ bool MeterContext::Shutdown() noexcept bool MeterContext::ForceFlush(std::chrono::microseconds timeout) noexcept { - // TODO - Implement timeout logic. bool result = true; if (!shutdown_latch_.test_and_set(std::memory_order_acquire)) { + // Convert to nanos to prevent overflow + auto timeout_ns = std::chrono::nanoseconds::max(); + if (std::chrono::duration_cast(timeout_ns) > timeout) + { + timeout_ns = std::chrono::duration_cast(timeout); + } + + auto current_time = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point expire_time; + auto overflow_checker = std::chrono::system_clock::time_point::max(); + + // check if the expected expire time doesn't overflow. + if (overflow_checker - current_time > timeout_ns) + { + expire_time = current_time + + std::chrono::duration_cast(timeout_ns); + } + else + { + // overflow happens, reset expire time to max. + expire_time = overflow_checker; + } for (auto &collector : collectors_) { - bool status = std::static_pointer_cast(collector)->ForceFlush(timeout); - result = result && status; + if (!std::static_pointer_cast(collector)->ForceFlush( + std::chrono::duration_cast(timeout_ns))) + { + result = false; + } + + current_time = std::chrono::system_clock::now(); + + if (expire_time >= current_time) + { + timeout_ns = + std::chrono::duration_cast(expire_time - current_time); + } + else + { + timeout_ns = std::chrono::nanoseconds::zero(); + } } if (!result) {