From 9654a6f61390cda86d90106b14ab51e812cf839c Mon Sep 17 00:00:00 2001 From: Todd Derr Date: Fri, 31 Jan 2020 14:00:14 -0500 Subject: [PATCH] feat: gracefully fail adding to the `CompletionQueue` after `Shutdown` (googleapis/google-cloud-cpp-common#138) * feat: gracefully fail adding to the `CompletionQueue` after `Shutdown` The grpc `CompletionQueue` asserts if items are added to it after `Shutdown()` is called. Make our `CompletionQueue` a little friendlier by just failing those operations immediately. Part of googleapis/google-cloud-cpp-common#129 * Add a test for making RPCs after `Shutdown()`, which uncovered a couple bugs * Add a comment about the safety of leaving reader_ null in `Start`. --- google/cloud/grpc_utils/completion_queue.cc | 4 +- google/cloud/grpc_utils/completion_queue.h | 4 +- .../cloud/grpc_utils/completion_queue_test.cc | 82 +++++++++++++++++++ .../internal/async_read_stream_impl.h | 27 ++++-- .../internal/completion_queue_impl.cc | 5 ++ .../internal/completion_queue_impl.h | 8 +- 6 files changed, 117 insertions(+), 13 deletions(-) diff --git a/google/cloud/grpc_utils/completion_queue.cc b/google/cloud/grpc_utils/completion_queue.cc index 9b0d8b6a404fc..91de4343109ba 100644 --- a/google/cloud/grpc_utils/completion_queue.cc +++ b/google/cloud/grpc_utils/completion_queue.cc @@ -91,7 +91,9 @@ CompletionQueue::MakeDeadlineTimer( std::chrono::system_clock::time_point deadline) { auto op = std::make_shared(impl_->CreateAlarm()); void* tag = impl_->RegisterOperation(op); - op->Set(impl_->cq(), deadline, tag); + if (tag != nullptr) { + op->Set(impl_->cq(), deadline, tag); + } return op->GetFuture(); } diff --git a/google/cloud/grpc_utils/completion_queue.h b/google/cloud/grpc_utils/completion_queue.h index c82d39fec641a..f56c2ec8b66b2 100644 --- a/google/cloud/grpc_utils/completion_queue.h +++ b/google/cloud/grpc_utils/completion_queue.h @@ -116,7 +116,9 @@ class CompletionQueue { auto op = std::make_shared>(); void* tag = impl_->RegisterOperation(op); - op->Start(async_call, std::move(context), request, &impl_->cq(), tag); + if (tag != nullptr) { + op->Start(async_call, std::move(context), request, &impl_->cq(), tag); + } return op->GetFuture(); } diff --git a/google/cloud/grpc_utils/completion_queue_test.cc b/google/cloud/grpc_utils/completion_queue_test.cc index 670ea9b2ce9d3..b2761ed58eb2c 100644 --- a/google/cloud/grpc_utils/completion_queue_test.cc +++ b/google/cloud/grpc_utils/completion_queue_test.cc @@ -37,6 +37,7 @@ namespace btadmin = ::google::bigtable::admin::v2; namespace btproto = ::google::bigtable::v2; using ::testing::_; using ::testing::Invoke; +using ::testing::StrictMock; class MockClient { public: @@ -267,6 +268,54 @@ TEST(CompletionQueueTest, MakeStreamingReadRpc) { runner.join(); } +TEST(CompletionQueueTest, MakeRpcsAfterShutdown) { + using ms = std::chrono::milliseconds; + + auto mock_cq = std::make_shared(); + CompletionQueue cq(mock_cq); + + // Use `StrictMock` to enforce that there are no calls made on the client. + StrictMock mock_client; + std::thread runner([&cq] { cq.Run(); }); + cq.Shutdown(); + + btadmin::GetTableRequest get_table_request; + get_table_request.set_name("test-table-name"); + future done = + cq.MakeUnaryRpc( + [&mock_client](grpc::ClientContext* context, + btadmin::GetTableRequest const& request, + grpc::CompletionQueue* cq) { + return mock_client.AsyncGetTable(context, request, cq); + }, + get_table_request, + google::cloud::internal::make_unique()) + .then([](future> f) { + EXPECT_EQ(StatusCode::kCancelled, f.get().status().code()); + }); + + btproto::ReadRowsRequest read_request; + read_request.set_table_name("test-table-name"); + (void)cq.MakeStreamingReadRpc( + [&mock_client](grpc::ClientContext* context, + btproto::ReadRowsRequest const& request, + grpc::CompletionQueue* cq) { + return mock_client.AsyncReadRows(context, request, cq); + }, + read_request, google::cloud::internal::make_unique(), + [](btproto::ReadRowsResponse const&) { + ADD_FAILURE() << "OnReadHandler unexpectedly called"; + return make_ready_future(true); + }, + [](Status const& status) { + EXPECT_EQ(StatusCode::kCancelled, status.code()); + }); + + mock_cq->SimulateCompletion(true); + EXPECT_EQ(std::future_status::ready, done.wait_for(ms(0))); + runner.join(); +} + TEST(CompletionQueueTest, RunAsync) { CompletionQueue cq; @@ -282,6 +331,39 @@ TEST(CompletionQueueTest, RunAsync) { runner.join(); } +// Sets up a timer that reschedules itself and verfies we can shut down +// cleanly whether we call `CancelAll()` on the queue first or not. +namespace { +void RunAndReschedule(CompletionQueue& cq, bool ok) { + if (ok) { + cq.MakeRelativeTimer(std::chrono::seconds(1)) + .then([&cq](future> + result) { RunAndReschedule(cq, result.get().ok()); }); + } +} +} // namespace + +TEST(CompletionQueueTest, ShutdownWithReschedulingTimer) { + CompletionQueue cq; + std::thread t([&cq] { cq.Run(); }); + + RunAndReschedule(cq, /*ok=*/true); + + cq.Shutdown(); + t.join(); +} + +TEST(CompletionQueueTest, CancelAndShutdownWithReschedulingTimer) { + CompletionQueue cq; + std::thread t([&cq] { cq.Run(); }); + + RunAndReschedule(cq, /*ok=*/true); + + cq.CancelAll(); + cq.Shutdown(); + t.join(); +} + } // namespace } // namespace GOOGLE_CLOUD_CPP_GRPC_UTILS_NS } // namespace grpc_utils diff --git a/google/cloud/grpc_utils/internal/async_read_stream_impl.h b/google/cloud/grpc_utils/internal/async_read_stream_impl.h index 483e488485860..216e2240eaece 100644 --- a/google/cloud/grpc_utils/internal/async_read_stream_impl.h +++ b/google/cloud/grpc_utils/internal/async_read_stream_impl.h @@ -155,10 +155,16 @@ class AsyncReadStreamImpl context_ = std::move(context); cq_ = std::move(cq); - reader_ = async_call(context_.get(), request, &cq_->cq()); auto callback = std::make_shared(this->shared_from_this()); void* tag = cq_->RegisterOperation(std::move(callback)); - reader_->StartCall(tag); + // @note If `tag == nullptr` the `CompletionQueue` has been `Shutdown()`. + // We leave `reader_` null in this case; other methods must make the + // same `tag != nullptr` check prior to accessing `reader_`. This is + // safe since `Shutdown()` cannot be undone. + if (tag != nullptr) { + reader_ = async_call(context_.get(), request, &cq_->cq()); + reader_->StartCall(tag); + } } /// Cancel the current streaming read RPC. @@ -196,7 +202,9 @@ class AsyncReadStreamImpl auto callback = std::make_shared(this->shared_from_this()); auto response = &callback->response; void* tag = cq_->RegisterOperation(std::move(callback)); - reader_->Read(response, tag); + if (tag != nullptr) { + reader_->Read(response, tag); + } } /// Handle the result of a `Read()` call. @@ -244,11 +252,16 @@ class AsyncReadStreamImpl auto callback = std::make_shared(this->shared_from_this()); auto status = &callback->status; void* tag = cq_->RegisterOperation(std::move(callback)); - reader_->Finish(status, tag); + if (tag != nullptr) { + reader_->Finish(status, tag); + } } /// Handle the result of a Finish() request. - void OnFinish(bool, Status status) { on_finish_(std::move(status)); } + void OnFinish(bool ok, Status status) { + on_finish_(ok ? std::move(status) + : Status(StatusCode::kCancelled, "call cancelled")); + } /** * Discard all the messages until OnRead() receives a failure. @@ -279,7 +292,9 @@ class AsyncReadStreamImpl auto callback = std::make_shared(this->shared_from_this()); auto response = &callback->response; void* tag = cq_->RegisterOperation(std::move(callback)); - reader_->Read(response, tag); + if (tag != nullptr) { + reader_->Read(response, tag); + } } /// Handle the result of a Discard() call. diff --git a/google/cloud/grpc_utils/internal/completion_queue_impl.cc b/google/cloud/grpc_utils/internal/completion_queue_impl.cc index 03c16e92f947b..971add3e66960 100644 --- a/google/cloud/grpc_utils/internal/completion_queue_impl.cc +++ b/google/cloud/grpc_utils/internal/completion_queue_impl.cc @@ -78,6 +78,11 @@ void* CompletionQueueImpl::RegisterOperation( std::shared_ptr op) { void* tag = op.get(); std::unique_lock lk(mu_); + if (shutdown_) { + lk.unlock(); + op->Notify(/*ok=*/false); + return nullptr; + } auto ins = pending_ops_.emplace(reinterpret_cast(tag), std::move(op)); // After this point we no longer need the lock, so release it. diff --git a/google/cloud/grpc_utils/internal/completion_queue_impl.h b/google/cloud/grpc_utils/internal/completion_queue_impl.h index 660592aed408b..783d397c80460 100644 --- a/google/cloud/grpc_utils/internal/completion_queue_impl.h +++ b/google/cloud/grpc_utils/internal/completion_queue_impl.h @@ -54,8 +54,6 @@ class AsyncGrpcOperation : public AsyncOperation { * Derived classes wrap the callbacks provided by the application and invoke * the callback when this virtual member function is called. * - * @param cq the completion queue sending the notification, this is useful in - * case the callback needs to retry the operation. * @param ok opaque parameter returned by grpc::CompletionQueue. The * semantics defined by gRPC depend on the type of operation, therefore the * operation needs to interpret this parameter based on those semantics. @@ -105,10 +103,10 @@ class AsyncUnaryRpcFuture : public AsyncGrpcOperation { private: bool Notify(bool ok) override { if (!ok) { - // This would mean a bug in gRPC. The documentation states that Finish() - // always returns `true` for unary RPCs. + // `Finish()` always returns `true` for unary RPCs, so the only time we + // get `!ok` is after `Shutdown()` was called; treat that as "cancelled". promise_.set_value(::google::cloud::Status( - google::cloud::StatusCode::kUnknown, "Finish() returned false")); + google::cloud::StatusCode::kCancelled, "call cancelled")); return true; } if (!status_.ok()) {