diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 007cbc38ec6..b4086b8d10e 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -582,6 +582,30 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) { } } +func TestConfigurableWatchProgressNotifyInterval(t *testing.T) { + progressInterval := 200 * time.Millisecond + clus := integration.NewClusterV3(t, + &integration.ClusterConfig{ + Size: 3, + WatchProgressNotifyInterval: progressInterval, + }) + defer clus.Terminate(t) + + opts := []clientv3.OpOption{clientv3.WithProgressNotify()} + rch := clus.RandClient().Watch(context.Background(), "foo", opts...) + + timeout := 1 * time.Second // we expect to receive watch progress notify in 2 * progressInterval, + // but for CPU-starved situation it may take longer. So we use 1 second here for timeout. + select { + case resp := <-rch: // waiting for a watch progress notify response + if !resp.IsProgressNotify() { + t.Fatalf("expected resp.IsProgressNotify() == true") + } + case <-time.After(timeout): + t.Fatalf("timed out waiting for watch progress notify response in %v", timeout) + } +} + func TestWatchRequestProgress(t *testing.T) { testCases := []struct { name string diff --git a/integration/cluster.go b/integration/cluster.go index 7c3e0701891..e685d6b59cd 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -152,6 +152,8 @@ type ClusterConfig struct { EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration + + WatchProgressNotifyInterval time.Duration } type cluster struct { @@ -279,23 +281,24 @@ func (c *cluster) HTTPMembers() []client.Member { func (c *cluster) mustNewMember(t testing.TB) *member { m := mustNewMember(t, memberConfig{ - name: c.name(rand.Int()), - authToken: c.cfg.AuthToken, - peerTLS: c.cfg.PeerTLS, - clientTLS: c.cfg.ClientTLS, - quotaBackendBytes: c.cfg.QuotaBackendBytes, - maxTxnOps: c.cfg.MaxTxnOps, - maxRequestBytes: c.cfg.MaxRequestBytes, - snapshotCount: c.cfg.SnapshotCount, - snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries, - grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, - grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, - grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, - clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, - clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, - useIP: c.cfg.UseIP, - enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, - leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, + name: c.name(rand.Int()), + authToken: c.cfg.AuthToken, + peerTLS: c.cfg.PeerTLS, + clientTLS: c.cfg.ClientTLS, + quotaBackendBytes: c.cfg.QuotaBackendBytes, + maxTxnOps: c.cfg.MaxTxnOps, + maxRequestBytes: c.cfg.MaxRequestBytes, + snapshotCount: c.cfg.SnapshotCount, + snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries, + grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, + grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, + grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, + clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, + clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, + useIP: c.cfg.UseIP, + enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, + leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, + WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, }) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { @@ -568,23 +571,24 @@ type member struct { func (m *member) GRPCAddr() string { return m.grpcAddr } type memberConfig struct { - name string - peerTLS *transport.TLSInfo - clientTLS *transport.TLSInfo - authToken string - quotaBackendBytes int64 - maxTxnOps uint - maxRequestBytes uint - snapshotCount uint64 - snapshotCatchUpEntries uint64 - grpcKeepAliveMinTime time.Duration - grpcKeepAliveInterval time.Duration - grpcKeepAliveTimeout time.Duration - clientMaxCallSendMsgSize int - clientMaxCallRecvMsgSize int - useIP bool - enableLeaseCheckpoint bool - leaseCheckpointInterval time.Duration + name string + peerTLS *transport.TLSInfo + clientTLS *transport.TLSInfo + authToken string + quotaBackendBytes int64 + maxTxnOps uint + maxRequestBytes uint + snapshotCount uint64 + snapshotCatchUpEntries uint64 + grpcKeepAliveMinTime time.Duration + grpcKeepAliveInterval time.Duration + grpcKeepAliveTimeout time.Duration + clientMaxCallSendMsgSize int + clientMaxCallRecvMsgSize int + useIP bool + enableLeaseCheckpoint bool + leaseCheckpointInterval time.Duration + WatchProgressNotifyInterval time.Duration } // mustNewMember return an inited member with the given name. If peerTLS is @@ -678,6 +682,8 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member { m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval + m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval + m.InitialCorruptCheck = true lcfg := logutil.DefaultZapLoggerConfig