Skip to content

Commit

Permalink
Reuse checkPut in Put
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Dec 6, 2024
1 parent 7bb5596 commit c47203d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 51 deletions.
98 changes: 47 additions & 51 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,57 +43,52 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p
)
ctx = context.WithValue(ctx, traceutil.TraceKey{}, trace)
}
leaseID := lease.LeaseID(p.Lease)
if leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil {
return nil, nil, lease.ErrLeaseNotFound
}
}
txnWrite := kv.Write(trace)
defer txnWrite.End()
resp, err = put(ctx, txnWrite, p)
resp, err = put(ctx, txnWrite, lessor, p)
return resp, trace, err
}

func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
func put(ctx context.Context, txnWrite mvcc.TxnWrite, lessor lease.Lessor, req *pb.PutRequest) (resp *pb.PutResponse, err error) {
trace := traceutil.Get(ctx)
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
val, leaseID := p.Value, lease.LeaseID(p.Lease)

var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.StepWithFunction(func() {
rr, err = txnWrite.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
}, "get previous kv pair")

if err != nil {
return nil, err
}
prevKV, err := prevKVIfNeeded(ctx, txnWrite, req)
if err != nil {
return nil, err
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, errors.ErrKeyNotFound
}
val, leaseID := req.Value, lease.LeaseID(req.Lease)
if req.IgnoreValue {
val = prevKV.Value
}
if p.IgnoreValue {
val = rr.KVs[0].Value
if req.IgnoreLease {
leaseID = lease.LeaseID(prevKV.Lease)
}
if p.IgnoreLease {
leaseID = lease.LeaseID(rr.KVs[0].Lease)
err = checkPut(lessor, req, prevKV)
if err != nil {
return nil, err
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
if req.PrevKv {
resp.PrevKv = prevKV
}

resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID)
resp.Header.Revision = txnWrite.Put(req.Key, val, leaseID)
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp, nil
}

func prevKVIfNeeded(ctx context.Context, rv mvcc.ReadView, req *pb.PutRequest) (*mvccpb.KeyValue, error) {
if req.IgnoreValue || req.IgnoreLease || req.PrevKv {
resp, err := rv.Range(ctx, req.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
if resp != nil && len(resp.KVs) != 0 {
return &resp.KVs[0], nil
}
}
return nil, nil
}

func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
trace = traceutil.Get(ctx)
// create delete tracing if the trace in context is empty
Expand Down Expand Up @@ -292,7 +287,7 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
} else {
txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead)
}
txnResp, err := txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
txnResp, err := txn(ctx, lg, txnWrite, lessor, rt, isWrite, txnPath)
txnWrite.End()

trace.AddField(
Expand All @@ -302,9 +297,9 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
return txnResp, trace, err
}

func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, lessor lease.Lessor, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
txnResp, _ := newTxnResp(rt, txnPath)
_, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp)
_, err := executeTxn(ctx, lg, txnWrite, lessor, rt, txnPath, txnResp)
if err != nil {
if isWrite {
// CAUTION: When a txn performing write operations starts, we always expect it to be successful.
Expand Down Expand Up @@ -356,7 +351,7 @@ func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txn
return txnResp, txnCount
}

func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, lessor lease.Lessor, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
trace := traceutil.Get(ctx)
reqs := rt.Success
if !txnPath[0] {
Expand All @@ -382,7 +377,7 @@ func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt
traceutil.Field{Key: "req_type", Value: "put"},
traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
resp, err := put(ctx, txnWrite, tv.RequestPut)
resp, err := put(ctx, txnWrite, lessor, tv.RequestPut)
if err != nil {
return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
}
Expand All @@ -396,7 +391,7 @@ func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
applyTxns, err := executeTxn(ctx, lg, txnWrite, tv.RequestTxn, txnPath[1:], resp)
applyTxns, err := executeTxn(ctx, lg, txnWrite, lessor, tv.RequestTxn, txnPath[1:], resp)
if err != nil {
// don't wrap the error. It's a recursive call and err should be already wrapped
return 0, err
Expand All @@ -410,19 +405,16 @@ func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt
return txns, nil
}

func checkPut(rv mvcc.ReadView, lessor lease.Lessor, req *pb.PutRequest) error {
if req.IgnoreValue || req.IgnoreLease {
// expects previous key-value, error if not exist
rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{})
if err != nil {
return err
}
if rr == nil || len(rr.KVs) == 0 {
func checkPut(lessor lease.Lessor, req *pb.PutRequest, prevKV *mvccpb.KeyValue) error {
if req.IgnoreValue || req.IgnoreLease || req.PrevKv {
if (req.IgnoreValue || req.IgnoreLease) && prevKV == nil {
// ignore_{lease,value} flag expects previous key-value pair
return errors.ErrKeyNotFound
}
}
if lease.LeaseID(req.Lease) != lease.NoLease {
if l := lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
leaseID := lease.LeaseID(req.Lease)
if !req.IgnoreLease && leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil {
return lease.ErrLeaseNotFound
}
}
Expand Down Expand Up @@ -454,7 +446,11 @@ func checkTxn(rv mvcc.ReadView, rt *pb.TxnRequest, lessor lease.Lessor, txnPath
case *pb.RequestOp_RequestRange:
err = checkRange(rv, tv.RequestRange)
case *pb.RequestOp_RequestPut:
err = checkPut(rv, lessor, tv.RequestPut)
prevKV, err := prevKVIfNeeded(context.TODO(), rv, tv.RequestPut)
if err != nil {
return 0, err
}
err = checkPut(lessor, tv.RequestPut, prevKV)
case *pb.RequestOp_RequestDeleteRange:
case *pb.RequestOp_RequestTxn:
txns, err = checkTxn(rv, tv.RequestTxn, lessor, txnPath[1:])
Expand Down
10 changes: 10 additions & 0 deletions server/etcdserver/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ var putTestCases = []testCase{
},
},
},
{
name: "Put withPrevKV should succeed",
op: &pb.RequestOp{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
PrevKv: true,
},
},
},
},
{
name: "Put with non-existing lease should fail",
op: &pb.RequestOp{
Expand Down

0 comments on commit c47203d

Please sign in to comment.