Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132916: kvserver: clear rac2 token metrics prior to integration testing r=sumeerbhola a=kvoli

`TestFlowControl.*V2` tests assert on exact counters. This can be problematic if benign deltas occur while setting up the test, such a send queue forming when adding a new learner, but being quickly resolved.

Clear the token metrics prior to commencing these tests, in order to prevent flakes that result from such deltas in setup.

Fixes: #132642
Release note: None

133089: roachprod: update default CockroachDB logging configuration r=dhartunian a=jbowens

Update the default logging configuration used for roachprod clusters to disable auditable logs on logs going to file sinks. Some roachtests use the buffered:true configuration to withstand disk stall events. This setting is incompatible with auditable logs on file sinks and recently introduced validation (#132742) prohibits the settings from being used together.

Release note: none
Informs #129922.
Informs #132988.
Epic: none

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
3 people committed Oct 21, 2024
3 parents ee22c97 + 6815022 + 35625de commit 47acd83
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ go_test(
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch",
"//pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb",
"//pkg/kv/kvserver/kvflowcontrol/node_rac2",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
Expand Down
65 changes: 65 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -2253,6 +2254,10 @@ func TestFlowControlBasicV2(t *testing.T) {
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- Flow token metrics, before issuing the 1MiB replicated write.`)
h.query(n1, v2FlowTokensQueryStr)
Expand Down Expand Up @@ -2342,6 +2347,10 @@ func TestFlowControlRangeSplitMergeV2(t *testing.T) {
require.NoError(t, err)

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)
h.log("sending put request to pre-split range")
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
h.log("sent put request to pre-split range")
Expand Down Expand Up @@ -2464,6 +2473,10 @@ func TestFlowControlBlockedAdmissionV2(t *testing.T) {
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 5 1MiB, 3x replicated write that's not admitted.)`)
h.log("sending put requests")
Expand Down Expand Up @@ -2579,6 +2592,10 @@ func TestFlowControlAdmissionPostSplitMergeV2(t *testing.T) {
require.NoError(t, err)

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.log("sending put request to pre-split range")
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -2722,6 +2739,10 @@ func TestFlowControlCrashedNodeV2(t *testing.T) {
require.NoError(t, err)
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 2, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 5x1MiB, 2x replicated writes that are not admitted.)`)
h.log("sending put requests")
Expand Down Expand Up @@ -2870,6 +2891,10 @@ func TestFlowControlRaftSnapshotV2(t *testing.T) {
repl := store.LookupReplica(roachpb.RKey(k))
require.NotNil(t, repl)
h.waitForConnectedStreams(ctx, repl.RangeID, 5, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

// Set up a key to replicate across the cluster. We're going to modify this
// key and truncate the raft logs from that command after killing one of the
Expand Down Expand Up @@ -3085,6 +3110,10 @@ func TestFlowControlRaftMembershipV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3224,6 +3253,10 @@ func TestFlowControlRaftMembershipRemoveSelfV2(t *testing.T) {
// Make sure the lease is on n1 and that we're triply connected.
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3353,6 +3386,10 @@ func TestFlowControlClassPrioritizationV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3469,6 +3506,10 @@ func TestFlowControlUnquiescedRangeV2(t *testing.T) {
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))

h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 1x1MiB, 3x replicated elastic write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.BulkNormalPri)
Expand Down Expand Up @@ -3571,6 +3612,10 @@ func TestFlowControlTransferLeaseV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3664,6 +3709,10 @@ func TestFlowControlLeaderNotLeaseholderV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 1x1MiB, 3x replicated write that's not admitted.)`)
h.put(ctx, k, 1<<20 /* 1MiB */, testFlowModeToPri(mode))
Expand Down Expand Up @@ -3780,6 +3829,10 @@ func TestFlowControlGranterAdmitOneByOneV2(t *testing.T) {
desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
// Reset the token metrics, since a send queue may have instantly
// formed when adding one of the replicas, before being quickly
// drained.
h.resetV2TokenMetrics(ctx)

h.comment(`-- (Issuing 1024*1KiB, 3x replicated writes that are not admitted.)`)
h.log("sending put requests")
Expand Down Expand Up @@ -4865,6 +4918,18 @@ func (h *flowControlTestHelper) enableVerboseRaftMsgLoggingForRange(rangeID roac
}
}

func (h *flowControlTestHelper) resetV2TokenMetrics(ctx context.Context) {
for _, server := range h.tc.Servers {
require.NoError(h.t, server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error {
s.GetStoreConfig().KVFlowStreamTokenProvider.Metrics().(*rac2.TokenMetrics).TestingClear()
_, err := s.ComputeMetricsPeriodically(ctx, nil, 0)
require.NoError(h.t, err)
s.GetStoreConfig().KVFlowStreamTokenProvider.UpdateMetricGauges()
return nil
}))
}
}

// makeV2EnabledTestFileName is a utility function which returns an updated
// filename for the testdata file based on the v2EnabledWhenLeaderLevel.
func makeV2EnabledTestFileName(
Expand Down
28 changes: 28 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,34 @@ func NewTokenMetrics() *TokenMetrics {
return m
}

// TestingClear is used in tests to reset the metrics.
func (m *TokenMetrics) TestingClear() {
// NB: we only clear the counter metrics, as the stream metrics are gauges.
for _, typ := range []TokenType{
EvalToken,
SendToken,
} {
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.CounterMetrics[typ].Deducted[wc].Clear()
m.CounterMetrics[typ].Returned[wc].Clear()
m.CounterMetrics[typ].Unaccounted[wc].Clear()
m.CounterMetrics[typ].Disconnected[wc].Clear()
if typ == SendToken {
m.CounterMetrics[typ].SendQueue[0].ForceFlushDeducted.Clear()
for _, wc := range []admissionpb.WorkClass{
admissionpb.RegularWorkClass,
admissionpb.ElasticWorkClass,
} {
m.CounterMetrics[typ].SendQueue[0].PreventionDeducted[wc].Clear()
}
}
}
}
}

type TokenCounterMetrics struct {
Deducted [admissionpb.NumWorkClasses]*metric.Counter
Returned [admissionpb.NumWorkClasses]*metric.Counter
Expand Down
6 changes: 3 additions & 3 deletions pkg/roachprod/install/files/cockroachdb-logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ sinks:
channels: [STORAGE]
security:
channels: [PRIVILEGES, USER_ADMIN]
auditable: true
auditable: false
sql-audit:
channels: [SENSITIVE_ACCESS]
auditable: true
auditable: false
sql-auth:
channels: [SESSIONS]
auditable: true
auditable: false
sql-exec:
channels: [SQL_EXEC]
sql-slow:
Expand Down

0 comments on commit 47acd83

Please sign in to comment.