Skip to content

Commit

Permalink
Merge pull request #6789 from heyitsanthony/grpcproxy-close-send
Browse files Browse the repository at this point in the history
grpcproxy: reliably track rid in watchergroups
  • Loading branch information
Anthony Romano authored Nov 2, 2016
2 parents fe755b6 + 8ec4215 commit 3782571
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 36 deletions.
26 changes: 13 additions & 13 deletions proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
sid := wp.nextStreamID
wp.mu.Unlock()

ctx, cancel := context.WithCancel(wp.ctx)
sws := serverWatchStream{
cw: wp.cw,
groups: &wp.wgs,
Expand All @@ -70,7 +71,8 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {

watchCh: make(chan *pb.WatchResponse, 1024),

proxyCtx: wp.ctx,
ctx: ctx,
cancel: cancel,
}

go sws.recvLoop()
Expand All @@ -93,11 +95,13 @@ type serverWatchStream struct {

nextWatcherID int64

proxyCtx context.Context
ctx context.Context
cancel context.CancelFunc
}

func (sws *serverWatchStream) close() {
var wg sync.WaitGroup
sws.cancel()
sws.mu.Lock()
wg.Add(len(sws.singles) + len(sws.inGroups))
for _, ws := range sws.singles {
Expand Down Expand Up @@ -145,8 +149,8 @@ func (sws *serverWatchStream) recvLoop() error {
key: string(cr.Key),
end: string(cr.RangeEnd),
},
id: sws.nextWatcherID,
ch: sws.watchCh,
id: sws.nextWatcherID,
sws: sws,

progress: cr.ProgressNotify,
filters: v3rpc.FiltersFromRequest(cr),
Expand Down Expand Up @@ -176,7 +180,7 @@ func (sws *serverWatchStream) sendLoop() {
if err := sws.gRPCStream.Send(wresp); err != nil {
return
}
case <-sws.proxyCtx.Done():
case <-sws.ctx.Done():
return
}
}
Expand All @@ -192,18 +196,15 @@ func (sws *serverWatchStream) addCoalescedWatcher(w watcher) {
}

func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
sws.mu.Lock()
defer sws.mu.Unlock()

ctx, cancel := context.WithCancel(sws.proxyCtx)

ctx, cancel := context.WithCancel(sws.ctx)
wch := sws.cw.Watch(ctx,
w.wr.key, clientv3.WithRange(w.wr.end),
clientv3.WithRev(rev),
clientv3.WithProgressNotify(),
clientv3.WithCreatedNotify(),
)

sws.mu.Lock()
defer sws.mu.Unlock()
ws := newWatcherSingle(wch, cancel, w, sws)
sws.singles[w.id] = ws
go ws.run()
Expand All @@ -213,12 +214,11 @@ func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool {
sws.mu.Lock()
defer sws.mu.Unlock()

rid := receiverID{streamID: sws.id, watcherID: ws.w.id}
// do not add new watchers when stream is closing
if sws.inGroups == nil {
return false
}
if sws.groups.maybeJoinWatcherSingle(rid, ws) {
if sws.groups.maybeJoinWatcherSingle(ws) {
delete(sws.singles, ws.w.id)
sws.inGroups[ws.w.id] = struct{}{}
return true
Expand Down
12 changes: 5 additions & 7 deletions proxy/grpcproxy/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ type watchRange struct {
}

type watcher struct {
id int64
wr watchRange
id int64
wr watchRange
sws *serverWatchStream

rev int64
filters []mvcc.FilterFunc
progress bool
ch chan<- *pb.WatchResponse
}

func (w *watcher) send(wr clientv3.WatchResponse) {
Expand Down Expand Up @@ -87,10 +87,8 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
Events: events,
}
select {
case w.ch <- pbwr:
case w.sws.watchCh <- pbwr:
case <-time.After(50 * time.Millisecond):
// close the watch chan will notify the stream sender.
// the stream will gc all its watchers.
close(w.ch)
w.sws.cancel()
}
}
6 changes: 4 additions & 2 deletions proxy/grpcproxy/watcher_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package grpcproxy
import (
"testing"

"golang.org/x/net/context"

"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
Expand All @@ -30,8 +32,8 @@ func TestWatchgroupBroadcast(t *testing.T) {
for i := range chs {
chs[i] = make(chan *pb.WatchResponse, 1)
w := watcher{
id: int64(i),
ch: chs[i],
id: int64(i),
sws: &serverWatchStream{watchCh: chs[i], ctx: context.TODO()},

progress: true,
}
Expand Down
28 changes: 15 additions & 13 deletions proxy/grpcproxy/watcher_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) {
WatchId: rid.watcherID,
Created: true,
}
w.ch <- resp

select {
case w.sws.watchCh <- resp:
case <-w.sws.ctx.Done():
}
return
}

Expand Down Expand Up @@ -96,24 +98,24 @@ func (wgs *watchergroups) removeWatcher(rid receiverID) (int64, bool) {
return -1, false
}

func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingle) bool {
func (wgs *watchergroups) maybeJoinWatcherSingle(ws watcherSingle) bool {
wgs.mu.Lock()
defer wgs.mu.Unlock()

rid := receiverID{streamID: ws.sws.id, watcherID: ws.w.id}
group, ok := wgs.groups[ws.w.wr]
if ok {
return group.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w) != -1
return group.add(rid, ws.w) != -1
}

if ws.canPromote() {
wg := newWatchergroup(ws.ch, ws.cancel)
wgs.groups[ws.w.wr] = wg
wg.add(receiverID{streamID: ws.sws.id, watcherID: ws.w.id}, ws.w)
go wg.run()
return true
if !ws.canPromote() {
return false
}

return false
wg := newWatchergroup(ws.ch, ws.cancel)
wgs.groups[ws.w.wr] = wg
wgs.idToGroup[rid] = wg
wg.add(rid, ws.w)
go wg.run()
return true
}

func (wgs *watchergroups) stop() {
Expand Down
1 change: 0 additions & 1 deletion proxy/grpcproxy/watcher_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (ws watcherSingle) run() {
for wr := range ws.ch {
ws.lastStoreRev = wr.Header.Revision
ws.w.send(wr)

if ws.sws.maybeCoalesceWatcher(ws) {
return
}
Expand Down

0 comments on commit 3782571

Please sign in to comment.