-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support concurrent exports #781
Conversation
Thanks for the feedback:+1:, I agree with the approach in principle. We interpreted the spec as BatchSpanProcessor must finish sending one batch span before starting another. My concern is, like others pointed out in the spec issue, without a bound it can cause an OOM. We can add a parameter to config the number of concurrent sending tasks. If the parameter is set to 1 then disable the concurrent sending. Another issue is based on the current model, the Jaeger is a little complex. I think the &mut borrow is caused by PS. For some reason, your link to the spec issue is not working. open-telemetry/opentelemetry-specification#2434. |
Thanks for the quick reply!
Makes sense; this would be fairly straightforward to add.
The
My 2c: in the non-concurrent model it meant starting a request up to the batch size. In the concurrent model, it's still possible to have pending items in queue up to the timeout; to me it still makes sense to consider flushing the start of an export.
That's right. The generated thrift agent has some mutable methods for working with the io streams and some sort of frame counter. It's by no means untenable but certainly more work than the other exports (hence the draft for discussion).
Fixed, thank you. Would it be premature for me to continue with the suggested fixes? |
@jwilm these all sound like good improvements to me 👍 |
I think we can implement the proposed changes. One thing is your branch seems stale and I'd suggest rebasing on the latest to include some critical changes like we split |
3045d96
to
d18c5c5
Compare
I rebased the existing work on main, and I'm starting on the proposed changes (will track on OP). In the mean time, I would appreciate some input on the Jaeger exporter. I noticed that stackdriver exporter spawns a separate task for the actual export work, and the |
I believe all the requested changes have been implemented, save for the Jaeger exporter. Considering the implementation path there, I think a task queue system could be utilized where the Jaeger exporter will only export a single batch concurrently (regardless of BSP settings). This seems like the path of least resistance to landing this PR, but it also means that the Jaeger exporter doesn't actually gain any concurrency. Perhaps this is OK for now and Jaeger concurrency can be a future enhancement? Separately, I took a closer look at the stackdriver exporter, and I think it could easily take advantage of the BSP concurrency management instead of rolling its own. Let me know if you'd like that done. Waiting on your feedback before proceeding. Thanks! |
385caa2
to
6f75887
Compare
|
||
Ok(builder.build()) | ||
} | ||
// pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These build simple / sync methods are the last blocker for the jaeger implementation. In order to support the non-pipelined I/O resources like the Agent uploaders, the Exporter and Uploaders were decoupled via a channel, and the uploaders now run on a dedicated task. However, this meant that Exporter::new
has to return a task to spawn as well, similar to the stackdriver exporter. This was problematic with this build_simple
, etc. methods since there's no runtime available on which to spawn that task.
I'll need feedback on whether we can simply thread the Runtime through here and whether this design is acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the commit to thread the runtime through, but I don't have enough context in the project to know if this is the right approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the hard work! Sorry didn't get much time yesterday to take a look. Will try to take a look tomorrow or over the weekend. We do want to keep install_simple
without runtime parameter just so that you can work with opentelemetry without a runtime. But I need a closer look to see it's possible to do it under the new exporter APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just that method in particular, or is the goal that the opentelemetry library can be used without a runtime? If it's just that method, we can probably figure something out using the cfg(feature) attrs internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general we want to allow people to use it without a runtime. As a util library, we shouldn't assume the runtime and deny certain use cases.
2d1df86
to
0617ca8
Compare
Codecov Report
@@ Coverage Diff @@
## main #781 +/- ##
=======================================
- Coverage 70.2% 69.8% -0.5%
=======================================
Files 109 109
Lines 8963 9045 +82
=======================================
+ Hits 6293 6314 +21
- Misses 2670 2731 +61
Continue to review full report at Codecov.
|
I think the easiest way is to wrap the uploader using mutex? I think the ideal solution is to replace the
I think it makes sense. Curious what's your thoughts @djc. Also, we can open a separate PR to address this. |
Sounds good, are you actually going to be using stackdriver or are you just cleaning stuff up here? :) |
I considered that, but it has some downsides including loss of ordering of uploads. The task queue seemed cleaner, but it did necessitate spawning a future somehow. I suppose another option is spawning a thread for the non-async/await uploader(s).
Would it be possible to expose two different exporters from Jaeger? This might open up the design space a bit more.
Just an offer to clean up :) |
I think if we allow concurrent exporting. We won't be able to guarantee the strict order of spans. i. e multiple requests in flight could finish in arbitrary order. Need to double-check how Jaeger or other backend handle this but I don't think it is a no go. I will report back what I found
Yeah I think although it is against the best practice when working with async runtime we can try this. |
Would be great, thanks! |
Tested around Jaeger and it seems OK for us in ingest spans out of order. I am OK if we spawn a thread or use a mutex now. As long as we don't make an API change we can work on improving the internals in the following PR.
I think it's possible but probably will be better to address it in a different PR :) Thanks again for the wonderful work |
I think there's a path to keeping the task based approach if the only blocker is the install_simple method not taking a runtime. Given that the core library requires some sort of async runtime, we could potentially detect that based on cargo features and insert the runtime internally rather than requiring it as an argument in the public API. What do you think? |
opentelemetry-zipkin/Cargo.toml
Outdated
@@ -38,6 +38,7 @@ http = "0.2" | |||
reqwest = { version = "0.11", optional = true, default-features = false } | |||
surf = { version = "2.0", optional = true, default-features = false } | |||
thiserror = { version = "1.0"} | |||
futures = "0.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we save on dependencies by using futures-util (or even futures-core) for most of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the exception of the current Jaeger exporter implementation, we should be able to get away with using only futures-util. I'll make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, and maybe in some cases even futures-core (at least for Zipkin)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in the latest commit.
Applications generating significant span volume can end up dropping data due to the synchronous export step. According to the opentelemetry spec, This function will never be called concurrently for the same exporter instance. It can be called again only after the current call returns. However, it does not place a restriction on concurrent I/O or anything of that nature. There is an [ongoing discussion] about tweaking the language to make this more clear. With that in mind, this commit makes the exporters return a future that can be spawned concurrently. Unfortunately, this means that the `export()` method can no longer be async while taking &mut self. The latter is desirable to enforce the no concurrent calls line of the spec, so the choice is made here to return a future instead with the lifetime decoupled from self. This resulted in a bit of additional verbosity, but for the most part the async code can still be shoved into an async fn for the ergonomics. The main exception to this is the `jaeger` exporter which internally requires a bunch of mutable references. I plan to discuss with the opentelemetry team the overall goal of this PR and get buy-in before making more invasive changes to support this in the jaeger exporter. [ongoing discussion]: open-telemetry/opentelemetry-specification#2434
Prior, export tasks were run in "fire and forget" mode with runtime::spawn. SpanProcessor now manages tasks directly using FuturesUnordered. This enables limiting overall concurrency (and thus memory footprint). Additionally, flush and shutdown logic now spawn an additional task for any unexported spans and wait on _all_ outstanding tasks to complete before returning.
Users may desire to control the level of export concurrency in the batch span processor. There are two special values: max_concurrent_exports = 0: no bound on concurrency max_concurrent_exports = 1: no concurrency, makes everything synchronous on the messaging task.
Key points - decouple exporter from uploaders via channel and spawned task - some uploaders are a shared I/O resource and cannot be multiplexed - necessitates a task queue - eg, HttpClient will spawn many I/O tasks internally, AgentUploader is a single I/O resource. Different level of abstraction. - Synchronous API not supported without a Runtime argument. I updated the API to thread one through, but maybe this is undesirable. I'm also exploiting the fact in the Actix examples that it uses Tokio under the hood to pass through the Tokio runtime token. - Tests pass save for a couple of flakey environment ones which is likely a race condition.
This should be ready for another review. The Jaeger API is as it was before, and a thread is used internally to handle the task execution. As a future work, I think it would go a long way to have two different exporters for Jaeger: one which does the exports directly with no thread and synchronously, and then the more production oriented asynchronous and concurrent exporter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it looks really great and should be a great improvement 👍 . We can look into Jaeger more and optimize the jaeger exporter in a following up PR.
@@ -2,7 +2,7 @@ | |||
|
|||
set -eu | |||
|
|||
cargo test --all "$@" | |||
cargo test --all "$@" -- --test-threads=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why we need the test thread to be 1 here? Does running them in parallel cause some issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, I noticed there were some flakey tests due to modifying the environment and colliding with each other.
scripts/lint.sh
Outdated
cargo_feature opentelemetry-jaeger "wasm_collector_client" | ||
cargo_feature opentelemetry-jaeger "collector_client, wasm_collector_client" | ||
cargo_feature opentelemetry-jaeger "default" | ||
cargo_feature opentelemetry-jaeger "full" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice to have the full
feature, but we probably want to keep those separate tests to make sure those feature can also work on their own
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work 👍 Just one nit and fixing the lint/test I think we are good to merge
The minimal necessary futures library (core, util, futures proper) is now used in all packages touched by the concurrent exporters work.
To keep the API _actually_ simple, we now leverage a thread to run the jaeger exporter internals.
Per PR feedback, the default should match the previous behavior of 1 batch at a time.
This finishes the remaining TODOs on the concurrent-exports branch. The major change included here adds shutdown functionality to the jaeger exporter which ensures the exporter has finished its tasks before exiting.
This was erroneously committed.
OTEL_BSP_MAX_CONCURRENT_EXPORTS may now be specified in the environment to configure the number of max concurrent exports. This configurable now has parity with the other options of the span_processor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🎉
I don't have bandwidth to review this in detail, sorry. |
Anything I can do to help land this? |
🎉 thanks for the merge! Are there plans to publish a new release to crates.io anytime soon? We would love to get off our git dependencies. |
I hope we can release a new version of 0.18 sometime soon. You can track the process in #779 |
Problem
Applications generating significant span volume can end up dropping data due to the synchronous export step. According to the opentelemetry spec,
However, it does not place a restriction on concurrent I/O or anything of that nature. There is an ongoing discussion about tweaking the language to make this more clear, and it seems there is consensus on concurrent transmissions being OK with the spec.
Approach
With that in mind, this commit makes the exporters return a future that can be spawned concurrently. Unfortunately, this means that the
export()
method can no longer be async while taking &mut self. The latter is desirable to enforce the no concurrent calls line of the spec, so the choice is made here to return a future instead with the lifetime decoupled from self. This resulted in a bit of additional verbosity, but for the most part the async code can still be shoved into an async fn for the ergonomics.The jaeger exporter is left untouched for now as the changes here might take a couple of different approaches (decouple exporter via a channel, or a bigger refactor to fix the &mut borrows deep in the exporter logic).
Discussion
I'm opening this PR as a draft to start some discussion with the OpenTelemetry-Rust maintainers. To kick this off, I have results to share from a particularly busy application of ours which has been dropping tons of spans due to reaching queue capacity:
This is a 2 day period. To the left, we have millions of spans dropped because the queue was not being flushed fast enough. To the right, there are nearly 0 occurrences of dropped spans due to this issue.
For us, this is a massive improvement.
A couple of questions for the maintainers:
Remaining Work
shutdown()
export_concurrency
export_concurrency
export_concurrency: 1
force_flush