Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: gracefully fail adding to the CompletionQueue after Shutdown #138

Merged
merged 3 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/cloud/grpc_utils/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ CompletionQueue::MakeDeadlineTimer(
std::chrono::system_clock::time_point deadline) {
auto op = std::make_shared<AsyncTimerFuture>(impl_->CreateAlarm());
void* tag = impl_->RegisterOperation(op);
op->Set(impl_->cq(), deadline, tag);
if (tag != nullptr) {
op->Set(impl_->cq(), deadline, tag);
}
return op->GetFuture();
}

Expand Down
33 changes: 33 additions & 0 deletions google/cloud/grpc_utils/completion_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,39 @@ TEST(CompletionQueueTest, RunAsync) {
runner.join();
}

// Sets up a timer that reschedules itself and verfies we can shut down
// cleanly whether we call `CancelAll()` on the queue first or not.
namespace {
void RunAndReschedule(CompletionQueue& cq, bool ok) {
if (ok) {
cq.MakeRelativeTimer(std::chrono::seconds(1))
.then([&cq](future<StatusOr<std::chrono::system_clock::time_point>>
result) { RunAndReschedule(cq, result.get().ok()); });
}
}
} // namespace

TEST(CompletionQueueTest, ShutdownWithReschedulingTimer) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

RunAndReschedule(cq, /*ok=*/true);

cq.Shutdown();
t.join();
}

TEST(CompletionQueueTest, CancelAndShutdownWithReschedulingTimer) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

RunAndReschedule(cq, /*ok=*/true);

cq.CancelAll();
cq.Shutdown();
t.join();
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_GRPC_UTILS_NS
} // namespace grpc_utils
Expand Down
16 changes: 12 additions & 4 deletions google/cloud/grpc_utils/internal/async_read_stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ class AsyncReadStreamImpl
reader_ = async_call(context_.get(), request, &cq_->cq());
auto callback = std::make_shared<NotifyStart>(this->shared_from_this());
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->StartCall(tag);
if (tag != nullptr) {
reader_->StartCall(tag);
}
}

/// Cancel the current streaming read RPC.
Expand Down Expand Up @@ -196,7 +198,9 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyRead>(this->shared_from_this());
auto response = &callback->response;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Read(response, tag);
if (tag != nullptr) {
reader_->Read(response, tag);
}
}

/// Handle the result of a `Read()` call.
Expand Down Expand Up @@ -244,7 +248,9 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyFinish>(this->shared_from_this());
auto status = &callback->status;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Finish(status, tag);
if (tag != nullptr) {
reader_->Finish(status, tag);
}
}

/// Handle the result of a Finish() request.
Expand Down Expand Up @@ -279,7 +285,9 @@ class AsyncReadStreamImpl
auto callback = std::make_shared<NotifyDiscard>(this->shared_from_this());
auto response = &callback->response;
void* tag = cq_->RegisterOperation(std::move(callback));
reader_->Read(response, tag);
if (tag != nullptr) {
reader_->Read(response, tag);
}
}

/// Handle the result of a Discard() call.
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/grpc_utils/internal/completion_queue_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ void* CompletionQueueImpl::RegisterOperation(
std::shared_ptr<AsyncGrpcOperation> op) {
void* tag = op.get();
std::unique_lock<std::mutex> lk(mu_);
if (shutdown_) {
lk.unlock();
op->Notify(/*ok=*/false);
return nullptr;
}
auto ins =
pending_ops_.emplace(reinterpret_cast<std::intptr_t>(tag), std::move(op));
// After this point we no longer need the lock, so release it.
Expand Down
2 changes: 0 additions & 2 deletions google/cloud/grpc_utils/internal/completion_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ class AsyncGrpcOperation : public AsyncOperation {
* Derived classes wrap the callbacks provided by the application and invoke
* the callback when this virtual member function is called.
*
* @param cq the completion queue sending the notification, this is useful in
* case the callback needs to retry the operation.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed removing this in a prior PR

* @param ok opaque parameter returned by grpc::CompletionQueue. The
* semantics defined by gRPC depend on the type of operation, therefore the
* operation needs to interpret this parameter based on those semantics.
Expand Down