Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2.12-rc: kvserver: compact liveness range periodically #130711

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions pkg/kv/kvserver/liveness/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,52 @@ func TestGetActiveNodes(t *testing.T) {
})
require.Equal(t, []roachpb.NodeID{1, 2, 3, 4}, getActiveNodes(nl1))
}

// TestLivenessRangeGetsPeriodicallyCompacted tests that the liveness range
// gets compacted when we set the liveness range compaction interval.
func TestLivenessRangeGetsPeriodicallyCompacted(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

// Enable the liveness range compaction and set the interval to 1s to speed
// up the test.
c := tc.Server(0).SystemLayer().SQLConn(t)
_, err := c.ExecContext(ctx, "set cluster setting kv.liveness_range_compact.interval='1s'")
require.NoError(t, err)

// Get the original file number of the sstable for the liveness range. We
// expect to see this file number change as the liveness range gets compacted.
livenessFileNumberQuery := "WITH replicas(n) AS (SELECT unnest(replicas) FROM " +
"crdb_internal.ranges_no_leases WHERE range_id = 2), sstables AS (SELECT " +
"(crdb_internal.sstable_metrics(n, n, start_key, end_key)).* " +
"FROM crdb_internal.ranges_no_leases, replicas WHERE range_id = 2) " +
"SELECT file_num FROM sstables"

sqlDB := tc.ApplicationLayer(0).SQLConn(t)
var original_file_num string
testutils.SucceedsSoon(t, func() error {
rows := sqlDB.QueryRow(livenessFileNumberQuery)
if err := rows.Scan(&original_file_num); err != nil {
return err
}
return nil
})

// Expect that the liveness file number changes.
testutils.SucceedsSoon(t, func() error {
var current_file_num string
rows := sqlDB.QueryRow(livenessFileNumberQuery)
if err := rows.Scan(&current_file_num); err != nil {
return err
}
if current_file_num == original_file_num {
return errors.Errorf("Liveness compaction hasn't happened yet")
}
return nil
})
}
91 changes: 91 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"net"
"sort"
"strings"
Expand Down Expand Up @@ -247,6 +248,13 @@ var (
`duration spent in processing above any available stack history is appended to its trace, if automatic trace snapshots are enabled`,
time.Second*30,
)

livenessRangeCompactInterval = settings.RegisterDurationSetting(
settings.SystemOnly,
"kv.liveness_range_compact.interval",
`interval at which the liveness range is compacted. A value of 0 disables the periodic compaction`,
0,
)
)

type nodeMetrics struct {
Expand Down Expand Up @@ -751,6 +759,8 @@ func (n *Node) start(
log.Infof(ctx, "started with engine type %v", t)
}
log.Infof(ctx, "started with attributes %v", attrs.Attrs)

n.startPeriodicLivenessCompaction(n.stopper, livenessRangeCompactInterval)
return nil
}

Expand Down Expand Up @@ -963,6 +973,87 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time.
})
}

// startPeriodicLivenessCompaction starts a loop where it periodically compacts
// the liveness range.
func (n *Node) startPeriodicLivenessCompaction(
stopper *stop.Stopper, livenessRangeCompactInterval *settings.DurationSetting,
) {
ctx := n.AnnotateCtx(context.Background())

// getCompactionInterval() returns the interval at which the liveness range is
// set to be compacted. If the interval is set to 0, the period is set to the
// max possible duration because a value of 0 cause the ticker to panic.
getCompactionInterval := func() time.Duration {
interval := livenessRangeCompactInterval.Get(&n.storeCfg.Settings.SV)
if interval == 0 {
interval = math.MaxInt64
}
return interval
}

if err := stopper.RunAsyncTask(ctx, "liveness-compaction", func(ctx context.Context) {
interval := getCompactionInterval()
ticker := time.NewTicker(interval)

intervalChangeChan := make(chan time.Duration)

// Update the compaction interval when the setting changes.
livenessRangeCompactInterval.SetOnChange(&n.storeCfg.Settings.SV, func(ctx context.Context) {
// intervalChangeChan is used to signal the compaction loop that the
// interval has changed. Avoid blocking the main goroutine that is
// responsible for handling all settings updates.
select {
case intervalChangeChan <- getCompactionInterval():
default:
}
})

defer ticker.Stop()
for {
select {
case <-ticker.C:
// Find the liveness replica in order to compact it.
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
store.VisitReplicas(func(repl *kvserver.Replica) bool {
span := repl.Desc().KeySpan().AsRawSpanWithNoLocals()
if keys.NodeLivenessSpan.Overlaps(span) {

// The CompactRange() method expects the start and end keys to be
// encoded.
startEngineKey :=
storage.EngineKey{
Key: span.Key,
}.Encode()

endEngineKey :=
storage.EngineKey{
Key: span.EndKey,
}.Encode()

timeBeforeCompaction := timeutil.Now()
if err := store.StateEngine().CompactRange(startEngineKey, endEngineKey); err != nil {
log.Errorf(ctx, "failed compacting liveness replica: %+v with error: %s", repl, err)
}

log.Infof(ctx, "finished compacting liveness replica: %+v and it took: %+v",
repl, timeutil.Since(timeBeforeCompaction))
}
return true
})
return nil
})
case newInterval := <-intervalChangeChan:
ticker.Reset(newInterval)
case <-stopper.ShouldQuiesce():
return
}
}
}); err != nil {
log.Errorf(ctx, "failed to start the async liveness compaction task")
}

}

// computeMetricsPeriodically instructs each store to compute the value of
// complicated metrics.
func (n *Node) computeMetricsPeriodically(
Expand Down