diff --git a/google/cloud/completion_queue.cc b/google/cloud/completion_queue.cc index 679a69a3969bc..dd43050a08293 100644 --- a/google/cloud/completion_queue.cc +++ b/google/cloud/completion_queue.cc @@ -38,7 +38,8 @@ namespace { class AsyncTimerFuture : public internal::AsyncGrpcOperation { public: explicit AsyncTimerFuture(std::unique_ptr alarm) - : alarm_(std::move(alarm)) {} + : promise_(/*cancellation_callback=*/[this] { Cancel(); }), + alarm_(std::move(alarm)) {} future> GetFuture() { return promise_.get_future(); diff --git a/google/cloud/completion_queue_test.cc b/google/cloud/completion_queue_test.cc index a1b78c21040de..f96245cef8ce7 100644 --- a/google/cloud/completion_queue_test.cc +++ b/google/cloud/completion_queue_test.cc @@ -330,7 +330,7 @@ TEST(CompletionQueueTest, RunAsync) { runner.join(); } -// Sets up a timer that reschedules itself and verfies we can shut down +// Sets up a timer that reschedules itself and verifies we can shut down // cleanly whether we call `CancelAll()` on the queue first or not. namespace { void RunAndReschedule(CompletionQueue& cq, bool ok) { @@ -363,6 +363,35 @@ TEST(CompletionQueueTest, CancelAndShutdownWithReschedulingTimer) { t.join(); } +TEST(CompletionQueueTest, CancelTimerSimple) { + CompletionQueue cq; + std::thread t([&cq] { cq.Run(); }); + + using ms = std::chrono::milliseconds; + auto fut = cq.MakeRelativeTimer(ms(20000)); + fut.cancel(); + auto tp = fut.get(); + EXPECT_FALSE(tp.ok()) << ", status=" << tp.status(); + cq.Shutdown(); + t.join(); +} + +TEST(CompletionQueueTest, CancelTimerContinuation) { + CompletionQueue cq; + std::thread t([&cq] { cq.Run(); }); + + using ms = std::chrono::milliseconds; + auto fut = cq.MakeRelativeTimer(ms(20000)).then( + [](future> f) { + return f.get().status(); + }); + fut.cancel(); + auto status = fut.get(); + EXPECT_FALSE(status.ok()) << ", status=" << status; + cq.Shutdown(); + t.join(); +} + } // namespace } // namespace GOOGLE_CLOUD_CPP_NS } // namespace cloud diff --git a/google/cloud/future_generic_then_test.cc b/google/cloud/future_generic_then_test.cc index 2d21bf0a8abb2..785baf65dbf55 100644 --- a/google/cloud/future_generic_then_test.cc +++ b/google/cloud/future_generic_then_test.cc @@ -175,6 +175,18 @@ TEST(FutureTestInt, ThenByCopy) { EXPECT_FALSE(next.valid()); } +/// @test Verify the behavior around cancellation. +TEST(FutureTestInt, CancelThroughContinuation) { + bool cancelled = false; + promise p0([&cancelled] { cancelled = true; }); + auto f0 = p0.get_future(); + auto f1 = f0.then([](future f) { return f.get() * 2; }); + EXPECT_TRUE(f1.cancel()); + EXPECT_TRUE(cancelled); + p0.set_value(42); + EXPECT_EQ(84, f1.get()); +} + // The following tests reference the technical specification: // http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/p0159r0.html // The test names match the section and paragraph from the TS. diff --git a/google/cloud/future_void_then_test.cc b/google/cloud/future_void_then_test.cc index 4c80916a88708..16ff0e8366e71 100644 --- a/google/cloud/future_void_then_test.cc +++ b/google/cloud/future_void_then_test.cc @@ -166,6 +166,18 @@ TEST(FutureTestVoid, ThenByCopy) { // http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/p0159r0.html // The test names match the section and paragraph from the TS. +/// @test Verify the behavior around cancellation. +TEST(FutureTestVoid, CancelThroughContinuation) { + bool cancelled = false; + promise p0([&cancelled] { cancelled = true; }); + auto f0 = p0.get_future(); + auto f1 = f0.then([](future) { return 7; }); + EXPECT_TRUE(f1.cancel()); + EXPECT_TRUE(cancelled); + p0.set_value(); + EXPECT_EQ(7, f1.get()); +} + /// @test Verify conformance with section 2.3 of the Concurrency TS. TEST(FutureTestVoid, conform_2_3_2_a) { // future should have an unwrapping constructor. diff --git a/google/cloud/internal/future_impl.h b/google/cloud/internal/future_impl.h index 3b3c3ed6940fa..9c32e5442b64b 100644 --- a/google/cloud/internal/future_impl.h +++ b/google/cloud/internal/future_impl.h @@ -69,11 +69,11 @@ class continuation_base { class future_shared_state_base { public: future_shared_state_base() : future_shared_state_base([] {}) {} - future_shared_state_base(std::function cancellation_callback) + explicit future_shared_state_base(std::function cancellation_callback) : mu_(), cv_(), current_state_(state::not_ready), - cancellation_callback_(cancellation_callback) {} + cancellation_callback_(std::move(cancellation_callback)) {} /// Return true if the shared state has a value or an exception. bool is_ready() const { std::unique_lock lk(mu_); @@ -204,6 +204,10 @@ class future_shared_state_base { continuation_ = std::move(c); } + std::function extract_cancellation_callback() { + return std::move(cancellation_callback_); + } + // Try to cancel the task by invoking the cancellation_callback. bool cancel() { if (!cancellable()) { @@ -330,8 +334,8 @@ template class future_shared_state final : private future_shared_state_base { public: future_shared_state() : future_shared_state_base(), buffer_() {} - future_shared_state(std::function cancellation_callback) - : future_shared_state_base(cancellation_callback), buffer_() {} + explicit future_shared_state(std::function cancellation_callback) + : future_shared_state_base(std::move(cancellation_callback)), buffer_() {} ~future_shared_state() { if (current_state_ == state::has_value) { // Recall that state::has_value is a terminal state, once a value is @@ -345,6 +349,7 @@ class future_shared_state final : private future_shared_state_base { using future_shared_state_base::abandon; using future_shared_state_base::cancel; + using future_shared_state_base::extract_cancellation_callback; using future_shared_state_base::is_ready; using future_shared_state_base::set_continuation; using future_shared_state_base::set_exception; @@ -468,11 +473,12 @@ template <> class future_shared_state final : private future_shared_state_base { public: future_shared_state() : future_shared_state_base() {} - future_shared_state(std::function cancellation_callback) - : future_shared_state_base(cancellation_callback) {} + explicit future_shared_state(std::function cancellation_callback) + : future_shared_state_base(std::move(cancellation_callback)) {} using future_shared_state_base::abandon; using future_shared_state_base::cancel; + using future_shared_state_base::extract_cancellation_callback; using future_shared_state_base::is_ready; using future_shared_state_base::set_continuation; using future_shared_state_base::set_exception; @@ -666,7 +672,8 @@ struct continuation : public continuation_base { continuation(Functor&& f, std::shared_ptr s) : functor(std::move(f)), input(std::move(s)), - output(std::make_shared>()) {} + output(std::make_shared>( + input.lock()->extract_cancellation_callback())) {} continuation(Functor&& f, std::shared_ptr s, std::shared_ptr o) @@ -721,7 +728,8 @@ struct unwrapping_continuation : public continuation_base { : functor(std::move(f)), input(std::move(s)), intermediate(), - output(std::make_shared()) {} + output(std::make_shared( + input.lock()->extract_cancellation_callback())) {} void execute() override { auto tmp = input.lock(); diff --git a/google/cloud/internal/future_then_impl.h b/google/cloud/internal/future_then_impl.h index 0cfc81cc3354f..998d14be9df41 100644 --- a/google/cloud/internal/future_then_impl.h +++ b/google/cloud/internal/future_then_impl.h @@ -54,9 +54,9 @@ typename internal::then_helper::future_t future::then_impl( typename internal::then_helper::functor_result_t; using future_t = typename internal::then_helper::future_t; - // The `shared_state_type` (aka `future_shared_state`) is be written + // The `shared_state_type` (aka `future_shared_state`) is written // without any reference to the `future` class, otherwise there would - // be cycling dependencies between the two classes. We must adapt the + // be cyclic dependencies between the two classes. We must adapt the // provided functor, which takes a `future` parameter to take a // `shared_ptr