From dafd93f87889e6332d580f648666465450542e7c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 22 Dec 2023 16:47:55 -0500 Subject: [PATCH 1/3] kv/tscache: improve error handling This commit improves the error handling in intervalSkl in two ways: 1. it guards `errors.Is` calls with `err != nil` checks. `errors.Is` is too expensive to call without this check, and it shows up in CPU profiles. 2. it panics on unexpected errors instead of silently dropping them. This mirrors the rest of the logic in this file. Epic: None Release note: None --- pkg/kv/kvserver/tscache/interval_skl.go | 32 +++++++++++++++++-------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/tscache/interval_skl.go b/pkg/kv/kvserver/tscache/interval_skl.go index ab0a2fe1f67c..051d9f078ce9 100644 --- a/pkg/kv/kvserver/tscache/interval_skl.go +++ b/pkg/kv/kvserver/tscache/interval_skl.go @@ -322,8 +322,12 @@ func (s *intervalSkl) addRange(from, to []byte, opt rangeOptions, val cacheValue err = fp.addNode(&it, to, val, 0, true /* mustInit */) } - if errors.Is(err, arenaskl.ErrArenaFull) { - return fp + if err != nil { + if errors.Is(err, arenaskl.ErrArenaFull) { + return fp + } else { + panic(fmt.Sprintf("unexpected error: %v", err)) + } } } @@ -340,8 +344,12 @@ func (s *intervalSkl) addRange(from, to []byte, opt rangeOptions, val cacheValue err = fp.addNode(&it, from, val, hasGap, false /* mustInit */) } - if errors.Is(err, arenaskl.ErrArenaFull) { - return fp + if err != nil { + if errors.Is(err, arenaskl.ErrArenaFull) { + return fp + } else { + panic(fmt.Sprintf("unexpected error: %v", err)) + } } // Seek to the node immediately after the "from" node. @@ -1129,12 +1137,16 @@ func (p *sklPage) scanTo( // Decode the current node's value set. keyVal, gapVal := decodeValueSet(it.Value(), it.Meta()) - if errors.Is(ratchetErr, arenaskl.ErrArenaFull) { - // If we failed to ratchet an uninitialized node above, the desired - // ratcheting won't be reflected in the decoded values. Perform the - // ratcheting manually. - keyVal, _ = ratchetValue(keyVal, prevGapVal) - gapVal, _ = ratchetValue(gapVal, prevGapVal) + if ratchetErr != nil { + if errors.Is(ratchetErr, arenaskl.ErrArenaFull) { + // If we failed to ratchet an uninitialized node above, the desired + // ratcheting won't be reflected in the decoded values. Perform the + // ratcheting manually. + keyVal, _ = ratchetValue(keyVal, prevGapVal) + gapVal, _ = ratchetValue(gapVal, prevGapVal) + } else { + panic(fmt.Sprintf("unexpected error: %v", ratchetErr)) + } } if !(first && (opt&excludeFrom) != 0) { From dba19af939455b94431b48e0ec5f9e7b57885aa0 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 19 Dec 2023 17:37:53 -0500 Subject: [PATCH 2/3] kv: don't consult ReadTimestamp in Transaction.LastActive Informs #101938. Without the synthetic timestamp bit, we don't know for sure whether the transaction's ReadTimestamp is a ClockTimestamp or not. To avoid comparing a future-time MVCC timestamp against a clock timestamp for purposes of detecting transaction liveness, we stop consulting the ReadTimestamp. This was always an unproven optimization anyway, so it's safe to remove. Release note: None --- .../kvclient/kvcoord/txn_interceptor_heartbeater_test.go | 8 ++++---- pkg/kv/kvserver/intentresolver/intent_resolver_test.go | 8 ++++---- pkg/roachpb/data.go | 9 +++------ 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go index 7d88ce50392d..d61400cb6a94 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -268,7 +269,7 @@ func TestTxnHeartbeaterLoopStartsBeforeExpiry(t *testing.T) { }, { // First locking request happens at expiration. Heartbeat immediately. - lockingRequestDelay: 5 * time.Second, + lockingRequestDelay: 5*time.Second + 1*time.Nanosecond, consideredExpired: true, loopStarts: StartImmediately, }, @@ -281,11 +282,10 @@ func TestTxnHeartbeaterLoopStartsBeforeExpiry(t *testing.T) { } { t.Run(fmt.Sprintf("delay=%s", test.lockingRequestDelay), func(t *testing.T) { ctx := context.Background() - txn := makeTxnProto() - manualTime := timeutil.NewManualTime(timeutil.Unix(0, 123)) clock := hlc.NewClockForTesting(manualTime) - txn.MinTimestamp, txn.WriteTimestamp = clock.Now(), clock.Now() + txn := roachpb.MakeTransaction("test", []byte("key"), isolation.Serializable, 0, clock.Now(), + 0 /* maxOffsetNs */, 0 /* coordinatorNodeID */, 0, false /* omitInRangefeeds */) // We attempt to simulate a transaction that heartbeats every 1s, however // it is important to note that a transaction is considered expired when it diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver_test.go b/pkg/kv/kvserver/intentresolver/intent_resolver_test.go index 22176978a161..f176c47b8480 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver_test.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver_test.go @@ -76,8 +76,8 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { txn0 := newTransaction("txn0", key, 1, clock) // Txn1 is in the pending state but is expired. txn1 := newTransaction("txn1", key, 1, clock) - txn1.ReadTimestamp.WallTime -= int64(100 * time.Second) - txn1.LastHeartbeat = txn1.ReadTimestamp + txn1.MinTimestamp.WallTime -= int64(100 * time.Second) + txn1.LastHeartbeat = txn1.MinTimestamp // Txn2 is in the staging state and is not old enough to have expired so the // code ought to send nothing. txn2 := newTransaction("txn2", key, 1, clock) @@ -85,8 +85,8 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { // Txn3 is in the staging state but is expired. txn3 := newTransaction("txn3", key, 1, clock) txn3.Status = roachpb.STAGING - txn3.ReadTimestamp.WallTime -= int64(100 * time.Second) - txn3.LastHeartbeat = txn3.ReadTimestamp + txn3.MinTimestamp.WallTime -= int64(100 * time.Second) + txn3.LastHeartbeat = txn3.MinTimestamp // Txn4 is in the committed state. txn4 := newTransaction("txn4", key, 1, clock) txn4.Status = roachpb.COMMITTED diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index a31c3d8b67ca..9a8abdf8b264 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -997,13 +997,10 @@ func MakeTransaction( } // LastActive returns the last timestamp at which client activity definitely -// occurred, i.e. the maximum of ReadTimestamp and LastHeartbeat. +// occurred, i.e. the maximum of MinTimestamp and LastHeartbeat. func (t Transaction) LastActive() hlc.Timestamp { - ts := t.LastHeartbeat - // TODO(nvanbenschoten): remove this when we remove synthetic timestamps. - if !t.ReadTimestamp.Synthetic { - ts.Forward(t.ReadTimestamp) - } + ts := t.MinTimestamp + ts.Forward(t.LastHeartbeat) return ts } From 82baeca716b7c3118f74799f692e61d0e2ea2b54 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 21 Dec 2023 23:35:32 +0000 Subject: [PATCH 3/3] server: use group for download workers Release note: none. Epic: none. --- pkg/server/span_download.go | 184 +++++++++++++++++++----------------- 1 file changed, 99 insertions(+), 85 deletions(-) diff --git a/pkg/server/span_download.go b/pkg/server/span_download.go index 57502b9f3c14..845cecb697a4 100644 --- a/pkg/server/span_download.go +++ b/pkg/server/span_download.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/authserver" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/srverrors" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -84,99 +85,112 @@ func (s *systemStatusServer) DownloadSpan( func (s *systemStatusServer) localDownloadSpan( ctx context.Context, req *serverpb.DownloadSpanRequest, ) error { - return s.stores.VisitStores(func(store *kvserver.Store) error { - spanCh := make(chan roachpb.Span) - - grp := ctxgroup.WithContext(ctx) - grp.GoCtx(func(ctx context.Context) error { - defer close(spanCh) - ctxDone := ctx.Done() - - for _, sp := range req.Spans { - select { - case spanCh <- sp: - case <-ctxDone: - return ctx.Err() - } - } - return nil - }) + downloadSpansCh, stopTuningCh := make(chan roachpb.Span), make(chan struct{}) + return ctxgroup.GoAndWait(ctx, + // Tune download concurrency until stopTuningCh closes. + func(ctx context.Context) error { + t := time.NewTicker(time.Second * 15) + defer t.Stop() + return tuneDownloadCurrency(ctx, t.C, stopTuningCh, store.StateEngine(), + s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm) + }, + // Download until downloadSpansCh closes, then close stopTuningCh. + func(ctx context.Context) error { + defer close(stopTuningCh) + return downloadSpans(ctx, store.StateEngine(), downloadSpansCh) + }, + // Send spans to downloadSpansCh. + func(ctx context.Context) error { + defer close(downloadSpansCh) + return sendDownloadSpans(ctx, req.Spans, downloadSpansCh) + }, + ) + }) +} - const downloadWaiters = 16 - downloadersDone := make(chan struct{}, downloadWaiters) +// sendDownloadSpans sends spans that cover all spans in spans to the passed ch. +func sendDownloadSpans(ctx context.Context, spans roachpb.Spans, out chan roachpb.Span) error { + ctxDone := ctx.Done() + for _, sp := range spans { + select { + case out <- sp: + case <-ctxDone: + return ctx.Err() + } + } + return nil +} - downloader := func(ctx context.Context) error { - for sp := range spanCh { - if err := store.TODOEngine().Download(ctx, sp); err != nil { - return err - } +// downloadSpans instructs the passed engine, in parallel, to downloads spans +// received on the passed ch until it closes. +func downloadSpans(ctx context.Context, eng storage.Engine, spans chan roachpb.Span) error { + const downloadWaiters = 16 + return ctxgroup.GroupWorkers(ctx, downloadWaiters, func(ctx context.Context, _ int) error { + for sp := range spans { + if err := eng.Download(ctx, sp); err != nil { + return err } - downloadersDone <- struct{}{} - return nil } + return nil + }) +} - for i := 0; i < downloadWaiters; i++ { - grp.GoCtx(downloader) +// tuneDownloadCurrency periodically adjusts download concurrency up or down on +// the passed engine based on the cpu load value read from the passed func until +// the passed done ch closes or is signaled. +func tuneDownloadCurrency( + ctx context.Context, + tick <-chan time.Time, + done <-chan struct{}, + eng storage.Engine, + readCPU func() float64, +) error { + var added int64 + // Remove any additional concurrency we've added when we exit. + // + // TODO(dt,radu): Ideally we'd adjust a separate limit that applies only + // to download compactions, so that this does not fight with manual calls + // to SetConcurrentCompactions. + defer func() { + if added != 0 { + adjusted := eng.AdjustCompactionConcurrency(-added) + log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted) } + }() - grp.GoCtx(func(ctx context.Context) error { - var added int64 - // Remove any additional concurrency we've added when we exit. - // - // TODO(dt,radu): Ideally we'd adjust a separate limit that applies only - // to download compactions, so that this does not fight with manual calls - // to SetConcurrentCompactions. - defer func() { - if added != 0 { - adjusted := store.TODOEngine().AdjustCompactionConcurrency(-added) - log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted) - } - }() - - const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8 - - // Begin by bumping up the concurrency by 8, then start watching the CPU - // usage and adjusting up or down based on CPU until downloading finishes. - store.TODOEngine().AdjustCompactionConcurrency(initialIncrease) - added += initialIncrease - - t := time.NewTicker(time.Second * 15) - defer t.Stop() - ctxDone := ctx.Done() - - var waitersExited int - for { - select { - case <-ctxDone: - return ctx.Err() - case <-downloadersDone: - waitersExited++ - // Return and stop managing added concurrency if the workers are done. - if waitersExited >= downloadWaiters { - return nil - } - case <-t.C: - cpu := s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm() - if cpu > highCPU && added > 0 { - // If CPU is high and we have added any additional concurrency, we - // should reduce our added concurrency to make sure CPU is available - // for the execution of foreground traffic. - adjusted := store.TODOEngine().AdjustCompactionConcurrency(-1) - added-- - log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100) - } else if cpu < lowCPU { - // If CPU is low, we should use it to do additional downloading. - if added < maxAddedConcurrency { - adjusted := store.TODOEngine().AdjustCompactionConcurrency(1) - added++ - log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100) - } - } + const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8 + + // Begin by bumping up the concurrency by 8, then start watching the CPU + // usage and adjusting up or down based on CPU until downloading finishes. + eng.AdjustCompactionConcurrency(initialIncrease) + added += initialIncrease + + ctxDone := ctx.Done() + + for { + select { + case <-ctxDone: + return ctx.Err() + case <-done: + return nil + case <-tick: + cpu := readCPU() + if cpu > highCPU && added > 0 { + // If CPU is high and we have added any additional concurrency, we + // should reduce our added concurrency to make sure CPU is available + // for the execution of foreground traffic. + adjusted := eng.AdjustCompactionConcurrency(-1) + added-- + log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100) + } else if cpu < lowCPU { + // If CPU is low, we should use it to do additional downloading. + if added < maxAddedConcurrency { + adjusted := eng.AdjustCompactionConcurrency(1) + added++ + log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100) } } - }) - - return grp.Wait() - }) + } + } }