Skip to content

Commit

Permalink
Merge pull request etcd-io#12271 from jingyih/add_watch_notify_interv…
Browse files Browse the repository at this point in the history
…al_flag_in_testing

integration: add WatchProgressNotifyInterval in integration test
  • Loading branch information
jpbetz authored Sep 9, 2020
2 parents c20cc05 + 73817b5 commit 10fa961
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 34 deletions.
24 changes: 24 additions & 0 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,30 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
}
}

func TestConfigurableWatchProgressNotifyInterval(t *testing.T) {
progressInterval := 200 * time.Millisecond
clus := integration.NewClusterV3(t,
&integration.ClusterConfig{
Size: 3,
WatchProgressNotifyInterval: progressInterval,
})
defer clus.Terminate(t)

opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
rch := clus.RandClient().Watch(context.Background(), "foo", opts...)

timeout := 1 * time.Second // we expect to receive watch progress notify in 2 * progressInterval,
// but for CPU-starved situation it may take longer. So we use 1 second here for timeout.
select {
case resp := <-rch: // waiting for a watch progress notify response
if !resp.IsProgressNotify() {
t.Fatalf("expected resp.IsProgressNotify() == true")
}
case <-time.After(timeout):
t.Fatalf("timed out waiting for watch progress notify response in %v", timeout)
}
}

func TestWatchRequestProgress(t *testing.T) {
testCases := []struct {
name string
Expand Down
74 changes: 40 additions & 34 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ type ClusterConfig struct {

EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration

WatchProgressNotifyInterval time.Duration
}

type cluster struct {
Expand Down Expand Up @@ -279,23 +281,24 @@ func (c *cluster) HTTPMembers() []client.Member {
func (c *cluster) mustNewMember(t testing.TB) *member {
m := mustNewMember(t,
memberConfig{
name: c.name(rand.Int()),
authToken: c.cfg.AuthToken,
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxTxnOps: c.cfg.MaxTxnOps,
maxRequestBytes: c.cfg.MaxRequestBytes,
snapshotCount: c.cfg.SnapshotCount,
snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
name: c.name(rand.Int()),
authToken: c.cfg.AuthToken,
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxTxnOps: c.cfg.MaxTxnOps,
maxRequestBytes: c.cfg.MaxRequestBytes,
snapshotCount: c.cfg.SnapshotCount,
snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries,
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
Expand Down Expand Up @@ -568,23 +571,24 @@ type member struct {
func (m *member) GRPCAddr() string { return m.grpcAddr }

type memberConfig struct {
name string
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
authToken string
quotaBackendBytes int64
maxTxnOps uint
maxRequestBytes uint
snapshotCount uint64
snapshotCatchUpEntries uint64
grpcKeepAliveMinTime time.Duration
grpcKeepAliveInterval time.Duration
grpcKeepAliveTimeout time.Duration
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
name string
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
authToken string
quotaBackendBytes int64
maxTxnOps uint
maxRequestBytes uint
snapshotCount uint64
snapshotCatchUpEntries uint64
grpcKeepAliveMinTime time.Duration
grpcKeepAliveInterval time.Duration
grpcKeepAliveTimeout time.Duration
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
WatchProgressNotifyInterval time.Duration
}

// mustNewMember return an inited member with the given name. If peerTLS is
Expand Down Expand Up @@ -678,6 +682,8 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member {
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval

m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval

m.InitialCorruptCheck = true

lcfg := logutil.DefaultZapLoggerConfig
Expand Down

0 comments on commit 10fa961

Please sign in to comment.