Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130522: changefeedccl: add roachtesting for network metrics r=rharding6373 a=asg0451

Add a roachtest helper that checks that the new
cdc network metrics are being emitted. For now it
simply checks that they are > 0.

Part of: #130097

Release note: None

130585: replica_rac2: rm Ready scheduling on piggybacked r=sumeerbhola a=pav-kv

Since the piggybacked admitted vectors are applied immediately from within the method that clears the queued updates, we no longer need to schedule a Ready cycle. Previously the token release would happen subsequently in Ready handler.

Related to #129508

Co-authored-by: Miles Frankel <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
3 people committed Sep 12, 2024
3 parents b7f7a6e + 33c96f7 + 915fbf9 commit a463cb3
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 28 deletions.
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ go_library(
"@com_github_prometheus_client_golang//api",
"@com_github_prometheus_client_golang//api/prometheus/v1:prometheus",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
"@com_github_prometheus_common//expfmt",
"@com_github_prometheus_common//model",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
76 changes: 76 additions & 0 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"regexp"
"runtime"
Expand Down Expand Up @@ -62,6 +63,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/debug"
"github.com/cockroachdb/errors"
prompb "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"golang.org/x/oauth2/clientcredentials"
)

Expand Down Expand Up @@ -590,6 +593,38 @@ func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob {
return cj
}

// verifyMetrics runs the check function on the prometheus metrics of each
// cockroach node in the cluster until it returns true, or until its retry
// period expires.
func (ct *cdcTester) verifyMetrics(
ctx context.Context, check func(metrics map[string]*prompb.MetricFamily) (ok bool),
) {
parser := expfmt.TextParser{}

testutils.SucceedsSoon(ct.t, func() error {
uiAddrs, err := ct.cluster.ExternalAdminUIAddr(ctx, ct.logger, ct.cluster.CRDBNodes())
if err != nil {
return err
}
for _, uiAddr := range uiAddrs {
uiAddr = fmt.Sprintf("https://%s/_status/vars", uiAddr)
out, err := exec.Command("curl", "-kf", uiAddr).Output()
if err != nil {
return err
}
res, err := parser.TextToMetricFamilies(bytes.NewReader(out))
if err != nil {
return err
}
if check(res) {
ct.t.Status("metrics check passed")
return nil
}
}
return errors.New("metrics check failed")
})
}

// runFeedLatencyVerifier runs a goroutine which polls the various latencies
// for a changefeed job (initial scan latency, etc) and asserts that they
// are below the specified targets.
Expand Down Expand Up @@ -1614,6 +1649,7 @@ func registerCDC(r registry.Registry) {
steadyLatency: 5 * time.Minute,
})
ct.waitForWorkload()
ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out"))
},
})
// An example usage of having multiple sink nodes.
Expand Down Expand Up @@ -1835,6 +1871,7 @@ func registerCDC(r registry.Registry) {
steadyLatency: time.Minute,
})
ct.waitForWorkload()
ct.verifyMetrics(ctx, verifyMetricsNonZero("cloud_read_bytes", "cloud_write_bytes"))
},
})
r.Add(registry.TestSpec{
Expand Down Expand Up @@ -1864,7 +1901,10 @@ func registerCDC(r registry.Registry) {
initialScanLatency: 30 * time.Minute,
steadyLatency: time.Minute,
})

ct.waitForWorkload()

ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out"))
},
})

Expand Down Expand Up @@ -1902,7 +1942,10 @@ func registerCDC(r registry.Registry) {
initialScanLatency: 30 * time.Minute,
steadyLatency: time.Minute,
})

ct.waitForWorkload()

ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out"))
},
})

Expand Down Expand Up @@ -1984,6 +2027,8 @@ func registerCDC(r registry.Registry) {
})

ct.waitForWorkload()

ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out"))
},
})
r.Add(registry.TestSpec{
Expand Down Expand Up @@ -2200,6 +2245,7 @@ func registerCDC(r registry.Registry) {
steadyLatency: 10 * time.Minute,
})
ct.waitForWorkload()
ct.verifyMetrics(ctx, verifyMetricsNonZero("changefeed_network_bytes_in", "changefeed_network_bytes_out"))
},
RequiresLicense: true,
})
Expand Down Expand Up @@ -3654,3 +3700,33 @@ func (c *topicConsumer) close() {
}
_ = c.consumer.Close()
}

// verifyMetricsNonZero returns a check function for runMetricsVerifier that
// checks that the metrics matching the names input are > 0.
func verifyMetricsNonZero(names ...string) func(metrics map[string]*prompb.MetricFamily) (ok bool) {
namesMap := make(map[string]struct{}, len(names))
for _, name := range names {
namesMap[name] = struct{}{}
}

return func(metrics map[string]*prompb.MetricFamily) (ok bool) {
found := map[string]struct{}{}

for name, fam := range metrics {
if _, ok := namesMap[name]; !ok {
continue
}

for _, m := range fam.Metric {
if m.Counter.GetValue() > 0 {
found[name] = struct{}{}
}
}

if len(found) == len(names) {
return true
}
}
return false
}
}
12 changes: 5 additions & 7 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,10 @@ type Processor interface {
// that ProcessPiggybackedAdmittedAtLeaderRaftMuLocked gets called soon.
EnqueuePiggybackedAdmittedAtLeader(roachpb.ReplicaID, kvflowcontrolpb.AdmittedState)
// ProcessPiggybackedAdmittedAtLeaderRaftMuLocked is called to process
// previously enqueued piggybacked admitted vectors. Returns true if
// HandleRaftReadyRaftMuLocked should be called.
// previously enqueued piggybacked admitted vectors.
//
// raftMu is held.
ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) bool
ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context)

// SideChannelForPriorityOverrideAtFollowerRaftMuLocked is called on a
// follower to provide information about whether the leader is using the
Expand Down Expand Up @@ -919,10 +918,10 @@ func (p *processorImpl) EnqueuePiggybackedAdmittedAtLeader(
}

// ProcessPiggybackedAdmittedAtLeaderRaftMuLocked implements Processor.
func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) bool {
func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx context.Context) {
p.opts.Replica.RaftMuAssertHeld()
if p.destroyed {
return false
return
}
var updates map[roachpb.ReplicaID]rac2.AdmittedVector
// Swap the updates map with the empty scratch. This is an optimization to
Expand All @@ -936,14 +935,13 @@ func (p *processorImpl) ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx conte
}
}()
if len(updates) == 0 {
return false
return
}
for replicaID, state := range updates {
p.leader.rc.AdmitRaftMuLocked(ctx, replicaID, state)
}
// Clear the scratch from the updates that we have just handled.
clear(p.leader.scratch)
return true
}

// SideChannelForPriorityOverrideAtFollowerRaftMuLocked implements Processor.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,10 +1430,10 @@ func (r *Replica) tick(
return true, nil
}

func (r *Replica) processRACv2PiggybackedAdmitted(ctx context.Context) bool {
func (r *Replica) processRACv2PiggybackedAdmitted(ctx context.Context) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
return r.flowControlV2.ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx)
r.flowControlV2.ProcessPiggybackedAdmittedAtLeaderRaftMuLocked(ctx)
}

func (r *Replica) hasRaftReadyRLocked() bool {
Expand Down
14 changes: 4 additions & 10 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ type raftProcessor interface {
// Process a raft tick for the specified range.
// Return true if the range should be queued for ready processing.
processTick(context.Context, roachpb.RangeID) bool
// Process a piggybacked raftpb.Message that advances admitted. Used for
// RACv2. Returns true if the range should be queued for ready processing.
processRACv2PiggybackedAdmitted(ctx context.Context, id roachpb.RangeID) bool
// Process piggybacked admitted vectors that may advance admitted state for
// the given range's peer replicas. Used for RACv2.
processRACv2PiggybackedAdmitted(ctx context.Context, id roachpb.RangeID)
}

type raftScheduleFlags int
Expand Down Expand Up @@ -414,14 +414,8 @@ func (ss *raftSchedulerShard) worker(
}
}
if state.flags&stateRACv2PiggybackedAdmitted != 0 {
// processRACv2PiggybackedAdmitted returns true if the range should
// perform ready processing. Do not reorder this below the call to
// processReady.
if processor.processRACv2PiggybackedAdmitted(ctx, id) {
state.flags |= stateRaftReady
}
processor.processRACv2PiggybackedAdmitted(ctx, id)
}

if state.flags&stateRaftReady != 0 {
processor.processReady(id)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,7 @@ func (p *testProcessor) processTick(_ context.Context, rangeID roachpb.RangeID)
return false
}

func (p *testProcessor) processRACv2PiggybackedAdmitted(
ctx context.Context, id roachpb.RangeID,
) bool {
return false
func (p *testProcessor) processRACv2PiggybackedAdmitted(_ context.Context, _ roachpb.RangeID) {
}

func (p *testProcessor) readyCount(rangeID roachpb.RangeID) int {
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,12 +721,10 @@ func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool {
return exists // ready
}

func (s *Store) processRACv2PiggybackedAdmitted(ctx context.Context, rangeID roachpb.RangeID) bool {
r, ok := s.mu.replicasByRangeID.Load(rangeID)
if !ok {
return false
func (s *Store) processRACv2PiggybackedAdmitted(ctx context.Context, rangeID roachpb.RangeID) {
if r, ok := s.mu.replicasByRangeID.Load(rangeID); ok {
r.processRACv2PiggybackedAdmitted(ctx)
}
return r.processRACv2PiggybackedAdmitted(ctx)
}

// nodeIsLiveCallback is invoked when a node transitions from non-live to live.
Expand Down

0 comments on commit a463cb3

Please sign in to comment.