Skip to content

Commit

Permalink
Merge pull request #11223 from YoyinZyc/automated-cherry-pick-of-#111…
Browse files Browse the repository at this point in the history
…79-origin-release-3.4

Automated cherry pick of #11179
  • Loading branch information
gyuho authored Oct 9, 2019
2 parents 03b5e72 + 480d551 commit 2c36cab
Show file tree
Hide file tree
Showing 22 changed files with 620 additions and 92 deletions.
3 changes: 2 additions & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/pkg/fileutil"
"go.etcd.io/etcd/pkg/traceutil"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
Expand Down Expand Up @@ -384,7 +385,7 @@ func (s *v3Manager) saveDB() error {
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write()
txn := mvs.Write(traceutil.TODO())
btx := be.BatchTx()
del := func(k, v []byte) error {
txn.DeleteRange(k, nil)
Expand Down
78 changes: 48 additions & 30 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.etcd.io/etcd/pkg/types"

"github.com/gogo/protobuf/proto"
Expand All @@ -43,17 +44,18 @@ type applyResult struct {
// to being logically reflected by the node. Currently only used for
// Compaction requests.
physc <-chan struct{}
trace *traceutil.Trace
}

// applierV3 is the interface for processing V3 raft messages
type applierV3 interface {
Apply(r *pb.InternalRaftRequest) *applyResult

Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
Expand Down Expand Up @@ -119,15 +121,15 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
case r.Range != nil:
ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
case r.Put != nil:
ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put)
case r.DeleteRange != nil:
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
case r.Txn != nil:
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
case r.Compaction != nil:
ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction)
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
case r.LeaseGrant != nil:
ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
case r.LeaseRevoke != nil:
Expand Down Expand Up @@ -174,32 +176,39 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
return ar
}

func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}

trace = traceutil.New("put",
a.s.getLogger(),
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
)
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
if l := a.s.lessor.Lookup(leaseID); l == nil {
return nil, lease.ErrLeaseNotFound
return nil, nil, lease.ErrLeaseNotFound
}
}
txn = a.s.KV().Write()
txn = a.s.KV().Write(trace)
defer txn.End()
}

var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.DisableStep()
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
return nil, nil, err
}
trace.EnableStep()
trace.Step("get previous kv pair")
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, ErrKeyNotFound
return nil, nil, ErrKeyNotFound
}
}
if p.IgnoreValue {
Expand All @@ -215,7 +224,8 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
}

resp.Header.Revision = txn.Put(p.Key, val, leaseID)
return resp, nil
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp, trace, nil
}

func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
Expand All @@ -224,7 +234,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
end := mkGteRange(dr.RangeEnd)

if txn == nil {
txn = a.s.kv.Write()
txn = a.s.kv.Write(traceutil.TODO())
defer txn.End()
}

Expand All @@ -245,12 +255,14 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
return resp, nil
}

func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)

resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

if txn == nil {
txn = a.s.kv.Read()
txn = a.s.kv.Read(trace)
defer txn.End()
}

Expand Down Expand Up @@ -327,7 +339,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}

trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
Expand All @@ -337,12 +349,13 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
isWrite := !isTxnReadonly(rt)
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(traceutil.TODO()))

txnPath := compareToPath(txn, rt)
if isWrite {
Expand All @@ -364,7 +377,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
// be the revision of the write txn.
if isWrite {
txn.End()
txn = a.s.KV().Write()
txn = a.s.KV().Write(traceutil.TODO())
}
a.applyTxn(txn, rt, txnPath, txnResp)
rev := txn.Rev()
Expand Down Expand Up @@ -516,7 +529,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
resp, err := a.Range(txn, tv.RequestRange)
resp, err := a.Range(context.TODO(), txn, tv.RequestRange)
if err != nil {
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
Expand All @@ -526,7 +539,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
case *pb.RequestOp_RequestPut:
resp, err := a.Put(txn, tv.RequestPut)
resp, _, err := a.Put(txn, tv.RequestPut)
if err != nil {
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
Expand Down Expand Up @@ -557,17 +570,22 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
return txns
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
resp := &pb.CompactionResponse{}
resp.Header = &pb.ResponseHeader{}
ch, err := a.s.KV().Compact(compaction.Revision)
trace := traceutil.New("compact",
a.s.getLogger(),
traceutil.Field{Key: "revision", Value: compaction.Revision},
)

ch, err := a.s.KV().Compact(trace, compaction.Revision)
if err != nil {
return nil, ch, err
return nil, ch, nil, err
}
// get the current revision. which key to get is not important.
rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{})
resp.Header.Revision = rr.Rev
return resp, ch, err
return resp, ch, trace, err
}

func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
Expand Down Expand Up @@ -674,8 +692,8 @@ type applierV3Capped struct {
// with Puts so that the number of keys in the store is capped.
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }

func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
return nil, ErrNoSpace
func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, ErrNoSpace
}

func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
Expand Down Expand Up @@ -824,13 +842,13 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
return &quotaApplierV3{app, NewBackendQuota(s, "v3-applier")}
}

func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
ok := a.q.Available(p)
resp, err := a.applierV3.Put(txn, p)
resp, trace, err := a.applierV3.Put(txn, p)
if err == nil && !ok {
err = ErrNoSpace
}
return resp, err
return resp, trace, err
}

func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
Expand Down
14 changes: 8 additions & 6 deletions etcdserver/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package etcdserver

import (
"context"
"sync"

"go.etcd.io/etcd/auth"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/pkg/traceutil"
)

type authApplierV3 struct {
Expand Down Expand Up @@ -61,33 +63,33 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
return ret
}

func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) {
func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
return nil, err
return nil, nil, err
}

if err := aa.checkLeasePuts(lease.LeaseID(r.Lease)); err != nil {
// The specified lease is already attached with a key that cannot
// be written by this user. It means the user cannot revoke the
// lease so attaching the lease to the newly written key should
// be forbidden.
return nil, err
return nil, nil, err
}

if r.PrevKv {
err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, nil)
if err != nil {
return nil, err
return nil, nil, err
}
}
return aa.applierV3.Put(txn, r)
}

func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
return nil, err
}
return aa.applierV3.Range(txn, r)
return aa.applierV3.Range(ctx, txn, r)
}

func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
Expand Down
11 changes: 6 additions & 5 deletions etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/pkg/traceutil"
"go.etcd.io/etcd/pkg/types"

"go.uber.org/zap"
Expand Down Expand Up @@ -382,11 +383,11 @@ type applierV3Corrupt struct {

func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }

func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
return nil, ErrCorrupt
func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, ErrCorrupt
}

func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
return nil, ErrCorrupt
}

Expand All @@ -398,8 +399,8 @@ func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
return nil, ErrCorrupt
}

func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) {
return nil, nil, ErrCorrupt
func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
return nil, nil, nil, ErrCorrupt
}

func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
Expand Down
3 changes: 2 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"go.etcd.io/etcd/pkg/pbutil"
"go.etcd.io/etcd/pkg/runtime"
"go.etcd.io/etcd/pkg/schedule"
"go.etcd.io/etcd/pkg/traceutil"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/pkg/wait"
"go.etcd.io/etcd/raft"
Expand Down Expand Up @@ -1178,7 +1179,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
plog.Info("recovering lessor...")
}

s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })

if lg != nil {
lg.Info("restored lease store")
Expand Down
Loading

0 comments on commit 2c36cab

Please sign in to comment.