From 5f4496b8996347b78f336c0a156fca83c33c85b5 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 13:50:10 +0200 Subject: [PATCH 01/15] testutils: move ListenerRegistry to leaf package We'll want to access it from `base`, which can't import `testutils` due to dependency cycles. --- pkg/BUILD.bazel | 1 + pkg/cli/BUILD.bazel | 1 + pkg/cli/debug_recover_loss_of_quorum_test.go | 3 ++- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/client_store_test.go | 3 ++- pkg/kv/kvserver/loqrecovery/BUILD.bazel | 1 + .../kvserver/loqrecovery/server_integration_test.go | 5 +++-- pkg/testutils/BUILD.bazel | 2 -- pkg/testutils/listenerutil/BUILD.bazel | 13 +++++++++++++ pkg/testutils/{ => listenerutil}/listener.go | 2 +- pkg/util/startup/BUILD.bazel | 2 +- pkg/util/startup/startup_test.go | 4 ++-- 12 files changed, 28 insertions(+), 10 deletions(-) create mode 100644 pkg/testutils/listenerutil/BUILD.bazel rename pkg/testutils/{ => listenerutil}/listener.go (99%) diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index c0f660df23f7..e1276d1943b4 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -2149,6 +2149,7 @@ GO_TARGETS = [ "//pkg/testutils/lint/passes/unconvert:unconvert_test", "//pkg/testutils/lint:lint", "//pkg/testutils/lint:lint_test", + "//pkg/testutils/listenerutil:listenerutil", "//pkg/testutils/localtestcluster:localtestcluster", "//pkg/testutils/metrictestutils:metrictestutils", "//pkg/testutils/pgtest:pgtest", diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 691137e7a480..7e14780c180f 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -394,6 +394,7 @@ go_test( "//pkg/storage", "//pkg/testutils", "//pkg/testutils/datapathutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index 812c8954467d..788d24bd248c 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -437,7 +438,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { }) defer c.Cleanup() - listenerReg := testutils.NewListenerRegistry() + listenerReg := listenerutil.NewListenerRegistry() defer listenerReg.Close() storeReg := server.NewStickyInMemEnginesRegistry() diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 44066d570f75..b2a86281bf7f 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -457,6 +457,7 @@ go_test( "//pkg/testutils/echotest", "//pkg/testutils/gossiputil", "//pkg/testutils/kvclientutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/kv/kvserver/client_store_test.go b/pkg/kv/kvserver/client_store_test.go index 738d59ab71ef..33b749eb2413 100644 --- a/pkg/kv/kvserver/client_store_test.go +++ b/pkg/kv/kvserver/client_store_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -120,7 +121,7 @@ func TestStoreLoadReplicaQuiescent(t *testing.T) { testutils.RunTrueAndFalse(t, "kv.expiration_leases_only.enabled", func(t *testing.T, expOnly bool) { storeReg := server.NewStickyInMemEnginesRegistry() defer storeReg.CloseAllStickyInMemEngines() - listenerReg := testutils.NewListenerRegistry() + listenerReg := listenerutil.NewListenerRegistry() defer listenerReg.Close() ctx := context.Background() diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index b38a9ef4c9eb..451816b59b20 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -96,6 +96,7 @@ go_test( "//pkg/storage/enginepb", "//pkg/testutils", "//pkg/testutils/datapathutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/testcluster", diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index c050a5e0453f..1cf507be92b0 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -704,13 +705,13 @@ func prepTestCluster( *testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore, - testutils.ListenerRegistry, + listenerutil.ListenerRegistry, ) { skip.UnderStressRace(t, "cluster frequently fails to start under stress race") reg := server.NewStickyInMemEnginesRegistry() - lReg := testutils.NewListenerRegistry() + lReg := listenerutil.NewListenerRegistry() args := base.TestClusterArgs{ ServerArgsPerNode: make(map[int]base.TestServerArgs), diff --git a/pkg/testutils/BUILD.bazel b/pkg/testutils/BUILD.bazel index 0d561e957c78..9e3caaff30af 100644 --- a/pkg/testutils/BUILD.bazel +++ b/pkg/testutils/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "files.go", "hook.go", "keys.go", - "listener.go", "net.go", "pprof.go", "soon.go", @@ -34,7 +33,6 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", - "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/testutils/listenerutil/BUILD.bazel b/pkg/testutils/listenerutil/BUILD.bazel new file mode 100644 index 000000000000..6931a4ee9757 --- /dev/null +++ b/pkg/testutils/listenerutil/BUILD.bazel @@ -0,0 +1,13 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "listenerutil", + srcs = ["listener.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/testutils/listener.go b/pkg/testutils/listenerutil/listener.go similarity index 99% rename from pkg/testutils/listener.go rename to pkg/testutils/listenerutil/listener.go index 65631427954b..d88fe92357b5 100644 --- a/pkg/testutils/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package testutils +package listenerutil import ( "net" diff --git a/pkg/util/startup/BUILD.bazel b/pkg/util/startup/BUILD.bazel index a95d3018645f..58ad9a5e7407 100644 --- a/pkg/util/startup/BUILD.bazel +++ b/pkg/util/startup/BUILD.bazel @@ -37,7 +37,7 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/spanconfig", - "//pkg/testutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/testcluster", diff --git a/pkg/util/startup/startup_test.go b/pkg/util/startup/startup_test.go index 9e4ea02c5980..6a9ce100d340 100644 --- a/pkg/util/startup/startup_test.go +++ b/pkg/util/startup/startup_test.go @@ -29,7 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" - "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -137,7 +137,7 @@ func runCircuitBreakerTestForKey( ) ctx := context.Background() - lReg := testutils.NewListenerRegistry() + lReg := listenerutil.NewListenerRegistry() defer lReg.Close() reg := server.NewStickyInMemEnginesRegistry() defer reg.CloseAllStickyInMemEngines() From 70dce126e915a4653a13bf928743df2c82ed61b6 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 5 Jul 2023 15:03:44 +0200 Subject: [PATCH 02/15] listenerutil: export ReusableListener We'll type assert on it in future commits. --- pkg/testutils/listenerutil/listener.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index d88fe92357b5..4ebecd10a016 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -32,7 +32,7 @@ import ( // actual network sockets when closed, but will pause accepting connections. // Test could then specifically resume listeners prior to restarting servers. type ListenerRegistry struct { - listeners map[int]*reusableListener + listeners map[int]*ReusableListener } // NewListenerRegistry creates a registry of reusable listeners to be used with @@ -40,7 +40,7 @@ type ListenerRegistry struct { // listeners and inject them into test cluster using Listener field of // base.TestServerArgs. func NewListenerRegistry() ListenerRegistry { - return ListenerRegistry{listeners: make(map[int]*reusableListener)} + return ListenerRegistry{listeners: make(map[int]*ReusableListener)} } // GetOrFail returns an existing reusable socket listener or creates a new one @@ -52,7 +52,7 @@ func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) net.Listener { } nl, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err, "failed to create network listener") - l := &reusableListener{ + l := &ReusableListener{ id: idx, wrapped: nl, acceptC: make(chan acceptResult), @@ -87,7 +87,11 @@ type acceptResult struct { err error } -type reusableListener struct { +// A ReusableListener wraps a net.Listener and gives it the ability to be closed +// and reopened, which is useful for tests that want to restart servers under +// the same address without worrying about losing a race with another process' +// port acquisition. +type ReusableListener struct { id int wrapped net.Listener acceptC chan acceptResult @@ -98,7 +102,7 @@ type reusableListener struct { stopC chan interface{} } -func (l *reusableListener) run() { +func (l *ReusableListener) run() { defer func() { close(l.acceptC) }() @@ -121,20 +125,20 @@ func (l *reusableListener) run() { } } -func (l *reusableListener) pauseC() <-chan interface{} { +func (l *ReusableListener) pauseC() <-chan interface{} { l.pauseMu.RLock() defer l.pauseMu.RUnlock() return l.pauseMu.pauseC } -func (l *reusableListener) resume() { +func (l *ReusableListener) resume() { l.pauseMu.Lock() defer l.pauseMu.Unlock() l.pauseMu.pauseC = make(chan interface{}) } // Accept implements net.Listener interface. -func (l *reusableListener) Accept() (net.Conn, error) { +func (l *ReusableListener) Accept() (net.Conn, error) { select { case c, ok := <-l.acceptC: if !ok { @@ -150,7 +154,7 @@ func (l *reusableListener) Accept() (net.Conn, error) { // doesn't close underlying listener and it is the responsibility of // ListenerRegistry that provided it to close wrapped listener when registry // is closed. -func (l *reusableListener) Close() error { +func (l *ReusableListener) Close() error { l.pauseMu.Lock() defer l.pauseMu.Unlock() select { @@ -163,6 +167,6 @@ func (l *reusableListener) Close() error { } // Addr implements net.Listener interface. -func (l *reusableListener) Addr() net.Addr { +func (l *ReusableListener) Addr() net.Addr { return l.wrapped.Addr() } From d1c7fcd3460ca5f4a866014ce90dd397c9cd9686 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 5 Jul 2023 15:04:45 +0200 Subject: [PATCH 03/15] listenerutil: code movement Future commits will change the receiver to ReusableListener. --- pkg/testutils/listenerutil/listener.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index 4ebecd10a016..cabc8b571423 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -64,15 +64,6 @@ func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) net.Listener { return l } -// ReopenOrFail will allow accepting more connections on existing shared -// listener if it was previously closed. If it was not closed, nothing happens. -// If listener wasn't created previously, test failure is raised. -func (r *ListenerRegistry) ReopenOrFail(t *testing.T, idx int) { - l, ok := r.listeners[idx] - require.Truef(t, ok, "socket for id %d is not open", idx) - l.resume() -} - // Close closes and deletes all previously created shared listeners. func (r *ListenerRegistry) Close() { for k, v := range r.listeners { @@ -131,6 +122,15 @@ func (l *ReusableListener) pauseC() <-chan interface{} { return l.pauseMu.pauseC } +// ReopenOrFail will allow accepting more connections on existing shared +// listener if it was previously closed. If it was not closed, nothing happens. +// If listener wasn't created previously, test failure is raised. +func (r *ListenerRegistry) ReopenOrFail(t *testing.T, idx int) { + l, ok := r.listeners[idx] + require.Truef(t, ok, "socket for id %d is not open", idx) + l.resume() +} + func (l *ReusableListener) resume() { l.pauseMu.Lock() defer l.pauseMu.Unlock() From 788decfeffddd6fd480067debddc1eeb4dc5bf36 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 5 Jul 2023 15:12:47 +0200 Subject: [PATCH 04/15] listenerutil: return ReusableListener from GetOrFail --- pkg/testutils/listenerutil/listener.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index cabc8b571423..073e81915a05 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -45,7 +45,7 @@ func NewListenerRegistry() ListenerRegistry { // GetOrFail returns an existing reusable socket listener or creates a new one // on a random local port. -func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) net.Listener { +func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) *ReusableListener { t.Helper() if l, ok := r.listeners[idx]; ok { return l From 98744d37810229007c90a4f3e9adb5b5988de591 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 5 Jul 2023 15:13:20 +0200 Subject: [PATCH 05/15] listenerutil: put registry into ReusableListener --- pkg/testutils/listenerutil/listener.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index 073e81915a05..fdfc3c980748 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -53,6 +53,7 @@ func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) *ReusableListener { nl, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err, "failed to create network listener") l := &ReusableListener{ + reg: r, id: idx, wrapped: nl, acceptC: make(chan acceptResult), @@ -83,7 +84,8 @@ type acceptResult struct { // the same address without worrying about losing a race with another process' // port acquisition. type ReusableListener struct { - id int + reg *ListenerRegistry + id int // idx into reg.listeners wrapped net.Listener acceptC chan acceptResult pauseMu struct { From d21226daafd9fc0ec7eb33e03b13b93229650a48 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 5 Jul 2023 15:14:04 +0200 Subject: [PATCH 06/15] listenerutil: add ListenerRegistry.MustGet We'll clean up the possible confusion with `GetOrFail` (which will create the listener if not present) later. --- pkg/testutils/listenerutil/listener.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index fdfc3c980748..46385322aefb 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -65,6 +65,15 @@ func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) *ReusableListener { return l } +func (r *ListenerRegistry) MustGet(t require.TestingT, idx int) *ReusableListener { + if l, ok := r.listeners[idx]; ok { + return l + } + t.Errorf("listener %d not found", idx) + t.FailNow() + return nil // not reached +} + // Close closes and deletes all previously created shared listeners. func (r *ListenerRegistry) Close() { for k, v := range r.listeners { From 8a54f4d3c5a04e9f014953e5336521aadb8290a6 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 13:53:05 +0200 Subject: [PATCH 07/15] listenerutil: rename GetOrFail to MustGetOrCreate --- pkg/cli/debug_recover_loss_of_quorum_test.go | 2 +- pkg/kv/kvserver/client_store_test.go | 2 +- pkg/kv/kvserver/loqrecovery/server_integration_test.go | 2 +- pkg/testutils/listenerutil/listener.go | 6 +++--- pkg/util/startup/startup_test.go | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index 788d24bd248c..b504c0818100 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -462,7 +462,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { StickyEngineRegistry: storeReg, }, }, - Listener: listenerReg.GetOrFail(t, i), + Listener: listenerReg.MustGetOrCreate(t, i), StoreSpecs: []base.StoreSpec{ { InMemory: true, diff --git a/pkg/kv/kvserver/client_store_test.go b/pkg/kv/kvserver/client_store_test.go index 33b749eb2413..293dcfe67c8d 100644 --- a/pkg/kv/kvserver/client_store_test.go +++ b/pkg/kv/kvserver/client_store_test.go @@ -133,7 +133,7 @@ func TestStoreLoadReplicaQuiescent(t *testing.T) { ReusableListeners: true, ServerArgs: base.TestServerArgs{ Settings: st, - Listener: listenerReg.GetOrFail(t, 0), + Listener: listenerReg.MustGetOrCreate(t, 0), RaftConfig: base.RaftConfig{ RaftTickInterval: 100 * time.Millisecond, }, diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index 1cf507be92b0..1aa4329c89c3 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -733,7 +733,7 @@ func prepTestCluster( StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), }, }, - Listener: lReg.GetOrFail(t, i), + Listener: lReg.MustGetOrCreate(t, i), } } tc := testcluster.NewTestCluster(t, nodes, args) diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index 46385322aefb..8d53415ae791 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -36,16 +36,16 @@ type ListenerRegistry struct { } // NewListenerRegistry creates a registry of reusable listeners to be used with -// test cluster. Once created use ListenerRegistry.GetOrFail to create new +// test cluster. Once created use ListenerRegistry.MustGetOrCreate to create new // listeners and inject them into test cluster using Listener field of // base.TestServerArgs. func NewListenerRegistry() ListenerRegistry { return ListenerRegistry{listeners: make(map[int]*ReusableListener)} } -// GetOrFail returns an existing reusable socket listener or creates a new one +// MustGetOrCreate returns an existing reusable socket listener or creates a new one // on a random local port. -func (r *ListenerRegistry) GetOrFail(t *testing.T, idx int) *ReusableListener { +func (r *ListenerRegistry) MustGetOrCreate(t *testing.T, idx int) *ReusableListener { t.Helper() if l, ok := r.listeners[idx]; ok { return l diff --git a/pkg/util/startup/startup_test.go b/pkg/util/startup/startup_test.go index 6a9ce100d340..618abe4faaed 100644 --- a/pkg/util/startup/startup_test.go +++ b/pkg/util/startup/startup_test.go @@ -170,7 +170,7 @@ func runCircuitBreakerTestForKey( StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), }, }, - Listener: lReg.GetOrFail(t, i), + Listener: lReg.MustGetOrCreate(t, i), } args.ServerArgsPerNode[i] = a } From 9332af7752e63a07d70376351497b7834f12a2fb Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 5 Jul 2023 15:33:58 +0200 Subject: [PATCH 08/15] listenerutil: move Reopen to ReusableListener --- pkg/cli/debug_recover_loss_of_quorum_test.go | 2 +- .../loqrecovery/server_integration_test.go | 4 ++-- pkg/testutils/listenerutil/listener.go | 15 +++++++++------ pkg/util/startup/startup_test.go | 2 +- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index b504c0818100..8abaa7fbceac 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -555,7 +555,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { // NB: If recovery is not performed, server will just hang on startup. // This is caused by liveness range becoming unavailable and preventing any // progress. So it is likely that test will timeout if basic workflow fails. - listenerReg.ReopenOrFail(t, 0) + require.NoError(t, listenerReg.MustGet(t, 0).Reopen()) require.NoError(t, tc.RestartServer(0), "restart failed") s = sqlutils.MakeSQLRunner(tc.Conns[0]) diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index 1aa4329c89c3..db5095576338 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -630,7 +630,7 @@ func TestRetrieveApplyStatus(t *testing.T) { for _, id := range planDetails.UpdatedNodes { tc.StopServer(int(id.NodeID - 1)) - lReg.ReopenOrFail(t, int(id.NodeID-1)) + require.NoError(t, lReg.MustGet(t, int(id.NodeID-1)).Reopen()) require.NoError(t, tc.RestartServer(int(id.NodeID-1)), "failed to restart node") } @@ -682,7 +682,7 @@ func TestRejectBadVersionApplication(t *testing.T) { tc.StopServer(1) require.NoError(t, pss[1].SavePlan(plan), "failed to inject plan into storage") - lReg.ReopenOrFail(t, 1) + require.NoError(t, lReg.MustGet(t, 1).Reopen()) require.NoError(t, tc.RestartServer(1), "failed to restart server") r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index 8d53415ae791..41aad6f45736 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -133,13 +133,16 @@ func (l *ReusableListener) pauseC() <-chan interface{} { return l.pauseMu.pauseC } -// ReopenOrFail will allow accepting more connections on existing shared -// listener if it was previously closed. If it was not closed, nothing happens. -// If listener wasn't created previously, test failure is raised. -func (r *ListenerRegistry) ReopenOrFail(t *testing.T, idx int) { - l, ok := r.listeners[idx] - require.Truef(t, ok, "socket for id %d is not open", idx) +// Reopen will allow accepting more connections on existing shared listener if +// it was previously closed. If it was not closed, nothing happens. If listener +// wasn't created previously, an error is returned. +func (r *ReusableListener) Reopen() error { + l, ok := r.reg.listeners[r.id] + if !ok { + return errors.Errorf("socket for id %d is not open", r.id) + } l.resume() + return nil } func (l *ReusableListener) resume() { diff --git a/pkg/util/startup/startup_test.go b/pkg/util/startup/startup_test.go index 618abe4faaed..d9fa192c383c 100644 --- a/pkg/util/startup/startup_test.go +++ b/pkg/util/startup/startup_test.go @@ -278,7 +278,7 @@ func runCircuitBreakerTestForKey( // Restart node and check that it succeeds in reestablishing range quorum // necessary for startup actions. - lReg.ReopenOrFail(t, 5) + require.NoError(t, lReg.MustGet(t, 5).Reopen()) err = tc.RestartServer(5) require.NoError(t, err, "restarting server with range(s) %s tripping circuit breaker", rangesList) From c2e4b695b4a68bb56d26b2af2db7cd083599d054 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 14:05:04 +0200 Subject: [PATCH 09/15] listenerutil: use require.TestingT instead of *testing.T --- pkg/testutils/listenerutil/listener.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index 41aad6f45736..2a3c1f33a549 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -12,7 +12,6 @@ package listenerutil import ( "net" - "testing" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" @@ -45,8 +44,10 @@ func NewListenerRegistry() ListenerRegistry { // MustGetOrCreate returns an existing reusable socket listener or creates a new one // on a random local port. -func (r *ListenerRegistry) MustGetOrCreate(t *testing.T, idx int) *ReusableListener { - t.Helper() +func (r *ListenerRegistry) MustGetOrCreate(t require.TestingT, idx int) *ReusableListener { + if h, ok := t.(interface{ Helper() }); ok { + h.Helper() + } if l, ok := r.listeners[idx]; ok { return l } From 759e606b8a8df79935c3ff7350d0ccf6940d8309 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 14:23:01 +0200 Subject: [PATCH 10/15] listenerutil: return pointer from NewListenerRegistry --- pkg/kv/kvserver/loqrecovery/server_integration_test.go | 2 +- pkg/testutils/listenerutil/listener.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index db5095576338..81ab2c76637e 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -705,7 +705,7 @@ func prepTestCluster( *testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore, - listenerutil.ListenerRegistry, + *listenerutil.ListenerRegistry, ) { skip.UnderStressRace(t, "cluster frequently fails to start under stress race") diff --git a/pkg/testutils/listenerutil/listener.go b/pkg/testutils/listenerutil/listener.go index 2a3c1f33a549..f058a6f68a8c 100644 --- a/pkg/testutils/listenerutil/listener.go +++ b/pkg/testutils/listenerutil/listener.go @@ -38,8 +38,8 @@ type ListenerRegistry struct { // test cluster. Once created use ListenerRegistry.MustGetOrCreate to create new // listeners and inject them into test cluster using Listener field of // base.TestServerArgs. -func NewListenerRegistry() ListenerRegistry { - return ListenerRegistry{listeners: make(map[int]*ReusableListener)} +func NewListenerRegistry() *ListenerRegistry { + return &ListenerRegistry{listeners: make(map[int]*ReusableListener)} } // MustGetOrCreate returns an existing reusable socket listener or creates a new one From 5320237ce7d24622775f727333e7eaed7f2487f2 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 14:21:12 +0200 Subject: [PATCH 11/15] listenerutil: add a unit test --- pkg/BUILD.bazel | 2 + pkg/testutils/listenerutil/BUILD.bazel | 16 +++- pkg/testutils/listenerutil/listener_test.go | 94 +++++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 pkg/testutils/listenerutil/listener_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index e1276d1943b4..da3a542ff37a 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -594,6 +594,7 @@ ALL_TESTS = [ "//pkg/testutils/lint/passes/timer:timer_test", "//pkg/testutils/lint/passes/unconvert:unconvert_test", "//pkg/testutils/lint:lint_test", + "//pkg/testutils/listenerutil:listenerutil_test", "//pkg/testutils/release:release_test", "//pkg/testutils/sqlutils:sqlutils_test", "//pkg/testutils/testcluster:testcluster_test", @@ -2150,6 +2151,7 @@ GO_TARGETS = [ "//pkg/testutils/lint:lint", "//pkg/testutils/lint:lint_test", "//pkg/testutils/listenerutil:listenerutil", + "//pkg/testutils/listenerutil:listenerutil_test", "//pkg/testutils/localtestcluster:localtestcluster", "//pkg/testutils/metrictestutils:metrictestutils", "//pkg/testutils/pgtest:pgtest", diff --git a/pkg/testutils/listenerutil/BUILD.bazel b/pkg/testutils/listenerutil/BUILD.bazel index 6931a4ee9757..0632b7aed487 100644 --- a/pkg/testutils/listenerutil/BUILD.bazel +++ b/pkg/testutils/listenerutil/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "listenerutil", @@ -11,3 +11,17 @@ go_library( "@com_github_stretchr_testify//require", ], ) + +go_test( + name = "listenerutil_test", + srcs = ["listener_test.go"], + args = ["-test.timeout=295s"], + embed = [":listenerutil"], + deps = [ + "//pkg/util/ctxgroup", + "//pkg/util/leaktest", + "//pkg/util/stop", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/testutils/listenerutil/listener_test.go b/pkg/testutils/listenerutil/listener_test.go new file mode 100644 index 000000000000..356ff1291f15 --- /dev/null +++ b/pkg/testutils/listenerutil/listener_test.go @@ -0,0 +1,94 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package listenerutil + +import ( + "context" + "net" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestReusableListener(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + lr := NewListenerRegistry() + defer lr.Close() + ln := lr.MustGetOrCreate(t, 0) + + // connect accepts from the socket and also connects to it to write and then + // read a single byte. The returned error is that of the connecting goroutine; + // an error in Accept() is ignored (but others are returned). + connect := func() error { + return ctxgroup.GoAndWait(ctx, + func(ctx context.Context) (rerr error) { + defer func() { + rerr = errors.Wrap(rerr, "connecter") + }() + c, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + return err + } + if _, err := c.Write([]byte{'x'}); err != nil { + return errors.Wrap(err, "write") + } + sl := make([]byte, 1) + if _, err := c.Read(sl); err != nil { + return errors.Wrap(err, "read") + } + if sl[0] != 'x' { + return errors.Errorf("reply-read: didn't expect %v", sl) + } + _ = c.Close() + return nil + }, + func(ctx context.Context) (rerr error) { + defer func() { + rerr = errors.Wrap(rerr, "accepter") + }() + c, err := ln.Accept() + if err != nil { + t.Logf("ignoring error from Accept: %s", err) + return nil + } + sl := make([]byte, 1) + if _, err := c.Read(sl); err != nil { + return errors.Wrap(err, "read") + } + if sl[0] != 'x' { + return errors.Errorf("read: didn't expect %v", sl) + } + if _, err := c.Write([]byte{'x'}); err != nil { + return errors.Wrap(err, "reply-write") + } + _ = c.Close() + return nil + }, + ) + } + + require.NoError(t, connect()) + require.NoError(t, ln.Close()) + err := connect() + require.Error(t, err) + t.Logf("the expected error is: %v", err) + require.NoError(t, lr.MustGet(t, 0).Reopen()) + require.NoError(t, connect()) +} From 36d18daf899ac21227c4eb26ed5395baaf60f2db Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 14:25:21 +0200 Subject: [PATCH 12/15] testcluster: introduce ReusableListenerReg This will replace the ReusableListeners flag and simplify all tests using it. It's not used as of this commit, and all the tests using the soon-to-be-legacy approach still pass. --- pkg/base/BUILD.bazel | 1 + pkg/base/test_server_args.go | 12 +++++++++ pkg/testutils/testcluster/BUILD.bazel | 1 + pkg/testutils/testcluster/testcluster.go | 31 +++++++++++++++++++++++- 4 files changed, 44 insertions(+), 1 deletion(-) diff --git a/pkg/base/BUILD.bazel b/pkg/base/BUILD.bazel index 61501d598f01..3d31cd9a92e1 100644 --- a/pkg/base/BUILD.bazel +++ b/pkg/base/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/security/username", "//pkg/server/autoconfig/acprovider", "//pkg/settings/cluster", + "//pkg/testutils/listenerutil", "//pkg/util", "//pkg/util/envutil", "//pkg/util/humanizeutil", diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index 08c874a8292d..d50f270557cb 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/autoconfig/acprovider" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -209,7 +210,18 @@ type TestClusterArgs struct { // proxy listeners to TestServerArgs.Listener that would survive // net.Listener.Close() and then allow restarted server to use them again. // See testutils.ListenerRegistry. + // + // TODO(during PR): phase this out. ReusableListeners bool + + // If set, listeners will be created from the below registry and they will be + // retained across restarts (i.e. servers are kept on the same ports, but + // avoiding races where another process grabs the port while the server is + // down). It's also possible not to set this field but set a *ReusableListener + // directly in TestServerArgs.Listener. If a non-reusable listener is set in + // that field, RestartServer will return an error to guide the developer + // towards a non-flaky pattern. + ReusableListenerReg *listenerutil.ListenerRegistry } // DefaultTestTenantOptions specifies the conditions under which the default diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index 2c0fdc71e7d4..f57938b32a2f 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/sql/randgen", "//pkg/storage", "//pkg/testutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/util/allstacks", "//pkg/util/hlc", diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index dac74a94dee1..8e6d888a1467 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/allstacks" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -61,6 +62,8 @@ import ( type TestCluster struct { Servers []*server.TestServer Conns []*gosql.DB + // ReusableListeners is populated if (and only if) TestClusterArgs.ReusableListeners is set. + ReusableListeners map[int] /* idx */ *listenerutil.ReusableListener // Connection to the storage cluster. Typically, the first connection in // Conns, but could be different if we're transparently running in a test @@ -281,6 +284,18 @@ func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) * serverArgs = tc.clusterArgs.ServerArgs } + // If a reusable listener registry is provided, create reusable listeners + // for every server that doesn't have a custom listener provided. (Only + // servers with a reusable listener can be restarted). + if reg := clusterArgs.ReusableListenerReg; reg != nil && serverArgs.Listener == nil { + ln := reg.MustGetOrCreate(t, i) + serverArgs.Listener = ln + if tc.ReusableListeners == nil { + tc.ReusableListeners = map[int]*listenerutil.ReusableListener{} + } + tc.ReusableListeners[i] = ln + } + if len(serverArgs.StoreSpecs) == 0 { serverArgs.StoreSpecs = []base.StoreSpec{base.DefaultTestStoreSpec} } @@ -1644,7 +1659,11 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } serverArgs := tc.serverArgs[idx] - if !tc.clusterArgs.ReusableListeners { + if ln := tc.ReusableListeners[idx]; ln != nil { + serverArgs.Listener = ln + } + + if serverArgs.Listener == nil { if idx == 0 { // If it's the first server, then we need to restart the RPC listener by hand. // Look at NewTestCluster for more details. @@ -1663,6 +1682,16 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } } } + } else if ln, ok := serverArgs.Listener.(*listenerutil.ReusableListener); !ok { + // Restarting a server without a reusable listener can cause flakes since the + // port may be occupied by a different process now. Use a reusable listener + // to avoid that problem. + return errors.Errorf( + "ReusableListeners must be set on ClusterArgs or compatible Listener "+ + "needs to be set in serverArgs to restart server %d", idx, + ) + } else if err := ln.Reopen(); err != nil { + return err } for i, specs := range serverArgs.StoreSpecs { From a732de028831cb28a75bd5b656de8c5b342fbe3a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 19:04:06 +0100 Subject: [PATCH 13/15] *: adopt reusable listeners in all tests that restart They didn't do this before, and as a result were susceptible to flakes. They were discovered through the checks introduced in follow-up commits. It will be impossible to hold it wrong. --- pkg/kv/kvserver/client_raft_test.go | 46 ++++++++++++++----- pkg/kv/kvserver/client_replica_test.go | 6 ++- pkg/kv/kvserver/node_liveness_test.go | 20 ++++++-- pkg/kv/kvserver/replicate_queue_test.go | 6 ++- pkg/server/BUILD.bazel | 1 + pkg/server/server_startup_test.go | 4 ++ pkg/testutils/testcluster/BUILD.bazel | 1 + pkg/testutils/testcluster/testcluster_test.go | 8 +++- 8 files changed, 71 insertions(+), 21 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 23b6b49d4b84..7078c8b74217 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -47,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" @@ -92,11 +93,14 @@ func TestStoreRecoverFromEngine(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() ctx := context.Background() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { @@ -198,6 +202,8 @@ func TestStoreRecoverWithErrors(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() numIncrements := 0 keyA := roachpb.Key("a") @@ -205,7 +211,8 @@ func TestStoreRecoverWithErrors(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ @@ -341,6 +348,8 @@ func TestRestoreReplicas(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 2 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -363,8 +372,9 @@ func TestRestoreReplicas(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) store := tc.GetFirstStoreFromServer(t, 0) @@ -661,6 +671,8 @@ func TestSnapshotAfterTruncation(t *testing.T) { t.Run(name, func(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -683,8 +695,9 @@ func TestSnapshotAfterTruncation(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) store := tc.GetFirstStoreFromServer(t, 0) @@ -4192,6 +4205,8 @@ func TestInitRaftGroupOnRequest(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 2 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -4214,8 +4229,9 @@ func TestInitRaftGroupOnRequest(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) @@ -4703,6 +4719,8 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() stopper := stop.NewStopper() ctx := context.Background() @@ -4764,8 +4782,9 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) // Make a key that's in the user data space. @@ -5097,6 +5116,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { stickyEngineRegistry server.StickyInMemEnginesRegistry, ) { stickyEngineRegistry = server.NewStickyInMemEnginesRegistry() + lisReg := listenerutil.NewListenerRegistry() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -5128,10 +5148,12 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { tc = testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) + tc.Stopper().AddCloser(stop.CloserFn(lisReg.Close)) db = tc.GetFirstStoreFromServer(t, 1).DB() // Split off a non-system range so we don't have to account for node liveness diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 4737cd86676f..d6ba3ba968c1 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -51,6 +51,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -2158,13 +2159,16 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() var leaseAcquisitionTrap atomic.Value ctx := context.Background() manual := hlc.NewHybridManualClock() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index fadcebdf1151..66a6ef5c31c2 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -152,11 +153,14 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() ctx := context.Background() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { @@ -869,6 +873,8 @@ func TestNodeLivenessSetDraining(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -891,8 +897,9 @@ func TestNodeLivenessSetDraining(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) @@ -1200,6 +1207,8 @@ func verifyNodeIsDecommissioning(t *testing.T, tc *testcluster.TestCluster, node func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1222,8 +1231,9 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { ctx := context.Background() tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 99c26cec4aac..9f1a245d7ea9 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -2091,6 +2092,8 @@ func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() zcfg := zonepb.DefaultZoneConfig() zcfg.NumReplicas = proto.Int32(1) @@ -2098,7 +2101,8 @@ func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { base.TestClusterArgs{ // Disable the replication queue initially, to assert on the lease // statuses pre and post enabling the replicate queue. - ReplicationMode: base.ReplicationManual, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ Settings: st, DefaultTestTenant: base.TestTenantDisabled, diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index ac9537cba1a9..05d14026ba1c 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -524,6 +524,7 @@ go_test( "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/diagutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/server/server_startup_test.go b/pkg/server/server_startup_test.go index 8f719c1c6bac..363d21c016b6 100644 --- a/pkg/server/server_startup_test.go +++ b/pkg/server/server_startup_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -41,9 +42,12 @@ func TestStartupInjectedFailureSingleNode(t *testing.T) { t.Log("TestStartupInjectedFailure random seed", seed) reg := server.NewStickyInMemEnginesRegistry() defer reg.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() var enableFaults atomic.Bool args := base.TestClusterArgs{ + ReusableListenerReg: lisReg, ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index f57938b32a2f..b3dc3a2caad4 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -64,6 +64,7 @@ go_test( "//pkg/server/serverpb", "//pkg/sql/catalog/desctestutils", "//pkg/testutils", + "//pkg/testutils/listenerutil", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/util/httputil", diff --git a/pkg/testutils/testcluster/testcluster_test.go b/pkg/testutils/testcluster/testcluster_test.go index 44af28ef8634..6c01b37db2c3 100644 --- a/pkg/testutils/testcluster/testcluster_test.go +++ b/pkg/testutils/testcluster/testcluster_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/listenerutil" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -275,6 +276,8 @@ func TestRestart(t *testing.T) { stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() defer stickyEngineRegistry.CloseAllStickyInMemEngines() + lisReg := listenerutil.NewListenerRegistry() + defer lisReg.Close() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -297,8 +300,9 @@ func TestRestart(t *testing.T) { ctx := context.Background() tc := StartTestCluster(t, numServers, base.TestClusterArgs{ - ReplicationMode: base.ReplicationAuto, - ServerArgsPerNode: stickyServerArgs, + ReplicationMode: base.ReplicationAuto, + ReusableListenerReg: lisReg, + ServerArgsPerNode: stickyServerArgs, }) defer tc.Stopper().Stop(ctx) require.NoError(t, tc.WaitForFullReplication()) From 27af3713afe1e3610772211564ae3b0f32adeaed Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 14:43:40 +0200 Subject: [PATCH 14/15] *: phase out ReusableListener bool --- pkg/base/test_server_args.go | 9 --------- pkg/cli/debug_recover_loss_of_quorum_test.go | 6 ++---- pkg/kv/kvserver/client_store_test.go | 5 ++--- pkg/kv/kvserver/loqrecovery/server_integration_test.go | 9 ++++----- pkg/util/startup/startup_test.go | 4 ++-- 5 files changed, 10 insertions(+), 23 deletions(-) diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index d50f270557cb..5e45fddda746 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -205,15 +205,6 @@ type TestClusterArgs struct { // and potentially adjusted according to ReplicationMode. ServerArgsPerNode map[int]TestServerArgs - // If reusable listeners is true, then restart should keep listeners untouched - // so that servers are kept on the same ports. It is up to the test to set - // proxy listeners to TestServerArgs.Listener that would survive - // net.Listener.Close() and then allow restarted server to use them again. - // See testutils.ListenerRegistry. - // - // TODO(during PR): phase this out. - ReusableListeners bool - // If set, listeners will be created from the below registry and they will be // retained across restarts (i.e. servers are kept on the same ports, but // avoiding races where another process grabs the port while the server is diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index 8abaa7fbceac..3c2ea66aa50d 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -462,7 +462,6 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { StickyEngineRegistry: storeReg, }, }, - Listener: listenerReg.MustGetOrCreate(t, i), StoreSpecs: []base.StoreSpec{ { InMemory: true, @@ -471,8 +470,8 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { } } tc := testcluster.NewTestCluster(t, 3, base.TestClusterArgs{ - ReusableListeners: true, - ServerArgsPerNode: sa, + ReusableListenerReg: listenerReg, + ServerArgsPerNode: sa, }) tc.Start(t) s := sqlutils.MakeSQLRunner(tc.Conns[0]) @@ -555,7 +554,6 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { // NB: If recovery is not performed, server will just hang on startup. // This is caused by liveness range becoming unavailable and preventing any // progress. So it is likely that test will timeout if basic workflow fails. - require.NoError(t, listenerReg.MustGet(t, 0).Reopen()) require.NoError(t, tc.RestartServer(0), "restart failed") s = sqlutils.MakeSQLRunner(tc.Conns[0]) diff --git a/pkg/kv/kvserver/client_store_test.go b/pkg/kv/kvserver/client_store_test.go index 293dcfe67c8d..20b959d1f135 100644 --- a/pkg/kv/kvserver/client_store_test.go +++ b/pkg/kv/kvserver/client_store_test.go @@ -129,11 +129,10 @@ func TestStoreLoadReplicaQuiescent(t *testing.T) { kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, expOnly) tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ReusableListeners: true, + ReplicationMode: base.ReplicationManual, + ReusableListenerReg: listenerReg, ServerArgs: base.TestServerArgs{ Settings: st, - Listener: listenerReg.MustGetOrCreate(t, 0), RaftConfig: base.RaftConfig{ RaftTickInterval: 100 * time.Millisecond, }, diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index 81ab2c76637e..d5cb4f0ad072 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -630,7 +630,6 @@ func TestRetrieveApplyStatus(t *testing.T) { for _, id := range planDetails.UpdatedNodes { tc.StopServer(int(id.NodeID - 1)) - require.NoError(t, lReg.MustGet(t, int(id.NodeID-1)).Reopen()) require.NoError(t, tc.RestartServer(int(id.NodeID-1)), "failed to restart node") } @@ -682,7 +681,6 @@ func TestRejectBadVersionApplication(t *testing.T) { tc.StopServer(1) require.NoError(t, pss[1].SavePlan(plan), "failed to inject plan into storage") - require.NoError(t, lReg.MustGet(t, 1).Reopen()) require.NoError(t, tc.RestartServer(1), "failed to restart server") r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) @@ -705,6 +703,8 @@ func prepTestCluster( *testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore, + // TODO(during PR): no caller uses this now except to close it, so close it + // via stopper and don't return it. *listenerutil.ListenerRegistry, ) { skip.UnderStressRace(t, "cluster frequently fails to start under stress race") @@ -714,8 +714,8 @@ func prepTestCluster( lReg := listenerutil.NewListenerRegistry() args := base.TestClusterArgs{ - ServerArgsPerNode: make(map[int]base.TestServerArgs), - ReusableListeners: true, + ServerArgsPerNode: make(map[int]base.TestServerArgs), + ReusableListenerReg: lReg, } for i := 0; i < nodes; i++ { args.ServerArgsPerNode[i] = base.TestServerArgs{ @@ -733,7 +733,6 @@ func prepTestCluster( StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), }, }, - Listener: lReg.MustGetOrCreate(t, i), } } tc := testcluster.NewTestCluster(t, nodes, args) diff --git a/pkg/util/startup/startup_test.go b/pkg/util/startup/startup_test.go index d9fa192c383c..9a4512665570 100644 --- a/pkg/util/startup/startup_test.go +++ b/pkg/util/startup/startup_test.go @@ -149,8 +149,8 @@ func runCircuitBreakerTestForKey( kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) args := base.TestClusterArgs{ - ServerArgsPerNode: make(map[int]base.TestServerArgs), - ReusableListeners: true, + ServerArgsPerNode: make(map[int]base.TestServerArgs), + ReusableListenerReg: lReg, } var enableFaults atomic.Bool for i := 0; i < nodes; i++ { From 4e8a998bc19642e46b3d26f0ba934db466aef4f6 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 11 Jul 2023 14:47:35 +0200 Subject: [PATCH 15/15] loqrecovery: address a TODO --- pkg/kv/kvserver/loqrecovery/BUILD.bazel | 1 + .../loqrecovery/server_integration_test.go | 46 ++++++------------- 2 files changed, 16 insertions(+), 31 deletions(-) diff --git a/pkg/kv/kvserver/loqrecovery/BUILD.bazel b/pkg/kv/kvserver/loqrecovery/BUILD.bazel index 451816b59b20..7f30694650fc 100644 --- a/pkg/kv/kvserver/loqrecovery/BUILD.bazel +++ b/pkg/kv/kvserver/loqrecovery/BUILD.bazel @@ -106,6 +106,7 @@ go_test( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", + "//pkg/util/stop", "//pkg/util/strutil", "//pkg/util/timeutil", "//pkg/util/uuid", diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index d5cb4f0ad072..3326d6f1a500 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -197,8 +198,7 @@ func TestGetPlanStagingState(t *testing.T) { ctx := context.Background() - tc, reg, planStores, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, planStores := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -260,8 +260,7 @@ func TestStageRecoveryPlans(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -302,8 +301,7 @@ func TestStageBadVersions(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 1) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 1) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -332,8 +330,7 @@ func TestStageConflictingPlans(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -373,8 +370,7 @@ func TestForcePlanUpdate(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -416,8 +412,7 @@ func TestNodeDecommissioned(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -450,8 +445,7 @@ func TestRejectDecommissionReachableNode(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -472,8 +466,7 @@ func TestStageRecoveryPlansToWrongCluster(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -505,8 +498,7 @@ func TestRetrieveRangeStatus(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 5) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 5) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -562,8 +554,7 @@ func TestRetrieveApplyStatus(t *testing.T) { ctx := context.Background() - tc, reg, _, lReg := prepTestCluster(t, 5) - defer lReg.Close() + tc, reg, _ := prepTestCluster(t, 5) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -660,8 +651,7 @@ func TestRejectBadVersionApplication(t *testing.T) { ctx := context.Background() - tc, reg, pss, lReg := prepTestCluster(t, 3) - defer lReg.Close() + tc, reg, pss := prepTestCluster(t, 3) defer reg.CloseAllStickyInMemEngines() defer tc.Stopper().Stop(ctx) @@ -699,14 +689,7 @@ func TestRejectBadVersionApplication(t *testing.T) { func prepTestCluster( t *testing.T, nodes int, -) ( - *testcluster.TestCluster, - server.StickyInMemEnginesRegistry, - map[int]loqrecovery.PlanStore, - // TODO(during PR): no caller uses this now except to close it, so close it - // via stopper and don't return it. - *listenerutil.ListenerRegistry, -) { +) (*testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore) { skip.UnderStressRace(t, "cluster frequently fails to start under stress race") reg := server.NewStickyInMemEnginesRegistry() @@ -737,7 +720,8 @@ func prepTestCluster( } tc := testcluster.NewTestCluster(t, nodes, args) tc.Start(t) - return tc, reg, prepInMemPlanStores(t, args.ServerArgsPerNode), lReg + tc.Stopper().AddCloser(stop.CloserFn(lReg.Close)) + return tc, reg, prepInMemPlanStores(t, args.ServerArgsPerNode) } func prepInMemPlanStores(