-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix multiple leaks issues raised from CM #10567
Conversation
Backport to 6.6 |
@@ -718,6 +718,8 @@ func (p *Input) Wait() { | |||
|
|||
// Stop stops all harvesters and then stops the input | |||
func (p *Input) Stop() { | |||
defer logp.Debug("input", "Log input stopped") | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense to keep a trace of that when debugging why events are not going through..
|
||
if t, ok := http.Transport.(ci); ok { | ||
t.CloseIdleConnections() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, an interface cast with one method can also be written like this:
if t, ok := http.Transport.(interface{ CloseIdleConnections() }); ok {
t.CloseIdleConnections()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice trick I didn't know that.
} | ||
|
||
return reflect.DeepEqual(a, b) | ||
return true, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why golint
or go vet
do not complain here, but: return aHash == bHash, nil
.
Still, I'd prefer proper equality checks over a hash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer a hash in the long term, especially if we could diminish the roundtrip of call, I would be OK to implement our own hashing strategies later, I think for know hashstructure's Hash is an OK solution and would take care of a few false positive refresh.
Crap, I commit my code that raised the other issue. 🤦
…On Tue, Feb 5, 2019 at 12:04 PM Steffen Siering ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In libbeat/publisher/pipeline/output.go
<#10567 (comment)>:
> - }
-
- err := w.client.Connect()
- if err != nil {
- logp.Err("Failed to connect to %v: %v", w.client, err)
- reconnectAttempts++
+ var (
+ connected bool
+ reconnectAttempts int
+ )
+
+ for {
+ select {
+ case <-w.done:
+ logp.Info("Closed connection to %v", w.client)
+ return
This looks like a potential race. When calling select the runtime chooses
the channel order by random. That is we can have this sequence of events
happening:
1. close(w.done)
2. batch := <-w.qu
3. w.client.Close()
4. batch.Cancelled() // connection was already closed before
5. w.client.Connect() // reconnect -> fd leak
6. <- w.done
7. return
alternatively:
1. batch := <-w.qu
2. close(w.done)
3. w.client.Close()
4. w.client.Publish() -> error -> connection close
5. batch := <-w.qu
6. batch.Cancelled()
7. w.client.Connect() // fd leak
8. <-w.done
9. return
This is why we used to have the boolean, so to check if the output has
been closed or if we can reconnect. The client must have no chance to
reconnect after the output has been closed.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#10567 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAACgJ7yRTSJKRcQSKU_jArUli2ZhlXZks5vKbmOgaJpZM4ajQH9>
.
--
--
ph
Software Engineer
|
} | ||
|
||
err := w.client.Publish(batch) | ||
if err != nil { | ||
logp.Err("Failed to publish events: %v", err) | ||
// on error return to connect loop | ||
break | ||
connected = false | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above does the following:
- make the close sync with the
run()
method. - Closing the client will only happen when
run()
execution is completed. - We check for
<- w.done
in two select, we do this to prioritize shutdown vs reading from queue on next execution loop, we also select on both the queue and the done channels and wait, first read will win here.
I have extracted the config equal change in #10587 |
Leaks: - Fix a goroutine leak when the output was updated, the old goroutine was blocked on waiting for events. - Fix an transport goroutine leak waiting on Reader, because of keep alive. - Fix a FD leaks from the elasticsearch output, output were replaced but the transport were not GC correctly and was keeping an ever increasing number of fd open.
closing in favor of #10599 |
Leaks:
Fix a goroutine leak when the output was updated, the old goroutine was
blocked on waiting for events.
Fix an transport goroutine leak waiting on Reader, because of keep
alive.
Fix a FD leaks from the elasticsearch output, output were replaced but
the transport were not GC correctly and was keeping an ever increasing
number of fd open.
Fixes: #10491