Skip to content

Commit

Permalink
feat: cancel futures returned by .then() (googleapis/google-cloud-cpp…
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan authored Feb 7, 2020
1 parent 17b04f6 commit d5e88e7
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 12 deletions.
3 changes: 2 additions & 1 deletion google/cloud/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ namespace {
class AsyncTimerFuture : public internal::AsyncGrpcOperation {
public:
explicit AsyncTimerFuture(std::unique_ptr<grpc::Alarm> alarm)
: alarm_(std::move(alarm)) {}
: promise_(/*cancellation_callback=*/[this] { Cancel(); }),
alarm_(std::move(alarm)) {}

future<StatusOr<std::chrono::system_clock::time_point>> GetFuture() {
return promise_.get_future();
Expand Down
31 changes: 30 additions & 1 deletion google/cloud/completion_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ TEST(CompletionQueueTest, RunAsync) {
runner.join();
}

// Sets up a timer that reschedules itself and verfies we can shut down
// Sets up a timer that reschedules itself and verifies we can shut down
// cleanly whether we call `CancelAll()` on the queue first or not.
namespace {
void RunAndReschedule(CompletionQueue& cq, bool ok) {
Expand Down Expand Up @@ -363,6 +363,35 @@ TEST(CompletionQueueTest, CancelAndShutdownWithReschedulingTimer) {
t.join();
}

TEST(CompletionQueueTest, CancelTimerSimple) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

using ms = std::chrono::milliseconds;
auto fut = cq.MakeRelativeTimer(ms(20000));
fut.cancel();
auto tp = fut.get();
EXPECT_FALSE(tp.ok()) << ", status=" << tp.status();
cq.Shutdown();
t.join();
}

TEST(CompletionQueueTest, CancelTimerContinuation) {
CompletionQueue cq;
std::thread t([&cq] { cq.Run(); });

using ms = std::chrono::milliseconds;
auto fut = cq.MakeRelativeTimer(ms(20000)).then(
[](future<StatusOr<std::chrono::system_clock::time_point>> f) {
return f.get().status();
});
fut.cancel();
auto status = fut.get();
EXPECT_FALSE(status.ok()) << ", status=" << status;
cq.Shutdown();
t.join();
}

} // namespace
} // namespace GOOGLE_CLOUD_CPP_NS
} // namespace cloud
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/future_generic_then_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ TEST(FutureTestInt, ThenByCopy) {
EXPECT_FALSE(next.valid());
}

/// @test Verify the behavior around cancellation.
TEST(FutureTestInt, CancelThroughContinuation) {
bool cancelled = false;
promise<int> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
auto f1 = f0.then([](future<int> f) { return f.get() * 2; });
EXPECT_TRUE(f1.cancel());
EXPECT_TRUE(cancelled);
p0.set_value(42);
EXPECT_EQ(84, f1.get());
}

// The following tests reference the technical specification:
// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/p0159r0.html
// The test names match the section and paragraph from the TS.
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/future_void_then_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ TEST(FutureTestVoid, ThenByCopy) {
// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/p0159r0.html
// The test names match the section and paragraph from the TS.

/// @test Verify the behavior around cancellation.
TEST(FutureTestVoid, CancelThroughContinuation) {
bool cancelled = false;
promise<void> p0([&cancelled] { cancelled = true; });
auto f0 = p0.get_future();
auto f1 = f0.then([](future<void>) { return 7; });
EXPECT_TRUE(f1.cancel());
EXPECT_TRUE(cancelled);
p0.set_value();
EXPECT_EQ(7, f1.get());
}

/// @test Verify conformance with section 2.3 of the Concurrency TS.
TEST(FutureTestVoid, conform_2_3_2_a) {
// future<void> should have an unwrapping constructor.
Expand Down
24 changes: 16 additions & 8 deletions google/cloud/internal/future_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ class continuation_base {
class future_shared_state_base {
public:
future_shared_state_base() : future_shared_state_base([] {}) {}
future_shared_state_base(std::function<void()> cancellation_callback)
explicit future_shared_state_base(std::function<void()> cancellation_callback)
: mu_(),
cv_(),
current_state_(state::not_ready),
cancellation_callback_(cancellation_callback) {}
cancellation_callback_(std::move(cancellation_callback)) {}
/// Return true if the shared state has a value or an exception.
bool is_ready() const {
std::unique_lock<std::mutex> lk(mu_);
Expand Down Expand Up @@ -204,6 +204,10 @@ class future_shared_state_base {
continuation_ = std::move(c);
}

std::function<void()> extract_cancellation_callback() {
return std::move(cancellation_callback_);
}

// Try to cancel the task by invoking the cancellation_callback.
bool cancel() {
if (!cancellable()) {
Expand Down Expand Up @@ -330,8 +334,8 @@ template <typename T>
class future_shared_state final : private future_shared_state_base {
public:
future_shared_state() : future_shared_state_base(), buffer_() {}
future_shared_state(std::function<void()> cancellation_callback)
: future_shared_state_base(cancellation_callback), buffer_() {}
explicit future_shared_state(std::function<void()> cancellation_callback)
: future_shared_state_base(std::move(cancellation_callback)), buffer_() {}
~future_shared_state() {
if (current_state_ == state::has_value) {
// Recall that state::has_value is a terminal state, once a value is
Expand All @@ -345,6 +349,7 @@ class future_shared_state final : private future_shared_state_base {

using future_shared_state_base::abandon;
using future_shared_state_base::cancel;
using future_shared_state_base::extract_cancellation_callback;
using future_shared_state_base::is_ready;
using future_shared_state_base::set_continuation;
using future_shared_state_base::set_exception;
Expand Down Expand Up @@ -468,11 +473,12 @@ template <>
class future_shared_state<void> final : private future_shared_state_base {
public:
future_shared_state() : future_shared_state_base() {}
future_shared_state(std::function<void()> cancellation_callback)
: future_shared_state_base(cancellation_callback) {}
explicit future_shared_state(std::function<void()> cancellation_callback)
: future_shared_state_base(std::move(cancellation_callback)) {}

using future_shared_state_base::abandon;
using future_shared_state_base::cancel;
using future_shared_state_base::extract_cancellation_callback;
using future_shared_state_base::is_ready;
using future_shared_state_base::set_continuation;
using future_shared_state_base::set_exception;
Expand Down Expand Up @@ -666,7 +672,8 @@ struct continuation : public continuation_base {
continuation(Functor&& f, std::shared_ptr<input_shared_state_t> s)
: functor(std::move(f)),
input(std::move(s)),
output(std::make_shared<future_shared_state<result_t>>()) {}
output(std::make_shared<future_shared_state<result_t>>(
input.lock()->extract_cancellation_callback())) {}

continuation(Functor&& f, std::shared_ptr<input_shared_state_t> s,
std::shared_ptr<output_shared_state_t> o)
Expand Down Expand Up @@ -721,7 +728,8 @@ struct unwrapping_continuation : public continuation_base {
: functor(std::move(f)),
input(std::move(s)),
intermediate(),
output(std::make_shared<output_shared_state_t>()) {}
output(std::make_shared<output_shared_state_t>(
input.lock()->extract_cancellation_callback())) {}

void execute() override {
auto tmp = input.lock();
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/internal/future_then_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ typename internal::then_helper<F, T>::future_t future<T>::then_impl(
typename internal::then_helper<F, T>::functor_result_t;
using future_t = typename internal::then_helper<F, T>::future_t;

// The `shared_state_type` (aka `future_shared_state<T>`) is be written
// The `shared_state_type` (aka `future_shared_state<T>`) is written
// without any reference to the `future<T>` class, otherwise there would
// be cycling dependencies between the two classes. We must adapt the
// be cyclic dependencies between the two classes. We must adapt the
// provided functor, which takes a `future<T>` parameter to take a
// `shared_ptr<shared_state_type` parameter so it can be consumed by the
// underlying class. Because we need to support C++11, we use a local class
Expand Down

0 comments on commit d5e88e7

Please sign in to comment.