Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: use messages on NotLeaseholderErrors everywhere #56334

Merged
merged 1 commit into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(
r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
"reproposal failed due to closed timestamp",
)
err.CustomMsg = "reproposal failed due to closed timestamp"
return roachpb.NewError(err)
}
// Some tests check for this log message in the trace.
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ func checkForcedErr(
// We return a NotLeaseHolderError so that the DistSender retries.
// NB: we set proposerStoreID to 0 because we don't know who proposed the
// Raft command. This is ok, as this is only used for debug information.
nlhe := newNotLeaseHolderError(replicaState.Lease, 0 /* proposerStoreID */, replicaState.Desc)
nlhe.CustomMsg = fmt.Sprintf(
"stale proposal: command was proposed under lease #%d but is being applied "+
"under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease)
nlhe := newNotLeaseHolderError(
replicaState.Lease, 0 /* proposerStoreID */, replicaState.Desc,
fmt.Sprintf(
"stale proposal: command was proposed under lease #%d but is being applied "+
"under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease))
return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe)
}

Expand Down
57 changes: 35 additions & 22 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
// back to indicate that we have no idea who the range lease holder might
// be; we've withdrawn from active duty.
llHandle.resolve(roachpb.NewError(
newNotLeaseHolderError(nil, p.repl.store.StoreID(), p.repl.mu.state.Desc)))
newNotLeaseHolderError(nil, p.repl.store.StoreID(), p.repl.mu.state.Desc,
"lease acquisition task couldn't be started; node is shutting down")))
return llHandle
}
// InitOrJoinRequest requires that repl.mu is exclusively locked. requestLeaseAsync
Expand Down Expand Up @@ -336,7 +337,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
if status.Lease.OwnedBy(nextLeaseHolder.StoreID) &&
p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
} else if status.Liveness.Epoch == status.Lease.Epoch {
// If not owner, increment epoch if necessary to invalidate lease.
Expand All @@ -346,9 +347,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
if live, liveErr := p.repl.store.cfg.NodeLiveness.IsLive(nextLeaseHolder.NodeID); !live || liveErr != nil {
err = errors.Errorf("not incrementing epoch on n%d because next leaseholder (n%d) not live (err = %v)",
status.Liveness.NodeID, nextLeaseHolder.NodeID, liveErr)
if log.V(1) {
log.Infof(ctx, "%v", err)
}
log.VEventf(ctx, 1, "%v", err)
} else if err = p.repl.store.cfg.NodeLiveness.IncrementEpoch(ctx, status.Liveness); err != nil {
// If we get ErrEpochAlreadyIncremented, someone else beat
// us to it. This proves that the target node is truly
Expand Down Expand Up @@ -380,14 +379,16 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
//
// https://github.com/cockroachdb/cockroach/issues/35986
if !errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "failed to increment leaseholder's epoch: %s", err)
}
}
}
// Set error for propagation to all waiters below.
if err != nil {
// TODO(bdarnell): is status.Lease really what we want to put in the NotLeaseHolderError here?
pErr = roachpb.NewError(newNotLeaseHolderError(&status.Lease, p.repl.store.StoreID(), p.repl.Desc()))
pErr = roachpb.NewError(newNotLeaseHolderError(
&status.Lease, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err)))
}
}

Expand Down Expand Up @@ -623,7 +624,8 @@ func (r *Replica) requestLeaseLocked(
}
if transferLease, ok := r.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID); ok {
return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError(
newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc)))
newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc,
"lease transfer in progress")))
}
// If we're draining, we'd rather not take any new leases (since we're also
// trying to move leases away elsewhere). But if we're the leader, we don't
Expand All @@ -634,7 +636,8 @@ func (r *Replica) requestLeaseLocked(
// this code can go away.
log.VEventf(ctx, 2, "refusing to take the lease because we're draining")
return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError(
newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc)))
newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc,
"refusing to take the lease; node is draining")))
}
return r.mu.pendingLeaseRequest.InitOrJoinRequest(
ctx, repDesc, status, r.mu.state.Desc.StartKey.AsRawKey(), false /* transfer */)
Expand Down Expand Up @@ -674,7 +677,8 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID
}
desc := r.mu.state.Desc
if !status.Lease.OwnedBy(r.store.StoreID()) {
return nil, nil, newNotLeaseHolderError(&status.Lease, r.store.StoreID(), desc)
return nil, nil, newNotLeaseHolderError(&status.Lease, r.store.StoreID(), desc,
"can't transfer the lease because this store doesn't own it")
}
// Verify the target is a replica of the range.
var ok bool
Expand Down Expand Up @@ -707,7 +711,8 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID
}
// Another transfer is in progress, and it's not transferring to the
// same replica we'd like.
return nil, nil, newNotLeaseHolderError(&nextLease, r.store.StoreID(), desc)
return nil, nil, newNotLeaseHolderError(&nextLease, r.store.StoreID(), desc,
"another transfer to a different store is in progress")
}
// Stop using the current lease.
r.mu.minLeaseProposedTS = status.Timestamp
Expand Down Expand Up @@ -825,10 +830,11 @@ func (r *Replica) isLeaseValidRLocked(
// Note that this error can be generated on the Raft processing goroutine, so
// its output should be completely determined by its parameters.
func newNotLeaseHolderError(
l *roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor,
l *roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, msg string,
) *roachpb.NotLeaseHolderError {
err := &roachpb.NotLeaseHolderError{
RangeID: rangeDesc.RangeID,
RangeID: rangeDesc.RangeID,
CustomMsg: msg,
}
if proposerStoreID != 0 {
err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID)
Expand Down Expand Up @@ -912,7 +918,8 @@ func (r *Replica) redirectOnOrAcquireLease(
// Lease state couldn't be determined.
log.VEventf(ctx, 2, "lease state couldn't be determined")
return nil, roachpb.NewError(
newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc))
newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc,
"lease state couldn't be determined"))

case kvserverpb.LeaseState_VALID, kvserverpb.LeaseState_STASIS:
if !status.Lease.OwnedBy(r.store.StoreID()) {
Expand Down Expand Up @@ -957,7 +964,8 @@ func (r *Replica) redirectOnOrAcquireLease(
// Otherwise, if the lease is currently held by another replica, redirect
// to the holder.
return nil, roachpb.NewError(
newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc))
newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc,
"lease held by different store"))
}
// Check that we're not in the process of transferring the lease away.
// If we are transferring the lease away, we can't serve reads or
Expand All @@ -972,7 +980,8 @@ func (r *Replica) redirectOnOrAcquireLease(
if transferLease, ok := r.mu.pendingLeaseRequest.TransferInProgress(
repDesc.ReplicaID); ok {
return nil, roachpb.NewError(
newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc))
newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc,
"lease transfer in progress"))
}

// If the lease is in stasis, we can't serve requests until we've
Expand Down Expand Up @@ -1015,7 +1024,7 @@ func (r *Replica) redirectOnOrAcquireLease(
}
// If lease is currently held by another, redirect to holder.
return nil, roachpb.NewError(
newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc))
newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc, "lease proscribed"))
}

// Return a nil handle to signal that we have a valid lease.
Expand Down Expand Up @@ -1064,15 +1073,17 @@ func (r *Replica) redirectOnOrAcquireLease(
if _, descErr := r.GetReplicaDescriptor(); descErr != nil {
err = descErr
} else if lease, _ := r.GetLease(); !r.IsLeaseValid(ctx, lease, r.store.Clock().Now()) {
err = newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc())
err = newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc(),
"lease acquisition attempt lost to another lease, which has expired in the meantime")
} else {
err = newNotLeaseHolderError(&lease, r.store.StoreID(), r.Desc())
err = newNotLeaseHolderError(&lease, r.store.StoreID(), r.Desc(),
"lease acquisition attempt lost to another lease")
}
pErr = roachpb.NewError(err)
}
return pErr
}
log.Eventf(ctx, "lease acquisition succeeded: %+v", status.Lease)
log.VEventf(ctx, 2, "lease acquisition succeeded: %+v", status.Lease)
return nil
case <-slowTimer.C:
slowTimer.Read = true
Expand All @@ -1086,10 +1097,12 @@ func (r *Replica) redirectOnOrAcquireLease(
case <-ctx.Done():
llHandle.Cancel()
log.VErrEventf(ctx, 2, "lease acquisition failed: %s", ctx.Err())
return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc()))
return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc(),
"lease acquisition canceled because context canceled"))
case <-r.store.Stopper().ShouldStop():
llHandle.Cancel()
return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc()))
return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc(),
"lease acquisition canceled because node is stopping"))
}
}
}()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func (r *Replica) executeWriteBatch(
// manager.
if curLease, _ := r.GetLease(); curLease.Sequence > st.Lease.Sequence {
curLeaseCpy := curLease // avoid letting curLease escape
err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc())
err := newNotLeaseHolderError(&curLeaseCpy, r.store.StoreID(), r.Desc(),
"stale lease discovered before proposing")
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
return nil, g, roachpb.NewError(err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,12 @@ func (e *NotLeaseHolderError) Type() ErrorDetailType {
}

func (e *NotLeaseHolderError) message(_ *Error) string {
const prefix = "[NotLeaseHolderError] "
var buf strings.Builder
buf.WriteString("[NotLeaseHolderError] ")
if e.CustomMsg != "" {
return prefix + e.CustomMsg
buf.WriteString(e.CustomMsg)
buf.WriteString("; ")
}
var buf strings.Builder
buf.WriteString(prefix)
fmt.Fprintf(&buf, "r%d: ", e.RangeID)
if e.Replica != (ReplicaDescriptor{}) {
fmt.Fprintf(&buf, "replica %s not lease holder; ", e.Replica)
Expand Down