Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat!: change the result type for timers to StatusOr #134

Merged
merged 2 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions google/cloud/grpc_utils/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AsyncTimerFuture : public internal::AsyncGrpcOperation {
explicit AsyncTimerFuture(std::unique_ptr<grpc::Alarm> alarm)
: alarm_(std::move(alarm)) {}

future<std::chrono::system_clock::time_point> GetFuture() {
future<StatusOr<std::chrono::system_clock::time_point>> GetFuture() {
return promise_.get_future();
}

Expand All @@ -61,12 +61,16 @@ class AsyncTimerFuture : public internal::AsyncGrpcOperation {
}

private:
bool Notify(CompletionQueue&, bool) override {
promise_.set_value(deadline_);
bool Notify(CompletionQueue&, bool ok) override {
if (!ok) {
promise_.set_value(Status(StatusCode::kCancelled, "timer canceled"));
} else {
promise_.set_value(deadline_);
}
return true;
}

promise<std::chrono::system_clock::time_point> promise_;
promise<StatusOr<std::chrono::system_clock::time_point>> promise_;
std::chrono::system_clock::time_point deadline_;
/// Holds the underlying handle. It might be a nullptr in tests.
std::unique_ptr<grpc::Alarm> alarm_;
Expand All @@ -82,7 +86,7 @@ void CompletionQueue::Shutdown() { impl_->Shutdown(); }

void CompletionQueue::CancelAll() { impl_->CancelAll(); }

google::cloud::future<std::chrono::system_clock::time_point>
google::cloud::future<StatusOr<std::chrono::system_clock::time_point>>
CompletionQueue::MakeDeadlineTimer(
std::chrono::system_clock::time_point deadline) {
auto op = std::make_shared<AsyncTimerFuture>(impl_->CreateAlarm());
Expand Down
13 changes: 10 additions & 3 deletions google/cloud/grpc_utils/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ class CompletionQueue {
* @param deadline when should the timer expire.
*
* @return a future that becomes satisfied after @p deadline.
* The result of the future is the time at which it expired, or an error
* Status if the timer did not run to expiration (e.g. it was cancelled).
*/
google::cloud::future<std::chrono::system_clock::time_point>
google::cloud::future<StatusOr<std::chrono::system_clock::time_point>>
MakeDeadlineTimer(std::chrono::system_clock::time_point deadline);

/**
Expand All @@ -75,9 +77,11 @@ class CompletionQueue {
* @param duration when should the timer expire relative to the current time.
*
* @return a future that becomes satisfied after @p duration time has elapsed.
* The result of the future is the time at which it expired, or an error
* Status if the timer did not run to expiration (e.g. it was cancelled).
*/
template <typename Rep, typename Period>
future<std::chrono::system_clock::time_point> MakeRelativeTimer(
future<StatusOr<std::chrono::system_clock::time_point>> MakeRelativeTimer(
std::chrono::duration<Rep, Period> duration) {
return MakeDeadlineTimer(std::chrono::system_clock::now() + duration);
}
Expand Down Expand Up @@ -175,7 +179,10 @@ class CompletionQueue {
internal::CheckRunAsyncCallback<Functor>::value, int>::type = 0>
void RunAsync(Functor&& functor) {
MakeRelativeTimer(std::chrono::seconds(0))
.then([this, functor](future<std::chrono::system_clock::time_point>) {
.then([this, functor](
future<StatusOr<std::chrono::system_clock::time_point>>) {
// We intentionally ignore the status here; the functor is always
// called, even after a call to `CancelAll`.
CompletionQueue cq(impl_);
functor(cq);
});
Expand Down
22 changes: 16 additions & 6 deletions google/cloud/grpc_utils/completion_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "google/cloud/grpc_utils/completion_queue.h"
#include "google/cloud/future.h"
#include "google/cloud/testing_util/assert_ok.h"
#include <gtest/gtest.h>
#include <chrono>
#include <memory>
Expand All @@ -38,7 +39,8 @@ TEST(CompletionQueueTest, TimerSmokeTest) {
using ms = std::chrono::milliseconds;
promise<void> wait_for_sleep;
cq.MakeRelativeTimer(ms(2))
.then([&wait_for_sleep](future<std::chrono::system_clock::time_point>) {
.then([&wait_for_sleep](
future<StatusOr<std::chrono::system_clock::time_point>>) {
wait_for_sleep.set_value();
})
.get();
Expand All @@ -56,7 +58,8 @@ TEST(CompletionQueueTest, MockSmokeTest) {
using ms = std::chrono::milliseconds;
promise<void> wait_for_sleep;
cq.MakeRelativeTimer(ms(20000)).then(
[&wait_for_sleep](future<std::chrono::system_clock::time_point>) {
[&wait_for_sleep](
future<StatusOr<std::chrono::system_clock::time_point>>) {
wait_for_sleep.set_value();
});
mock->SimulateCompletion(cq, true);
Expand All @@ -74,7 +77,10 @@ TEST(CompletionQueueTest, ShutdownWithPending) {
CompletionQueue cq;
std::thread runner([&cq] { cq.Run(); });
timer = cq.MakeRelativeTimer(ms(20)).then(
[](future<std::chrono::system_clock::time_point>) {});
[](future<StatusOr<std::chrono::system_clock::time_point>> result) {
// Timer still runs to completion after `Shutdown`.
EXPECT_STATUS_OK(result.get().status());
});
EXPECT_EQ(std::future_status::timeout, timer.wait_for(ms(0)));
cq.Shutdown();
EXPECT_EQ(std::future_status::timeout, timer.wait_for(ms(0)));
Expand All @@ -92,9 +98,13 @@ TEST(CompletionQueueTest, CanCancelAllEvents) {
cq.Run();
done.set_value();
});
cq.MakeRelativeTimer(ms(20000));
cq.MakeRelativeTimer(ms(20000));
cq.MakeRelativeTimer(ms(20000));
for (int i = 0; i < 3; ++i) {
cq.MakeRelativeTimer(ms(20000)).then(
[](future<StatusOr<std::chrono::system_clock::time_point>> result) {
// Cancelled timers return CANCELLED status.
EXPECT_EQ(StatusCode::kCancelled, result.get().status().code());
});
}
auto f = done.get_future();
EXPECT_EQ(std::future_status::timeout, f.wait_for(ms(1)));
cq.Shutdown();
Expand Down