Skip to content

Commit

Permalink
clientv3: support "watch fragment"
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Feb 7, 2018
1 parent b581eda commit ec9ff02
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 4 deletions.
80 changes: 80 additions & 0 deletions clientv3/integration/watch_fragment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

func TestWatchFragmentEnable(t *testing.T) { testWatchFragment(t, true) }
func TestWatchFragmentDisable(t *testing.T) { testWatchFragment(t, false) }

func testWatchFragment(t *testing.T, fragment bool) {
// when two events are combined, response exceeds limit
clus := integration.NewClusterV3(t,
&integration.ClusterConfig{
Size: 1,
MaxRequestBytes: 1.5 * 1024 * 1024,
},
)
defer clus.Terminate(t)

cli := clus.Client(0)
errc := make(chan error)
for i := 0; i < 100; i++ {
go func(i int) {
_, err := cli.Put(context.TODO(),
fmt.Sprint("foo", i),
strings.Repeat("a", 1024*1024),
)
errc <- err
}(i)
}
for i := 0; i < 100; i++ {
if err := <-errc; err != nil {
t.Errorf("failed to put: %v", err)
}
}

opts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithRev(1),
}
if !fragment {
opts = append(opts, clientv3.WithNoFragment())
}
wch := cli.Watch(context.TODO(), "foo", opts...)

// expect 100 MiB watch response
select {
case ws := <-wch:
if len(ws.Events) != 100 {
t.Errorf("expected 100 events with watch fragmentation, got %d", len(ws.Events))
}
if ws.Err() != nil {
t.Errorf("unexpected error %v", ws.Err())
}
case <-time.After(testutil.RequestTimeout):
t.Fatalf("took too long to receive events")
}
}
9 changes: 9 additions & 0 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Op struct {
// for watch, put, delete
prevKV bool

// for watch
noFragment bool

// for put
ignoreValue bool
ignoreLease bool
Expand Down Expand Up @@ -466,6 +469,12 @@ func WithPrevKV() OpOption {
}
}

// WithNoFragment to receive raw watch response
// without fragmentation.
func WithNoFragment() OpOption {
return func(op *Op) { op.noFragment = true }
}

// WithIgnoreValue updates the key using its current value.
// This option can not be combined with non-empty values.
// Returns an error if the key does not exist.
Expand Down
32 changes: 28 additions & 4 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ type watchRequest struct {
createdNotify bool
// progressNotify is for progress updates
progressNotify bool
// noFragment to receive raw watch response without fragment
// watch fragment "true" by default in case watch response exceeds
// server request limit, which is default 1.5 MiB
noFragment bool
// filters is the list of events to filter out
filters []pb.WatchCreateRequest_FilterType
// get the previous key-value pair before the event happens
Expand Down Expand Up @@ -232,7 +236,7 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
func (vc *valCtx) Err() error { return nil }

func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
func (w *watcher) newWatcherGrpcStream(inctx context.Context, op Op) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
Expand Down Expand Up @@ -274,6 +278,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
progressNotify: ow.progressNotify,
filters: filters,
prevKV: ow.prevKV,
noFragment: ow.noFragment,
retc: make(chan chan WatchResponse, 1),
}

Expand All @@ -291,7 +296,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
wgs = w.newWatcherGrpcStream(ctx, ow)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
Expand Down Expand Up @@ -450,7 +455,7 @@ func (w *watchGrpcStream) run() {
}

cancelSet := make(map[int64]struct{})

var cur *pb.WatchResponse
for {
select {
// Watch() requested
Expand All @@ -477,8 +482,18 @@ func (w *watchGrpcStream) run() {
}
// New events from the watch client
case pbresp := <-w.respc:
if cur == nil || pbresp.Created || pbresp.Canceled {
cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId {
// combine with previous events
cur.Events = append(cur.Events, pbresp.Events...)
cur.Fragment = pbresp.Fragment
}

switch {
case pbresp.Created:
// reset
cur = nil
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
Expand All @@ -489,15 +504,22 @@ func (w *watchGrpcStream) run() {
wc.Send(ws.initReq.toPB())
}
case pbresp.Canceled && pbresp.CompactRevision == 0:
// reset
cur = nil
delete(cancelSet, pbresp.WatchId)
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
close(ws.recvc)
closing[ws] = struct{}{}
}
case cur.Fragment:
// continue until all fragmented responses arrive
continue
default:
// dispatch to appropriate watch stream
if ok := w.dispatchEvent(pbresp); ok {
ok := w.dispatchEvent(cur)
cur = nil
if ok {
break
}
// watch response on unexpected watch id; cancel id
Expand Down Expand Up @@ -820,7 +842,9 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
ProgressNotify: wr.progressNotify,
Filters: wr.filters,
PrevKv: wr.prevKV,
Fragment: !wr.noFragment,
}

cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}
Expand Down

0 comments on commit ec9ff02

Please sign in to comment.