-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add multiple beat.Clients #37657
Add multiple beat.Clients #37657
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
❕ Build Aborted
Expand to view the summary
Build stats
🤖 GitHub commentsExpand to view the GitHub comments
To re-run your PR in the CI, just comment with:
|
💚 Build Succeeded
Expand to view the summary
Build stats
❕ Flaky test reportNo test was executed to be analysed. 🤖 GitHub commentsExpand to view the GitHub comments
To re-run your PR in the CI, just comment with:
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
💔 Build Failed
Failed CI Stepscc @kcreddy |
💔 Build Failed
Failed CI Stepscc @kcreddy |
💔 Build Failed
Failed CI Stepscc @kcreddy |
💚 Build Succeeded
cc @kcreddy |
💚 Build Succeeded
cc @kcreddy |
💚 Build Succeeded
cc @kcreddy |
Pinging @elastic/security-service-integrations (Team:Security-Service Integrations) |
// The input gets blocked until flush.min_events or flush.timeout is reached. | ||
// Hence max_outstanding_message has to be atleast flush.min_events to avoid this blockage. | ||
c.Subscription.MaxOutstandingMessages = 1600 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunate that there is not a globally visible variable/constant that could be used for this that would ensure that this is both explained in code an robust to source mutation.
Is there a way for the input to know flush.min_events
and adjust this to be max(flush.min_events, max_outstanding_messages)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the queue config is defined inside publisher, I don't see any quick way to get this check without a considerable refactor of Input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. It was a wistful desire rather than an expectation of change.
// It is not increased too high to cause high CPU usage. | ||
c.Subscription.NumGoroutines = 2 | ||
// The input gets blocked until flush.min_events or flush.timeout is reached. | ||
// Hence max_outstanding_message has to be atleast flush.min_events to avoid this blockage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Hence max_outstanding_message has to be atleast flush.min_events to avoid this blockage. | |
// Hence max_outstanding_message has to be at least flush.min_events to avoid this blockage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated as suggested
This pull request is now in conflicts. Could you fix it? 🙏
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code changes in this PR create N pub/sub readers based on the subscription.num_goroutines
value. This overloads the meaning of this config option. It would mean that subscription.num_goroutines * subscription.num_goroutines
(i.e squared) goroutines are created.
I wonder if the same gains could be achieved by increasing the receive settings in a manner that is the same as running N readers. The pub/sub client library appears to be built to handle this parallelism for us.
For example, settings equivalent to what you were using with these code changes would be
# Multiply everything by the number of pub/sub clients you were previously creating:
sub.ReceiveSettings.NumGoroutines = in.Subscription.NumGoroutines * in.Subscription.NumGoroutines
sub.ReceiveSettings.MaxOutstandingMessages = in.Subscription.MaxOutstandingMessages * in.Subscription.NumGoroutines
Basically, the question is whether the same gains can be achieve just by config changes.
filebeat-8.13.1
is the latest beats version used as a baseline, andfilebeat-8.14.0
is the new code containing multiple beat.Clients.
IIUC this introduces more changes into the test than is necessary. For a controlled experiment I would have expected the baseline to have been performed on some commit (e.g. v8.13.1), and then the only changes introduced for the experiment are the ones under test. It sounds like the experiment was performed with all changes between v8.13.1..v8.14.0 plus what's in this PR. (for future reference)
Increasing
max_outstanding_messages
to atleastflush.min_events
is suggested to overcome this.
Very good. This is an important improvement to the defaults. I think it would be valuable to mention about this in the pub/sub input docs.
Add multiple beat.Clients
I don't see the multiple beat.Client
s in the code. It still only has one AFAICT. I compared the mutux profiles you gave me
Before (8.13.1.g2m2000r30s.pprof.filebeat.contentions.delay.001.pb)
After (8.14.0.g2m2000r30s.pprof.filebeat.contentions.delay.001.pb)
In both before and after there appears to be some lock contention (and delay) caused by the Publish
lock. Like roughly 5s delay in the 30s profile.
}() | ||
if err != nil { | ||
in.log.Errorw("failed to create pub/sub client: ", "error", err) | ||
cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this should be calling return
after cancel()
?
var workerWg sync.WaitGroup | ||
|
||
for ctx.Err() == nil { | ||
workers, err := in.workerSem.AcquireContext(numGoRoutines, ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should proceed the approach of adding multiple pub/sub readers (pending resolution to previous comments), but if we do, then:
I think this can be simplified to not use a semaphore. A plain for loop (e.g. for i := 0; i < number of readers; i++ { go runSinglePubSubClient() }
) seems to achieve the same result given that if any one "worker" fails they all stop due to cancel()
.
This pull request is now in conflicts. Could you fix it? 🙏
|
Closing the PR as per the comments: #37657 (review). This PR aims at creating multiple pubsub clients and not the beat pipeline clients. Although there is performance benefits observed, it is only because of the additional go_routines being created underneath. This is verified by increasing existing config options alone and we don't require additional pubsub clients to get this performance benefit. The input blockage issue is resolved by increasing the default value of |
Proposed commit message
Improve GCP PubSub input performance by creating multiple beat.Clients in a pool for each pubsub input instance. Each client creates its own subscription to the topic. Input being blocked due to
max_outstanding_message
<flush.min_events
is fixed by increasing defaultmax_outstanding_message
to atleast default value offlush.min_events
. Increased defaultgo_routines
to2
to improve ingestion performance but not too high to cause high CPU usage.Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.How to test this PR locally
Related issues
Logs
Performance measurement
The tests are executed by creating a GCP PubSub subscription. The output section is configured with
file
output.filebeat-8.13.1
is the latest beats version used as a baseline, andfilebeat-8.14.0
is the new code containing multiple beat.Clients.Acronyms used:
2 types of measurements are taken.
1. Based on generated output events (approximation)
This measurement comes from number of output messages inside the files generated from
file output
.g1m1000r1m
g1m1000r2m
g2m1000r1m
g2m1000r2m
g1m1000r1m-qft30
g1m1600r1m
g1m2000r30s
g1m3000r30s
g2m2000r30s
g2m2000r2m
g3m1000r2m
g3m2000r30s
g4m2000r30s
g5m2000r30s
2. Based on values inside "Total Metrics" log message.
This log/metrics message appears on stopping filebeat. These are accurate compared to file output counts and they match against GCP PubSub
Acked Messages
metric.g1m2000r30s
g2m2000r30s
g3m2000r30s
g4m2000r30s
g5m2000r30s
Results:
8.13.1
, comparingg1m1000r1m
andg1m1000r1m-qft30s
, shows that whenmax_outstanding_messages
set to1000
i.e., less than defaultflush.min_events
, there is a clear indication of input being blocked. It is similar to AWS SQS issue .max_outstanding_messages
to atleastflush.min_events
is suggested to overcome this. Doing so, the performance improvement is evident ing1m1600r1m
. Subsequent increase tomax_outstanding_messages
to 2000 ing1m2000r30s
and 3000 ing1m3000r30s
didn't improve the ingestion performance as theflush.min_events
is still set at1600
.filebeat-8.13.1-g1m2000r30s
between messages in file output (90k
) vsTotal Metrics
log message (39k
). AlthoughTotal Metrics
seems to be accurate, this huge difference is unaccounted.8.13.1
and8.14.0
versions, but the overall ingestion rates are much higher in8.14.0
than its corresponding run in8.13.0
version.Mutex profiles:
8.13.1.g3m2000r30s.pprof.filebeat.contentions.delay.001.pb.gz
8.14.0.g3m2000r30s.pprof.filebeat.contentions.delay.001.pb.gz