Skip to content

Commit

Permalink
kvserver: use messages on NotLeaseholderErrors everywhere
Browse files Browse the repository at this point in the history
NLHE permits custom messages in it, but the field was rarely used. This
patch makes every instance where we instantiate the error provide a
message, since this error comes from a wide variety of conditions.

Release note: None
  • Loading branch information
andreimatei committed Nov 5, 2020
1 parent f9779fa commit af6a81f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(
r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
"request's timestamp below closed timestamp",
)
err.CustomMsg = "reproposal failed due to closed timestamp"
return roachpb.NewError(err)
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ func checkForcedErr(
// We return a NotLeaseHolderError so that the DistSender retries.
// NB: we set proposerStoreID to 0 because we don't know who proposed the
// Raft command. This is ok, as this is only used for debug information.
nlhe := newNotLeaseHolderError(replicaState.Lease, 0 /* proposerStoreID */, replicaState.Desc)
nlhe.CustomMsg = fmt.Sprintf(
"stale proposal: command was proposed under lease #%d but is being applied "+
"under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease)
nlhe := newNotLeaseHolderError(
replicaState.Lease, 0 /* proposerStoreID */, replicaState.Desc,
fmt.Sprintf(
"stale proposal: command was proposed under lease #%d but is being applied "+
"under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease))
return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe)
}

Expand Down
57 changes: 35 additions & 22 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
// back to indicate that we have no idea who the range lease holder might
// be; we've withdrawn from active duty.
llHandle.resolve(roachpb.NewError(
newNotLeaseHolderError(nil, p.repl.store.StoreID(), p.repl.mu.state.Desc)))
newNotLeaseHolderError(nil, p.repl.store.StoreID(), p.repl.mu.state.Desc,
"lease acquisition task couldn't be started; node is shutting down")))
return llHandle
}
// InitOrJoinRequest requires that repl.mu is exclusively locked. requestLeaseAsync
Expand Down Expand Up @@ -336,7 +337,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
if status.Lease.OwnedBy(nextLeaseHolder.StoreID) &&
p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
} else if status.Liveness.Epoch == status.Lease.Epoch {
// If not owner, increment epoch if necessary to invalidate lease.
Expand All @@ -346,9 +347,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
if live, liveErr := p.repl.store.cfg.NodeLiveness.IsLive(nextLeaseHolder.NodeID); !live || liveErr != nil {
err = errors.Errorf("not incrementing epoch on n%d because next leaseholder (n%d) not live (err = %v)",
status.Liveness.NodeID, nextLeaseHolder.NodeID, liveErr)
if log.V(1) {
log.Infof(ctx, "%v", err)
}
log.VEventf(ctx, 1, "%v", err)
} else if err = p.repl.store.cfg.NodeLiveness.IncrementEpoch(ctx, status.Liveness); err != nil {
// If we get ErrEpochAlreadyIncremented, someone else beat
// us to it. This proves that the target node is truly
Expand Down Expand Up @@ -380,14 +379,16 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
//
// https://github.com/cockroachdb/cockroach/issues/35986
if !errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "failed to increment leaseholder's epoch: %s", err)
}
}
}
// Set error for propagation to all waiters below.
if err != nil {
// TODO(bdarnell): is status.Lease really what we want to put in the NotLeaseHolderError here?
pErr = roachpb.NewError(newNotLeaseHolderError(&status.Lease, p.repl.store.StoreID(), p.repl.Desc()))
pErr = roachpb.NewError(newNotLeaseHolderError(
&status.Lease, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err)))
}
}

Expand Down Expand Up @@ -623,7 +624,8 @@ func (r *Replica) requestLeaseLocked(
}
if transferLease, ok := r.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID); ok {
return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError(
newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc)))
newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc,
"lease transfer in progress")))
}
// If we're draining, we'd rather not take any new leases (since we're also
// trying to move leases away elsewhere). But if we're the leader, we don't
Expand All @@ -634,7 +636,8 @@ func (r *Replica) requestLeaseLocked(
// this code can go away.
log.VEventf(ctx, 2, "refusing to take the lease because we're draining")
return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError(
newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc)))
newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc,
"refusing to take the lease; node is draining")))
}
return r.mu.pendingLeaseRequest.InitOrJoinRequest(
ctx, repDesc, status, r.mu.state.Desc.StartKey.AsRawKey(), false /* transfer */)
Expand Down Expand Up @@ -674,7 +677,8 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID
}
desc := r.mu.state.Desc
if !status.Lease.OwnedBy(r.store.StoreID()) {
return nil, nil, newNotLeaseHolderError(&status.Lease, r.store.StoreID(), desc)
return nil, nil, newNotLeaseHolderError(&status.Lease, r.store.StoreID(), desc,
"can't transfer the lease because this store doesn't own it")
}
// Verify the target is a replica of the range.
var ok bool
Expand Down Expand Up @@ -707,7 +711,8 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID
}
// Another transfer is in progress, and it's not transferring to the
// same replica we'd like.
return nil, nil, newNotLeaseHolderError(&nextLease, r.store.StoreID(), desc)
return nil, nil, newNotLeaseHolderError(&nextLease, r.store.StoreID(), desc,
"another transfer to a different store is in progress")
}
// Stop using the current lease.
r.mu.minLeaseProposedTS = status.Timestamp
Expand Down Expand Up @@ -825,10 +830,11 @@ func (r *Replica) isLeaseValidRLocked(
// Note that this error can be generated on the Raft processing goroutine, so
// its output should be completely determined by its parameters.
func newNotLeaseHolderError(
l *roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor,
l *roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, msg string,
) *roachpb.NotLeaseHolderError {
err := &roachpb.NotLeaseHolderError{
RangeID: rangeDesc.RangeID,
RangeID: rangeDesc.RangeID,
CustomMsg: msg,
}
if proposerStoreID != 0 {
err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID)
Expand Down Expand Up @@ -912,7 +918,8 @@ func (r *Replica) redirectOnOrAcquireLease(
// Lease state couldn't be determined.
log.VEventf(ctx, 2, "lease state couldn't be determined")
return nil, roachpb.NewError(
newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc))
newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc,
"lease state couldn't be determined"))

case kvserverpb.LeaseState_VALID, kvserverpb.LeaseState_STASIS:
if !status.Lease.OwnedBy(r.store.StoreID()) {
Expand Down Expand Up @@ -957,7 +964,8 @@ func (r *Replica) redirectOnOrAcquireLease(
// Otherwise, if the lease is currently held by another replica, redirect
// to the holder.
return nil, roachpb.NewError(
newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc))
newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc,
"lease held by different store"))
}
// Check that we're not in the process of transferring the lease away.
// If we are transferring the lease away, we can't serve reads or
Expand All @@ -972,7 +980,8 @@ func (r *Replica) redirectOnOrAcquireLease(
if transferLease, ok := r.mu.pendingLeaseRequest.TransferInProgress(
repDesc.ReplicaID); ok {
return nil, roachpb.NewError(
newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc))
newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc,
"lease transfer in progress"))
}

// If the lease is in stasis, we can't serve requests until we've
Expand Down Expand Up @@ -1015,7 +1024,7 @@ func (r *Replica) redirectOnOrAcquireLease(
}
// If lease is currently held by another, redirect to holder.
return nil, roachpb.NewError(
newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc))
newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc, "lease proscribed"))
}

// Return a nil handle to signal that we have a valid lease.
Expand Down Expand Up @@ -1064,15 +1073,17 @@ func (r *Replica) redirectOnOrAcquireLease(
if _, descErr := r.GetReplicaDescriptor(); descErr != nil {
err = descErr
} else if lease, _ := r.GetLease(); !r.IsLeaseValid(ctx, lease, r.store.Clock().Now()) {
err = newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc())
err = newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc(),
"lease acquisition attempt lost to another lease, which has expired in the meantime")
} else {
err = newNotLeaseHolderError(&lease, r.store.StoreID(), r.Desc())
err = newNotLeaseHolderError(&lease, r.store.StoreID(), r.Desc(),
"lease acquisition attempt lost to another lease")
}
pErr = roachpb.NewError(err)
}
return pErr
}
log.Eventf(ctx, "lease acquisition succeeded: %+v", status.Lease)
log.VEventf(ctx, 2, "lease acquisition succeeded: %+v", status.Lease)
return nil
case <-slowTimer.C:
slowTimer.Read = true
Expand All @@ -1086,10 +1097,12 @@ func (r *Replica) redirectOnOrAcquireLease(
case <-ctx.Done():
llHandle.Cancel()
log.VErrEventf(ctx, 2, "lease acquisition failed: %s", ctx.Err())
return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc()))
return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc(),
"lease acquisition canceled"))
case <-r.store.Stopper().ShouldStop():
llHandle.Cancel()
return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc()))
return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc(),
"lease acquisition cancled because node is stopping"))
}
}
}()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func (r *Replica) executeWriteBatch(
// manager.
if curLease, _ := r.GetLease(); curLease.Sequence > st.Lease.Sequence {
curLeaseCpy := curLease // avoid letting curLease escape
err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc())
err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc(),
"stale lease discovered before proposing")
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
return nil, g, roachpb.NewError(err)
}
Expand Down

0 comments on commit af6a81f

Please sign in to comment.