-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: Allow watcher to close channels while waiting for new client #6651
Conversation
8f27c23
to
70bcd8b
Compare
Can we add a test to illustrate the problem? |
70bcd8b
to
1e575fb
Compare
Added a test to reproduce the problem. With the current code, the test times out because the |
c7d9ecb
to
eb63d56
Compare
Finally fixed all issues...! |
@yudai Can we change func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) to func (w *watchGrpcStream) openWatchClient(ctx context.Context) (ws pb.Watch_WatchClient, err error) And make openWatchClient respect the passed in context? |
@xiang90 Got it. Is that ok to pass |
eb63d56
to
9a9894d
Compare
After read the code again, I think there is a more serious issue with how stream handles context in general. First each stream can have multiple substreams created for each individual watchers. So cancelling one watcher should not cancel the entire stream, or it will affect other watchers on the same stream. We probably did wrong on this even without your code change. Your change makes it worse to cancel the ctx on the main stream when a watcher cancellation happens. |
@yudai Ignore my previous reply. I was wrong. We actually mask the ctx cancellation with https://github.com/coreos/etcd/blob/master/clientv3/watch.go#L193-L202. So yea, you cannot use ctx on w. it wont work. we need to respect the ctx on each wr: |
Let me double check. Each I think, when we want to cancel
I'm afraid that respecting |
9a9894d
to
c2cad27
Compare
We do not want to cancel that. For each stream, we can have multiple watchers each created by separate w.Watch() call. Cancelling that will affect all watchers on this stream. We only want to cancel the substream we created for the watcher with the ctx. |
The main stream routine catches all substream closing here: https://github.com/coreos/etcd/blob/master/clientv3/watch.go#L496-L498 During its initialization, it wont catch any closing event. So the cancellation wont work. I think this is the root problem that we need to solve. /cc @heyitsanthony |
|
||
for { | ||
select { | ||
case opc := <-clientOpenC: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yudai OK. Now I understand this better. We move part of the logic into serveWatchClient to solve race conditions?
wc = opc.wc | ||
closeErr = opc.err | ||
case ws := <-w.closingc: | ||
w.closeSubstream(ws) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this a func? this is the same as line 529-534.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could make the code a bit complicated, because closing
is a local variable tied to this loop and in the the if
block, the is a return
to break the loop.
If moving the closing
variable to the struct makes sense, I'll do that.
@@ -671,7 +694,7 @@ func (w *watchGrpcStream) joinSubstreams() { | |||
} | |||
|
|||
// openWatchClient retries opening a watchclient until retryConnection fails | |||
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { | |||
func (w *watchGrpcStream) openWatchClient(ctx context.Context) (ws pb.Watch_WatchClient, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was wrong. this change is actually not necessary.
@@ -543,21 +575,41 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { | |||
|
|||
// serveWatchClient forwards messages from the grpc stream to run() | |||
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not modify serveWatchClient? add a new func for line 578->594?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also on the initialization case, we do not even need line 578-594.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This patch is really complicated for what it does.
time.Sleep(time.Second * 3) | ||
clus.Members[0].Terminate(t) | ||
time.Sleep(time.Second * 3) | ||
donec := make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the donec is unneeded synchronization,
go cancel()
select {
...
}
if err := cli.Close(); err != nil {
...
The way to go is probably to rearrange the newWatchClient code to accept cancels while the watch goroutines are down: func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
// mark all substreams as resuming
if len(w.substreams)+len(w.resuming) > 0 {
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
}
w.substreams = make(map[int64]*watcherStream)
// accept cancels while reconnecting
donec := make(chan struct{})
var wg sync.WaitGroup
wg.Add(len(w.resuming))
for i := range w.resuming {
ws := w.resuming[i]
go func() {
defer wg.Done()
select {
case <-ws.initReq.ctx.Done():
ws.closing = true
close(ws.outc)
case <-donec:
}
}()
}
// connect to grpc stream
wc, err := w.openWatchClient()
// clean up cancel waiters
close(donec)
wg.Wait()
for _, ws := range w.resuming {
if ws == nil || ws.closing {
continue
}
ws.donec = make(chan struct{})
go w.serveSubstream(ws, w.resumec)
}
if err != nil {
return nil, v3rpc.Error(err)
}
// receive data from new grpc stream
go w.serveWatchClient(wc)
return wc, nil
} (it still fails the test but with a goroutine leak because of an unrelated issue) |
@heyitsanthony I tested your code and found a goroutine leak, probably the same one you saw, happens at select {
case <-ws.initReq.ctx.Done():
ws.closing = true
close(ws.outc)
case <-donec:
} I'm guessing this part needs I came up with an idea which replaces |
c2cad27
to
00db101
Compare
@yudai The |
00db101
to
0261d3c
Compare
I updated the code to restore |
@heyitsanthony Currently I'm getting
|
@heyitsanthony thanks. I wait for the PR to be merged. |
@xiang90 Thanks. I rebased the branch to the master and am getting a hangup. It seems when you cancel a substream while the watcher has no connection, |
0261d3c
to
9e1f951
Compare
@yudai Any progress on this? Or anything we can help with to move this forward? |
9e1f951
to
0e61165
Compare
@xiang90 I spent some time to try to fix issues I found in the branch: However, I still have not found a good way to fix the issues with keeping the function clean. I personally think it would be better to manage events/concurrency in the single place in |
@yudai can you please update this PR with the |
1435d36
to
338405d
Compare
@heyitsanthony I updated the PR 👍 I added a bit additional code to break the loop from The patch still lacks some care to pass |
Once watchGrpcStream.run() gets a connection error, it does not anything until it gets a new watch client, because newWatchClient() called in the main loop blocks. In this state, watch response channels returned by the Watcher won't be closed even if the cancel function of the context given to the Watcher has been called. The cancel request will be executed after a connection to the server has been re-established. This commit allow watcher to run cancel tasks while waiting for a new client so that users can cancel watch tasks anytime.
338405d
to
1adb7e1
Compare
if err != nil { | ||
return nil, v3rpc.Error(err) | ||
} | ||
func (w *watchGrpcStream) newWatchClient() (ws pb.Watch_WatchClient, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heyitsanthony This functions is pretty fat right now. I feel we should rename this to newOrResumeWatchClient. Or we can separate the new, resume case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the new/resume cases should be identical; I think the way to trim it is to have a new function like
func (wgs *watchGrpcStream) waitOnResumeCancels (chan struct{}, *sync.WaitGroup)
that returns donec
and the waitgroup for all the resume on cancellation goroutine stuff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heyitsanthony Does the new case need to go through the resume thing? It can know that there is no pending (pending + resuming == 0) thing, and can skip all resume logic. I think the reasoning behind is that we just do not have to care about it since new and resume can be identical anyway.
My worry is that this func becomes fat and affect readability. And yes, I hope we could group the resuming logic in the func in a better way. Probably move them to a new function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a new stream should go through the same path as a resume; there's no significant difference between creating a new stream and resuming a stream (aside from the resume having its initial revision updated). We used to have that distinction but it made reasoning about the resume path complicated and error-prone.
The function can be trimmed down by splitting out the goroutine phase in newWatchClient
that wait for the cancels into a separate function (e.g., waitOnResumeCancels
) and have newWatchClient
call that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. fine with me. /cc @yudai
@heyitsanthony @yudai Can we move this forward? |
Sorry for the late reply. I was out for a trip. I'm totally fine to leave this issue to @heyitsanthony. The ideal fix would require some larger changes, so I think it would be great if a maintainer can handle this issue 👍 |
Fixed by #6816 |
@heyitsanthony @xiang90 Thank you very much for the fix! |
Once watchGrpcStream.run() gets a connection error, it does not anything
until it gets a new watch client, because newWatchClient() called in the main
loop blocks.
In this state, watch response channels returned by the Watcher won't be
closed even if the cancel function of the context given to the Watcher
has been called.
The cancel request will be executed after a connection to the server has
been re-established.
This commit allow watcher to run cancel tasks while waiting for a new
client so that users can cancel watch tasks anytime.
Hi,
I think that the current behavior makes sense when you can assume that the connection to the etcd cluster is stable enough and recovers quickly after troubles. However, in a situation that a connection outage could be longer, the current behavior could lead to some resource leak or unexpected block of goroutines.
Here's some code to reproduce the situation: