Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: gracefully fail adding to the CompletionQueue after Shutdown #138

Merged
merged 3 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/cloud/grpc_utils/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ CompletionQueue::MakeDeadlineTimer(
std::chrono::system_clock::time_point deadline) {
auto op = std::make_shared<AsyncTimerFuture>(impl_->CreateAlarm());
void* tag = impl_->RegisterOperation(op);
op->Set(impl_->cq(), deadline, tag);
if (tag != nullptr) {
op->Set(impl_->cq(), deadline, tag);
}
return op->GetFuture();
}

Expand Down
4 changes: 3 additions & 1 deletion google/cloud/grpc_utils/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ class CompletionQueue {
auto op =
std::make_shared<internal::AsyncUnaryRpcFuture<Request, Response>>();
void* tag = impl_->RegisterOperation(op);
op->Start(async_call, std::move(context), request, &impl_->cq(), tag);
if (tag != nullptr) {
op->Start(async_call, std::move(context), request, &impl_->cq(), tag);
}
return op->GetFuture();
}

Expand Down
82 changes: 82 additions & 0 deletions google/cloud/grpc_utils/completion_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace btadmin = ::google::bigtable::admin::v2;
namespace btproto = ::google::bigtable::v2;
using ::testing::_;
using ::testing::Invoke;
using ::testing::StrictMock;

class MockClient {
public:
Expand Down Expand Up @@ -267,6 +268,54 @@ TEST(CompletionQueueTest, MakeStreamingReadRpc) {
runner.join();
}

TEST(CompletionQueueTest, MakeRpcsAfterShutdown) {
using ms = std::chrono::milliseconds;

auto mock_cq = std::make_shared<MockCompletionQueue>();
CompletionQueue cq(mock_cq);

// Use `StrictMock` to enforce that there are no calls made on the client.
StrictMock<MockClient> mock_client;
std::thread runner([&cq] { cq.Run(); });
cq.Shutdown();

btadmin::GetTableRequest get_table_request;
get_table_request.set_name("test-table-name");
future<void> done =
cq.MakeUnaryRpc(
[&mock_client](grpc::ClientContext* context,
btadmin::GetTableRequest const& request,
grpc::CompletionQueue* cq) {
return mock_client.AsyncGetTable(context, request, cq);
},
get_table_request,
google::cloud::internal::make_unique<grpc::ClientContext>())
.then([](future<StatusOr<btadmin::Table>> f) {
EXPECT_EQ(StatusCode::kCancelled, f.get().status().code());
});

btproto::ReadRowsRequest read_request;
read_request.set_table_name("test-table-name");
(void)cq.MakeStreamingReadRpc(
[&mock_client](grpc::ClientContext* context,
btproto::ReadRowsRequest const& request,
grpc::CompletionQueue* cq) {
return mock_client.AsyncReadRows(context, request, cq);
},
read_request, google::cloud::internal::make_unique<grpc::ClientContext>(),
[](btproto::ReadRowsResponse const&) {
ADD_FAILURE() << "OnReadHandler unexpectedly called";
return make_ready_future(true);
},
[](Status const& status) {
EXPECT_EQ(StatusCode::kCancelled, status.code());
});

mock_cq->SimulateCompletion(true);
EXPECT_EQ(std::future_status::ready, done.wait_for(ms(0)));
runner.join();
}

TEST(CompletionQueueTest, RunAsync) {
using ms = std::chrono::milliseconds;
CompletionQueue cq;
Expand All @@ -284,6 +333,39 @@ TEST(CompletionQueueTest, RunAsync) {
runner.join();
}

// Sets up a timer that reschedules itself and verfies we can shut down
// cleanly whether we call `CancelAll()` on the queue first or not.
namespace {
void RunAndReschedule(CompletionQueue& cq, bool ok) {
if (ok) {
cq.MakeRelativeTimer(std::chrono::seconds(1))
.then([&cq](future<StatusOr<std::chrono::system_clock::time_point>>
result) { RunAndReschedule(cq, result.get().ok()); });
}
}
} // namespace

TEST(CompletionQueueTest, ShutdownWithReschedulingTimer) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

RunAndReschedule(cq, /*ok=*/true);

cq.Shutdown();
t.join();
}

TEST(CompletionQueueTest, CancelAndShutdownWithReschedulingTimer) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

RunAndReschedule(cq, /*ok=*/true);

cq.CancelAll();
cq.Shutdown();
t.join();
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_GRPC_UTILS_NS
} // namespace grpc_utils
Expand Down
27 changes: 21 additions & 6 deletions google/cloud/grpc_utils/internal/async_read_stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,16 @@ class AsyncReadStreamImpl

context_ = std::move(context);
cq_ = std::move(cq);
reader_ = async_call(context_.get(), request, &cq_->cq());
auto callback = std::make_shared<NotifyStart>(this->shared_from_this());
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->StartCall(tag);
// @note If `tag == nullptr` the `CompletionQueue` has been `Shutdown()`.
// We leave `reader_` null in this case; other methods must make the
// same `tag != nullptr` check prior to accessing `reader_`. This is
// safe since `Shutdown()` cannot be undone.
if (tag != nullptr) {
reader_ = async_call(context_.get(), request, &cq_->cq());
reader_->StartCall(tag);
}
}

/// Cancel the current streaming read RPC.
Expand Down Expand Up @@ -196,7 +202,9 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyRead>(this->shared_from_this());
auto response = &callback->response;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Read(response, tag);
if (tag != nullptr) {
reader_->Read(response, tag);
}
}

/// Handle the result of a `Read()` call.
Expand Down Expand Up @@ -244,11 +252,16 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyFinish>(this->shared_from_this());
auto status = &callback->status;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Finish(status, tag);
if (tag != nullptr) {
reader_->Finish(status, tag);
}
}

/// Handle the result of a Finish() request.
void OnFinish(bool, Status status) { on_finish_(std::move(status)); }
void OnFinish(bool ok, Status status) {
on_finish_(ok ? std::move(status)
: Status(StatusCode::kCancelled, "call cancelled"));
}

/**
* Discard all the messages until OnRead() receives a failure.
Expand Down Expand Up @@ -279,7 +292,9 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyDiscard>(this->shared_from_this());
auto response = &callback->response;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Read(response, tag);
if (tag != nullptr) {
reader_->Read(response, tag);
}
}

/// Handle the result of a Discard() call.
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/grpc_utils/internal/completion_queue_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ void* CompletionQueueImpl::RegisterOperation(
std::shared_ptr<AsyncGrpcOperation> op) {
void* tag = op.get();
std::unique_lock<std::mutex> lk(mu_);
if (shutdown_) {
lk.unlock();
op->Notify(/*ok=*/false);
return nullptr;
}
auto ins =
pending_ops_.emplace(reinterpret_cast<std::intptr_t>(tag), std::move(op));
// After this point we no longer need the lock, so release it.
Expand Down
8 changes: 3 additions & 5 deletions google/cloud/grpc_utils/internal/completion_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ class AsyncGrpcOperation : public AsyncOperation {
* Derived classes wrap the callbacks provided by the application and invoke
* the callback when this virtual member function is called.
*
* @param cq the completion queue sending the notification, this is useful in
* case the callback needs to retry the operation.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed removing this in a prior PR

* @param ok opaque parameter returned by grpc::CompletionQueue. The
* semantics defined by gRPC depend on the type of operation, therefore the
* operation needs to interpret this parameter based on those semantics.
Expand Down Expand Up @@ -105,10 +103,10 @@ class AsyncUnaryRpcFuture : public AsyncGrpcOperation {
private:
bool Notify(bool ok) override {
if (!ok) {
// This would mean a bug in gRPC. The documentation states that Finish()
// always returns `true` for unary RPCs.
// `Finish()` always returns `true` for unary RPCs, so the only time we
// get `!ok` is after `Shutdown()` was called; treat that as "cancelled".
promise_.set_value(::google::cloud::Status(
google::cloud::StatusCode::kUnknown, "Finish() returned false"));
google::cloud::StatusCode::kCancelled, "call cancelled"));
return true;
}
if (!status_.ok()) {
Expand Down