Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
99304: backupccl: add test with randomized completed spans to TestRestoreEntryCover r=rhu713 a=rhu713

Add some testing with randomized completed spans to TestRestoreEntryCover. This testing should demonstrate the correctness of generateAndSendImportSpans in the presence of completed spans.

Informs: #98779

Release note: None

100287: kvserver: add `leases.requests.latency` metric r=erikgrinaker a=erikgrinaker

This patch adds a histogram of lease request latencies. It includes all request types (acquisitions, transfers, and extensions) and all outcomes (successes and errors), but only considers the coalesced lease requests regardless of the number of waiters and how long they have been waiting for.

Epic: none
Release note (ops change): Added a metric `leases.requests.latency` recording a histogram of lease request latencies.

100450: roachtest: unskip `acceptance/gossip/peerings` r=erikgrinaker a=erikgrinaker

Addressed by bfed880.

Resolves #96091.
Touches #100213.

Epic: none
Release note: None

Co-authored-by: Rui Hu <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
3 people committed Apr 3, 2023
4 parents 66dfb23 + f752fb3 + 0c41a7e + 1b6a425 commit 86ecf5f
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 6 deletions.
103 changes: 100 additions & 3 deletions pkg/ccl/backupccl/restore_span_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,13 @@ func makeImportSpans(
spans []roachpb.Span,
backups []backuppb.BackupManifest,
layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory,
highWaterMark []byte,
targetSize int64,
introducedSpanFrontier *spanUtils.Frontier,
completedSpans []jobspb.RestoreProgress_FrontierEntry,
useSimpleImportSpans bool,
) ([]execinfrapb.RestoreSpanEntry, error) {
var cover []execinfrapb.RestoreSpanEntry
cover := make([]execinfrapb.RestoreSpanEntry, 0)
spanCh := make(chan execinfrapb.RestoreSpanEntry)
g := ctxgroup.WithContext(context.Background())
g.Go(func() error {
Expand All @@ -270,10 +271,10 @@ func makeImportSpans(

filter, err := makeSpanCoveringFilter(
checkpointFrontier,
nil,
highWaterMark,
introducedSpanFrontier,
targetSize,
true)
highWaterMark == nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -398,6 +399,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
emptySpanFrontier,
emptyCompletedSpans,
Expand All @@ -417,6 +419,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
emptySpanFrontier,
emptyCompletedSpans,
Expand All @@ -437,6 +440,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
2<<20,
emptySpanFrontier,
emptyCompletedSpans,
Expand All @@ -456,6 +460,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
2<<20,
emptySpanFrontier,
emptyCompletedSpans,
Expand All @@ -477,6 +482,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
introducedSpanFrontier,
emptyCompletedSpans,
Expand All @@ -495,6 +501,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
introducedSpanFrontier,
emptyCompletedSpans,
Expand Down Expand Up @@ -522,6 +529,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
emptySpanFrontier,
persistFrontier(frontier, 0),
Expand All @@ -539,6 +547,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
emptySpanFrontier,
persistFrontier(frontier, 0),
Expand Down Expand Up @@ -926,6 +935,7 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) {
restoreSpans,
backups,
layerToIterFactory,
nil,
0,
introducedSpanFrontier,
[]jobspb.RestoreProgress_FrontierEntry{},
Expand Down Expand Up @@ -986,6 +996,29 @@ func TestRestoreEntryCover(t *testing.T) {
defer cleanupFn()
execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)

// getRandomCompletedSpans randomly gets up to maxNumSpans completed
// spans from the cover. A completed span can cover 1 or more
// RestoreSpanEntry in the cover.
getRandomCompletedSpans := func(cover []execinfrapb.RestoreSpanEntry, maxNumSpans int) []roachpb.Span {
var completedSpans []roachpb.Span
for i := 0; i < maxNumSpans; i++ {
start := rand.Intn(len(cover) + 1)
length := rand.Intn(len(cover) + 1 - start)
if length == 0 {
continue
}

sp := roachpb.Span{
Key: cover[start].Span.Key,
EndKey: cover[start+length-1].Span.EndKey,
}
completedSpans = append(completedSpans, sp)
}

merged, _ := roachpb.MergeSpans(&completedSpans)
return merged
}

for _, numBackups := range []int{1, 2, 3, 5, 9, 10, 11, 12} {
for _, spans := range []int{1, 2, 3, 5, 9, 11, 12} {
for _, files := range []int{0, 1, 2, 3, 4, 10, 12, 50} {
Expand All @@ -1009,13 +1042,77 @@ func TestRestoreEntryCover(t *testing.T) {
backups[numBackups-1].Spans,
backups,
layerToIterFactory,
nil,
target<<20,
introducedSpanFrontier,
[]jobspb.RestoreProgress_FrontierEntry{},
simpleImportSpans)
require.NoError(t, err)
require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans,
cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage))

// Check that the correct import spans are created if the job is
// resumed after the completion of some random entries in the cover.
if len(cover) > 0 {
for n := 1; n <= 5; n++ {
var completedSpans []roachpb.Span
var highWater []byte
var frontierEntries []jobspb.RestoreProgress_FrontierEntry

// Randomly choose to use frontier checkpointing instead of
// explicitly testing both forms to avoid creating an exponential
// number of tests.
useFrontierCheckpointing := rand.Intn(2) == 0
if useFrontierCheckpointing {
completedSpans = getRandomCompletedSpans(cover, n)
for _, sp := range completedSpans {
frontierEntries = append(frontierEntries, jobspb.RestoreProgress_FrontierEntry{
Span: sp,
Timestamp: completedSpanTime,
})
}
} else {
idx := r.Intn(len(cover))
highWater = cover[idx].Span.EndKey
}

resumeCover, err := makeImportSpans(
ctx,
backups[numBackups-1].Spans,
backups,
layerToIterFactory,
highWater,
target<<20,
introducedSpanFrontier,
frontierEntries,
simpleImportSpans)
require.NoError(t, err)

// Compute the spans that are required on resume by subtracting
// completed spans from the original required spans.
var resumedRequiredSpans roachpb.Spans
for _, origReq := range backups[numBackups-1].Spans {
var resumeReq []roachpb.Span
if useFrontierCheckpointing {
resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, completedSpans)
} else {
resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, []roachpb.Span{{Key: cover[0].Span.Key, EndKey: highWater}})
}
resumedRequiredSpans = append(resumedRequiredSpans, resumeReq...)
}

var errorMsg string
if useFrontierCheckpointing {
errorMsg = fmt.Sprintf("completed spans in frontier: %v", completedSpans)
} else {
errorMsg = fmt.Sprintf("highwater: %v", highWater)
}

require.NoError(t, checkRestoreCovering(ctx, backups, resumedRequiredSpans,
resumeCover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage),
errorMsg)
}
}
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func registerAcceptance(r registry.Registry) {
registry.OwnerKV: {
{name: "decommission-self", fn: runDecommissionSelf},
{name: "event-log", fn: runEventLog},
{name: "gossip/peerings", fn: runGossipPeerings, skip: "flaky test. tracked in #96091"},
{name: "gossip/peerings", fn: runGossipPeerings},
{name: "gossip/restart", fn: runGossipRestart},
{
name: "gossip/restart-node-one",
Expand Down
17 changes: 15 additions & 2 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ var (
Measurement: "Lease Requests",
Unit: metric.Unit_COUNT,
}
metaLeaseRequestLatency = metric.Metadata{
Name: "leases.requests.latency",
Help: "Lease request latency (all types and outcomes, coalesced)",
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaLeaseTransferSuccessCount = metric.Metadata{
Name: "leases.transfers.success",
Help: "Number of successful lease transfers",
Expand Down Expand Up @@ -1851,6 +1857,7 @@ type StoreMetrics struct {
// lease).
LeaseRequestSuccessCount *metric.Counter
LeaseRequestErrorCount *metric.Counter
LeaseRequestLatency metric.IHistogram
LeaseTransferSuccessCount *metric.Counter
LeaseTransferErrorCount *metric.Counter
LeaseExpirationCount *metric.Gauge
Expand Down Expand Up @@ -2424,8 +2431,14 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
OverReplicatedRangeCount: metric.NewGauge(metaOverReplicatedRangeCount),

// Lease request metrics.
LeaseRequestSuccessCount: metric.NewCounter(metaLeaseRequestSuccessCount),
LeaseRequestErrorCount: metric.NewCounter(metaLeaseRequestErrorCount),
LeaseRequestSuccessCount: metric.NewCounter(metaLeaseRequestSuccessCount),
LeaseRequestErrorCount: metric.NewCounter(metaLeaseRequestErrorCount),
LeaseRequestLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaLeaseRequestLatency,
Duration: histogramWindow,
Buckets: metric.NetworkLatencyBuckets,
}),
LeaseTransferSuccessCount: metric.NewCounter(metaLeaseTransferSuccessCount),
LeaseTransferErrorCount: metric.NewCounter(metaLeaseTransferErrorCount),
LeaseExpirationCount: metric.NewGauge(metaLeaseExpirationCount),
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ func (p *pendingLeaseRequest) requestLease(
status kvserverpb.LeaseStatus,
leaseReq kvpb.Request,
) error {
started := timeutil.Now()
defer func() {
p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds())
}()

// If requesting an epoch-based lease & current state is expired,
// potentially heartbeat our own liveness or increment epoch of
// prior owner. Note we only do this if the previous lease was
Expand Down
4 changes: 4 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,10 @@ var charts = []sectionDescription{
Title: "Stuck Acquisition Count",
Metrics: []string{"requests.slow.lease"},
},
{
Title: "Lease Request Latency",
Metrics: []string{"leases.requests.latency"},
},
{
Title: "Succcess Rate",
Metrics: []string{
Expand Down
1 change: 1 addition & 0 deletions pkg/ts/catalog/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ var histogramMetricsNames = map[string]struct{}{
"replication.flush_hist_nanos": {},
"kv.replica_read_batch_evaluate.latency": {},
"kv.replica_write_batch_evaluate.latency": {},
"leases.requests.latency": {},
}

func allInternalTSMetricsNames() []string {
Expand Down

0 comments on commit 86ecf5f

Please sign in to comment.