From 2dbc5109ca0592dc29a4fc8abbddb0081ec86b59 Mon Sep 17 00:00:00 2001 From: Takashi Matsuo Date: Thu, 6 Feb 2020 10:33:38 -0800 Subject: [PATCH] feat: support cancellation for long running operations (googleapis/google-cloud-cpp-common#160) * feat: support cancellation for long running operations * address review comments * address code review comments --- google/cloud/future_generic.h | 4 +++- google/cloud/future_generic_test.cc | 32 +++++++++++++++++++++++++++ google/cloud/future_void.h | 4 +++- google/cloud/future_void_test.cc | 30 +++++++++++++++++++++++++ google/cloud/internal/future_base.h | 9 +++++++- google/cloud/internal/future_impl.h | 34 +++++++++++++++++++++++++++-- 6 files changed, 108 insertions(+), 5 deletions(-) diff --git a/google/cloud/future_generic.h b/google/cloud/future_generic.h index 5fe9f1101c057..ddfc7181d1c60 100644 --- a/google/cloud/future_generic.h +++ b/google/cloud/future_generic.h @@ -76,6 +76,7 @@ class future final : private internal::future_base { return tmp->get(); } + using internal::future_base::cancel; using internal::future_base::is_ready; using internal::future_base::valid; using internal::future_base::wait; @@ -132,7 +133,8 @@ template class promise final : private internal::promise_base { public: /// Creates a promise with an unsatisfied shared state. - promise() = default; + promise(std::function cancellation_callback = [] {}) + : internal::promise_base(cancellation_callback) {} /// Constructs a new promise and transfer any shared state from @p rhs. promise(promise&&) noexcept = default; diff --git a/google/cloud/future_generic_test.cc b/google/cloud/future_generic_test.cc index 20daa7f48f3af..cb2a0a90421b6 100644 --- a/google/cloud/future_generic_test.cc +++ b/google/cloud/future_generic_test.cc @@ -635,6 +635,38 @@ TEST(FutureTestInt, conform_30_6_6_25_3) { // raise too. We do not need to test for that, exceptions are always propagated, // this is just giving implementors freedom. +/// @test Verify the behavior around cancellation. +TEST(FutureTestInt, cancellation_without_satisfaction) { + bool cancelled = false; + promise p0([&cancelled] { cancelled = true; }); + auto f0 = p0.get_future(); + ASSERT_TRUE(f0.cancel()); + ASSERT_TRUE(cancelled); +} + +/// @test Verify the case for cancel then satisfy. +TEST(FutureTestInt, cancellation_and_satisfaction) { + bool cancelled = false; + promise p0([&cancelled] { cancelled = true; }); + auto f0 = p0.get_future(); + ASSERT_TRUE(f0.cancel()); + p0.set_value(1); + ASSERT_EQ(std::future_status::ready, f0.wait_for(0_ms)); + ASSERT_EQ(1, f0.get()); + ASSERT_TRUE(cancelled); +} + +/// @test Verify the cancellation fails on satisfied promise. +TEST(FutureTestInt, cancellation_after_satisfaction) { + bool cancelled = false; + promise p0([&cancelled] { cancelled = true; }); + auto f0 = p0.get_future(); + p0.set_value(1); + ASSERT_FALSE(f0.cancel()); + ASSERT_FALSE(cancelled); + ASSERT_EQ(1, f0.get()); +} + } // namespace } // namespace GOOGLE_CLOUD_CPP_NS } // namespace cloud diff --git a/google/cloud/future_void.h b/google/cloud/future_void.h index f5e0ca826474b..f6d1c8b35f320 100644 --- a/google/cloud/future_void.h +++ b/google/cloud/future_void.h @@ -73,6 +73,7 @@ class future final : private internal::future_base { return tmp->get(); } + using future_base::cancel; using future_base::is_ready; using future_base::valid; using future_base::wait; @@ -128,7 +129,8 @@ template <> class promise final : private internal::promise_base { public: /// Creates a promise with an unsatisfied shared state. - promise() = default; + promise(std::function cancellation_callback = [] {}) + : promise_base(cancellation_callback) {} /// Constructs a new promise and transfer any shared state from @p rhs. promise(promise&&) noexcept = default; diff --git a/google/cloud/future_void_test.cc b/google/cloud/future_void_test.cc index d63bc71a1edeb..02478c51508a4 100644 --- a/google/cloud/future_void_test.cc +++ b/google/cloud/future_void_test.cc @@ -644,6 +644,36 @@ TEST(FutureTestVoid, conform_30_6_6_25_3) { // raise too. We do not need to test for that, exceptions are always propagated, // this is just giving implementors freedom. +/// @test Verify the behavior around cancellation. +TEST(FutureTestVoid, cancellation_without_satisfaction) { + bool cancelled = false; + promise p0([&cancelled] { cancelled = true; }); + auto f0 = p0.get_future(); + ASSERT_TRUE(f0.cancel()); + ASSERT_TRUE(cancelled); +} + +/// @test Verify the case for cancel then satisfy. +TEST(FutureTestVoid, cancellation_and_satisfaction) { + bool cancelled = false; + promise p0([&cancelled] { cancelled = true; }); + auto f0 = p0.get_future(); + ASSERT_TRUE(f0.cancel()); + p0.set_value(); + ASSERT_EQ(std::future_status::ready, f0.wait_for(0_ms)); + ASSERT_TRUE(cancelled); +} + +/// @test Verify the cancellation fails on satisfied promise. +TEST(FutureTestVoid, cancellation_after_satisfaction) { + bool cancelled = false; + promise p0([&cancelled] { cancelled = true; }); + auto f0 = p0.get_future(); + p0.set_value(); + ASSERT_FALSE(f0.cancel()); + ASSERT_FALSE(cancelled); +} + } // namespace } // namespace GOOGLE_CLOUD_CPP_NS } // namespace cloud diff --git a/google/cloud/internal/future_base.h b/google/cloud/internal/future_base.h index 150554b022803..24e814f1ab1d6 100644 --- a/google/cloud/internal/future_base.h +++ b/google/cloud/internal/future_base.h @@ -123,6 +123,11 @@ class future_base { return shared_state_->is_ready(); } + /** + * Cancel the future by invoking cancel() on the shared state. + */ + bool cancel() { return shared_state_->cancel(); } + protected: /// Shorthand to refer to the shared state type. using shared_state_type = internal::future_shared_state; @@ -144,7 +149,9 @@ class future_base { template class promise_base { public: - promise_base() : shared_state_(std::make_shared()) {} + explicit promise_base(std::function cancellation_callback) + : shared_state_( + std::make_shared(cancellation_callback)) {} promise_base(promise_base&&) noexcept = default; ~promise_base() { diff --git a/google/cloud/internal/future_impl.h b/google/cloud/internal/future_impl.h index babd3c7b6ecfb..3b3c3ed6940fa 100644 --- a/google/cloud/internal/future_impl.h +++ b/google/cloud/internal/future_impl.h @@ -68,14 +68,21 @@ class continuation_base { */ class future_shared_state_base { public: - future_shared_state_base() : mu_(), cv_(), current_state_(state::not_ready) {} - + future_shared_state_base() : future_shared_state_base([] {}) {} + future_shared_state_base(std::function cancellation_callback) + : mu_(), + cv_(), + current_state_(state::not_ready), + cancellation_callback_(cancellation_callback) {} /// Return true if the shared state has a value or an exception. bool is_ready() const { std::unique_lock lk(mu_); return is_ready_unlocked(); } + /// Return true if the shared state can be cancelled. + bool cancellable() const { return !is_ready() && !cancelled_; } + /// Block until is_ready() returns true ... void wait() { std::unique_lock lk(mu_); @@ -197,6 +204,19 @@ class future_shared_state_base { continuation_ = std::move(c); } + // Try to cancel the task by invoking the cancellation_callback. + bool cancel() { + if (!cancellable()) { + return false; + } + cancellation_callback_(); + // If the callback fails with an exception we assume it had no effect. + // Incidentally this means we provide the strong exception guarantee for + // this function. + cancelled_ = true; + return true; + } + protected: bool is_ready_unlocked() const { return current_state_ != state::not_ready; } @@ -279,6 +299,10 @@ class future_shared_state_base { * member variable and does not satisfy the shared state. */ std::unique_ptr continuation_; + + // Allow users "cancel" the future with the given callback. + std::atomic cancelled_ = ATOMIC_VAR_INIT(false); + std::function cancellation_callback_; }; /** @@ -306,6 +330,8 @@ template class future_shared_state final : private future_shared_state_base { public: future_shared_state() : future_shared_state_base(), buffer_() {} + future_shared_state(std::function cancellation_callback) + : future_shared_state_base(cancellation_callback), buffer_() {} ~future_shared_state() { if (current_state_ == state::has_value) { // Recall that state::has_value is a terminal state, once a value is @@ -318,6 +344,7 @@ class future_shared_state final : private future_shared_state_base { } using future_shared_state_base::abandon; + using future_shared_state_base::cancel; using future_shared_state_base::is_ready; using future_shared_state_base::set_continuation; using future_shared_state_base::set_exception; @@ -441,8 +468,11 @@ template <> class future_shared_state final : private future_shared_state_base { public: future_shared_state() : future_shared_state_base() {} + future_shared_state(std::function cancellation_callback) + : future_shared_state_base(cancellation_callback) {} using future_shared_state_base::abandon; + using future_shared_state_base::cancel; using future_shared_state_base::is_ready; using future_shared_state_base::set_continuation; using future_shared_state_base::set_exception;