Skip to content

Commit

Permalink
Merge pull request #6341 from xiang90/handle_overload
Browse files Browse the repository at this point in the history
grpcproxy: handle overloaded stream
  • Loading branch information
xiang90 authored Sep 7, 2016
2 parents 656167d + 56cfe40 commit 0b63502
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
5 changes: 3 additions & 2 deletions proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
wp.mu.Lock()
wp.nextStreamID++
sid := wp.nextStreamID
wp.mu.Unlock()

sws := serverWatchStream{
Expand All @@ -64,10 +65,10 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
singles: make(map[int64]*watcherSingle),
inGroups: make(map[int64]struct{}),

id: wp.nextStreamID,
id: sid,
gRPCStream: stream,

watchCh: make(chan *pb.WatchResponse, 10),
watchCh: make(chan *pb.WatchResponse, 1024),

proxyCtx: wp.ctx,
}
Expand Down
8 changes: 6 additions & 2 deletions proxy/grpcproxy/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package grpcproxy

import (
"time"

"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
Expand Down Expand Up @@ -86,7 +88,9 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
}
select {
case w.ch <- pbwr:
default:
panic("handle this")
case <-time.After(50 * time.Millisecond):
// close the watch chan will notify the stream sender.
// the stream will gc all its watchers.
close(w.ch)
}
}

0 comments on commit 0b63502

Please sign in to comment.