-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix periodic reader to trigger first exporter at the interval #1766
Fix periodic reader to trigger first exporter at the interval #1766
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1766 +/- ##
=======================================
+ Coverage 71.0% 71.6% +0.5%
=======================================
Files 135 136 +1
Lines 20751 20846 +95
=======================================
+ Hits 14746 14935 +189
+ Misses 6005 5911 -94 ☔ View full report in Codecov by Sentry. |
If we skip one export before this change. Shouldn't we see one export vs two exports? |
Correct. We now only see one export. Without this, there would be two exports! (You can use Collector to verify, or the unit tests) |
To make it more clear: With this change: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm I get the change diff upside down. Good find!
If anyone is curious, this is how I have been testing: use futures_util::{
stream::{self, FusedStream},
StreamExt,
};
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::runtime::Runtime;
use std::error::Error;
use std::time::{Duration, SystemTime};
#[derive(Debug)]
enum MyMessage {
Frequent,
Rare,
}
struct Poller;
impl Poller {
async fn run(mut messages: impl Unpin + FusedStream<Item = MyMessage>) {
while let Some(message) = messages.next().await {
println!(
"Received message: {:?} time: {:?}",
message,
SystemTime::now()
);
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let rt = runtime::Tokio;
let stream1 = rt
.interval(tokio::time::Duration::from_secs(15))
// .skip(1)
.map(|_| MyMessage::Frequent);
let stream2 = rt
.interval(tokio::time::Duration::from_secs(1500))
// .skip(1)
.map(|_| MyMessage::Rare);
println!(
"Finished building stream1 and stream2 {:?}",
SystemTime::now()
);
let messages = Box::pin(stream::select(stream1, stream2));
rt.spawn(Box::pin(Poller::run(messages)));
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
}
} |
Fixes #1559
PeriodicReader
makes the first export right away, instead of waiting for the interval. This is mostly unobservable/no-impact in real systems, but this is causing instability in the tests, as tests rely on force_flush() and expect results from a single export, not the result from multiple exports. With this change, the tests, when all were run together locally, passes 100% of the time. Before this PR, almost every time, there is at least 1-3 failures. Naturally, the CI flakiness should be gone as well.Separately, we should be able to leverage
ManualReader
for the tests to get predictable behavior and avoid needingtokio::test
. Will fix it later, as that requires larger refactoring.(This issue is visible, if running the opentelemetry-otlp examples too - instead of seeing one metric, resulting from the drop triggered shutdown, one would have seen two metrics without this fix. Also added windows specific instructions to test this in OTLP+ Collector)