Skip to content

Commit

Permalink
kvserver: add an interface for replica changes for the store rebalancer
Browse files Browse the repository at this point in the history
In order to simulate the store rebalancer, we need an interface that
can be used for lease transfers and replica moves. This PR wraps existing
code with a new interface, which will also be implemented in the asim package.

Release note: None
  • Loading branch information
lidorcarmel committed Jul 21, 2022
1 parent c73adb1 commit 9513ed0
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 9 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,8 @@ func (r *Replica) requestLeaseLocked(
// blocks until the transfer is done. If a transfer is already in progress, this
// method joins in waiting for it to complete if it's transferring to the same
// replica. Otherwise, a NotLeaseHolderError is returned.
//
// AdminTransferLease implements the ReplicaLeaseMover interface.
func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID) error {
// initTransferHelper inits a transfer if no extension is in progress.
// It returns a channel for waiting for the result of a pending
Expand Down
61 changes: 54 additions & 7 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,26 +1567,73 @@ func (rq *replicateQueue) shedLease(
if qpsMeasurementDur < replicastats.MinStatsDuration {
avgQPS = 0
}
if err := rq.transferLease(ctx, repl, target, avgQPS); err != nil {
if err := rq.TransferLease(ctx, repl, repl.store.StoreID(), target.StoreID, avgQPS); err != nil {
return allocator.TransferErr, err
}
return allocator.TransferOK, nil
}

func (rq *replicateQueue) transferLease(
ctx context.Context, repl *Replica, target roachpb.ReplicaDescriptor, rangeQPS float64,
// ReplicaLeaseMover handles lease transfers for a single range.
type ReplicaLeaseMover interface {
// AdminTransferLease moves the lease to the requested store.
AdminTransferLease(ctx context.Context, target roachpb.StoreID) error

// String returns info about the replica.
String() string
}

// RangeRebalancer handles replica moves and lease transfers.
type RangeRebalancer interface {
// TransferLease uses a LeaseMover interface to move a lease between stores.
// The QPS is used to update stats for the stores.
TransferLease(
ctx context.Context,
rlm ReplicaLeaseMover,
source, target roachpb.StoreID,
rangeQPS float64,
) error

// RelocateRange relocates replicas to the requested stores, and can transfer
// the lease for the range to the first target voter.
RelocateRange(
ctx context.Context,
key interface{},
voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
transferLeaseToFirstVoter bool,
) error
}

// TransferLease implements the RangeRebalancer interface.
func (rq *replicateQueue) TransferLease(
ctx context.Context, rlm ReplicaLeaseMover, source, target roachpb.StoreID, rangeQPS float64,
) error {
rq.metrics.TransferLeaseCount.Inc(1)
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
log.VEventf(ctx, 1, "transferring lease to s%d", target)
if err := rlm.AdminTransferLease(ctx, target); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", rlm, target)
}
rq.lastLeaseTransfer.Store(timeutil.Now())
rq.store.cfg.StorePool.UpdateLocalStoresAfterLeaseTransfer(
repl.store.StoreID(), target.StoreID, rangeQPS)
source, target, rangeQPS)
return nil
}

// RelocateRange implements the RangeRebalancer interface.
func (rq *replicateQueue) RelocateRange(
ctx context.Context,
key interface{},
voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
transferLeaseToFirstVoter bool,
) error {
return rq.store.DB().AdminRelocateRange(
ctx,
key,
voterTargets,
nonVoterTargets,
transferLeaseToFirstVoter,
)
}

func (rq *replicateQueue) changeReplicas(
ctx context.Context,
repl *Replica,
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type StoreRebalancer struct {
metrics StoreRebalancerMetrics
st *cluster.Settings
rq *replicateQueue
rr RangeRebalancer
replRankings *replicaRankings
getRaftStatusFn func(replica *Replica) *raft.Status
}
Expand All @@ -123,6 +124,7 @@ func NewStoreRebalancer(
metrics: makeStoreRebalancerMetrics(),
st: st,
rq: rq,
rr: rq,
replRankings: replRankings,
getRaftStatusFn: func(replica *Replica) *raft.Status {
return replica.RaftStatus()
Expand Down Expand Up @@ -251,7 +253,13 @@ func (sr *StoreRebalancer) rebalanceStore(

timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl)
if err := contextutil.RunWithTimeout(ctx, "transfer lease", timeout, func(ctx context.Context) error {
return sr.rq.transferLease(ctx, replWithStats.repl, target, replWithStats.qps)
return sr.rr.TransferLease(
ctx,
replWithStats.repl,
replWithStats.repl.StoreID(),
target.StoreID,
replWithStats.qps,
)
}); err != nil {
log.Errorf(ctx, "unable to transfer lease to s%d: %+v", target.StoreID, err)
continue
Expand Down Expand Up @@ -320,7 +328,7 @@ func (sr *StoreRebalancer) rebalanceStore(

timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl)
if err := contextutil.RunWithTimeout(ctx, "relocate range", timeout, func(ctx context.Context) error {
return sr.rq.store.DB().AdminRelocateRange(
return sr.rr.RelocateRange(
ctx,
descBeforeRebalance.StartKey.AsRawKey(),
voterTargets,
Expand Down

0 comments on commit 9513ed0

Please sign in to comment.