-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[processor/deltatocumulative]: expire stale series #31337
[processor/deltatocumulative]: expire stale series #31337
Conversation
uses the staleness tracking map. for now does not actually expire entries, until lifecycle management is in place
adds mutex-sync to the staleness struct. this is especially important for item expiration, which modifies the pq and map serially. intermittent changes by other routines would lead to data inconsistencies here.
@RichieSams please take a look at 7745b7e, had to add mutex-sync to your staleness code. |
converts from Start() methods to context based cancellation
IMO the mutex should be outside the structure. Taking a lock for each and every operation will get expensive fast, for the amount of data entrypoints that the processors will handle. IE, the user should take a lock once, do all the operations, and then release the lock. |
I do like your decision to pull the |
@RichieSams I agree. The reason for the extensive locking here is to synchronize with the out-of-band "pruning" routine, which needs to have exclusive ownership while it runs. I do have a new idea however how to separate those, will try it out 👍 |
I need to update my PR with the new Stateness structs. But in my initial implementation, I just had a mutex in the Processor. ConsumeMetrics() takes it at the start. https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/30827/files#diff-677a13ed5d6a215261ff4e6a8a715738563e28501eb80ac0691352e1f8f316ebR31 You could either call |
@RichieSams separated things here. some notes on my design:
In regard to locking, my current architecture takes global locks anyways. My approach is to first come up with a safe, correct but possibly slow implementation and optimize later, if required. |
@RichieSams @djaglowski @jpkrohling please take a look :) |
processor/deltatocumulativeprocessor/internal/streams/expiry.go
Outdated
Show resolved
Hide resolved
Moves all locking to the upper level, resulting a much clearer overall picture. Refactors delta.Accumulator to the streams.Map interface, allowing higher composability
@RichieSams that was really good feedback! I moved locking centrally into the highest level processor. Stale expiry is also called from there, leading to a very clear locking flow without hidden complexity. Also refactored the |
Tests expiry functionality under real conditions, using a faked clock
@RichieSams @djaglowski I think this is ready for a final review. Added docs and a thorough test-case. |
Code LGTM |
I guess the new release caused conflicts with this one. Once the conflicts are resolved, I'll merge this. |
for { | ||
if s.Len() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
case n == 0: | ||
<-e.sig | ||
case n > 0: | ||
expires := e.Staleness.Next().Add(e.Max) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has the potential of not being protected by the lock. Since you release the lock on return of this function, but this is a go-routine.
IMO, this whole struct could go away, and the Processor could keep Staleness
structs at the top level instead of maps. While "elegant", I'm not sure the elegance of sleeping "exactly" to the next expiry outweighs the potential bugs like this race, and the extra logic you had to add to Store() to make sure you can preemptively wake things up.
The handling of n
and calculation of expires
needs to happen outside the goroutine, so it's protected by the lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My vote would be to replace this logic with a simple Ticker() every say 5 minutes. (We could make that a config value if want)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a really good catch and it says quite something about complexity that no one noticed it before.
The issue I see with a timer is that streams possibly live up to 2*max-1
seconds if the timer hits in a bad moment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it would be max+tickerInterval
seconds. IMO in most real-world scenarios max
will be >> the ticker value. IE, max
is 1 hour, and ticker is 5 minutes. So a series lasting 1h 4.9999min isn't the end of the world.
There is precedent for this kind of expiry schema in other metrics tools, like mtail
We could allow the tickerInterval to be configurable. And add config validation that max
> tickerInterval
. With a big doc / warning that says that tickerInterval should be much smaller than max to ensure a bad lineup of the tick doesn't allow series to last too long
|
||
// "try-send" to notify possibly sleeping expiry routine | ||
select { | ||
case e.sig <- struct{}{}: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that for every store, we're potentially starting a new "sleep" go-routine for expiry. Lots of stores quickly in a row could build up many go-routines that are all sleeping. Since each loop of creates a new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure?
afaict there is only one routine (started in Processor.Start). the linked code here is a try-send operation that either sends an empty struct into the channel (if the other routine happens to listen) or just does nothing at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In processor.Start() we launch a goroutine that does this:
for {
p.mtx.Lock()
next := p.exp.ExpireOldEntries()
p.mtx.Unlock()
select {
case <-next:
case <-p.ctx.Done():
return
}
}
ExpireOldEntries()
is overridden above. It starts a goroutine to do the sleep. That's the goroutine I'm talking about.
By sending a struct to the channel, you cause next
to return early. IE, it doesn't wait for the sleep to finish / the channel to be closed. Which is the behaviour you want, but it also abandons that goroutine. The code will immediately loop and call ExpireOldEntries()
again, which spawns a new goroutine. Etc.
Sorry for the delayed final review; we got a puppy this weekend that's been eating all my time 😁 I added some small comments about the expiry and a race condition. Otherwise, LGTM |
) **Description:** Removes stale series from tracking (and thus frees their memory) using staleness logic from open-telemetry#31089 **Link to tracking Issue:** open-telemetry#30705, open-telemetry#31016 **Testing:** `TestExpiry` **Documentation:** README updated
) **Description:** Removes stale series from tracking (and thus frees their memory) using staleness logic from open-telemetry#31089 **Link to tracking Issue:** open-telemetry#30705, open-telemetry#31016 **Testing:** `TestExpiry` **Documentation:** README updated
Description: Removes stale series from tracking (and thus frees their memory) using staleness logic from #31089
Link to tracking Issue: #30705, #31016
Testing:
TestExpiry
Documentation: README updated