From b56ee178d5f3bcca4acab68d296e4fa07d9197db Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 3 Sep 2016 07:48:59 -0700 Subject: [PATCH 1/2] grpcproxy: handle overloaded stream --- proxy/grpcproxy/watch.go | 2 +- proxy/grpcproxy/watcher.go | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 980edea166c..30e65b5a052 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -67,7 +67,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { id: wp.nextStreamID, gRPCStream: stream, - watchCh: make(chan *pb.WatchResponse, 10), + watchCh: make(chan *pb.WatchResponse, 1024), proxyCtx: wp.ctx, } diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index 9e332725882..88c6303af9e 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -15,6 +15,8 @@ package grpcproxy import ( + "time" + "github.com/coreos/etcd/clientv3" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc" @@ -86,7 +88,9 @@ func (w *watcher) send(wr clientv3.WatchResponse) { } select { case w.ch <- pbwr: - default: - panic("handle this") + case <-time.After(50 * time.Millisecond): + // close the watch chan will notify the stream sender. + // the stream will gc all its watchers. + close(w.ch) } } From 56cfe40184f89443bf17f79ed66046eb294b123c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 3 Sep 2016 07:53:18 -0700 Subject: [PATCH 2/2] grpcproxy: fix a data race --- proxy/grpcproxy/watch.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 30e65b5a052..9cfe6c72470 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -56,6 +56,7 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer { func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { wp.mu.Lock() wp.nextStreamID++ + sid := wp.nextStreamID wp.mu.Unlock() sws := serverWatchStream{ @@ -64,7 +65,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { singles: make(map[int64]*watcherSingle), inGroups: make(map[int64]struct{}), - id: wp.nextStreamID, + id: sid, gRPCStream: stream, watchCh: make(chan *pb.WatchResponse, 1024),