Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have close/flush return a Future for callers to wait on #71

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

ramosbugs
Copy link
Contributor

Currently, calling flush triggers an asynchronous flush but doesn't
provide the caller with a mechanism to wait for all queued events to
be flushed.

This PR changes the Client::flush and Client::close interfaces to be
async and fixes a few concurrency issues I ran into while making this
change:

  • Replaces the non-async crossbeam_channel with async_channel.
    The crossbeam_channel crate blocks the current thread and isn't
    safe to use within an async executor. The timeout behavior is preserved
    using future::timeout.
  • Avoids creating a new runtime within each process_work task. Instead,
    tasks are spawned within the Transmission's runtime.
  • Removes the Mutex around the runtime, which causes problems sharing
    the guards across threads and doesn't appear to be needed.
  • When processing the stop event, flushes queued events before breaking
    out of the process_work loop (Queued events are ignored when flushing client #65).

Resolves #66 and fixes #65.

@ramosbugs
Copy link
Contributor Author

Sorry, this turned into a more involved PR than I expected!

@ramosbugs ramosbugs changed the title Have stop/flush return a Future for callers to wait on Have close/flush return a Future for callers to wait on Jan 10, 2021
@nlopes
Copy link
Owner

nlopes commented Jan 12, 2021

Will take a look as soon as I can.

@nlopes
Copy link
Owner

nlopes commented Jan 25, 2021

@ramosbugs apologies for such a long delay. I promise I'll look at this as soon as I'm not swamped. I appreciate the contribution!

@ramosbugs
Copy link
Contributor Author

no worries, thanks!

@nlopes
Copy link
Owner

nlopes commented Feb 5, 2021

Although I didn't have the intention to expose async outside of the client, I kind of like the approach here.
Would you be so kind as to run cargo readme >! README.md and push it here? That would save me an update commit just to sync the README.md with what's now in lib.rs.

@nlopes
Copy link
Owner

nlopes commented Feb 5, 2021

The main thing I'd like to change before I merge (if you're up for it!) is to make async-std and tokio features that can be toggled. Your change here adds async-std on top of tokio which isn't great. See #61 for what I mean.

If you're not up for it (and I totally understand if you aren't), I'll bring it myself in another PR with this one as base. Just let me know what you'd prefer and I'm happy to oblige.

I really appreciate your contribution.

@ramosbugs
Copy link
Contributor Author

Although I didn't have the intention to expose async outside of the client, I kind of like the approach here.
Would you be so kind as to run cargo readme >! README.md and push it here? That would save me an update commit just to sync the README.md with what's now in lib.rs.

done!

The main thing I'd like to change before I merge (if you're up for it!) is to make async-std and tokio features that can be toggled. Your change here adds async-std on top of tokio which isn't great. See #61 for what I mean.

It looks like there are a few complexities around tokio vs. async_std here:

  • Using tokio::time::timeout instead of async_std::future::timeout requires a tokio runtime to avoid a there is no timer running, must be called from the context of Tokio runtime panic. By contrast, async_std::future::timeout seems to work for any runtime.
  • tokio::sync::Mutex looks like a potential alternative to async_std::sync::Mutex; I'm not sure if it also requires a tokio runtime
  • calling Transmission::stop causes the tokio runtime created by Transmission::new_runtime to be dropped. if the caller invokes this from inside another tokio runtime, tokio panics with Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.. this was my original motivation for using async_std::test in the tests and async_std::main in the example. however, it's very easy for users of this crate to accidentally call stop inside of a tokio async context, so a more robust solution is probably needed. maybe we should just pass in a spawn closure instead of creating the runtime inside this crate at all?

@ramosbugs
Copy link
Contributor Author

also, is the objection to adding async_std due to the additional dependency cost, or is it specifically about mixing the tokio and async_std runtimes? it feels like runtime-agnostic primitives like async_std::future::timeout and async_std::sync::Mutex are preferable to runtime-specific ones like tokio::time::timeout. using a spawn closure like I suggested above could potentially remove the tokio dependency altogether

@nlopes
Copy link
Owner

nlopes commented Feb 10, 2021

it feels like runtime-agnostic primitives like async_std::future::timeout and async_std::sync::Mutex are preferable

You hit exactly what my message was intended to deliver and which I failed to deliver 😆
That's basically what I'd want: we either use runtime-agnostic primitives or we put different runtimes behind features.

@ramosbugs
Copy link
Contributor Author

I think given the issues around dropping runtimes at various points in the code, the runtime-agnostic approach would be preferable. I'll try to incorporate that change :-)

@ramosbugs
Copy link
Contributor Author

looks like the async_executors crate provides some nice adapters for different runtimes to be able to leverage the futures::task::Spawn trait. the commit I just pushed takes in an Arc<dyn Spawn> and avoids creating any runtimes directly. I also updated the unit tests to be run easily on all supported runtimes (currently just Tokio).

as reflected in #61, my change isn't sufficient to support runtimes other than Tokio because of Reqwest's dependency on Tokio, but it fixes the task management piece. switching to surf (in a follow-up change) would finish adding support for other runtimes.

there's also a caveat with my latest change: it relies on async_std's unstable feature flag in order to make use of an async condition variable (Condvar). I couldn't find any other suitable async-friendly primitives that wouldn't block the executor thread. note that this has nothing to do with "unstable" Rust and doesn't depend on nightly or anything. I think it just means that Condvar may not respect SemVer, so I pinned to the latest async-std 1.9.0 version. that means clients won't be able to integrate other versions easily, but that restriction can be removed once async-rs/async-std#217 is completed.

ramosbugs added a commit to ramosbugs/opentelemetry-honeycomb-rs that referenced this pull request Feb 11, 2021
@nlopes
Copy link
Owner

nlopes commented Mar 10, 2021

This is looking great. I'm going to do a deeper review this weekend and get this out there most likely.

I can't thank you enough for the contributions and patience on my late reviews.

@@ -58,9 +58,7 @@ impl Default for Options {

/// Client represents an object that can create new builders and events and send them
/// somewhere.
#[derive(Debug, Clone)]
Copy link
Owner

@nlopes nlopes May 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've started going through a closer look and this is probably going to be the biggest change/pain point for consumers of the library.

I experimented with a library beeline-rust which uses this one, providing an actix-web middleware and it requires really drastic changes to cope with this Clone disappearing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for opentelemetry-honeycomb, I was able to solve this by wrapping the client in an Arc+RwLock: https://github.com/ramosbugs/opentelemetry-honeycomb-rs/blob/ca82ecd99bdfcf9357547d6079e31d70a5bfb9b4/src/lib.rs#L168-L176

then sending events just involves acquiring the read lock: https://github.com/ramosbugs/opentelemetry-honeycomb-rs/blob/ca82ecd99bdfcf9357547d6079e31d70a5bfb9b4/src/lib.rs#L332-L335

would that solution work for actix-web, or is there something about actix that makes it more difficult?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would. I'm going to play with it tomorrow am and come back to this. I'm still quite keen on merging this PR.

pub struct TransmissionMock {
started: usize,
stopped: usize,
events_called: usize,
events: Vec<Event>,
events: Mutex<Vec<Event>>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is one where without Clone (and with this Mutex here, we pass on all the complexity to consumers).

I'm not saying its the wrong thing to do but the expectation that we'd manage this on the whole is now void.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still re-familiarizing myself with this PR and digesting the Clone issue for Client, but isn't this particular comment just on the testing mock?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed. I use this in beeline-rust (to test multi threads) https://github.com/nlopes/beeline-rust/blob/master/src/lib.rs#L211

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[feat] Interface for waiting on flush to complete Queued events are ignored when flushing client
2 participants