Skip to content

Commit

Permalink
Merge pull request #15237 from scpmw/guarantee_progress_notify_order
Browse files Browse the repository at this point in the history
etcdserver: Send requested progress notifications through watchStream (fix #15220)
  • Loading branch information
serathius authored Apr 5, 2023
2 parents b504ac1 + 74feb22 commit f6bb874
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 30 deletions.
37 changes: 29 additions & 8 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ type serverWatchStream struct {
// records fragmented watch IDs
fragment map[mvcc.WatchID]bool

// indicates whether we have an outstanding global progress
// notification to send
deferredProgress bool

// closec indicates the stream is closed.
closec chan struct{}

Expand Down Expand Up @@ -174,6 +178,8 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),

deferredProgress: false,

closec: make(chan struct{}),
}

Expand Down Expand Up @@ -360,10 +366,16 @@ func (sws *serverWatchStream) recvLoop() error {
}
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels
sws.mu.Lock()
// Ignore if deferred progress notification is already in progress
if !sws.deferredProgress {
// Request progress for all watchers,
// force generation of a response
if !sws.watchStream.RequestProgressAll() {
sws.deferredProgress = true
}
}
sws.mu.Unlock()
}
default:
// we probably should not shutdown the entire stream when
Expand Down Expand Up @@ -432,11 +444,15 @@ func (sws *serverWatchStream) sendLoop() {
Canceled: canceled,
}

if _, okID := ids[wresp.WatchID]; !okID {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
// Progress notifications can have WatchID -1
// if they announce on behalf of multiple watchers
if wresp.WatchID != clientv3.InvalidWatchID {
if _, okID := ids[wresp.WatchID]; !okID {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
}
}

mvcc.ReportEventReceived(len(evs))
Expand Down Expand Up @@ -467,6 +483,11 @@ func (sws *serverWatchStream) sendLoop() {
// elide next progress update if sent a key update
sws.progress[wresp.WatchID] = false
}
if sws.deferredProgress {
if sws.watchStream.RequestProgressAll() {
sws.deferredProgress = false
}
}
sws.mu.Unlock()

case c, ok := <-sws.ctrlStream:
Expand Down
30 changes: 26 additions & 4 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
Expand All @@ -41,6 +42,7 @@ var (
type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
progressAll(watchers map[WatchID]*watcher) bool
rev() int64
}

Expand Down Expand Up @@ -475,14 +477,34 @@ func (s *watchableStore) addVictim(victim watcherBatch) {
func (s *watchableStore) rev() int64 { return s.store.Rev() }

func (s *watchableStore) progress(w *watcher) {
s.progressIfSync(map[WatchID]*watcher{w.id: w}, w.id)
}

func (s *watchableStore) progressAll(watchers map[WatchID]*watcher) bool {
return s.progressIfSync(watchers, clientv3.InvalidWatchID)
}

func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseWatchID WatchID) bool {
s.mu.RLock()
defer s.mu.RUnlock()

if _, ok := s.synced.watchers[w]; ok {
w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
// If the ch is full, this watcher is receiving events.
// We do not need to send progress at all.
// Any watcher unsynced?
for _, w := range watchers {
if _, ok := s.synced.watchers[w]; !ok {
return false
}
}

// If all watchers are synchronised, send out progress
// notification on first watcher. Note that all watchers
// should have the same underlying stream, and the progress
// notification will be broadcasted client-side if required
// (see dispatchEvent in client/v3/watch.go)
for _, w := range watchers {
w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()})
return true
}
return true
}

type watcher struct {
Expand Down
13 changes: 13 additions & 0 deletions server/storage/mvcc/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ type WatchStream interface {
// of the watchers since the watcher is currently synced.
RequestProgress(id WatchID)

// RequestProgressAll requests a progress notification for all
// watchers sharing the stream. If all watchers are synced, a
// progress notification with watch ID -1 will be sent to an
// arbitrary watcher of this stream, and the function returns
// true.
RequestProgressAll() bool

// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id WatchID) error
Expand Down Expand Up @@ -188,3 +195,9 @@ func (ws *watchStream) RequestProgress(id WatchID) {
}
ws.watchable.progress(w)
}

func (ws *watchStream) RequestProgressAll() bool {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.watchable.progressAll(ws.watchers)
}
50 changes: 50 additions & 0 deletions server/storage/mvcc/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/lease"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
)
Expand Down Expand Up @@ -342,6 +343,55 @@ func TestWatcherRequestProgress(t *testing.T) {
}
}

func TestWatcherRequestProgressAll(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)

// manually create watchableStore instead of newWatchableStore
// because newWatchableStore automatically calls syncWatchers
// method to sync watchers in unsynced map. We want to keep watchers
// in unsynced to test if syncWatchers works as expected.
s := &watchableStore{
store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}

defer cleanup(s, b)

testKey := []byte("foo")
notTestKey := []byte("bad")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)

// Create watch stream with watcher. We will not actually get
// any notifications on it specifically, but there needs to be
// at least one Watch for progress notifications to get
// generated.
w := s.NewWatchStream()
w.Watch(0, notTestKey, nil, 1)

w.RequestProgressAll()
select {
case resp := <-w.Chan():
t.Fatalf("unexpected %+v", resp)
default:
}

s.syncWatchers()

w.RequestProgressAll()
wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}
select {
case resp := <-w.Chan():
if !reflect.DeepEqual(resp, wrs) {
t.Fatalf("got %+v, expect %+v", resp, wrs)
}
case <-time.After(time.Second):
t.Fatal("failed to receive progress")
}
}

func TestWatcherWatchWithFilter(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
Expand Down
68 changes: 68 additions & 0 deletions tests/integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1397,3 +1397,71 @@ func TestV3WatchCloseCancelRace(t *testing.T) {
t.Fatalf("expected %s watch, got %s", expected, minWatches)
}
}

// TestV3WatchProgressWaitsForSync checks that progress notifications
// don't get sent until the watcher is synchronised
func TestV3WatchProgressWaitsForSync(t *testing.T) {

// Disable for gRPC proxy, as it does not support requesting
// progress notifications
if integration.ThroughProxy {
t.Skip("grpc proxy currently does not support requesting progress notifications")
}

integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

client := clus.RandClient()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Write a couple values into key to make sure there's a
// non-trivial amount of history.
count := 1001
t.Logf("Writing key 'foo' %d times", count)
for i := 0; i < count; i++ {
_, err := client.Put(ctx, "foo", fmt.Sprintf("bar%d", i))
require.NoError(t, err)
}

// Create watch channel starting at revision 1 (i.e. it starts
// unsynced because of the update above)
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))

// Immediately request a progress notification. As the client
// is unsynchronised, the server will have to defer the
// notification internally.
err := client.RequestProgress(ctx)
require.NoError(t, err)

// Verify that we get the watch responses first. Note that
// events might be spread across multiple packets.
var event_count = 0
for event_count < count {
wr := <-wch
if wr.Err() != nil {
t.Fatal(fmt.Errorf("watch error: %w", wr.Err()))
}
if wr.IsProgressNotify() {
t.Fatal("Progress notification from unsynced client!")
}
if wr.Header.Revision != int64(count+1) {
t.Fatal("Incomplete watch response!")
}
event_count += len(wr.Events)
}

// ... followed by the requested progress notification
wr2 := <-wch
if wr2.Err() != nil {
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
}
if !wr2.IsProgressNotify() {
t.Fatal("Did not receive progress notification!")
}
if wr2.Header.Revision != int64(count+1) {
t.Fatal("Wrong revision in progress notification!")
}
}
46 changes: 36 additions & 10 deletions tests/robustness/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ const (

var (
LowTraffic = trafficConfig{
name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
requestProgress: false,
traffic: traffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
Expand All @@ -56,10 +57,11 @@ var (
},
}
HighTraffic = trafficConfig{
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: false,
traffic: traffic{
keyCount: 10,
largePutSize: 32769,
Expand All @@ -71,6 +73,22 @@ var (
},
},
}
ReqProgTraffic = trafficConfig{
name: "RequestProgressTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: true,
traffic: traffic{
keyCount: 10,
largePutSize: 8196,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 95},
{operation: LargePut, chance: 5},
},
},
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{
LowTraffic, HighTraffic,
Expand Down Expand Up @@ -141,6 +159,14 @@ func TestRobustness(t *testing.T) {
e2e.WithSnapshotCount(100),
),
})
scenarios = append(scenarios, scenario{
name: "Issue15220",
failpoint: RandomOneNodeClusterFailpoint,
traffic: &ReqProgTraffic,
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
),
})
snapshotOptions := []e2e.EPClusterOption{
e2e.WithGoFailEnabled(true),
e2e.WithSnapshotCount(100),
Expand Down Expand Up @@ -191,7 +217,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
forcestopCluster(r.clus)

watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, r.responses, watchProgressNotifyEnabled)
validateWatchResponses(t, r.responses, traffic.requestProgress || watchProgressNotifyEnabled)

r.events = watchEvents(r.responses)
validateEventsMatch(t, r.events)
Expand All @@ -218,7 +244,7 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan)
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, traffic.requestProgress)
return nil
})
g.Wait()
Expand Down
11 changes: 6 additions & 5 deletions tests/robustness/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}

type trafficConfig struct {
name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
requestProgress bool // Request progress notifications while watching this traffic
}

type Traffic interface {
Expand Down
Loading

0 comments on commit f6bb874

Please sign in to comment.