Skip to content

Commit

Permalink
Make '_TestRange' continuation act as a stream (#505)
Browse files Browse the repository at this point in the history
Making '_TestRange' continuation be inherited from
'TypeErasedStream' helps us avoid situations which
can block us for ever. Generally speaking now
'TakeRange' decides either we should continue
pulling from the stream or it's all done and we can
just continue processing given pipeline. This PR
also includes some minor code changes like renaming
some functions.
  • Loading branch information
ArthurBandaryk authored Aug 2, 2022
1 parent 3b89193 commit 8ccd637
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 27 deletions.
77 changes: 56 additions & 21 deletions eventuals/take.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace eventuals {

////////////////////////////////////////////////////////////////////////

struct _TakeLastN final {
struct _TakeLast final {
template <typename K_, typename Arg_>
struct Continuation final : public TypeErasedStream {
// NOTE: explicit constructor because inheriting 'TypeErasedStream'.
Expand Down Expand Up @@ -133,11 +133,30 @@ struct _TakeLastN final {
////////////////////////////////////////////////////////////////////////

struct _TakeRange final {
// We've decided to make '_TakeRange' continuation act as a stream.
// Thus we can avoid situations which might block us for ever telling
// the stream that 'TakeRange' has everything it wants (in this case
// rather than 'Loop' calls back up into the stream because 'TakeRange'
// is in the place, Loop calls back up to 'TakeRange', and then
// 'TakeRange' decides to continue pulling from the stream or decides
// that it's all done and then just continue processing).
template <typename K_, typename Arg_>
struct Continuation final {
struct Continuation final : public TypeErasedStream {
// NOTE: explicit constructor because inheriting 'TypeErasedStream'.
Continuation(K_ k, size_t begin, size_t amount)
: begin_(begin),
amount_(amount),
k_(std::move(k)) {}

Continuation(Continuation&& that) noexcept = default;

~Continuation() override = default;

void Begin(TypeErasedStream& stream) {
stream_ = &stream;
k_.Begin(stream);
previous_ = Scheduler::Context::Get();

k_.Begin(*this);
}

template <typename Error>
Expand All @@ -150,15 +169,15 @@ struct _TakeRange final {
}

template <typename... Args>
// 'in_range_' needs to prevent calling Next
// when stream has already passed the set amount_ of elements.
void Body(Args&&... args) {
if (CheckRange()) {
in_range_ = true;
if (begin_ <= i_ && i_ < begin_ + amount_) {
i_++;
k_.Body(std::forward<Args>(args)...);
} else if (!in_range_) {
} else if (i_ < begin_) {
i_++;
stream_->Next();
} else {
CHECK_EQ(i_, begin_ + amount_);
stream_->Done();
}
}
Expand All @@ -167,23 +186,39 @@ struct _TakeRange final {
k_.Ended();
}

void Register(Interrupt& interrupt) {
k_.Register(interrupt);
void Next() override {
previous_->Continue([this]() {
if (i_ < begin_ + amount_) {
stream_->Next();
} else {
CHECK_EQ(i_, begin_ + amount_);
stream_->Done();
}
});
}

bool CheckRange() {
bool result = i_ >= begin_ && i_ < begin_ + amount_;
++i_;
return result;
void Done() override {
previous_->Continue([this]() {
stream_->Done();
});
}

K_ k_;
size_t begin_ = 0;
size_t amount_ = 0;
void Register(Interrupt& interrupt) {
k_.Register(interrupt);
}

size_t begin_;
size_t amount_;
size_t i_ = 0;
bool in_range_ = false;

TypeErasedStream* stream_ = nullptr;
stout::borrowed_ptr<Scheduler::Context> previous_;

// NOTE: we store 'k_' as the _last_ member so it will be
// destructed _first_ and thus we won't have any use-after-delete
// issues during destruction of 'k_' if it holds any references or
// pointers to any (or within any) of the above members.
K_ k_;
};

struct Composable final {
Expand All @@ -205,15 +240,15 @@ struct _TakeRange final {

////////////////////////////////////////////////////////////////////////

[[nodiscard]] inline auto TakeLastN(size_t N) {
return _TakeLastN::Composable{N};
[[nodiscard]] inline auto TakeLast(size_t n) {
return _TakeLast::Composable{n};
}

[[nodiscard]] inline auto TakeRange(size_t begin, size_t amount) {
return _TakeRange::Composable{begin, amount};
}

[[nodiscard]] inline auto TakeFirstN(size_t amount) {
[[nodiscard]] inline auto TakeFirst(size_t amount) {
return _TakeRange::Composable{0, amount};
}

Expand Down
27 changes: 21 additions & 6 deletions test/take.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ TEST(Take, IterateTakeLastCollect) {

auto s = [&]() {
return Iterate(v)
| TakeLastN(2)
| TakeLast(2)
| Collect<std::vector>();
};

Expand All @@ -31,7 +31,7 @@ TEST(Take, IterateTakeLastAllCollect) {

auto s = [&]() {
return Iterate(v)
| TakeLastN(4)
| TakeLast(4)
| Collect<std::vector>();
};

Expand Down Expand Up @@ -68,7 +68,7 @@ TEST(Take, IterateTakeFirstCollect) {

auto s = [&]() {
return Iterate(v)
| TakeFirstN(3)
| TakeFirst(3)
| Collect<std::vector>();
};

Expand All @@ -81,7 +81,7 @@ TEST(Take, IterateTakeFirstFilterCollect) {

auto s = [&]() {
return Iterate(v)
| TakeFirstN(3)
| TakeFirst(3)
| Filter([](int x) { return x % 2 == 1; })
| Collect<std::vector>();
};
Expand All @@ -94,7 +94,7 @@ TEST(Take, TakeLastOutOfRange) {

auto s = [&]() {
return Iterate(v)
| TakeLastN(100)
| TakeLast(100)
| Collect<std::vector>();
};

Expand All @@ -106,7 +106,7 @@ TEST(Take, TakeFirstOutOfRange) {

auto s = [&]() {
return Iterate(v)
| TakeFirstN(100)
| TakeFirst(100)
| Collect<std::vector>();
};

Expand Down Expand Up @@ -159,5 +159,20 @@ TEST(Take, UniquePtr) {
EXPECT_EQ(2, *result[1]);
}

TEST(Take, TakeRangeInfiniteStream) {
auto s = []() {
return Stream<int>()
.next([i = 0](auto& k) mutable {
if (i < 2) {
k.Emit(i++);
}
})
| TakeRange(0, 2)
| Collect<std::vector>();
};

EXPECT_THAT(*s(), ElementsAre(0, 1));
}

} // namespace
} // namespace eventuals::test

0 comments on commit 8ccd637

Please sign in to comment.