Skip to content

Commit

Permalink
batcheval: add BarrierRequest.WithLeaseAppliedIndex
Browse files Browse the repository at this point in the history
This can be used to detect whether a replica has applied the barrier
command yet.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 20, 2024
1 parent 294b124 commit e494351
Show file tree
Hide file tree
Showing 9 changed files with 439 additions and 20 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) {
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
func (b *Batch) barrier(s, e interface{}, withLAI bool) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -1132,6 +1132,7 @@ func (b *Batch) barrier(s, e interface{}) {
Key: begin,
EndKey: end,
},
WithLeaseAppliedIndex: withLAI,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
44 changes: 34 additions & 10 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,23 +889,47 @@ func (db *DB) QueryResolvedTimestamp(
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
b := &Batch{}
b.barrier(begin, end)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
b.barrier(begin, end, false /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return hlc.Timestamp{}, err
}
responses := b.response.Responses
if len(responses) == 0 {
return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier")
if l := len(b.response.Responses); l != 1 {
return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l)
}
resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse)
if !ok {
return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier",
responses[0].GetInner())
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.Timestamp, nil
}

// BarrierWithLAI is like Barrier, but also returns the lease applied index and
// range descriptor at which the barrier was applied. In this case, the barrier
// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned.
//
// NB: the protocol support for this was added in a patch release, and is not
// guaranteed to be present with nodes prior to 24.1. In this case, the request
// will return an empty result.
func (db *DB) BarrierWithLAI(
ctx context.Context, begin, end interface{},
) (kvpb.LeaseAppliedIndex, roachpb.RangeDescriptor, error) {
b := &Batch{}
b.barrier(begin, end, true /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return 0, roachpb.RangeDescriptor{}, err
}
if l := len(b.response.Responses); l != 1 {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l)
}
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.LeaseAppliedIndex, resp.RangeDesc, nil
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ func (r *BarrierResponse) combine(_ context.Context, c combinable, _ *BatchReque
return err
}
r.Timestamp.Forward(otherR.Timestamp)
if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 {
return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex")
}
if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 {
return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc")
}
}
return nil
}
Expand Down Expand Up @@ -1767,7 +1773,13 @@ func (*RangeStatsRequest) flags() flag { return isRead }
func (*QueryResolvedTimestampRequest) flags() flag {
return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot
}
func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone }
func (r *BarrierRequest) flags() flag {
flags := isWrite | isRange | isAlone
if r.WithLeaseAppliedIndex {
flags |= isUnsplittable // the LAI is only valid for a single range
}
return flags
}
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }

// IsParallelCommit returns whether the EndTxn request is attempting to perform
Expand Down
31 changes: 28 additions & 3 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2424,11 +2424,28 @@ message QueryResolvedTimestampResponse {
(gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"];
}

// BarrierRequest is the request for a Barrier operation. This goes through Raft
// and has the purpose of waiting until all conflicting in-flight operations on
// this range have completed, without blocking any new operations.
// BarrierRequest is the request for a Barrier operation. This guarantees that
// all past and ongoing writes to a key span have completed and applied on the
// leaseholder. It does this by waiting for all conflicting write latches and
// then submitting a noop write through Raft, waiting for it to apply. Later
// writes are not affected -- in particular, it does not actually take out a
// latch, so writers don't have to wait for it to complete and can write below
// the barrier.
message BarrierRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

// WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier
// command in the response, allowing the caller to wait for the barrier to
// apply on an arbitrary replica. It also returns the range descriptor, so the
// caller can detect any unexpected range changes.
//
// When enabled, the barrier request can no longer span multiple ranges, and
// will instead return RangeKeyMismatchError. The caller must be prepared to
// handle this.
//
// NB: This field was added in a patch release. Nodes prior to 24.1 are not
// guaranteed to support it, returning a zero LeaseAppliedIndex instead.
bool with_lease_applied_index = 2;
}

// BarrierResponse is the response for a Barrier operation.
Expand All @@ -2438,6 +2455,14 @@ message BarrierResponse {
// Timestamp at which this Barrier was evaluated. Can be used to guarantee
// future operations happen on the same or newer leaseholders.
util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false];

// LeaseAppliedIndex at which this Barrier was applied. Only returned when
// requested via WithLeaseAppliedIndex.
uint64 lease_applied_index = 3 [(gogoproto.casttype) = "LeaseAppliedIndex"];

// RangeDesc at the time the barrier was applied. Only returned when requested
// via WithLeaseAppliedIndex.
RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false];
}

// A RequestUnion contains exactly one of the requests.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ go_test(
size = "medium",
srcs = [
"cmd_add_sstable_test.go",
"cmd_barrier_test.go",
"cmd_clear_range_test.go",
"cmd_delete_range_gchint_test.go",
"cmd_delete_range_test.go",
Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ func declareKeysBarrier(
return nil
}

// Barrier evaluation is a no-op, as all the latch waiting happens in
// the latch manager.
// Barrier evaluation is a no-op, but it still goes through Raft because of
// BatchRequest.RequiresConsensus(). The latch waiting happens in the latch
// manager, and the WithLeaseAppliedIndex info is populated during application.
func Barrier(
_ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.BarrierRequest)
resp := response.(*kvpb.BarrierResponse)
resp.Timestamp = cArgs.EvalCtx.Clock().Now()

return result.Result{}, nil
return result.Result{
Local: result.LocalResult{
PopulateBarrierResponse: args.WithLeaseAppliedIndex,
},
}, nil
}
Loading

0 comments on commit e494351

Please sign in to comment.