Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117017: server: cleanup span download function r=dt a=dt

* Move the tasks into their own free functions for readability and future testability.
* Use GoAndWait to avoid mistakes around what runs in/out of group. 
* Use a nested group instead of counting channel sends for worker termination check.
* Add commentary throughout.

Release note: none.
Epic: none.

117036: kv/tscache: improve error handling r=nvanbenschoten a=nvanbenschoten

This commit improves the error handling in `intervalSkl` in two ways:
1. it guards `errors.Is` calls with `err != nil` checks. `errors.Is` is too expensive to call without this check, and it shows up in CPU profiles.
2. it panics on unexpected errors instead of silently dropping them. This mirrors the rest of the logic in this file.

Epic: None
Release note: None

117119: kv: don't consult ReadTimestamp in Transaction.LastActive r=nvanbenschoten a=nvanbenschoten

Informs #101938.

Without the synthetic timestamp bit, we don't know for sure whether the transaction's ReadTimestamp is a ClockTimestamp or not. To avoid comparing a future-time MVCC timestamp against a clock timestamp for purposes of detecting transaction liveness, we stop consulting the ReadTimestamp. This was always an unproven optimization anyway, so it's safe to remove.

Release note: None

Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Jan 9, 2024
4 parents e4bd2ec + 82baeca + dafd93f + dba19af commit 8793a26
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 109 deletions.
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -268,7 +269,7 @@ func TestTxnHeartbeaterLoopStartsBeforeExpiry(t *testing.T) {
},
{
// First locking request happens at expiration. Heartbeat immediately.
lockingRequestDelay: 5 * time.Second,
lockingRequestDelay: 5*time.Second + 1*time.Nanosecond,
consideredExpired: true,
loopStarts: StartImmediately,
},
Expand All @@ -281,11 +282,10 @@ func TestTxnHeartbeaterLoopStartsBeforeExpiry(t *testing.T) {
} {
t.Run(fmt.Sprintf("delay=%s", test.lockingRequestDelay), func(t *testing.T) {
ctx := context.Background()
txn := makeTxnProto()

manualTime := timeutil.NewManualTime(timeutil.Unix(0, 123))
clock := hlc.NewClockForTesting(manualTime)
txn.MinTimestamp, txn.WriteTimestamp = clock.Now(), clock.Now()
txn := roachpb.MakeTransaction("test", []byte("key"), isolation.Serializable, 0, clock.Now(),
0 /* maxOffsetNs */, 0 /* coordinatorNodeID */, 0, false /* omitInRangefeeds */)

// We attempt to simulate a transaction that heartbeats every 1s, however
// it is important to note that a transaction is considered expired when it
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,17 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
txn0 := newTransaction("txn0", key, 1, clock)
// Txn1 is in the pending state but is expired.
txn1 := newTransaction("txn1", key, 1, clock)
txn1.ReadTimestamp.WallTime -= int64(100 * time.Second)
txn1.LastHeartbeat = txn1.ReadTimestamp
txn1.MinTimestamp.WallTime -= int64(100 * time.Second)
txn1.LastHeartbeat = txn1.MinTimestamp
// Txn2 is in the staging state and is not old enough to have expired so the
// code ought to send nothing.
txn2 := newTransaction("txn2", key, 1, clock)
txn2.Status = roachpb.STAGING
// Txn3 is in the staging state but is expired.
txn3 := newTransaction("txn3", key, 1, clock)
txn3.Status = roachpb.STAGING
txn3.ReadTimestamp.WallTime -= int64(100 * time.Second)
txn3.LastHeartbeat = txn3.ReadTimestamp
txn3.MinTimestamp.WallTime -= int64(100 * time.Second)
txn3.LastHeartbeat = txn3.MinTimestamp
// Txn4 is in the committed state.
txn4 := newTransaction("txn4", key, 1, clock)
txn4.Status = roachpb.COMMITTED
Expand Down
32 changes: 22 additions & 10 deletions pkg/kv/kvserver/tscache/interval_skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,12 @@ func (s *intervalSkl) addRange(
err = fp.addNode(&it, to, val, 0, true /* mustInit */)
}

if errors.Is(err, arenaskl.ErrArenaFull) {
return fp
if err != nil {
if errors.Is(err, arenaskl.ErrArenaFull) {
return fp
} else {
panic(fmt.Sprintf("unexpected error: %v", err))
}
}
}

Expand All @@ -343,8 +347,12 @@ func (s *intervalSkl) addRange(
err = fp.addNode(&it, from, val, hasGap, false /* mustInit */)
}

if errors.Is(err, arenaskl.ErrArenaFull) {
return fp
if err != nil {
if errors.Is(err, arenaskl.ErrArenaFull) {
return fp
} else {
panic(fmt.Sprintf("unexpected error: %v", err))
}
}

// Seek to the node immediately after the "from" node.
Expand Down Expand Up @@ -1072,12 +1080,16 @@ func (p *sklPage) scanTo(

// Decode the current node's value set.
keyVal, gapVal := decodeValueSet(it.Value(), it.Meta())
if errors.Is(ratchetErr, arenaskl.ErrArenaFull) {
// If we failed to ratchet an uninitialized node above, the desired
// ratcheting won't be reflected in the decoded values. Perform the
// ratcheting manually.
keyVal, _ = ratchetValue(keyVal, prevGapVal)
gapVal, _ = ratchetValue(gapVal, prevGapVal)
if ratchetErr != nil {
if errors.Is(ratchetErr, arenaskl.ErrArenaFull) {
// If we failed to ratchet an uninitialized node above, the desired
// ratcheting won't be reflected in the decoded values. Perform the
// ratcheting manually.
keyVal, _ = ratchetValue(keyVal, prevGapVal)
gapVal, _ = ratchetValue(gapVal, prevGapVal)
} else {
panic(fmt.Sprintf("unexpected error: %v", ratchetErr))
}
}

if !(first && (opt&excludeFrom) != 0) {
Expand Down
9 changes: 3 additions & 6 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,13 +997,10 @@ func MakeTransaction(
}

// LastActive returns the last timestamp at which client activity definitely
// occurred, i.e. the maximum of ReadTimestamp and LastHeartbeat.
// occurred, i.e. the maximum of MinTimestamp and LastHeartbeat.
func (t Transaction) LastActive() hlc.Timestamp {
ts := t.LastHeartbeat
// TODO(nvanbenschoten): remove this when we remove synthetic timestamps.
if !t.ReadTimestamp.Synthetic {
ts.Forward(t.ReadTimestamp)
}
ts := t.MinTimestamp
ts.Forward(t.LastHeartbeat)
return ts
}

Expand Down
184 changes: 99 additions & 85 deletions pkg/server/span_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/authserver"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -84,99 +85,112 @@ func (s *systemStatusServer) DownloadSpan(
func (s *systemStatusServer) localDownloadSpan(
ctx context.Context, req *serverpb.DownloadSpanRequest,
) error {

return s.stores.VisitStores(func(store *kvserver.Store) error {
spanCh := make(chan roachpb.Span)

grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
defer close(spanCh)
ctxDone := ctx.Done()

for _, sp := range req.Spans {
select {
case spanCh <- sp:
case <-ctxDone:
return ctx.Err()
}
}
return nil
})
downloadSpansCh, stopTuningCh := make(chan roachpb.Span), make(chan struct{})
return ctxgroup.GoAndWait(ctx,
// Tune download concurrency until stopTuningCh closes.
func(ctx context.Context) error {
t := time.NewTicker(time.Second * 15)
defer t.Stop()
return tuneDownloadCurrency(ctx, t.C, stopTuningCh, store.StateEngine(),
s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm)
},
// Download until downloadSpansCh closes, then close stopTuningCh.
func(ctx context.Context) error {
defer close(stopTuningCh)
return downloadSpans(ctx, store.StateEngine(), downloadSpansCh)
},
// Send spans to downloadSpansCh.
func(ctx context.Context) error {
defer close(downloadSpansCh)
return sendDownloadSpans(ctx, req.Spans, downloadSpansCh)
},
)
})
}

const downloadWaiters = 16
downloadersDone := make(chan struct{}, downloadWaiters)
// sendDownloadSpans sends spans that cover all spans in spans to the passed ch.
func sendDownloadSpans(ctx context.Context, spans roachpb.Spans, out chan roachpb.Span) error {
ctxDone := ctx.Done()
for _, sp := range spans {
select {
case out <- sp:
case <-ctxDone:
return ctx.Err()
}
}
return nil
}

downloader := func(ctx context.Context) error {
for sp := range spanCh {
if err := store.TODOEngine().Download(ctx, sp); err != nil {
return err
}
// downloadSpans instructs the passed engine, in parallel, to downloads spans
// received on the passed ch until it closes.
func downloadSpans(ctx context.Context, eng storage.Engine, spans chan roachpb.Span) error {
const downloadWaiters = 16
return ctxgroup.GroupWorkers(ctx, downloadWaiters, func(ctx context.Context, _ int) error {
for sp := range spans {
if err := eng.Download(ctx, sp); err != nil {
return err
}
downloadersDone <- struct{}{}
return nil
}
return nil
})
}

for i := 0; i < downloadWaiters; i++ {
grp.GoCtx(downloader)
// tuneDownloadCurrency periodically adjusts download concurrency up or down on
// the passed engine based on the cpu load value read from the passed func until
// the passed done ch closes or is signaled.
func tuneDownloadCurrency(
ctx context.Context,
tick <-chan time.Time,
done <-chan struct{},
eng storage.Engine,
readCPU func() float64,
) error {
var added int64
// Remove any additional concurrency we've added when we exit.
//
// TODO(dt,radu): Ideally we'd adjust a separate limit that applies only
// to download compactions, so that this does not fight with manual calls
// to SetConcurrentCompactions.
defer func() {
if added != 0 {
adjusted := eng.AdjustCompactionConcurrency(-added)
log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted)
}
}()

grp.GoCtx(func(ctx context.Context) error {
var added int64
// Remove any additional concurrency we've added when we exit.
//
// TODO(dt,radu): Ideally we'd adjust a separate limit that applies only
// to download compactions, so that this does not fight with manual calls
// to SetConcurrentCompactions.
defer func() {
if added != 0 {
adjusted := store.TODOEngine().AdjustCompactionConcurrency(-added)
log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted)
}
}()

const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8

// Begin by bumping up the concurrency by 8, then start watching the CPU
// usage and adjusting up or down based on CPU until downloading finishes.
store.TODOEngine().AdjustCompactionConcurrency(initialIncrease)
added += initialIncrease

t := time.NewTicker(time.Second * 15)
defer t.Stop()
ctxDone := ctx.Done()

var waitersExited int
for {
select {
case <-ctxDone:
return ctx.Err()
case <-downloadersDone:
waitersExited++
// Return and stop managing added concurrency if the workers are done.
if waitersExited >= downloadWaiters {
return nil
}
case <-t.C:
cpu := s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm()
if cpu > highCPU && added > 0 {
// If CPU is high and we have added any additional concurrency, we
// should reduce our added concurrency to make sure CPU is available
// for the execution of foreground traffic.
adjusted := store.TODOEngine().AdjustCompactionConcurrency(-1)
added--
log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100)
} else if cpu < lowCPU {
// If CPU is low, we should use it to do additional downloading.
if added < maxAddedConcurrency {
adjusted := store.TODOEngine().AdjustCompactionConcurrency(1)
added++
log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100)
}
}
const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8

// Begin by bumping up the concurrency by 8, then start watching the CPU
// usage and adjusting up or down based on CPU until downloading finishes.
eng.AdjustCompactionConcurrency(initialIncrease)
added += initialIncrease

ctxDone := ctx.Done()

for {
select {
case <-ctxDone:
return ctx.Err()
case <-done:
return nil
case <-tick:
cpu := readCPU()
if cpu > highCPU && added > 0 {
// If CPU is high and we have added any additional concurrency, we
// should reduce our added concurrency to make sure CPU is available
// for the execution of foreground traffic.
adjusted := eng.AdjustCompactionConcurrency(-1)
added--
log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100)
} else if cpu < lowCPU {
// If CPU is low, we should use it to do additional downloading.
if added < maxAddedConcurrency {
adjusted := eng.AdjustCompactionConcurrency(1)
added++
log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100)
}
}
})

return grp.Wait()
})
}
}
}

0 comments on commit 8793a26

Please sign in to comment.