Skip to content

Commit

Permalink
*: Add progress notify request watch request
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbetz committed Jun 20, 2018
1 parent 55a05d9 commit 0f2d760
Show file tree
Hide file tree
Showing 11 changed files with 783 additions and 415 deletions.
11 changes: 10 additions & 1 deletion Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ Empty field.
| ----- | ----------- | ---- |
| cluster_id | cluster_id is the ID of the cluster which sent the response. | uint64 |
| member_id | member_id is the ID of the member which sent the response. | uint64 |
| revision | revision is the key-value store revision when the request was applied. | int64 |
| revision | revision is the key-value store revision when the request was applied. For watch progress responses, the header.revision indicates progress. All future events recieved in this stream are guaranteed to have a higher revision number than the header.revision number. | int64 |
| raft_term | raft_term is the raft term when the request was applied. | uint64 |


Expand Down Expand Up @@ -840,13 +840,22 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive



##### message `WatchProgressRequest` (etcdserver/etcdserverpb/rpc.proto)

Requests the a watch stream progress status be sent in the watch response stream as soon as possible.

Empty field.



##### message `WatchRequest` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
| ----- | ----------- | ---- |
| request_union | request_union is a request to either create a new watcher or cancel an existing watcher. | oneof |
| create_request | | WatchCreateRequest |
| cancel_request | | WatchCancelRequest |
| progress_request | | WatchProgressRequest |



Expand Down
9 changes: 8 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2195,7 +2195,7 @@
"format": "uint64"
},
"revision": {
"description": "revision is the key-value store revision when the request was applied.",
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number.",
"type": "string",
"format": "int64"
}
Expand Down Expand Up @@ -2396,6 +2396,10 @@
}
}
},
"etcdserverpbWatchProgressRequest": {
"description": "Requests the a watch stream progress status be sent in the watch response stream as soon as\npossible.",
"type": "object"
},
"etcdserverpbWatchRequest": {
"type": "object",
"properties": {
Expand All @@ -2404,6 +2408,9 @@
},
"create_request": {
"$ref": "#/definitions/etcdserverpbWatchCreateRequest"
},
"progress_request": {
"$ref": "#/definitions/etcdserverpbWatchProgressRequest"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied."
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied."
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
72 changes: 72 additions & 0 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,78 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
}
}

func TestWatchRequestProgress(t *testing.T) {
testCases := []struct {
name string
watchers []string
}{
{"0-watcher", []string{}},
{"1-watcher", []string{"/"}},
{"2-watcher", []string{"/", "/"}},
}

for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
defer testutil.AfterTest(t)

watchTimeout := 3 * time.Second

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

wc := clus.RandClient()

var watchChans []clientv3.WatchChan

for _, prefix := range c.watchers {
watchChans = append(watchChans, wc.Watch(context.Background(), prefix, clientv3.WithPrefix()))
}

_, err := wc.Put(context.Background(), "/a", "1")
if err != nil {
t.Fatal(err)
}

for _, rch := range watchChans {
select {
case resp := <-rch: // wait for notification
if len(resp.Events) != 1 {
t.Fatalf("resp.Events expected 1, got %d", len(resp.Events))
}
case <-time.After(watchTimeout):
t.Fatalf("watch response expected in %v, but timed out", watchTimeout)
}
}

// put a value not being watched to increment revision
_, err = wc.Put(context.Background(), "x", "1")
if err != nil {
t.Fatal(err)
}

err = wc.RequestProgress(context.Background())
if err != nil {
t.Fatal(err)
}

// verify all watch channels receive a progress notify
for _, rch := range watchChans {
select {
case resp := <-rch:
if !resp.IsProgressNotify() {
t.Fatalf("expected resp.IsProgressNotify() == true")
}
if resp.Header.Revision != 3 {
t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision)
}
case <-time.After(watchTimeout):
t.Fatalf("progress response expected in %v, but timed out", watchTimeout)
}
}
})
}
}

func TestWatchEventType(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
Expand Down
123 changes: 102 additions & 21 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type Watcher interface {

// Close closes the watcher and cancels all watch requests.
Close() error

// RequestProgress requests a progress notify response be sent in all WatchChans.
RequestProgress(ctx context.Context) error
}

type WatchResponse struct {
Expand Down Expand Up @@ -150,7 +153,7 @@ type watchGrpcStream struct {
resuming []*watcherStream

// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
reqc chan watchStreamRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
Expand All @@ -168,6 +171,11 @@ type watchGrpcStream struct {
closeErr error
}

// watchStreamRequest is a union of the supported watch request operation types
type watchStreamRequest interface {
toPB() *pb.WatchRequest
}

// watchRequest is issued by the subscriber to start a new watcher
type watchRequest struct {
ctx context.Context
Expand All @@ -192,6 +200,10 @@ type watchRequest struct {
retc chan chan WatchResponse
}

// progressRequest is issued by the subscriber to request watch progress
type progressRequest struct {
}

// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
Expand Down Expand Up @@ -249,7 +261,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
Expand Down Expand Up @@ -355,6 +367,42 @@ func (w *watcher) Close() (err error) {
return err
}

// RequestProgress requests a progress notify response be sent in all watch channels.
func (w *watcher) RequestProgress(ctx context.Context) (err error) {
ctxKey := streamKeyFromCtx(ctx)

w.mu.Lock()
if w.streams == nil {
return fmt.Errorf("no stream found for context")
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()

pr := &progressRequest{}

select {
case reqc <- pr:
return nil
case <-ctx.Done():
if err == nil {
return ctx.Err()
}
return err
case <-donec:
if wgs.closeErr != nil {
return wgs.closeErr
}
// retry; may have dropped stream from no ctxs
return w.RequestProgress(ctx)
}
}

func (w *watchGrpcStream) close() (err error) {
w.cancel()
<-w.donec
Expand Down Expand Up @@ -462,26 +510,31 @@ func (w *watchGrpcStream) run() {
for {
select {
// Watch() requested
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
case req := <-w.reqc:
switch wreq := req.(type) {
case *watchRequest:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}

ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)

// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
}
case *progressRequest:
wc.Send(wreq.toPB())
}

// new events from the watch client
Expand Down Expand Up @@ -608,7 +661,28 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
}
ws, ok := w.substreams[pbresp.WatchId]

if wr.IsProgressNotify() {
return w.broadcastResponse(wr)
}
return w.unicastResponse(wr, pbresp.WatchId)

}

// broadcastResponse send a watch response to all watch substreams.
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
for _, ws := range w.substreams {
select {
case ws.recvc <- wr:
case <-ws.donec:
}
}
return true
}

// unicastResponse sends a watch response to a specific watch substream.
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
ws, ok := w.substreams[watchId]
if !ok {
return false
}
Expand Down Expand Up @@ -882,6 +956,13 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
return &pb.WatchRequest{RequestUnion: cr}
}

// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
func (pr *progressRequest) toPB() *pb.WatchRequest {
req := &pb.WatchProgressRequest{}
cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}

func streamKeyFromCtx(ctx context.Context) string {
if md, ok := metadata.FromOutgoingContext(ctx); ok {
return fmt.Sprintf("%+v", md)
Expand Down
Loading

0 comments on commit 0f2d760

Please sign in to comment.