Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84871: kvserver: add an interface for replica changes for the store rebalancer r=lidorcarmel a=lidorcarmel

In order to simulate the store rebalancer, we need an interface that
can be used for lease transfers and replica moves. This PR wraps existing
code with a new interface, which will also be implemented in the asim package.

Release note: None

84873: colexec: fix span use after finish in edge cases in parallel sync r=yuzefovich a=yuzefovich

Previously, in some cases the parallel unordered synchronizer would
finish the tracing spans prematurely which would result in "use of Span
after Finish" panics in test builds and this is now fixed.

Release note: None

84887: clusterversion,storage: add explicit version for split user keys r=jbowens a=nicktrav

Pebble ceased splitting user keys across multiple sstables in a single
level in cockroachdb/pebble@a860bbad. A Pebble format major version was
added in 22.1 to mark existing tables with split user that formed an
"atomic compaction unit" for compaction. The compaction to recombine the
split keys happens at a low priority within Pebble, so there is no
guarantee that these tables have been recombined.

Pebble range key support depends on having split user keys recombined.
As such, the Pebble migration to upgrade to a version that support range
keys must first perform a blocking migration to compact tables with
split user keys.

Currently, the Cockroach cluster version
`EnsurePebbleFormatVersionRangeKeys` ratchets the Pebble version through
both the split user keys migration, up to the version that supports
range keys. Strictly speaking, the Pebble format version migration will
take care of the sequencing.

However, to provide better visibility into the various Pebble
migrations, and to cater for the fact that the split user keys migration
may take some time to recombine the necessary tables, a dedicated
cluster version for the split keys migration is added. This dedicated
cluster version makes it unambiguous what's blocking the Cockroach
version finalization.

Release note (backward-incompatible change, ops change): A cluster
version is added to allow Pebble to recombine certain SSTables
(specifically, user keys that are split across multiple files in a level
of the LSM). Recombining the split user keys is required for supporting
the range keys feature.  The migration to recombine the SSTables is
expected to be short (split user keys are rare in practice), but will
block subsequent migrations until all tables have been recombined. The
`storage.marked-for-compaction-files` time series metric can show the
progress of the migration.

Close #84012.

Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Nick Travers <[email protected]>
  • Loading branch information
4 people committed Jul 22, 2022
4 parents 4ce560f + 9513ed0 + e3a9917 + 016019b commit 02727e3
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 43 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-28 set the active cluster version in the format '<major>.<minor>'
version version 22.1-30 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-28</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-30</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
32 changes: 20 additions & 12 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ const (

// LocalTimestamps enables the use of local timestamps in MVCC values.
LocalTimestamps
// PebbleFormatSplitUserKeysMarkedCompacted updates the Pebble format
// version that recombines all user keys that may be split across multiple
// files into a single table.
PebbleFormatSplitUserKeysMarkedCompacted
// EnsurePebbleFormatVersionRangeKeys is the first step of a two-part
// migration that bumps Pebble's format major version to a version that
// supports range keys.
Expand Down Expand Up @@ -517,53 +521,57 @@ var versionsSingleton = keyedVersions{
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 4},
},
{
Key: EnsurePebbleFormatVersionRangeKeys,
Key: PebbleFormatSplitUserKeysMarkedCompacted,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 6},
},
{
Key: EnablePebbleFormatVersionRangeKeys,
Key: EnsurePebbleFormatVersionRangeKeys,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 8},
},
{
Key: TrigramInvertedIndexes,
Key: EnablePebbleFormatVersionRangeKeys,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 10},
},
{
Key: RemoveGrantPrivilege,
Key: TrigramInvertedIndexes,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 12},
},
{
Key: MVCCRangeTombstones,
Key: RemoveGrantPrivilege,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 14},
},
{
Key: UpgradeSequenceToBeReferencedByID,
Key: MVCCRangeTombstones,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 16},
},
{
Key: SampledStmtDiagReqs,
Key: UpgradeSequenceToBeReferencedByID,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 18},
},
{
Key: AddSSTableTombstones,
Key: SampledStmtDiagReqs,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 20},
},
{
Key: SystemPrivilegesTable,
Key: AddSSTableTombstones,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 22},
},
{
Key: EnablePredicateProjectionChangefeed,
Key: SystemPrivilegesTable,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 24},
},
{
Key: AlterSystemSQLInstancesAddLocality,
Key: EnablePredicateProjectionChangefeed,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 26},
},
{
Key: SystemExternalConnectionsTable,
Key: AlterSystemSQLInstancesAddLocality,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 28},
},
{
Key: SystemExternalConnectionsTable,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 30},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
29 changes: 15 additions & 14 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,8 @@ func (r *Replica) requestLeaseLocked(
// blocks until the transfer is done. If a transfer is already in progress, this
// method joins in waiting for it to complete if it's transferring to the same
// replica. Otherwise, a NotLeaseHolderError is returned.
//
// AdminTransferLease implements the ReplicaLeaseMover interface.
func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID) error {
// initTransferHelper inits a transfer if no extension is in progress.
// It returns a channel for waiting for the result of a pending
Expand Down
61 changes: 54 additions & 7 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,26 +1567,73 @@ func (rq *replicateQueue) shedLease(
if qpsMeasurementDur < replicastats.MinStatsDuration {
avgQPS = 0
}
if err := rq.transferLease(ctx, repl, target, avgQPS); err != nil {
if err := rq.TransferLease(ctx, repl, repl.store.StoreID(), target.StoreID, avgQPS); err != nil {
return allocator.TransferErr, err
}
return allocator.TransferOK, nil
}

func (rq *replicateQueue) transferLease(
ctx context.Context, repl *Replica, target roachpb.ReplicaDescriptor, rangeQPS float64,
// ReplicaLeaseMover handles lease transfers for a single range.
type ReplicaLeaseMover interface {
// AdminTransferLease moves the lease to the requested store.
AdminTransferLease(ctx context.Context, target roachpb.StoreID) error

// String returns info about the replica.
String() string
}

// RangeRebalancer handles replica moves and lease transfers.
type RangeRebalancer interface {
// TransferLease uses a LeaseMover interface to move a lease between stores.
// The QPS is used to update stats for the stores.
TransferLease(
ctx context.Context,
rlm ReplicaLeaseMover,
source, target roachpb.StoreID,
rangeQPS float64,
) error

// RelocateRange relocates replicas to the requested stores, and can transfer
// the lease for the range to the first target voter.
RelocateRange(
ctx context.Context,
key interface{},
voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
transferLeaseToFirstVoter bool,
) error
}

// TransferLease implements the RangeRebalancer interface.
func (rq *replicateQueue) TransferLease(
ctx context.Context, rlm ReplicaLeaseMover, source, target roachpb.StoreID, rangeQPS float64,
) error {
rq.metrics.TransferLeaseCount.Inc(1)
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
log.VEventf(ctx, 1, "transferring lease to s%d", target)
if err := rlm.AdminTransferLease(ctx, target); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", rlm, target)
}
rq.lastLeaseTransfer.Store(timeutil.Now())
rq.store.cfg.StorePool.UpdateLocalStoresAfterLeaseTransfer(
repl.store.StoreID(), target.StoreID, rangeQPS)
source, target, rangeQPS)
return nil
}

// RelocateRange implements the RangeRebalancer interface.
func (rq *replicateQueue) RelocateRange(
ctx context.Context,
key interface{},
voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
transferLeaseToFirstVoter bool,
) error {
return rq.store.DB().AdminRelocateRange(
ctx,
key,
voterTargets,
nonVoterTargets,
transferLeaseToFirstVoter,
)
}

func (rq *replicateQueue) changeReplicas(
ctx context.Context,
repl *Replica,
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type StoreRebalancer struct {
metrics StoreRebalancerMetrics
st *cluster.Settings
rq *replicateQueue
rr RangeRebalancer
replRankings *replicaRankings
getRaftStatusFn func(replica *Replica) *raft.Status
}
Expand All @@ -123,6 +124,7 @@ func NewStoreRebalancer(
metrics: makeStoreRebalancerMetrics(),
st: st,
rq: rq,
rr: rq,
replRankings: replRankings,
getRaftStatusFn: func(replica *Replica) *raft.Status {
return replica.RaftStatus()
Expand Down Expand Up @@ -251,7 +253,13 @@ func (sr *StoreRebalancer) rebalanceStore(

timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl)
if err := contextutil.RunWithTimeout(ctx, "transfer lease", timeout, func(ctx context.Context) error {
return sr.rq.transferLease(ctx, replWithStats.repl, target, replWithStats.qps)
return sr.rr.TransferLease(
ctx,
replWithStats.repl,
replWithStats.repl.StoreID(),
target.StoreID,
replWithStats.qps,
)
}); err != nil {
log.Errorf(ctx, "unable to transfer lease to s%d: %+v", target.StoreID, err)
continue
Expand Down Expand Up @@ -320,7 +328,7 @@ func (sr *StoreRebalancer) rebalanceStore(

timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl)
if err := contextutil.RunWithTimeout(ctx, "relocate range", timeout, func(ctx context.Context) error {
return sr.rq.store.DB().AdminRelocateRange(
return sr.rr.RelocateRange(
ctx,
descBeforeRebalance.StartKey.AsRawKey(),
voterTargets,
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,17 +486,19 @@ func (s *ParallelUnorderedSynchronizer) Close(ctx context.Context) error {
// Note that at this point we know that the input goroutines won't be
// spawned up (our consumer won't call Next/DrainMeta after calling Close),
// so it is safe to close all closers from this goroutine.
for i, span := range s.tracingSpans {
if span != nil {
span.Finish()
s.tracingSpans[i] = nil
}
}
var lastErr error
for _, input := range s.inputs {
if err := input.ToClose.Close(ctx); err != nil {
lastErr = err
}
}
// Finish the spans after closing the Closers since Close() implementations
// might log some stuff.
for i, span := range s.tracingSpans {
if span != nil {
span.Finish()
s.tracingSpans[i] = nil
}
}
return lastErr
}
4 changes: 4 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,10 @@ func (p *Pebble) SetMinVersion(version roachpb.Version) error {
if formatVers < pebble.FormatRangeKeys {
formatVers = pebble.FormatRangeKeys
}
case !version.Less(clusterversion.ByKey(clusterversion.PebbleFormatSplitUserKeysMarkedCompacted)):
if formatVers < pebble.FormatMarkedCompacted {
formatVers = pebble.FormatMarkedCompacted
}
case !version.Less(clusterversion.ByKey(clusterversion.PebbleFormatSplitUserKeysMarked)):
if formatVers < pebble.FormatSplitUserKeysMarked {
formatVers = pebble.FormatSplitUserKeysMarked
Expand Down

0 comments on commit 02727e3

Please sign in to comment.