From 169a4aedee275ee0b9918a2484489a4ef90fddbc Mon Sep 17 00:00:00 2001 From: "Chris S. Kim" Date: Thu, 23 Feb 2023 12:57:10 -0500 Subject: [PATCH 1/3] Fix various flaky tests --- .../services/peerstream/stream_test.go | 4 +-- .../services/peerstream/testing.go | 10 ++++++ agent/xds/delta_test.go | 36 +++++++++++-------- command/debug/debug.go | 3 +- 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 904c2c28b15e..055d2ff97cd8 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1252,8 +1252,8 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) { }) testutil.RunStep(t, "stream is disconnected due to heartbeat timeout", func(t *testing.T) { - disconnectTime := ptr(it.FutureNow(1)) retry.Run(t, func(r *retry.R) { + disconnectTime := ptr(it.StaticNow()) status, ok := srv.StreamStatus(testPeerID) require.True(r, ok) require.False(r, status.Connected) @@ -1423,7 +1423,7 @@ func makeClient(t *testing.T, srv *testServer, peerID string) *MockClient { }, })) - // Receive a services and roots subscription request pair from server + // Receive ExportedService, ExportedServiceList, and PeeringTrustBundle subscription requests from server receivedSub1, err := client.Recv() require.NoError(t, err) receivedSub2, err := client.Recv() diff --git a/agent/grpc-external/services/peerstream/testing.go b/agent/grpc-external/services/peerstream/testing.go index 676aba46f230..2e6e614e9a01 100644 --- a/agent/grpc-external/services/peerstream/testing.go +++ b/agent/grpc-external/services/peerstream/testing.go @@ -150,6 +150,16 @@ func (t *incrementalTime) Now() time.Time { return t.base.Add(dur) } +// StaticNow returns the current internal clock without advancing it. +func (t *incrementalTime) StaticNow() time.Time { + t.mu.Lock() + defer t.mu.Unlock() + + dur := time.Duration(t.next) * time.Second + + return t.base.Add(dur) +} + // FutureNow will return a given future value of the Now() function. // The numerical argument indicates which future Now value you wanted. The // value must be > 0. diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 23d60198bbf8..de2b594150ed 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -10,7 +10,7 @@ import ( "github.com/armon/go-metrics" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "github.com/hashicorp/consul/api" + "github.com/stretchr/testify/require" rpcstatus "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -21,8 +21,10 @@ import ( "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/version" ) @@ -1095,13 +1097,15 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { // If there is no token, check that we increment the gauge if tt.token == "" { - data := scenario.sink.Data() - require.Len(t, data, 1) - - item := data[0] - val, ok := item.Gauges["consul.xds.test.xds.server.streamsUnauthenticated"] - require.True(t, ok) - require.Equal(t, float32(1), val.Value) + retry.Run(t, func(r *retry.R) { + data := scenario.sink.Data() + require.Len(r, data, 1) + + item := data[0] + val, ok := item.Gauges["consul.xds.test.xds.server.streamsUnauthenticated"] + require.True(r, ok) + require.Equal(r, float32(1), val.Value) + }) } if !tt.wantDenied { @@ -1138,13 +1142,15 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { // If there is no token, check that we decrement the gauge if tt.token == "" { - data := scenario.sink.Data() - require.Len(t, data, 1) - - item := data[0] - val, ok := item.Gauges["consul.xds.test.xds.server.streamsUnauthenticated"] - require.True(t, ok) - require.Equal(t, float32(0), val.Value) + retry.Run(t, func(r *retry.R) { + data := scenario.sink.Data() + require.Len(r, data, 1) + + item := data[0] + val, ok := item.Gauges["consul.xds.test.xds.server.streamsUnauthenticated"] + require.True(r, ok) + require.Equal(r, float32(0), val.Value) + }) } }) } diff --git a/command/debug/debug.go b/command/debug/debug.go index 017f42b77a2c..dd03286d68fc 100644 --- a/command/debug/debug.go +++ b/command/debug/debug.go @@ -270,7 +270,8 @@ func (c *cmd) prepare() (version string, err error) { // If none are specified we will collect information from // all by default if len(c.capture) == 0 { - c.capture = defaultTargets + c.capture = make([]string, len(defaultTargets)) + copy(c.capture, defaultTargets) } // If EnableDebug is not true, skip collecting pprof From d7429dda9b85e33ff2b29d4d55708416c2ff6220 Mon Sep 17 00:00:00 2001 From: "Chris S. Kim" Date: Thu, 23 Feb 2023 14:07:58 -0500 Subject: [PATCH 2/3] Capture a variable in for-loop --- agent/xds/delta_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index de2b594150ed..a61050f8457d 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -10,7 +10,6 @@ import ( "github.com/armon/go-metrics" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "github.com/stretchr/testify/require" rpcstatus "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" @@ -1059,19 +1058,23 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + // aclResolve may be called in a goroutine even after a + // testcase tt returns. Capture the variable as tc so the + // values don't swap in the next iteration. + tc := tt aclResolve := func(id string) (acl.Authorizer, error) { - if !tt.defaultDeny { + if !tc.defaultDeny { // Allow all return acl.RootAuthorizer("allow"), nil } - if tt.acl == "" { + if tc.acl == "" { // No token and defaultDeny is denied return acl.RootAuthorizer("deny"), nil } // Ensure the correct token was passed - require.Equal(t, tt.token, id) + require.Equal(t, tc.token, id) // Parse the ACL and enforce it - policy, err := acl.NewPolicyFromSource(tt.acl, nil, nil) + policy, err := acl.NewPolicyFromSource(tc.acl, nil, nil) require.NoError(t, err) return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) } From 7ee6f72dced7322f823f852e6742313188e69a3e Mon Sep 17 00:00:00 2001 From: "Chris S. Kim" Date: Thu, 23 Feb 2023 14:18:35 -0500 Subject: [PATCH 3/3] Clean up resources in correct order --- agent/xds/xds_protocol_helpers_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 8c4481515c8b..2edd05b9fb20 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -166,9 +166,6 @@ func newTestServerDeltaScenario( ) *testServerScenario { mgr := newTestManager(t) envoy := NewTestEnvoy(t, proxyID, token) - t.Cleanup(func() { - envoy.Close() - }) sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute) cfg := metrics.DefaultConfig("consul.xds.test") @@ -177,6 +174,7 @@ func newTestServerDeltaScenario( metrics.NewGlobal(cfg, sink) t.Cleanup(func() { + envoy.Close() sink := &metrics.BlackholeSink{} metrics.NewGlobal(cfg, sink) })