Skip to content

Commit

Permalink
Merge pull request #130711 from iskettaneh/backport23.2.12-rc-129827
Browse files Browse the repository at this point in the history
release-23.2.12-rc: kvserver: compact liveness range periodically
  • Loading branch information
iskettaneh authored Sep 17, 2024
2 parents bf1ecaa + 93a9e26 commit 42a7a91
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
49 changes: 49 additions & 0 deletions pkg/kv/kvserver/liveness/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,52 @@ func TestGetActiveNodes(t *testing.T) {
})
require.Equal(t, []roachpb.NodeID{1, 2, 3, 4}, getActiveNodes(nl1))
}

// TestLivenessRangeGetsPeriodicallyCompacted tests that the liveness range
// gets compacted when we set the liveness range compaction interval.
func TestLivenessRangeGetsPeriodicallyCompacted(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

// Enable the liveness range compaction and set the interval to 1s to speed
// up the test.
c := tc.Server(0).SystemLayer().SQLConn(t)
_, err := c.ExecContext(ctx, "set cluster setting kv.liveness_range_compact.interval='1s'")
require.NoError(t, err)

// Get the original file number of the sstable for the liveness range. We
// expect to see this file number change as the liveness range gets compacted.
livenessFileNumberQuery := "WITH replicas(n) AS (SELECT unnest(replicas) FROM " +
"crdb_internal.ranges_no_leases WHERE range_id = 2), sstables AS (SELECT " +
"(crdb_internal.sstable_metrics(n, n, start_key, end_key)).* " +
"FROM crdb_internal.ranges_no_leases, replicas WHERE range_id = 2) " +
"SELECT file_num FROM sstables"

sqlDB := tc.ApplicationLayer(0).SQLConn(t)
var original_file_num string
testutils.SucceedsSoon(t, func() error {
rows := sqlDB.QueryRow(livenessFileNumberQuery)
if err := rows.Scan(&original_file_num); err != nil {
return err
}
return nil
})

// Expect that the liveness file number changes.
testutils.SucceedsSoon(t, func() error {
var current_file_num string
rows := sqlDB.QueryRow(livenessFileNumberQuery)
if err := rows.Scan(&current_file_num); err != nil {
return err
}
if current_file_num == original_file_num {
return errors.Errorf("Liveness compaction hasn't happened yet")
}
return nil
})
}
91 changes: 91 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"net"
"sort"
"strings"
Expand Down Expand Up @@ -247,6 +248,13 @@ var (
`duration spent in processing above any available stack history is appended to its trace, if automatic trace snapshots are enabled`,
time.Second*30,
)

livenessRangeCompactInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"kv.liveness_range_compact.interval",
`interval at which the liveness range is compacted. A value of 0 disables the periodic compaction`,
0,
)
)

type nodeMetrics struct {
Expand Down Expand Up @@ -751,6 +759,8 @@ func (n *Node) start(
log.Infof(ctx, "started with engine type %v", t)
}
log.Infof(ctx, "started with attributes %v", attrs.Attrs)

n.startPeriodicLivenessCompaction(n.stopper, livenessRangeCompactInterval)
return nil
}

Expand Down Expand Up @@ -963,6 +973,87 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time.
})
}

// startPeriodicLivenessCompaction starts a loop where it periodically compacts
// the liveness range.
func (n *Node) startPeriodicLivenessCompaction(
stopper *stop.Stopper, livenessRangeCompactInterval *settings.DurationSetting,
) {
ctx := n.AnnotateCtx(context.Background())

// getCompactionInterval() returns the interval at which the liveness range is
// set to be compacted. If the interval is set to 0, the period is set to the
// max possible duration because a value of 0 cause the ticker to panic.
getCompactionInterval := func() time.Duration {
interval := livenessRangeCompactInterval.Get(&n.storeCfg.Settings.SV)
if interval == 0 {
interval = math.MaxInt64
}
return interval
}

if err := stopper.RunAsyncTask(ctx, "liveness-compaction", func(ctx context.Context) {
interval := getCompactionInterval()
ticker := time.NewTicker(interval)

intervalChangeChan := make(chan time.Duration)

// Update the compaction interval when the setting changes.
livenessRangeCompactInterval.SetOnChange(&n.storeCfg.Settings.SV, func(ctx context.Context) {
// intervalChangeChan is used to signal the compaction loop that the
// interval has changed. Avoid blocking the main goroutine that is
// responsible for handling all settings updates.
select {
case intervalChangeChan <- getCompactionInterval():
default:
}
})

defer ticker.Stop()
for {
select {
case <-ticker.C:
// Find the liveness replica in order to compact it.
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
store.VisitReplicas(func(repl *kvserver.Replica) bool {
span := repl.Desc().KeySpan().AsRawSpanWithNoLocals()
if keys.NodeLivenessSpan.Overlaps(span) {

// The CompactRange() method expects the start and end keys to be
// encoded.
startEngineKey :=
storage.EngineKey{
Key: span.Key,
}.Encode()

endEngineKey :=
storage.EngineKey{
Key: span.EndKey,
}.Encode()

timeBeforeCompaction := timeutil.Now()
if err := store.StateEngine().CompactRange(startEngineKey, endEngineKey); err != nil {
log.Errorf(ctx, "failed compacting liveness replica: %+v with error: %s", repl, err)
}

log.Infof(ctx, "finished compacting liveness replica: %+v and it took: %+v",
repl, timeutil.Since(timeBeforeCompaction))
}
return true
})
return nil
})
case newInterval := <-intervalChangeChan:
ticker.Reset(newInterval)
case <-stopper.ShouldQuiesce():
return
}
}
}); err != nil {
log.Errorf(ctx, "failed to start the async liveness compaction task")
}

}

// computeMetricsPeriodically instructs each store to compute the value of
// complicated metrics.
func (n *Node) computeMetricsPeriodically(
Expand Down

0 comments on commit 42a7a91

Please sign in to comment.