From 5ab7a6e7846424886cb825eda613d421ecdadc76 Mon Sep 17 00:00:00 2001 From: Amitanand Aiyer Date: Tue, 10 Apr 2018 15:44:09 -0700 Subject: [PATCH] ENG-3176 : Fix TestTimedoutInQueue Summary: Fix TestTimedoutInQueue Test Plan: ybd release --cxx-test redisserver_redisserver-test --gtest_filter TestRedisService.TestTimedoutInQueue Reviewers: mikhail, hector Reviewed By: hector Subscribers: ybase Differential Revision: https://phabricator.dev.yugabyte.com/D4590 --- src/yb/rpc/inbound_call.cc | 3 +++ src/yb/rpc/service_pool.cc | 18 ++++++++++-------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/yb/rpc/inbound_call.cc b/src/yb/rpc/inbound_call.cc index cfa783b18430..de3762dc3d94 100644 --- a/src/yb/rpc/inbound_call.cc +++ b/src/yb/rpc/inbound_call.cc @@ -126,6 +126,7 @@ Trace* InboundCall::trace() { void InboundCall::RecordCallReceived() { TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this); DCHECK(!timing_.time_received.Initialized()); // Protect against multiple calls. + VLOG(4) << "Received call " << ToString(); timing_.time_received = MonoTime::Now(); } @@ -133,6 +134,7 @@ void InboundCall::RecordHandlingStarted(scoped_refptr incoming_queue_ DCHECK(incoming_queue_time != nullptr); DCHECK(!timing_.time_handled.Initialized()); // Protect against multiple calls. timing_.time_handled = MonoTime::Now(); + VLOG(4) << "Handling call " << ToString(); incoming_queue_time->Increment( timing_.time_handled.GetDeltaSince(timing_.time_received).ToMicroseconds()); } @@ -144,6 +146,7 @@ MonoDelta InboundCall::GetTimeInQueue() const { void InboundCall::RecordHandlingCompleted(scoped_refptr handler_run_time) { DCHECK(!timing_.time_completed.Initialized()); // Protect against multiple calls. timing_.time_completed = MonoTime::Now(); + VLOG(4) << "Completed handling call " << ToString(); if (handler_run_time) { handler_run_time->Increment((timing_.time_completed - timing_.time_handled).ToMicroseconds()); } diff --git a/src/yb/rpc/service_pool.cc b/src/yb/rpc/service_pool.cc index eda4e78ae2e4..810a3c5b1dbe 100644 --- a/src/yb/rpc/service_pool.cc +++ b/src/yb/rpc/service_pool.cc @@ -93,7 +93,8 @@ namespace rpc { namespace { -static constexpr CoarseMonoClock::TimePoint kNone{CoarseMonoClock::TimePoint::min()}; +static constexpr CoarseMonoClock::Duration kNone{ + CoarseMonoClock::TimePoint::min().time_since_epoch()}; class InboundCallTask final { public: @@ -169,7 +170,8 @@ class ServicePoolImpl { const auto response_status = STATUS(ServiceUnavailable, err_msg); rpcs_queue_overflow_->Increment(); call->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, response_status); - last_backpressure_at_.store(CoarseMonoClock::Now(), std::memory_order_release); + last_backpressure_at_.store( + CoarseMonoClock::Now().time_since_epoch(), std::memory_order_release); } void Processed(const InboundCallPtr& call, const Status& status) { @@ -197,6 +199,7 @@ class ServicePoolImpl { ? "Call waited in the queue past deadline" : "The server is overloaded. Call waited in the queue past max_time_in_queue."); TRACE_TO(incoming->trace(), message); + VLOG(4) << "Timing out call " << incoming->ToString() << " due to : " << message; rpcs_timed_out_in_queue_->Increment(); // Respond as a failure, even though the client will probably ignore @@ -217,17 +220,16 @@ class ServicePoolImpl { // For testing purposes. if (FLAGS_enable_backpressure_mode_for_testing) { - last_backpressure_at = CoarseMonoClock::Now(); + last_backpressure_at = CoarseMonoClock::Now().time_since_epoch(); } // Test for a sentinel value, to avoid reading the clock. if (last_backpressure_at == kNone) { return false; } - auto now = CoarseMonoClock::Now(); - if (ToMilliseconds(now.time_since_epoch()) <= - ToMilliseconds(last_backpressure_at.time_since_epoch()) + - FLAGS_backpressure_recovery_period_ms) { + auto now = CoarseMonoClock::Now().time_since_epoch(); + if (ToMilliseconds(now) > + ToMilliseconds(last_backpressure_at) + FLAGS_backpressure_recovery_period_ms) { last_backpressure_at_.store(kNone, std::memory_order_release); return false; } @@ -240,7 +242,7 @@ class ServicePoolImpl { scoped_refptr incoming_queue_time_; scoped_refptr rpcs_timed_out_in_queue_; scoped_refptr rpcs_queue_overflow_; - std::atomic last_backpressure_at_{kNone}; + std::atomic last_backpressure_at_; std::atomic closing_ = {false}; TasksPool tasks_pool_;