Skip to content

Commit

Permalink
ENG-2993: #87 Expire based on time spent in the queue
Browse files Browse the repository at this point in the history
Summary:
We do not expect calls to spend too long waiting
in the queue. If they spend way too long, it means that
the system is already in a bad state, and we may be applying
backpressure to prevent new requests from joining the queue.

To avoid spending time on calls that are too old, we will
abort calls, if they have just been waiting in the queue for too long
and move on to process more "timely" requests.

Test Plan:
Added a test.
+ Jenkins

Reviewers: kannan, bharat, hector, mikhail

Reviewed By: bharat, hector, mikhail

Subscribers: bharat, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4251
  • Loading branch information
amitanandaiyer committed Apr 10, 2018
1 parent 1796740 commit d537ed2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 5 deletions.
4 changes: 4 additions & 0 deletions src/yb/rpc/inbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ void InboundCall::RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_
timing_.time_handled.GetDeltaSince(timing_.time_received).ToMicroseconds());
}

MonoDelta InboundCall::GetTimeInQueue() const {
return timing_.time_handled.GetDeltaSince(timing_.time_received);
}

void InboundCall::RecordHandlingCompleted(scoped_refptr<Histogram> handler_run_time) {
DCHECK(!timing_.time_completed.Initialized()); // Protect against multiple calls.
timing_.time_completed = MonoTime::Now();
Expand Down
4 changes: 4 additions & 0 deletions src/yb/rpc/inbound_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class InboundCall : public RpcCall {
// If the client did not specify a deadline, returns MonoTime::Max().
virtual MonoTime GetClientDeadline() const = 0;

// Returns the time spent in the service queue -- from the time the call was received, until
// it gets handled.
MonoDelta GetTimeInQueue() const;

virtual const std::string& method_name() const = 0;
virtual const std::string& service_name() const = 0;
virtual void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code, const Status& status) = 0;
Expand Down
54 changes: 49 additions & 5 deletions src/yb/rpc/service_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "yb/rpc/tasks_pool.h"

#include "yb/gutil/strings/substitute.h"
#include "yb/util/flag_tags.h"
#include "yb/util/metrics.h"
#include "yb/util/status.h"
#include "yb/util/thread.h"
Expand All @@ -55,6 +56,20 @@
using std::shared_ptr;
using strings::Substitute;

DEFINE_int64(max_time_in_queue_ms, 6000,
"Fail calls that get stuck in the queue longer than the specified amount of time "
"(in ms)");
TAG_FLAG(max_time_in_queue_ms, advanced);
TAG_FLAG(max_time_in_queue_ms, runtime);
DEFINE_int64(backpressure_recovery_period_ms, 600000,
"Once we hit a backpressure/service-overflow we will consider dropping stale requests "
"for this duration (in ms)");
TAG_FLAG(backpressure_recovery_period_ms, advanced);
TAG_FLAG(backpressure_recovery_period_ms, runtime);
DEFINE_test_flag(bool, enable_backpressure_mode_for_testing, false,
"For testing purposes. Enables the rpc's to be considered timed out in the queue even "
"when we have not had any backpressure in the recent past.");

METRIC_DEFINE_histogram(server, rpc_incoming_queue_time,
"RPC Queue Time",
yb::MetricUnit::kMicroseconds,
Expand All @@ -78,6 +93,8 @@ namespace rpc {

namespace {

static constexpr CoarseMonoClock::TimePoint kNone{CoarseMonoClock::TimePoint::min()};

class InboundCallTask final {
public:
InboundCallTask(ServicePoolImpl* pool, InboundCallPtr call)
Expand Down Expand Up @@ -152,6 +169,7 @@ class ServicePoolImpl {
const auto response_status = STATUS(ServiceUnavailable, err_msg);
rpcs_queue_overflow_->Increment();
call->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, response_status);
last_backpressure_at_.store(CoarseMonoClock::Now(), std::memory_order_release);
}

void Processed(const InboundCallPtr& call, const Status& status) {
Expand All @@ -173,15 +191,17 @@ class ServicePoolImpl {
incoming->RecordHandlingStarted(incoming_queue_time_);
ADOPT_TRACE(incoming->trace());

if (PREDICT_FALSE(incoming->ClientTimedOut())) {
TRACE_TO(incoming->trace(), "Skipping call since client already timed out");
if (PREDICT_FALSE(incoming->ClientTimedOut() || ShouldDropRequestDuringHighLoad(incoming))) {
const char* message =
(incoming->ClientTimedOut()
? "Call waited in the queue past deadline"
: "The server is overloaded. Call waited in the queue past max_time_in_queue.");
TRACE_TO(incoming->trace(), message);
rpcs_timed_out_in_queue_->Increment();

// Respond as a failure, even though the client will probably ignore
// the response anyway.
incoming->RespondFailure(
ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
STATUS(TimedOut, "Call waited in the queue past client deadline"));
incoming->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, STATUS(TimedOut, message));

return;
}
Expand All @@ -192,11 +212,35 @@ class ServicePoolImpl {
}

private:
bool ShouldDropRequestDuringHighLoad(InboundCallPtr incoming) {
auto last_backpressure_at = last_backpressure_at_.load(std::memory_order_acquire);

// For testing purposes.
if (FLAGS_enable_backpressure_mode_for_testing) {
last_backpressure_at = CoarseMonoClock::Now();
}

// Test for a sentinel value, to avoid reading the clock.
if (last_backpressure_at == kNone) {
return false;
}
auto now = CoarseMonoClock::Now();
if (ToMilliseconds(now.time_since_epoch()) <=
ToMilliseconds(last_backpressure_at.time_since_epoch()) +
FLAGS_backpressure_recovery_period_ms) {
last_backpressure_at_.store(kNone, std::memory_order_release);
return false;
}

return incoming->GetTimeInQueue().ToMilliseconds() > FLAGS_max_time_in_queue_ms;
}

ThreadPool* thread_pool_;
std::unique_ptr<ServiceIf> service_;
scoped_refptr<Histogram> incoming_queue_time_;
scoped_refptr<Counter> rpcs_timed_out_in_queue_;
scoped_refptr<Counter> rpcs_queue_overflow_;
std::atomic<CoarseMonoClock::TimePoint> last_backpressure_at_{kNone};

std::atomic<bool> closing_ = {false};
TasksPool<InboundCallTask> tasks_pool_;
Expand Down
23 changes: 23 additions & 0 deletions src/yb/yql/redis/redisserver/redisserver-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ DECLARE_uint64(redis_max_queued_bytes);
DECLARE_int64(redis_rpc_block_size);
DECLARE_bool(redis_safe_batch);
DECLARE_bool(emulate_redis_responses);
DECLARE_bool(enable_backpressure_mode_for_testing);
DECLARE_int32(redis_max_value_size);
DECLARE_int32(redis_max_command_size);
DECLARE_int32(rpc_max_message_size);
DECLARE_int32(consensus_max_batch_size_bytes);
DECLARE_int32(consensus_rpc_timeout_ms);
DECLARE_int64(max_time_in_queue_ms);

DEFINE_uint64(test_redis_max_concurrent_commands, 20,
"Value of redis_max_concurrent_commands for pipeline test");
Expand Down Expand Up @@ -715,6 +717,27 @@ TEST_F(TestRedisService, BatchedCommandsInline) {
"+OK\r\n+OK\r\n$3\r\nbar\r\n$1\r\n5\r\n");
}

TEST_F(TestRedisService, TestTimedoutInQueue) {
FLAGS_redis_max_batch = 1;
FLAGS_enable_backpressure_mode_for_testing = true;

DoRedisTestOk(__LINE__, {"SET", "foo", "value"});
DoRedisTestBulkString(__LINE__, {"GET", "foo"}, "value");
DoRedisTestOk(__LINE__, {"SET", "foo", "Test"});

// All calls past this call should fail.
DoRedisTestOk(__LINE__, {"DEBUGSLEEP", yb::ToString(FLAGS_max_time_in_queue_ms)});

const string expected_message =
"The server is overloaded. Call waited in the queue past max_time_in_queue.";
DoRedisTestExpectError(__LINE__, {"SET", "foo", "Test"}, expected_message);
DoRedisTestExpectError(__LINE__, {"GET", "foo"}, expected_message);
DoRedisTestExpectError(__LINE__, {"DEBUGSLEEP", "2000"}, expected_message);

SyncClient();
VerifyCallbacks();
}

TEST_F(TestRedisService, BatchedCommandsInlinePartial) {
for (int i = 0; i != 1000; ++i) {
ASSERT_NO_FATAL_FAILURE(
Expand Down

0 comments on commit d537ed2

Please sign in to comment.