-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improved and batched logs translation #2694
Improved and batched logs translation #2694
Conversation
815cbed
to
7f2e977
Compare
7f2e977
to
1397787
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks great - a few comments & questions though
@pmalek-sumo @djaglowski I haven't looked into this PR details so my comment may be wrong, but if I understand correctly this PR attempts to group logs after they are produced by github.com/open-telemetry/opentelemetry-log-collection Can we instead change how they are produced? Presumably we collect a batch or log lines at once from each file that we tail. Can we send the batch as it is collected from the file instead of sending one log line at a time and then trying to batch later? The benefits of the suggested approach would be:
|
@tigrannajaryan I think your suggestion may be a good idea to explore, but it would be a fairly substantial change.
This presumption is not accurate. Logs "lines" are processed one at a time, with the exception being for multiline logs which are treated as one log entry. When it comes down to it, most operations supported by opentelemetry-log-collection must be performed on each log entry individually, and each subsequent operation is basically just a function on a call stack, so batching doesn't provide a lot of additional performance in many cases. You certainly have a good point in regards to the value that would be provided by keeping related entries together to avoid having to reaggregate them later. We can probably do this, but there are a lot of implications to think through and I don't believe it is a good next step. @pmalek-sumo's change does appear to have performance benefits in its own right, so I suggest we should try to adopt the changes at this level and remove them later if we can make them redundant. |
That could be an improvement (taking into account @djaglowski comments though) but this PR tries to batch logs per resource which as I understand the concepts used in OTC doesn't translate 1:1 into a tailed file, correct? Hence the batching done with this approach is a bit different than what you're proposing @tigrannajaryan. Please correct me if I'm wrong. |
Got it.
Correct. I was suggesting that a batch log lines that is read from a file at once is split, parsed and delivered by log-collection library as a slice of entry.LogEntry at once, instead of delivering them one by one. Such a slice of LogEntrys would still be one pdata.Logs and would be emitted by the receiver using one ConsumeLogs call. However, you are right that these LogEntrys do not necessarily have the same Resource, so that's a different way to group them. Grouping by the Resource is a different approach and it is more expensive. I would want to know what the performance impact of this approach is. The serialization of the Resource, lookup in the map have costs, I am curious how big it is. Alternatively, we can simply group subsequent LogEntrys which have the same Resource, without reordering and grouping. That's less expensive to do and may yield similar final results when reading at high speed from a small set of files. |
One more thing: what is the benefit of receiving the logs grouped by Resource downstream? Any particular optimizations in terms of performance can be/are done in downstream consumers in such a scenario? |
1386330
to
d813232
Compare
4053455
to
e59915f
Compare
@pmalek-sumo Any idea why CI is hanging on this PR? Since the main purpose of this PR is to increase performance, can you please post before/after results for related testbed scenarios? |
Not sure what's up with that. The workflow seems to be failing not hanging: https://app.circleci.com/pipelines/github/SumoLogic/opentelemetry-collector-contrib/1582/workflows/376f9b5f-6477-4f6e-bc77-2547379d66f3 The tests that seem to cause issues are windows tests https://app.circleci.com/pipelines/github/SumoLogic/opentelemetry-collector-contrib/1582/workflows/376f9b5f-6477-4f6e-bc77-2547379d66f3/jobs/13227
Any guidelines on how to perform the tests? Particular config that should be used? |
Ok I guess I've figured it out. baseline: 3bdf190
this PR at e59915f first run:
second run:
|
@pmalek-sumo Thanks for posting testbed results. I've captured the flaky test issue here. I suppose we need to wait until it's resolved or reverted: #2792 |
497c19a
to
6e806f3
Compare
@djaglowski I've rebased this PR on top of Please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
internal/stanza/config.go
Outdated
MaxFlushCount uint `mapstructure:"max_flush_count"` | ||
FlushInterval time.Duration `mapstructure:"flush_interval"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments to explain these settings and/or add description in README.md.
c.dataMutex.RUnlock() | ||
if count > 0 { | ||
pLogs := c.convertBuffer() | ||
go c.queueForFlush(pLogs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not clear why queueForFlush needs to be in a separate goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was my attempt to not introduce backpressure for the caller of Batch()
.
We might take a different approach introducing configurable amount of workers that would consume consume the logs queued for flushing in a round robin fashion but this would introduce backpressure to the caller whenever flushing stalls.
c.stopOnce.Do(func() { | ||
close(c.stopChan) | ||
c.wg.Wait() | ||
close(c.pLogsChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to close this channel? How are external readers expected to drain any items pending in pLogsChan? You can't drain until you call Stop because more data may be arriving and you can't drain after you call Stop since the channel is now closed. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current implementation pLogsChan
is unbuffered so there won't be any logs to flush from it.
My reasoning here was as follows:
- we close the
stopChan
which signals with internal goroutines to return - we wait for them to return (to prevent producing dangling goroutines)
- we close the
pLogsChan
to signal the consumers that they can stop consuming (gracefully stopping downstream consumers/pipelines)
If I missed anything or there are reasons to introduce buffering to the channel please let me know.
6e806f3
to
45cfdf4
Compare
Thanks for your comments @tigrannajaryan ! I've addressed most of them I believe. I can add more comments on the design if needed. There's one more thing I believe I'll need to address which is the locking around checking if the flush is needed due to reached max count and the conversion and flush itself. With the current implementation because of goroutines scheduling there might be a couple of log entries more than max count set. |
45cfdf4
to
9e0bb57
Compare
@pmalek-sumo at one point you may want to move this to your repo :))) Probably when all comments are closed |
I've pushed this to my personal fork and created a PR that supersedes this: #2892 CC: @tigrannajaryan |
…2694) Signed-off-by: Bogdan Drutu <[email protected]>
Description: Introduce an aggregation layer to internal/stanza that translates entry.Entry into pdata.Logs aggregating logs coming from the same Resource into one entry.
Link to tracking Issue: #2330
Testing: unit tests added