Skip to content

Commit

Permalink
ENG-3176 : Fix TestTimedoutInQueue
Browse files Browse the repository at this point in the history
Summary: Fix TestTimedoutInQueue

Test Plan: ybd release --cxx-test redisserver_redisserver-test --gtest_filter TestRedisService.TestTimedoutInQueue

Reviewers: mikhail, hector

Reviewed By: hector

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4590
  • Loading branch information
amitanandaiyer committed Apr 11, 2018
1 parent b67e03c commit 5ab7a6e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
3 changes: 3 additions & 0 deletions src/yb/rpc/inbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,15 @@ Trace* InboundCall::trace() {
void InboundCall::RecordCallReceived() {
TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this);
DCHECK(!timing_.time_received.Initialized()); // Protect against multiple calls.
VLOG(4) << "Received call " << ToString();
timing_.time_received = MonoTime::Now();
}

void InboundCall::RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time) {
DCHECK(incoming_queue_time != nullptr);
DCHECK(!timing_.time_handled.Initialized()); // Protect against multiple calls.
timing_.time_handled = MonoTime::Now();
VLOG(4) << "Handling call " << ToString();
incoming_queue_time->Increment(
timing_.time_handled.GetDeltaSince(timing_.time_received).ToMicroseconds());
}
Expand All @@ -144,6 +146,7 @@ MonoDelta InboundCall::GetTimeInQueue() const {
void InboundCall::RecordHandlingCompleted(scoped_refptr<Histogram> handler_run_time) {
DCHECK(!timing_.time_completed.Initialized()); // Protect against multiple calls.
timing_.time_completed = MonoTime::Now();
VLOG(4) << "Completed handling call " << ToString();
if (handler_run_time) {
handler_run_time->Increment((timing_.time_completed - timing_.time_handled).ToMicroseconds());
}
Expand Down
18 changes: 10 additions & 8 deletions src/yb/rpc/service_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ namespace rpc {

namespace {

static constexpr CoarseMonoClock::TimePoint kNone{CoarseMonoClock::TimePoint::min()};
static constexpr CoarseMonoClock::Duration kNone{
CoarseMonoClock::TimePoint::min().time_since_epoch()};

class InboundCallTask final {
public:
Expand Down Expand Up @@ -169,7 +170,8 @@ class ServicePoolImpl {
const auto response_status = STATUS(ServiceUnavailable, err_msg);
rpcs_queue_overflow_->Increment();
call->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, response_status);
last_backpressure_at_.store(CoarseMonoClock::Now(), std::memory_order_release);
last_backpressure_at_.store(
CoarseMonoClock::Now().time_since_epoch(), std::memory_order_release);
}

void Processed(const InboundCallPtr& call, const Status& status) {
Expand Down Expand Up @@ -197,6 +199,7 @@ class ServicePoolImpl {
? "Call waited in the queue past deadline"
: "The server is overloaded. Call waited in the queue past max_time_in_queue.");
TRACE_TO(incoming->trace(), message);
VLOG(4) << "Timing out call " << incoming->ToString() << " due to : " << message;
rpcs_timed_out_in_queue_->Increment();

// Respond as a failure, even though the client will probably ignore
Expand All @@ -217,17 +220,16 @@ class ServicePoolImpl {

// For testing purposes.
if (FLAGS_enable_backpressure_mode_for_testing) {
last_backpressure_at = CoarseMonoClock::Now();
last_backpressure_at = CoarseMonoClock::Now().time_since_epoch();
}

// Test for a sentinel value, to avoid reading the clock.
if (last_backpressure_at == kNone) {
return false;
}
auto now = CoarseMonoClock::Now();
if (ToMilliseconds(now.time_since_epoch()) <=
ToMilliseconds(last_backpressure_at.time_since_epoch()) +
FLAGS_backpressure_recovery_period_ms) {
auto now = CoarseMonoClock::Now().time_since_epoch();
if (ToMilliseconds(now) >
ToMilliseconds(last_backpressure_at) + FLAGS_backpressure_recovery_period_ms) {
last_backpressure_at_.store(kNone, std::memory_order_release);
return false;
}
Expand All @@ -240,7 +242,7 @@ class ServicePoolImpl {
scoped_refptr<Histogram> incoming_queue_time_;
scoped_refptr<Counter> rpcs_timed_out_in_queue_;
scoped_refptr<Counter> rpcs_queue_overflow_;
std::atomic<CoarseMonoClock::TimePoint> last_backpressure_at_{kNone};
std::atomic<CoarseMonoClock::Duration> last_backpressure_at_;

std::atomic<bool> closing_ = {false};
TasksPool<InboundCallTask> tasks_pool_;
Expand Down

0 comments on commit 5ab7a6e

Please sign in to comment.