Skip to content

Commit

Permalink
Remove 'operator|' and replace it with 'operator>>' (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaNikolaenko authored Aug 2, 2022
1 parent 8ccd637 commit cba2c46
Show file tree
Hide file tree
Showing 93 changed files with 1,702 additions and 1,738 deletions.
66 changes: 34 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ For an example of how to depend on eventuals via Bazel in your own project you'l

Most of the time you'll use higher-level ***combinators*** for composing eventuals together. This guide will start with more basic ones and work our way up to creating your own eventuals.

You ***compose*** eventuals together using an overloaded `operator|()`. You'll see some examples shortly. The syntax is similar to Bash "pipelines" and we reuse the term pipeline for eventuals as well.
You ***compose*** eventuals together using an overloaded `operator>>()`. You'll see some examples shortly. The syntax is similar to Bash "pipelines" (but instead of `|` we use `>>`) and we reuse the term pipeline for eventuals as well.

Note that we use `operator>>()` instead of `operator|()` because it provides safer expression evaluation order in C++17 and on. See [this paper](https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2016/p0145r3.pdf) for more details.

Because the result type of a composed pipeline is not type-erased you'll use `auto` generously, e.g., as function return types.

You must explicitly ***start*** an eventual in order for it to run. You'll only start eventuals at the "edges" of your code, e.g., in `int main()`. Before you start an eventual you must first "terminate it" by composing with a `Terminal()`:

```cpp
auto e = AsynchronousFunction()
| Terminal()
>> Terminal()
.start([](auto&& result) {
// Eventual pipeline succeeded!
})
Expand Down Expand Up @@ -99,7 +101,7 @@ Probably the most used of all the combinators, `Then()` continues a pipeline wit
```cpp
http::Get("https://3rdparty.dev")
| Then([](http::Response&& response) {
>> Then([](http::Response&& response) {
// Return an eventual that will automatically get started.
return SomeAsynchronousFunction(response);
});
Expand All @@ -109,7 +111,7 @@ You don't have to return an eventual in the callable passed to `Then()`, you can

```cpp
http::Get("https:://3rdparty.dev")
| Then([](auto&& response) {
>> Then([](auto&& response) {
// Return a value that will automatically get propagated.
return response.code == 200;
});
Expand All @@ -121,7 +123,7 @@ When you need to _conditionally_ continue using two differently typed eventuals
```cpp
http::Get("https:://3rdparty.dev")
| Then([](auto&& response) {
>> Then([](auto&& response) {
// Try for the 'www' host if we don't get a 200.
return If(response.code != 200)
.then(http::Get("https:://www.3rdparty.dev"))
Expand All @@ -133,7 +135,7 @@ http::Get("https:://3rdparty.dev")

```cpp
http::Get("https:://3rdparty.dev")
| Then([](auto&& response) {
>> Then([](auto&& response) {
// Try for the 'www' host if we don't get a 200.
return If(response.code != 200)
.then(http::Get("https:://www.3rdparty.dev"))
Expand Down Expand Up @@ -182,21 +184,21 @@ An `Expected:Of<T>` *composes* with other eventuals exactly as though it is an e

```cpp
ReadPersonFromFile(file)
| Then([](Person&& person) {
>> Then([](Person&& person) {
return GetFullName(person);
})
| Then([](std::string&& full_name) {
>> Then([](std::string&& full_name) {
...
});
```
Or you can compose an eventual with `|` which can be useful in cases where want the error to propagate:
Or you can compose an eventual with `>>` which can be useful in cases where want the error to propagate:
```cpp
ReadPersonFromFile(file)
| Then(Let([](auto& person) {
>> Then(Let([](auto& person) {
return GetFullName(person)
| Then([&](auto&& full_name) {
>> Then([&](auto&& full_name) {
if (person.has_suffix) {
return full_name + " " + person.suffix();
} else {
Expand All @@ -213,7 +215,7 @@ Working with *asynchronous* code is a little complicated because there might be
```cpp
auto GetBody(const std::string& uri) {
return http::Get(uri)
| Then([](auto&& response) {
>> Then([](auto&& response) {
return If(response.code == 200)
.then(Just(response.body))
.otherwise(Raise("HTTP GET failed w/ code " + std::to_string(response.code)));
Expand All @@ -226,7 +228,7 @@ But as we already saw `If()` is not _only_ useful for errors; it can also be use
```cpp
auto GetOrRedirect(const std::string& uri, const std::string& redirect_uri) {
return http::Get(uri)
| Then([redirect_uri](auto&& response) {
>> Then([redirect_uri](auto&& response) {
// Redirect if 'Service Unavailable'.
return If(response.code == 503)
.then(http::get(redirect_uri))
Expand All @@ -244,11 +246,11 @@ Synchronization is just as necessary with asynchronous code as with synchronous
Lock lock;

AsynchronousFunction()
| Acquire(&lock)
| Then([](auto&& result) {
>> Acquire(&lock)
>> Then([](auto&& result) {
// Protected by 'lock' ...
})
| Release(&lock);
>> Release(&lock);
```
This is often used when capturing `this` to use as part of some asynchronous computation. To simplify this common pattern you can extend your classes with `Synchronizable` and then use `Synchronized()`:
Expand Down Expand Up @@ -282,14 +284,14 @@ class SomeAggregateSystem : public Synchronizable {
return cooling_subsystem_initialized_
&& safety_subsystem_initialized_;
})
| Then([](auto&& result) {
>> Then([](auto&& result) {
// ...
}));
}

auto InitializeCoolingSubsystem() {
return CoolingSubsystemInitialization()
| Synchronized(
>> Synchronized(
Then([this]() {
cooling_subsystem_initialized_ = true;
initialization_.Notify();
Expand Down Expand Up @@ -319,7 +321,7 @@ You can compose a `Task::Of` just like any other eventual as well:

```cpp
auto e = Task::Of<int>([]() { return Asynchronous(); })
| Then([](int i) {
>> Then([](int i) {
return stringify(i);
});
```
Expand Down Expand Up @@ -362,7 +364,7 @@ class DerivedAsynchronous : public Base {
Task::Of<std::string> Method() override {
return []() {
return AsynchronousFunction()
| Then([](bool condition) -> Expected::Of<std::string> {
>> Then([](bool condition) -> Expected::Of<std::string> {
if (condition) {
return Expected("success");
} else {
Expand Down Expand Up @@ -494,7 +496,7 @@ Stream<int>()
ended(k);
}
})
| Loop<int>()
>> Loop<int>()
.context(0)
.body([](auto& sum, auto& stream, auto&& value) {
sum += value;
Expand All @@ -519,10 +521,10 @@ Often times you'll want to perform some transformations on your stream. You can
```cpp
Iterate({1, 2, 3, 4, 5})
| Map([](int i) {
>> Map([](int i) {
return i + 1;
})
| Reduce(
>> Reduce(
/* sum = */ 0,
[](auto& sum) {
return Then([&](auto&& value) {
Expand All @@ -538,8 +540,8 @@ Sometimes you'll have an infinite stream. You can loop over it infinitely by usi

```cpp
SomeInfiniteStream()
| Map([](auto&& i) { return Foo(i); })
| Loop(); // Infinitely loop.
>> Map([](auto&& i) { return Foo(i); })
>> Loop(); // Infinitely loop.
```
### `http`
Expand All @@ -550,7 +552,7 @@ An HTTP `GET`:
```cpp
http::Get("http://example.com") // Use 'https://' for TLS/SSL.
| Then([](http::Response&& response) {
>> Then([](http::Response&& response) {
// ...
});
```
Expand All @@ -561,7 +563,7 @@ An HTTP `POST`:
http::Post(
"https://jsonplaceholder.typicode.com/posts",
{{"first", "emily"}, {"last", "schneider"}})
| Then([](auto&& response) {
>> Then([](auto&& response) {
// ...
});
```
Expand All @@ -576,7 +578,7 @@ http::Client client = http::Client::Builder()
client.Post(
"https://jsonplaceholder.typicode.com/posts",
{{"first", "emily"}, {"last", "schneider"}})
| Then([](auto&& response) {
>> Then([](auto&& response) {
// ...
});
```
Expand All @@ -591,7 +593,7 @@ client.Do(
.header("key", "value")
.header("another", "example")
.Build())
| Then([](auto&& response) {
>> Then([](auto&& response) {
// ...
});
```
Expand All @@ -605,7 +607,7 @@ client.Do(
.method(http::GET)
.verify_peer(true) // Overrides client!
.Build())
| Then([](auto&& response) {
>> Then([](auto&& response) {
// ...
});
```
Expand All @@ -629,7 +631,7 @@ http::Client client = http::Client::Builder()
.Build();

client.Get("https://3rdparty.dev")
| Then([](auto&& response) {
>> Then([](auto&& response) {
// ...
});
```
Expand All @@ -650,7 +652,7 @@ client.Do(
.method(http::GET)
.certificate(*certificate)
.Build())
| Then([](auto&& response) {
>> Then([](auto&& response) {
// ...
});
```
Expand Down
14 changes: 0 additions & 14 deletions eventuals/compose.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,6 @@ struct Composed final {

////////////////////////////////////////////////////////////////////////

template <
typename Left,
typename Right,
std::enable_if_t<
std::conjunction_v<
HasValueFrom<Left>,
HasValueFrom<Right>>,
int> = 0>
[[nodiscard]] auto operator|(Left left, Right right) {
return Composed<Left, Right>{std::move(left), std::move(right)};
}

////////////////////////////////////////////////////////////////////////

template <
typename Left,
typename Right,
Expand Down
10 changes: 5 additions & 5 deletions eventuals/concurrent-ordered.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,23 +247,23 @@ template <typename F>
return Map([i = 1](auto&& value) mutable {
return std::make_tuple(i++, std::forward<decltype(value)>(value));
})
| Concurrent([f = std::move(f)]() {
>> Concurrent([f = std::move(f)]() {
return FlatMap([&f, j = 1](auto&& tuple) mutable {
j = std::get<0>(tuple);
return Iterate({std::move(std::get<1>(tuple))})
| f()
| Map([j](auto&& value) {
>> f()
>> Map([j](auto&& value) {
return std::make_tuple(j, std::move(value));
})
// A special 'ConcurrentOrderedAdaptor()' allows us to handle
// the case when 'f()' has ended so we can propagate down to
// 'ReorderAdaptor()' that all elements for the 'i'th tranche
// of values has been emitted.
| ConcurrentOrderedAdaptor();
>> ConcurrentOrderedAdaptor();
});
})
// Handles the reordering of values by the propagated indexes.
| ReorderAdaptor();
>> ReorderAdaptor();
}

/////////////////////////////////////////////////////////////////////
Expand Down
34 changes: 17 additions & 17 deletions eventuals/concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ struct _Concurrent final {
notify_done_();
}
}))
| Terminal();
>> Terminal();
}

// Returns an eventual which will wait for the upstream stream to
Expand All @@ -358,10 +358,10 @@ struct _Concurrent final {
|| !fibers_done_;
};
}))
| Then([callback = std::move(callback)]() mutable {
>> Then([callback = std::move(callback)]() mutable {
callback();
})
| Terminal();
>> Terminal();
}

// Returns an eventual which handles when "downstream" requests
Expand Down Expand Up @@ -398,7 +398,7 @@ struct _Concurrent final {
// like function that you can call _before_ calling
// 'Done()' on the 'Concurrent()'.
}))
| Terminal();
>> Terminal();
}

// Head of linked list of fibers.
Expand Down Expand Up @@ -456,14 +456,14 @@ struct _Concurrent final {
// down below even though we know we only have a
// single 'arg' to iterate from the top.
Iterate({std::move(arg)})
| f_())
| Synchronized(Map([this](auto&& value) {
>> f_())
>> Synchronized(Map([this](auto&& value) {
values_.push_back(std::forward<decltype(value)>(value));
notify_egress_();
}))
| Loop()
| FiberEpilogue(fiber)
| Terminal();
>> Loop()
>> FiberEpilogue(fiber)
>> Terminal();
}

// Returns an upcasted 'TypeErasedFiber' from our typeful 'Fiber'.
Expand Down Expand Up @@ -498,7 +498,7 @@ struct _Concurrent final {
[[nodiscard]] auto Ingress() {
return Map(Let([this](Arg_& arg) {
return CreateOrReuseFiber()
| Then([&](TypeErasedFiber* fiber) {
>> Then([&](TypeErasedFiber* fiber) {
// A nullptr indicates that we should tell
// upstream we're "done" because something
// failed or an interrupt was received.
Expand All @@ -511,10 +511,10 @@ struct _Concurrent final {
return done;
});
}))
| Until([](bool done) { return done; })
| Loop() // Eagerly try to get next value to run concurrently!
| IngressEpilogue()
| Terminal();
>> Until([](bool done) { return done; })
>> Loop() // Eagerly try to get next value to run concurrently!
>> IngressEpilogue()
>> Terminal();
}

// Returns an eventual which implements the logic for handling
Expand All @@ -534,7 +534,7 @@ struct _Concurrent final {
// Need to check for an exception _before_
// 'Until()' because we have no way of hooking
// into "ended" after 'Until()'.
| Map([this]() {
>> Map([this]() {
return Eventual<std::optional<Value_>>()
.start([this](auto& k) {
if (exception_ && upstream_done_ && fibers_done_) {
Expand All @@ -556,10 +556,10 @@ struct _Concurrent final {
}
});
}))
| Until([](std::optional<Value_>& value) {
>> Until([](std::optional<Value_>& value) {
return !value;
})
| Map([](std::optional<Value_>&& value) {
>> Map([](std::optional<Value_>&& value) {
CHECK(value);
return std::move(*value);
});
Expand Down
Loading

0 comments on commit cba2c46

Please sign in to comment.