Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: support cancellation for long running operations #160

Merged
merged 3 commits into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/cloud/future_generic.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class future final : private internal::future_base<T> {
return tmp->get();
}

using internal::future_base<T>::cancel;
using internal::future_base<T>::is_ready;
using internal::future_base<T>::valid;
using internal::future_base<T>::wait;
Expand Down Expand Up @@ -132,7 +133,8 @@ template <typename T>
class promise final : private internal::promise_base<T> {
public:
/// Creates a promise with an unsatisfied shared state.
promise() = default;
promise(std::function<void()> cancellation_callback = [] {})
: internal::promise_base<T>(cancellation_callback) {}

/// Constructs a new promise and transfer any shared state from @p rhs.
promise(promise&&) noexcept = default;
Expand Down
32 changes: 32 additions & 0 deletions google/cloud/future_generic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,38 @@ TEST(FutureTestInt, conform_30_6_6_25_3) {
// raise too. We do not need to test for that, exceptions are always propagated,
// this is just giving implementors freedom.

/// @test Verify the behavior around cancellation.
TEST(FutureTestInt, cancellation_without_satisfaction) {
bool cancelled = false;
promise<int> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
ASSERT_TRUE(f0.cancel());
ASSERT_TRUE(cancelled);
}

/// @test Verify the case for cancel then satisfy.
TEST(FutureTestInt, cancellation_and_satisfaction) {
bool cancelled = false;
promise<int> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
ASSERT_TRUE(f0.cancel());
p0.set_value(1);
ASSERT_EQ(std::future_status::ready, f0.wait_for(0_ms));
ASSERT_EQ(1, f0.get());
ASSERT_TRUE(cancelled);
}

/// @test Verify the cancellation fails on satisfied promise.
TEST(FutureTestInt, cancellation_after_satisfaction) {
bool cancelled = false;
promise<int> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
p0.set_value(1);
ASSERT_FALSE(f0.cancel());
ASSERT_FALSE(cancelled);
ASSERT_EQ(1, f0.get());
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_NS
} // namespace cloud
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/future_void.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class future<void> final : private internal::future_base<void> {
return tmp->get();
}

using future_base::cancel;
using future_base::is_ready;
using future_base::valid;
using future_base::wait;
Expand Down Expand Up @@ -128,7 +129,8 @@ template <>
class promise<void> final : private internal::promise_base<void> {
public:
/// Creates a promise with an unsatisfied shared state.
promise() = default;
promise(std::function<void()> cancellation_callback = [] {})
: promise_base(cancellation_callback) {}

/// Constructs a new promise and transfer any shared state from @p rhs.
promise(promise&&) noexcept = default;
Expand Down
30 changes: 30 additions & 0 deletions google/cloud/future_void_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,36 @@ TEST(FutureTestVoid, conform_30_6_6_25_3) {
// raise too. We do not need to test for that, exceptions are always propagated,
// this is just giving implementors freedom.

/// @test Verify the behavior around cancellation.
TEST(FutureTestVoid, cancellation_without_satisfaction) {
bool cancelled = false;
promise<void> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
ASSERT_TRUE(f0.cancel());
ASSERT_TRUE(cancelled);
}

/// @test Verify the case for cancel then satisfy.
TEST(FutureTestVoid, cancellation_and_satisfaction) {
bool cancelled = false;
promise<void> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
ASSERT_TRUE(f0.cancel());
p0.set_value();
ASSERT_EQ(std::future_status::ready, f0.wait_for(0_ms));
ASSERT_TRUE(cancelled);
}

/// @test Verify the cancellation fails on satisfied promise.
TEST(FutureTestVoid, cancellation_after_satisfaction) {
bool cancelled = false;
promise<void> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
p0.set_value();
ASSERT_FALSE(f0.cancel());
ASSERT_FALSE(cancelled);
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_NS
} // namespace cloud
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/internal/future_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class future_base {
return shared_state_->is_ready();
}

/**
* Cancel the future by invoking cancel() on the shared state.
*/
bool cancel() { return shared_state_->cancel(); }

protected:
/// Shorthand to refer to the shared state type.
using shared_state_type = internal::future_shared_state<T>;
Expand All @@ -144,7 +149,9 @@ class future_base {
template <typename T>
class promise_base {
public:
promise_base() : shared_state_(std::make_shared<shared_state_type>()) {}
explicit promise_base(std::function<void()> cancellation_callback)
: shared_state_(
std::make_shared<shared_state_type>(cancellation_callback)) {}
promise_base(promise_base&&) noexcept = default;

~promise_base() {
Expand Down
34 changes: 32 additions & 2 deletions google/cloud/internal/future_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ class continuation_base {
*/
class future_shared_state_base {
public:
future_shared_state_base() : mu_(), cv_(), current_state_(state::not_ready) {}

future_shared_state_base() : future_shared_state_base([] {}) {}
future_shared_state_base(std::function<void()> cancellation_callback)
: mu_(),
cv_(),
current_state_(state::not_ready),
cancellation_callback_(cancellation_callback) {}
/// Return true if the shared state has a value or an exception.
bool is_ready() const {
std::unique_lock<std::mutex> lk(mu_);
return is_ready_unlocked();
}

/// Return true if the shared state can be cancelled.
bool cancellable() const { return !is_ready() && !cancelled_; }

/// Block until is_ready() returns true ...
void wait() {
std::unique_lock<std::mutex> lk(mu_);
Expand Down Expand Up @@ -197,6 +204,19 @@ class future_shared_state_base {
continuation_ = std::move(c);
}

// Try to cancel the task by invoking the cancellation_callback.
bool cancel() {
if (!cancellable()) {
return false;
}
cancellation_callback_();
// If the callback fails with an exception we assume it had no effect.
// Incidentally this means we provide the strong exception guarantee for
// this function.
cancelled_ = true;
return true;
}

protected:
bool is_ready_unlocked() const { return current_state_ != state::not_ready; }

Expand Down Expand Up @@ -279,6 +299,10 @@ class future_shared_state_base {
* member variable and does not satisfy the shared state.
*/
std::unique_ptr<continuation_base> continuation_;

// Allow users "cancel" the future with the given callback.
std::atomic<bool> cancelled_ = ATOMIC_VAR_INIT(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this be atomic? Or, more generally, what are the concurrency requirements of cancel()? Must callers serialize cancel() calls? If so, atomicity doesn't seem to be required. If not, atomicity doesn't seem sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devbww Good point. Maybe I can change it to bool. It is sufficient for my use case.

std::function<void()> cancellation_callback_;
};

/**
Expand Down Expand Up @@ -306,6 +330,8 @@ template <typename T>
class future_shared_state final : private future_shared_state_base {
public:
future_shared_state() : future_shared_state_base(), buffer_() {}
future_shared_state(std::function<void()> cancellation_callback)
: future_shared_state_base(cancellation_callback), buffer_() {}
~future_shared_state() {
if (current_state_ == state::has_value) {
// Recall that state::has_value is a terminal state, once a value is
Expand All @@ -318,6 +344,7 @@ class future_shared_state final : private future_shared_state_base {
}

using future_shared_state_base::abandon;
using future_shared_state_base::cancel;
using future_shared_state_base::is_ready;
using future_shared_state_base::set_continuation;
using future_shared_state_base::set_exception;
Expand Down Expand Up @@ -441,8 +468,11 @@ template <>
class future_shared_state<void> final : private future_shared_state_base {
public:
future_shared_state() : future_shared_state_base() {}
future_shared_state(std::function<void()> cancellation_callback)
: future_shared_state_base(cancellation_callback) {}

using future_shared_state_base::abandon;
using future_shared_state_base::cancel;
using future_shared_state_base::is_ready;
using future_shared_state_base::set_continuation;
using future_shared_state_base::set_exception;
Expand Down