diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index b2a116c9b38f..f2467c4405cd 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -312,6 +312,8 @@ go_library( "@com_github_prometheus_client_golang//api", "@com_github_prometheus_client_golang//api/prometheus/v1:prometheus", "@com_github_prometheus_client_golang//prometheus", + "@com_github_prometheus_client_model//go", + "@com_github_prometheus_common//expfmt", "@com_github_prometheus_common//model", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 6769e73191e7..0e3b1585db45 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -30,6 +30,7 @@ import ( "net/http" "net/url" "os" + "os/exec" "path/filepath" "regexp" "runtime" @@ -62,6 +63,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/debug" "github.com/cockroachdb/errors" + prompb "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "golang.org/x/oauth2/clientcredentials" ) @@ -590,6 +593,38 @@ func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob { return cj } +// verifyMetrics runs the check function on the prometheus metrics of each +// cockroach node in the cluster until it returns true, or until its retry +// period expires. +func (ct *cdcTester) verifyMetrics( + ctx context.Context, check func(metrics map[string]*prompb.MetricFamily) (ok bool), +) { + parser := expfmt.TextParser{} + + testutils.SucceedsSoon(ct.t, func() error { + uiAddrs, err := ct.cluster.ExternalAdminUIAddr(ctx, ct.logger, ct.cluster.CRDBNodes()) + if err != nil { + return err + } + for _, uiAddr := range uiAddrs { + uiAddr = fmt.Sprintf("https://%s/_status/vars", uiAddr) + out, err := exec.Command("curl", "-kf", uiAddr).Output() + if err != nil { + return err + } + res, err := parser.TextToMetricFamilies(bytes.NewReader(out)) + if err != nil { + return err + } + if check(res) { + ct.t.Status("metrics check passed") + return nil + } + } + return errors.New("metrics check failed") + }) +} + // runFeedLatencyVerifier runs a goroutine which polls the various latencies // for a changefeed job (initial scan latency, etc) and asserts that they // are below the specified targets. @@ -1614,6 +1649,7 @@ func registerCDC(r registry.Registry) { steadyLatency: 5 * time.Minute, }) ct.waitForWorkload() + ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out")) }, }) // An example usage of having multiple sink nodes. @@ -1835,6 +1871,7 @@ func registerCDC(r registry.Registry) { steadyLatency: time.Minute, }) ct.waitForWorkload() + ct.verifyMetrics(ctx, verifyMetricsNonZero("cloud_read_bytes", "cloud_write_bytes")) }, }) r.Add(registry.TestSpec{ @@ -1864,7 +1901,10 @@ func registerCDC(r registry.Registry) { initialScanLatency: 30 * time.Minute, steadyLatency: time.Minute, }) + ct.waitForWorkload() + + ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out")) }, }) @@ -1902,7 +1942,10 @@ func registerCDC(r registry.Registry) { initialScanLatency: 30 * time.Minute, steadyLatency: time.Minute, }) + ct.waitForWorkload() + + ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out")) }, }) @@ -1984,6 +2027,8 @@ func registerCDC(r registry.Registry) { }) ct.waitForWorkload() + + ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out")) }, }) r.Add(registry.TestSpec{ @@ -2200,6 +2245,7 @@ func registerCDC(r registry.Registry) { steadyLatency: 10 * time.Minute, }) ct.waitForWorkload() + ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out")) }, RequiresLicense: true, }) @@ -3654,3 +3700,33 @@ func (c *topicConsumer) close() { } _ = c.consumer.Close() } + +// verifyMetricsNonZero returns a check function for runMetricsVerifier that +// checks that the metrics matching the names input are > 0. +func verifyMetricsNonZero(names ...string) func(metrics map[string]*prompb.MetricFamily) (ok bool) { + namesMap := make(map[string]struct{}, len(names)) + for _, name := range names { + namesMap[name] = struct{}{} + } + + return func(metrics map[string]*prompb.MetricFamily) (ok bool) { + found := map[string]struct{}{} + + for name, fam := range metrics { + if _, ok := namesMap[name]; !ok { + continue + } + + for _, m := range fam.Metric { + if m.Counter.GetValue() > 0 { + found[name] = struct{}{} + } + } + + if len(found) == len(names) { + return true + } + } + return false + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 3c76ddbb5587..cf0618b74808 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -342,11 +342,10 @@ type Processor interface { // that ProcessPiggybackedAdmittedAtLeaderRaftMuLocked gets called soon. EnqueuePiggybackedAdmittedAtLeader(roachpb.ReplicaID, kvflowcontrolpb.AdmittedState) // ProcessPiggybackedAdmittedAtLeaderRaftMuLocked is called to process - // previously enqueued piggybacked admitted vectors. Returns true if - // HandleRaftReadyRaftMuLocked should be called. + // previously enqueued piggybacked admitted vectors. // // raftMu is held. - ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) bool + ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) // SideChannelForPriorityOverrideAtFollowerRaftMuLocked is called on a // follower to provide information about whether the leader is using the @@ -919,10 +918,10 @@ func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader( } // ProcessPiggybackedAdmittedAtLeaderRaftMuLocked implements Processor. -func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) bool { +func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) { p.opts.Replica.RaftMuAssertHeld() if p.destroyed { - return false + return } var updates map[roachpb.ReplicaID]rac2.AdmittedVector // Swap the updates map with the empty scratch. This is an optimization to @@ -936,14 +935,13 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte } }() if len(updates) == 0 { - return false + return } for replicaID, state := range updates { p.leader.rc.AdmitRaftMuLocked(ctx, replicaID, state) } // Clear the scratch from the updates that we have just handled. clear(p.leader.scratch) - return true } // SideChannelForPriorityOverrideAtFollowerRaftMuLocked implements Processor. diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 1bc033f2d533..6a9f33927923 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1430,10 +1430,10 @@ func (r *Replica) tick( return true, nil } -func (r *Replica) processRACv2PiggybackedAdmitted(ctx context.Context) bool { +func (r *Replica) processRACv2PiggybackedAdmitted(ctx context.Context) { r.raftMu.Lock() defer r.raftMu.Unlock() - return r.flowControlV2.ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx) + r.flowControlV2.ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx) } func (r *Replica) hasRaftReadyRLocked() bool { diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index b1c8462f484c..373ab97c9f2b 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -122,9 +122,9 @@ type raftProcessor interface { // Process a raft tick for the specified range. // Return true if the range should be queued for ready processing. processTick(context.Context, roachpb.RangeID) bool - // Process a piggybacked raftpb.Message that advances admitted. Used for - // RACv2. Returns true if the range should be queued for ready processing. - processRACv2PiggybackedAdmitted(ctx context.Context, id roachpb.RangeID) bool + // Process piggybacked admitted vectors that may advance admitted state for + // the given range's peer replicas. Used for RACv2. + processRACv2PiggybackedAdmitted(ctx context.Context, id roachpb.RangeID) } type raftScheduleFlags int @@ -414,14 +414,8 @@ func (ss *raftSchedulerShard) worker( } } if state.flags&stateRACv2PiggybackedAdmitted != 0 { - // processRACv2PiggybackedAdmitted returns true if the range should - // perform ready processing. Do not reorder this below the call to - // processReady. - if processor.processRACv2PiggybackedAdmitted(ctx, id) { - state.flags |= stateRaftReady - } + processor.processRACv2PiggybackedAdmitted(ctx, id) } - if state.flags&stateRaftReady != 0 { processor.processReady(id) } diff --git a/pkg/kv/kvserver/scheduler_test.go b/pkg/kv/kvserver/scheduler_test.go index f8a21471be63..496b21271613 100644 --- a/pkg/kv/kvserver/scheduler_test.go +++ b/pkg/kv/kvserver/scheduler_test.go @@ -179,10 +179,7 @@ func (p *testProcessor) processTick(_ context.Context, rangeID roachpb.RangeID) return false } -func (p *testProcessor) processRACv2PiggybackedAdmitted( - ctx context.Context, id roachpb.RangeID, -) bool { - return false +func (p *testProcessor) processRACv2PiggybackedAdmitted(_ context.Context, _ roachpb.RangeID) { } func (p *testProcessor) readyCount(rangeID roachpb.RangeID) int { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 697c0cbdab55..b7ce368761fd 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -721,12 +721,10 @@ func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool { return exists // ready } -func (s *Store) processRACv2PiggybackedAdmitted(ctx context.Context, rangeID roachpb.RangeID) bool { - r, ok := s.mu.replicasByRangeID.Load(rangeID) - if !ok { - return false +func (s *Store) processRACv2PiggybackedAdmitted(ctx context.Context, rangeID roachpb.RangeID) { + if r, ok := s.mu.replicasByRangeID.Load(rangeID); ok { + r.processRACv2PiggybackedAdmitted(ctx) } - return r.processRACv2PiggybackedAdmitted(ctx) } // nodeIsLiveCallback is invoked when a node transitions from non-live to live.