Skip to content

Commit

Permalink
Fix progress notification for watch that doesn't get any events
Browse files Browse the repository at this point in the history
When implementing the fix for progress notifications
(#15237) we made a incorrect
assumption that that unsynched watches will always get at least one event.

Unsynched watches include not only slow watchers, but also newly created
watches that requested current or older revision. In case that non of the events
match watch filter, those newly created watches might become synched
without any event going through.

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Mar 11, 2024
1 parent 358e3ba commit c85370d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 23 deletions.
20 changes: 1 addition & 19 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ type serverWatchStream struct {
// records fragmented watch IDs
fragment map[mvcc.WatchID]bool

// indicates whether we have an outstanding global progress
// notification to send
deferredProgress bool

// closec indicates the stream is closed.
closec chan struct{}

Expand Down Expand Up @@ -178,8 +174,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),

deferredProgress: false,

closec: make(chan struct{}),
}

Expand Down Expand Up @@ -375,14 +369,7 @@ func (sws *serverWatchStream) recvLoop() error {
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.mu.Lock()
// Ignore if deferred progress notification is already in progress
if !sws.deferredProgress {
// Request progress for all watchers,
// force generation of a response
if !sws.watchStream.RequestProgressAll() {
sws.deferredProgress = true
}
}
sws.watchStream.RequestProgressAll()
sws.mu.Unlock()
}
default:
Expand Down Expand Up @@ -498,11 +485,6 @@ func (sws *serverWatchStream) sendLoop() {
// elide next progress update if sent a key update
sws.progress[wresp.WatchID] = false
}
if sws.deferredProgress {
if sws.watchStream.RequestProgressAll() {
sws.deferredProgress = false
}
}
sws.mu.Unlock()

case c, ok := <-sws.ctrlStream:
Expand Down
53 changes: 49 additions & 4 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,8 +1391,8 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))

// Immediately request a progress notification. As the client
// is unsynchronised, the server will have to defer the
// notification internally.
// is unsynchronised, the server will not sent any notification,
//as client can infer progress from events.
err := client.RequestProgress(ctx)
require.NoError(t, err)

Expand All @@ -1412,8 +1412,9 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
}
event_count += len(wr.Events)
}

// ... followed by the requested progress notification
// client needs to request progress notification again
err = client.RequestProgress(ctx)
require.NoError(t, err)
wr2 := <-wch
if wr2.Err() != nil {
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
Expand All @@ -1425,3 +1426,47 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
t.Fatal("Wrong revision in progress notification!")
}
}

func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
if integration.ThroughProxy {

Check failure on line 1431 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-1-cpu)

undefined: integration

Check failure on line 1431 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-2-cpu)

undefined: integration

Check failure on line 1431 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-4-cpu)

undefined: integration

Check failure on line 1431 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-grpcproxy)

undefined: integration
t.Skip("grpc proxy currently does not support requesting progress notifications")
}
integration.BeforeTest(t)

Check failure on line 1434 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-1-cpu)

undefined: integration

Check failure on line 1434 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-2-cpu)

undefined: integration

Check failure on line 1434 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-4-cpu)

undefined: integration

Check failure on line 1434 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-grpcproxy)

undefined: integration

clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})

Check failure on line 1436 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-1-cpu)

undefined: integration

Check failure on line 1436 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-2-cpu)

undefined: integration

Check failure on line 1436 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-integration-4-cpu)

undefined: integration

Check failure on line 1436 in integration/v3_watch_test.go

View workflow job for this annotation

GitHub Actions / test (linux-amd64-grpcproxy)

undefined: integration
defer clus.Terminate(t)

client := clus.RandClient()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.Put(ctx, "bar", "1")
require.NoError(t, err)

wch := client.Watch(ctx, "foo", clientv3.WithRev(resp.Header.Revision))
// Request the progress notification on newly created watch that was not yet synced.
err = client.RequestProgress(ctx)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

require.NoError(t, err)
gotProgressNotification := false
for {
select {
case <-ticker.C:
err := client.RequestProgress(ctx)
require.NoError(t, err)
case resp := <-wch:
if resp.Err() != nil {
t.Fatal(fmt.Errorf("watch error: %w", resp.Err()))
}
if resp.IsProgressNotify() {
gotProgressNotification = true
}
}
if gotProgressNotification {
break
}
}
require.True(t, gotProgressNotification, "Expected to get progress notification")
}

0 comments on commit c85370d

Please sign in to comment.