Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: gracefully fail adding to the CompletionQueue after Shutdown #138

Merged
merged 3 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/cloud/grpc_utils/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ CompletionQueue::MakeDeadlineTimer(
std::chrono::system_clock::time_point deadline) {
auto op = std::make_shared<AsyncTimerFuture>(impl_->CreateAlarm());
void* tag = impl_->RegisterOperation(op);
op->Set(impl_->cq(), deadline, tag);
if (tag != nullptr) {
op->Set(impl_->cq(), deadline, tag);
}
return op->GetFuture();
}

Expand Down
4 changes: 3 additions & 1 deletion google/cloud/grpc_utils/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ class CompletionQueue {
auto op =
std::make_shared<internal::AsyncUnaryRpcFuture<Request, Response>>();
void* tag = impl_->RegisterOperation(op);
op->Start(async_call, std::move(context), request, &impl_->cq(), tag);
if (tag != nullptr) {
op->Start(async_call, std::move(context), request, &impl_->cq(), tag);
}
return op->GetFuture();
}

Expand Down
82 changes: 82 additions & 0 deletions google/cloud/grpc_utils/completion_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace btadmin = ::google::bigtable::admin::v2;
namespace btproto = ::google::bigtable::v2;
using ::testing::_;
using ::testing::Invoke;
using ::testing::StrictMock;

class MockClient {
public:
Expand Down Expand Up @@ -267,6 +268,54 @@ TEST(CompletionQueueTest, MakeStreamingReadRpc) {
runner.join();
}

TEST(CompletionQueueTest, MakeRpcsAfterShutdown) {
using ms = std::chrono::milliseconds;

auto mock_cq = std::make_shared<MockCompletionQueue>();
CompletionQueue cq(mock_cq);

// Use `StrictMock` to enforce that there are no calls made on the client.
StrictMock<MockClient> mock_client;
std::thread runner([&cq] { cq.Run(); });
cq.Shutdown();

btadmin::GetTableRequest get_table_request;
get_table_request.set_name("test-table-name");
future<void> done =
cq.MakeUnaryRpc(
[&mock_client](grpc::ClientContext* context,
btadmin::GetTableRequest const& request,
grpc::CompletionQueue* cq) {
return mock_client.AsyncGetTable(context, request, cq);
},
get_table_request,
google::cloud::internal::make_unique<grpc::ClientContext>())
.then([](future<StatusOr<btadmin::Table>> f) {
EXPECT_EQ(StatusCode::kCancelled, f.get().status().code());
});

btproto::ReadRowsRequest read_request;
read_request.set_table_name("test-table-name");
(void)cq.MakeStreamingReadRpc(
[&mock_client](grpc::ClientContext* context,
btproto::ReadRowsRequest const& request,
grpc::CompletionQueue* cq) {
return mock_client.AsyncReadRows(context, request, cq);
},
read_request, google::cloud::internal::make_unique<grpc::ClientContext>(),
[](btproto::ReadRowsResponse const&) {
ADD_FAILURE() << "OnReadHandler unexpectedly called";
return make_ready_future(true);
},
[](Status const& status) {
EXPECT_EQ(StatusCode::kCancelled, status.code());
});

mock_cq->SimulateCompletion(true);
EXPECT_EQ(std::future_status::ready, done.wait_for(ms(0)));
runner.join();
}

TEST(CompletionQueueTest, RunAsync) {
using ms = std::chrono::milliseconds;
CompletionQueue cq;
Expand All @@ -284,6 +333,39 @@ TEST(CompletionQueueTest, RunAsync) {
runner.join();
}

// Sets up a timer that reschedules itself and verfies we can shut down
// cleanly whether we call `CancelAll()` on the queue first or not.
namespace {
void RunAndReschedule(CompletionQueue& cq, bool ok) {
if (ok) {
cq.MakeRelativeTimer(std::chrono::seconds(1))
.then([&cq](future<StatusOr<std::chrono::system_clock::time_point>>
result) { RunAndReschedule(cq, result.get().ok()); });
}
}
} // namespace

TEST(CompletionQueueTest, ShutdownWithReschedulingTimer) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

RunAndReschedule(cq, /*ok=*/true);

cq.Shutdown();
t.join();
}

TEST(CompletionQueueTest, CancelAndShutdownWithReschedulingTimer) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

RunAndReschedule(cq, /*ok=*/true);

cq.CancelAll();
cq.Shutdown();
t.join();
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_GRPC_UTILS_NS
} // namespace grpc_utils
Expand Down
23 changes: 17 additions & 6 deletions google/cloud/grpc_utils/internal/async_read_stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@ class AsyncReadStreamImpl

context_ = std::move(context);
cq_ = std::move(cq);
reader_ = async_call(context_.get(), request, &cq_->cq());
auto callback = std::make_shared<NotifyStart>(this->shared_from_this());
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->StartCall(tag);
if (tag != nullptr) {
reader_ = async_call(context_.get(), request, &cq_->cq());
reader_->StartCall(tag);
}
}

/// Cancel the current streaming read RPC.
Expand Down Expand Up @@ -196,7 +198,9 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyRead>(this->shared_from_this());
auto response = &callback->response;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Read(response, tag);
if (tag != nullptr) {
reader_->Read(response, tag);
}
}

/// Handle the result of a `Read()` call.
Expand Down Expand Up @@ -244,11 +248,16 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyFinish>(this->shared_from_this());
auto status = &callback->status;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Finish(status, tag);
if (tag != nullptr) {
reader_->Finish(status, tag);
}
}

/// Handle the result of a Finish() request.
void OnFinish(bool, Status status) { on_finish_(std::move(status)); }
void OnFinish(bool ok, Status status) {
on_finish_(ok ? std::move(status)
: Status(StatusCode::kCancelled, "call cancelled"));
}

/**
* Discard all the messages until OnRead() receives a failure.
Expand Down Expand Up @@ -279,7 +288,9 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyDiscard>(this->shared_from_this());
auto response = &callback->response;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Read(response, tag);
if (tag != nullptr) {
reader_->Read(response, tag);
}
}

/// Handle the result of a Discard() call.
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/grpc_utils/internal/completion_queue_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ void* CompletionQueueImpl::RegisterOperation(
std::shared_ptr<AsyncGrpcOperation> op) {
void* tag = op.get();
std::unique_lock<std::mutex> lk(mu_);
if (shutdown_) {
lk.unlock();
op->Notify(/*ok=*/false);
return nullptr;
}
auto ins =
pending_ops_.emplace(reinterpret_cast<std::intptr_t>(tag), std::move(op));
// After this point we no longer need the lock, so release it.
Expand Down
8 changes: 3 additions & 5 deletions google/cloud/grpc_utils/internal/completion_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ class AsyncGrpcOperation : public AsyncOperation {
* Derived classes wrap the callbacks provided by the application and invoke
* the callback when this virtual member function is called.
*
* @param cq the completion queue sending the notification, this is useful in
* case the callback needs to retry the operation.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed removing this in a prior PR

* @param ok opaque parameter returned by grpc::CompletionQueue. The
* semantics defined by gRPC depend on the type of operation, therefore the
* operation needs to interpret this parameter based on those semantics.
Expand Down Expand Up @@ -105,10 +103,10 @@ class AsyncUnaryRpcFuture : public AsyncGrpcOperation {
private:
bool Notify(bool ok) override {
if (!ok) {
// This would mean a bug in gRPC. The documentation states that Finish()
// always returns `true` for unary RPCs.
// `Finish()` always returns `true` for unary RPCs, so the only time we
// get `!ok` is after `Shutdown()` was called; treat that as "cancelled".
promise_.set_value(::google::cloud::Status(
google::cloud::StatusCode::kUnknown, "Finish() returned false"));
google::cloud::StatusCode::kCancelled, "call cancelled"));
return true;
}
if (!status_.ok()) {
Expand Down