Skip to content

Commit

Permalink
feat!: change the result type for timers to StatusOr (googleapis/goog…
Browse files Browse the repository at this point in the history
…le-cloud-cpp-common#134)

The result of the `future` returned from `MakeDeadlineTimer` and`MakeRelativeTimer` is now a `StatusOr` that indicates whether the timer ran to expiration or not (e.g. it was cancelled).

Part of googleapis/google-cloud-cpp-common#129
  • Loading branch information
mr-salty authored Jan 27, 2020
1 parent a4e8890 commit 7742997
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 14 deletions.
14 changes: 9 additions & 5 deletions google/cloud/grpc_utils/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AsyncTimerFuture : public internal::AsyncGrpcOperation {
explicit AsyncTimerFuture(std::unique_ptr<grpc::Alarm> alarm)
: alarm_(std::move(alarm)) {}

future<std::chrono::system_clock::time_point> GetFuture() {
future<StatusOr<std::chrono::system_clock::time_point>> GetFuture() {
return promise_.get_future();
}

Expand All @@ -61,12 +61,16 @@ class AsyncTimerFuture : public internal::AsyncGrpcOperation {
}

private:
bool Notify(CompletionQueue&, bool) override {
promise_.set_value(deadline_);
bool Notify(CompletionQueue&, bool ok) override {
if (!ok) {
promise_.set_value(Status(StatusCode::kCancelled, "timer canceled"));
} else {
promise_.set_value(deadline_);
}
return true;
}

promise<std::chrono::system_clock::time_point> promise_;
promise<StatusOr<std::chrono::system_clock::time_point>> promise_;
std::chrono::system_clock::time_point deadline_;
/// Holds the underlying handle. It might be a nullptr in tests.
std::unique_ptr<grpc::Alarm> alarm_;
Expand All @@ -82,7 +86,7 @@ void CompletionQueue::Shutdown() { impl_->Shutdown(); }

void CompletionQueue::CancelAll() { impl_->CancelAll(); }

google::cloud::future<std::chrono::system_clock::time_point>
google::cloud::future<StatusOr<std::chrono::system_clock::time_point>>
CompletionQueue::MakeDeadlineTimer(
std::chrono::system_clock::time_point deadline) {
auto op = std::make_shared<AsyncTimerFuture>(impl_->CreateAlarm());
Expand Down
13 changes: 10 additions & 3 deletions google/cloud/grpc_utils/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ class CompletionQueue {
* @param deadline when should the timer expire.
*
* @return a future that becomes satisfied after @p deadline.
* The result of the future is the time at which it expired, or an error
* Status if the timer did not run to expiration (e.g. it was cancelled).
*/
google::cloud::future<std::chrono::system_clock::time_point>
google::cloud::future<StatusOr<std::chrono::system_clock::time_point>>
MakeDeadlineTimer(std::chrono::system_clock::time_point deadline);

/**
Expand All @@ -75,9 +77,11 @@ class CompletionQueue {
* @param duration when should the timer expire relative to the current time.
*
* @return a future that becomes satisfied after @p duration time has elapsed.
* The result of the future is the time at which it expired, or an error
* Status if the timer did not run to expiration (e.g. it was cancelled).
*/
template <typename Rep, typename Period>
future<std::chrono::system_clock::time_point> MakeRelativeTimer(
future<StatusOr<std::chrono::system_clock::time_point>> MakeRelativeTimer(
std::chrono::duration<Rep, Period> duration) {
return MakeDeadlineTimer(std::chrono::system_clock::now() + duration);
}
Expand Down Expand Up @@ -175,7 +179,10 @@ class CompletionQueue {
internal::CheckRunAsyncCallback<Functor>::value, int>::type = 0>
void RunAsync(Functor&& functor) {
MakeRelativeTimer(std::chrono::seconds(0))
.then([this, functor](future<std::chrono::system_clock::time_point>) {
.then([this, functor](
future<StatusOr<std::chrono::system_clock::time_point>>) {
// We intentionally ignore the status here; the functor is always
// called, even after a call to `CancelAll`.
CompletionQueue cq(impl_);
functor(cq);
});
Expand Down
22 changes: 16 additions & 6 deletions google/cloud/grpc_utils/completion_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "google/cloud/grpc_utils/completion_queue.h"
#include "google/cloud/future.h"
#include "google/cloud/testing_util/assert_ok.h"
#include <gtest/gtest.h>
#include <chrono>
#include <memory>
Expand All @@ -38,7 +39,8 @@ TEST(CompletionQueueTest, TimerSmokeTest) {
using ms = std::chrono::milliseconds;
promise<void> wait_for_sleep;
cq.MakeRelativeTimer(ms(2))
.then([&wait_for_sleep](future<std::chrono::system_clock::time_point>) {
.then([&wait_for_sleep](
future<StatusOr<std::chrono::system_clock::time_point>>) {
wait_for_sleep.set_value();
})
.get();
Expand All @@ -56,7 +58,8 @@ TEST(CompletionQueueTest, MockSmokeTest) {
using ms = std::chrono::milliseconds;
promise<void> wait_for_sleep;
cq.MakeRelativeTimer(ms(20000)).then(
[&wait_for_sleep](future<std::chrono::system_clock::time_point>) {
[&wait_for_sleep](
future<StatusOr<std::chrono::system_clock::time_point>>) {
wait_for_sleep.set_value();
});
mock->SimulateCompletion(cq, true);
Expand All @@ -74,7 +77,10 @@ TEST(CompletionQueueTest, ShutdownWithPending) {
CompletionQueue cq;
std::thread runner([&cq] { cq.Run(); });
timer = cq.MakeRelativeTimer(ms(20)).then(
[](future<std::chrono::system_clock::time_point>) {});
[](future<StatusOr<std::chrono::system_clock::time_point>> result) {
// Timer still runs to completion after `Shutdown`.
EXPECT_STATUS_OK(result.get().status());
});
EXPECT_EQ(std::future_status::timeout, timer.wait_for(ms(0)));
cq.Shutdown();
EXPECT_EQ(std::future_status::timeout, timer.wait_for(ms(0)));
Expand All @@ -92,9 +98,13 @@ TEST(CompletionQueueTest, CanCancelAllEvents) {
cq.Run();
done.set_value();
});
cq.MakeRelativeTimer(ms(20000));
cq.MakeRelativeTimer(ms(20000));
cq.MakeRelativeTimer(ms(20000));
for (int i = 0; i < 3; ++i) {
cq.MakeRelativeTimer(ms(20000)).then(
[](future<StatusOr<std::chrono::system_clock::time_point>> result) {
// Cancelled timers return CANCELLED status.
EXPECT_EQ(StatusCode::kCancelled, result.get().status().code());
});
}
auto f = done.get_future();
EXPECT_EQ(std::future_status::timeout, f.wait_for(ms(1)));
cq.Shutdown();
Expand Down

0 comments on commit 7742997

Please sign in to comment.