Skip to content

Commit

Permalink
feat: support cancellation for long running operations (googleapis/go…
Browse files Browse the repository at this point in the history
…ogle-cloud-cpp-common#160)

* feat: support cancellation for long running operations

* address review comments

* address code review comments
  • Loading branch information
Takashi Matsuo authored Feb 6, 2020
1 parent f9d0b43 commit da48cc9
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 5 deletions.
4 changes: 3 additions & 1 deletion google/cloud/future_generic.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class future final : private internal::future_base<T> {
return tmp->get();
}

using internal::future_base<T>::cancel;
using internal::future_base<T>::is_ready;
using internal::future_base<T>::valid;
using internal::future_base<T>::wait;
Expand Down Expand Up @@ -132,7 +133,8 @@ template <typename T>
class promise final : private internal::promise_base<T> {
public:
/// Creates a promise with an unsatisfied shared state.
promise() = default;
promise(std::function<void()> cancellation_callback = [] {})
: internal::promise_base<T>(cancellation_callback) {}

/// Constructs a new promise and transfer any shared state from @p rhs.
promise(promise&&) noexcept = default;
Expand Down
32 changes: 32 additions & 0 deletions google/cloud/future_generic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,38 @@ TEST(FutureTestInt, conform_30_6_6_25_3) {
// raise too. We do not need to test for that, exceptions are always propagated,
// this is just giving implementors freedom.

/// @test Verify the behavior around cancellation.
TEST(FutureTestInt, cancellation_without_satisfaction) {
bool cancelled = false;
promise<int> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
ASSERT_TRUE(f0.cancel());
ASSERT_TRUE(cancelled);
}

/// @test Verify the case for cancel then satisfy.
TEST(FutureTestInt, cancellation_and_satisfaction) {
bool cancelled = false;
promise<int> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
ASSERT_TRUE(f0.cancel());
p0.set_value(1);
ASSERT_EQ(std::future_status::ready, f0.wait_for(0_ms));
ASSERT_EQ(1, f0.get());
ASSERT_TRUE(cancelled);
}

/// @test Verify the cancellation fails on satisfied promise.
TEST(FutureTestInt, cancellation_after_satisfaction) {
bool cancelled = false;
promise<int> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
p0.set_value(1);
ASSERT_FALSE(f0.cancel());
ASSERT_FALSE(cancelled);
ASSERT_EQ(1, f0.get());
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_NS
} // namespace cloud
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/future_void.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class future<void> final : private internal::future_base<void> {
return tmp->get();
}

using future_base::cancel;
using future_base::is_ready;
using future_base::valid;
using future_base::wait;
Expand Down Expand Up @@ -128,7 +129,8 @@ template <>
class promise<void> final : private internal::promise_base<void> {
public:
/// Creates a promise with an unsatisfied shared state.
promise() = default;
promise(std::function<void()> cancellation_callback = [] {})
: promise_base(cancellation_callback) {}

/// Constructs a new promise and transfer any shared state from @p rhs.
promise(promise&&) noexcept = default;
Expand Down
30 changes: 30 additions & 0 deletions google/cloud/future_void_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,36 @@ TEST(FutureTestVoid, conform_30_6_6_25_3) {
// raise too. We do not need to test for that, exceptions are always propagated,
// this is just giving implementors freedom.

/// @test Verify the behavior around cancellation.
TEST(FutureTestVoid, cancellation_without_satisfaction) {
bool cancelled = false;
promise<void> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
ASSERT_TRUE(f0.cancel());
ASSERT_TRUE(cancelled);
}

/// @test Verify the case for cancel then satisfy.
TEST(FutureTestVoid, cancellation_and_satisfaction) {
bool cancelled = false;
promise<void> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
ASSERT_TRUE(f0.cancel());
p0.set_value();
ASSERT_EQ(std::future_status::ready, f0.wait_for(0_ms));
ASSERT_TRUE(cancelled);
}

/// @test Verify the cancellation fails on satisfied promise.
TEST(FutureTestVoid, cancellation_after_satisfaction) {
bool cancelled = false;
promise<void> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
p0.set_value();
ASSERT_FALSE(f0.cancel());
ASSERT_FALSE(cancelled);
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_NS
} // namespace cloud
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/internal/future_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class future_base {
return shared_state_->is_ready();
}

/**
* Cancel the future by invoking cancel() on the shared state.
*/
bool cancel() { return shared_state_->cancel(); }

protected:
/// Shorthand to refer to the shared state type.
using shared_state_type = internal::future_shared_state<T>;
Expand All @@ -144,7 +149,9 @@ class future_base {
template <typename T>
class promise_base {
public:
promise_base() : shared_state_(std::make_shared<shared_state_type>()) {}
explicit promise_base(std::function<void()> cancellation_callback)
: shared_state_(
std::make_shared<shared_state_type>(cancellation_callback)) {}
promise_base(promise_base&&) noexcept = default;

~promise_base() {
Expand Down
34 changes: 32 additions & 2 deletions google/cloud/internal/future_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ class continuation_base {
*/
class future_shared_state_base {
public:
future_shared_state_base() : mu_(), cv_(), current_state_(state::not_ready) {}

future_shared_state_base() : future_shared_state_base([] {}) {}
future_shared_state_base(std::function<void()> cancellation_callback)
: mu_(),
cv_(),
current_state_(state::not_ready),
cancellation_callback_(cancellation_callback) {}
/// Return true if the shared state has a value or an exception.
bool is_ready() const {
std::unique_lock<std::mutex> lk(mu_);
return is_ready_unlocked();
}

/// Return true if the shared state can be cancelled.
bool cancellable() const { return !is_ready() && !cancelled_; }

/// Block until is_ready() returns true ...
void wait() {
std::unique_lock<std::mutex> lk(mu_);
Expand Down Expand Up @@ -197,6 +204,19 @@ class future_shared_state_base {
continuation_ = std::move(c);
}

// Try to cancel the task by invoking the cancellation_callback.
bool cancel() {
if (!cancellable()) {
return false;
}
cancellation_callback_();
// If the callback fails with an exception we assume it had no effect.
// Incidentally this means we provide the strong exception guarantee for
// this function.
cancelled_ = true;
return true;
}

protected:
bool is_ready_unlocked() const { return current_state_ != state::not_ready; }

Expand Down Expand Up @@ -279,6 +299,10 @@ class future_shared_state_base {
* member variable and does not satisfy the shared state.
*/
std::unique_ptr<continuation_base> continuation_;

// Allow users "cancel" the future with the given callback.
std::atomic<bool> cancelled_ = ATOMIC_VAR_INIT(false);
std::function<void()> cancellation_callback_;
};

/**
Expand Down Expand Up @@ -306,6 +330,8 @@ template <typename T>
class future_shared_state final : private future_shared_state_base {
public:
future_shared_state() : future_shared_state_base(), buffer_() {}
future_shared_state(std::function<void()> cancellation_callback)
: future_shared_state_base(cancellation_callback), buffer_() {}
~future_shared_state() {
if (current_state_ == state::has_value) {
// Recall that state::has_value is a terminal state, once a value is
Expand All @@ -318,6 +344,7 @@ class future_shared_state final : private future_shared_state_base {
}

using future_shared_state_base::abandon;
using future_shared_state_base::cancel;
using future_shared_state_base::is_ready;
using future_shared_state_base::set_continuation;
using future_shared_state_base::set_exception;
Expand Down Expand Up @@ -441,8 +468,11 @@ template <>
class future_shared_state<void> final : private future_shared_state_base {
public:
future_shared_state() : future_shared_state_base() {}
future_shared_state(std::function<void()> cancellation_callback)
: future_shared_state_base(cancellation_callback) {}

using future_shared_state_base::abandon;
using future_shared_state_base::cancel;
using future_shared_state_base::is_ready;
using future_shared_state_base::set_continuation;
using future_shared_state_base::set_exception;
Expand Down

0 comments on commit da48cc9

Please sign in to comment.