From 28bb8037d906cd51b0618f4d6a837a016d2dffad Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Mon, 9 Sep 2019 15:38:03 -0700 Subject: [PATCH 1/6] pkg: create package traceutil for tracing. mvcc: add tracing steps:range from the in-memory index tree; range from boltdb. etcdserver: add tracing steps: agreement among raft nodes before linerized reading; authentication; filter and sort kv pairs; assemble the response. --- etcdserver/apply.go | 23 +++++++++----- etcdserver/apply_auth.go | 5 ++-- etcdserver/corrupt.go | 2 +- etcdserver/util.go | 14 +++++---- etcdserver/v3_server.go | 13 ++++++-- mvcc/kv.go | 3 +- mvcc/kv_test.go | 2 +- mvcc/kv_view.go | 6 ++-- mvcc/kvstore_test.go | 6 ++-- mvcc/kvstore_txn.go | 15 ++++++++-- pkg/traceutil/trace.go | 60 +++++++++++++++++++++++++++++++++++++ pkg/traceutil/trace_test.go | 28 +++++++++++++++++ 12 files changed, 149 insertions(+), 28 deletions(-) create mode 100644 pkg/traceutil/trace.go create mode 100644 pkg/traceutil/trace_test.go diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 1f06ad0dd67..f5b3c24b03c 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "github.com/gogo/protobuf/proto" @@ -50,7 +51,7 @@ type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) - Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) + Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) @@ -119,7 +120,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: - ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range) + ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) case r.Put != nil: ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put) case r.DeleteRange != nil: @@ -245,12 +246,18 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ return resp, nil } -func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { + trace, ok := ctx.Value("trace").(*traceutil.Trace) + if !ok || trace == nil { + trace = traceutil.New("Apply Range") + ctx = context.WithValue(ctx, "trace", trace) + } + resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} if txn == nil { - txn = a.s.kv.Read() + txn = a.s.kv.Read(trace) defer txn.End() } @@ -327,7 +334,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang rr.KVs = rr.KVs[:r.Limit] resp.More = true } - + trace.Step("Filter and sort the key-value pairs.") resp.Header.Revision = rr.Rev resp.Count = int64(rr.Count) resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) @@ -337,12 +344,14 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang } resp.Kvs[i] = &rr.KVs[i] } + trace.Step("Assemble the response.") return resp, nil } func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { isWrite := !isTxnReadonly(rt) - txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read()) + trace := traceutil.New("ReadOnlyTxn") + txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace)) txnPath := compareToPath(txn, rt) if isWrite { @@ -516,7 +525,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat respi := tresp.Responses[i].Response switch tv := req.Request.(type) { case *pb.RequestOp_RequestRange: - resp, err := a.Range(txn, tv.RequestRange) + resp, err := a.Range(context.TODO(), txn, tv.RequestRange) if err != nil { if lg != nil { lg.Panic("unexpected error during txn", zap.Error(err)) diff --git a/etcdserver/apply_auth.go b/etcdserver/apply_auth.go index 4b094ad5d8d..c31644b3d19 100644 --- a/etcdserver/apply_auth.go +++ b/etcdserver/apply_auth.go @@ -15,6 +15,7 @@ package etcdserver import ( + "context" "sync" "go.etcd.io/etcd/auth" @@ -83,11 +84,11 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon return aa.applierV3.Put(txn, r) } -func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } - return aa.applierV3.Range(txn, r) + return aa.applierV3.Range(ctx, txn, r) } func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 32678a7c512..0f9a4053f04 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -386,7 +386,7 @@ func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResp return nil, ErrCorrupt } -func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { return nil, ErrCorrupt } diff --git a/etcdserver/util.go b/etcdserver/util.go index fe5024ef00d..4db42d0651e 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/rafthttp" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -108,7 +109,7 @@ func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Strin if !isNil(respMsg) { resp = fmt.Sprintf("size:%d", proto.Size(respMsg)) } - warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err) + warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "", resp, err) } func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { @@ -126,18 +127,18 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR } resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse)) } - warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) + warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "read-only range ", resp, err) } -func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { +func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { var resp string if !isNil(rangeResponse) { resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse)) } - warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) + warnOfExpensiveGenericRequest(lg, trace, now, reqStringer, "read-only range ", resp, err) } -func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { +func warnOfExpensiveGenericRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { d := time.Since(now) if d > warnApplyDuration { if lg != nil { @@ -159,6 +160,9 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fm } plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d) } + if trace != nil { + trace.Log(lg) + } slowApplies.Inc() } } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index b2084618b8a..efe3bfe3540 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/lease/leasehttp" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/raft" "github.com/gogo/protobuf/proto" @@ -85,14 +86,18 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + trace := traceutil.New("Range") + ctx = context.WithValue(ctx, "trace", trace) + var resp *pb.RangeResponse var err error defer func(start time.Time) { - warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err) + warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), trace, start, r, resp, err) }(time.Now()) if !r.Serializable { err = s.linearizableReadNotify(ctx) + trace.Step("Agreement among raft nodes before linearized reading.") if err != nil { return nil, err } @@ -101,7 +106,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = s.applyV3Base.Range(nil, r) } + get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { err = serr return nil, err @@ -558,6 +563,10 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e if err = chk(ai); err != nil { return err } + + if trace, ok := ctx.Value("trace").(*traceutil.Trace); ok && trace != nil { + trace.Step("Authentication.") + } // fetch response for serialized request get() // check for stale token revision in case the auth store was updated while diff --git a/mvcc/kv.go b/mvcc/kv.go index 8e898a5ad3d..065b9079989 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -18,6 +18,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" ) type RangeOptions struct { @@ -102,7 +103,7 @@ type KV interface { WriteView // Read creates a read transaction. - Read() TxnRead + Read(trace *traceutil.Trace) TxnRead // Write creates a write transaction. Write() TxnWrite diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 012537a4e4c..673cfba74e6 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -47,7 +47,7 @@ var ( return kv.Range(key, end, ro) } txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { - txn := kv.Read() + txn := kv.Read(nil) defer txn.End() return txn.Range(key, end, ro) } diff --git a/mvcc/kv_view.go b/mvcc/kv_view.go index bd2e77729ff..56070d18ff0 100644 --- a/mvcc/kv_view.go +++ b/mvcc/kv_view.go @@ -19,19 +19,19 @@ import "go.etcd.io/etcd/lease" type readView struct{ kv KV } func (rv *readView) FirstRev() int64 { - tr := rv.kv.Read() + tr := rv.kv.Read(nil) defer tr.End() return tr.FirstRev() } func (rv *readView) Rev() int64 { - tr := rv.kv.Read() + tr := rv.kv.Read(nil) defer tr.End() return tr.Rev() } func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - tr := rv.kv.Read() + tr := rv.kv.Read(nil) defer tr.End() return tr.Range(key, end, ro) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index cac11e1f825..cde1954de44 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { s.Put([]byte("foo"), []byte("bar"), lease.NoLease) // readTx simulates a long read request - readTx1 := s.Read() + readTx1 := s.Read(nil) // write should not be blocked by reads done := make(chan struct{}) @@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { } // readTx2 simulates a short read request - readTx2 := s.Read() + readTx2 := s.Read(nil) ro := RangeOptions{Limit: 1, Rev: 0, Count: false} ret, err := readTx2.Range([]byte("foo"), nil, ro) if err != nil { @@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { mu.Lock() wKVs := make(kvs, len(committedKVs)) copy(wKVs, committedKVs) - tx := s.Read() + tx := s.Read(nil) mu.Unlock() // get all keys in backend store, and compare with wKVs ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 9698254644d..c5e5c973e84 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -18,6 +18,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -27,9 +28,11 @@ type storeTxnRead struct { firstRev int64 rev int64 + + trace *traceutil.Trace } -func (s *store) Read() TxnRead { +func (s *store) Read(trace *traceutil.Trace) TxnRead { s.mu.RLock() s.revMu.RLock() // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After @@ -38,7 +41,7 @@ func (s *store) Read() TxnRead { tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created. firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() - return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) + return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace}) } func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev } @@ -66,7 +69,7 @@ func (s *store) Write() TxnWrite { tx := s.b.BatchTx() tx.Lock() tw := &storeTxnWrite{ - storeTxnRead: storeTxnRead{s, tx, 0, 0}, + storeTxnRead: storeTxnRead{s, tx, 0, 0, nil}, tx: tx, beginRev: s.currentRev, changes: make([]mvccpb.KeyValue, 0, 4), @@ -124,6 +127,9 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } revpairs := tr.s.kvindex.Revisions(key, end, rev) + if tr.trace != nil { + tr.trace.Step("Range keys from in-memory index tree.") + } if len(revpairs) == 0 { return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil } @@ -163,6 +169,9 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } } } + if tr.trace != nil { + tr.trace.Step("Range keys from bolt db.") + } return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil } diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go new file mode 100644 index 00000000000..e21f6ef090a --- /dev/null +++ b/pkg/traceutil/trace.go @@ -0,0 +1,60 @@ +package traceutil + +import ( + "bytes" + "fmt" + "time" + + "github.com/coreos/pkg/capnslog" + "go.uber.org/zap" +) + +var ( + plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "trace") +) + +type Trace struct { + operation string + startTime time.Time + steps []step +} + +type step struct { + time time.Time + msg string +} + +func New(op string) *Trace { + return &Trace{operation: op, startTime: time.Now()} +} + +func (t *Trace) Step(msg string) { + t.steps = append(t.steps, step{time: time.Now(), msg: msg}) +} + +// Dump all steps in the Trace +func (t *Trace) Log(lg *zap.Logger) { + + var buf bytes.Buffer + + buf.WriteString(fmt.Sprintf("The tracing of %v request:\n", t.operation)) + + buf.WriteString("Request started at:") + buf.WriteString(t.startTime.Format("2006-01-02 15:04:05")) + buf.WriteString(fmt.Sprintf(".%06d", t.startTime.Nanosecond()/1000)) + buf.WriteString("\n") + lastStepTime := t.startTime + for i, step := range t.steps { + buf.WriteString(fmt.Sprintf("Step %d: %v Time cost: %v\n", i, step.msg, step.time.Sub(lastStepTime))) + //fmt.Println(step.msg, " costs: ", step.time.Sub(lastStepTime)) + lastStepTime = step.time + } + buf.WriteString("Trace End\n") + + s := buf.String() + if lg != nil { + lg.Info(s) + } else { + plog.Info(s) + } +} diff --git a/pkg/traceutil/trace_test.go b/pkg/traceutil/trace_test.go new file mode 100644 index 00000000000..2517c546392 --- /dev/null +++ b/pkg/traceutil/trace_test.go @@ -0,0 +1,28 @@ +package traceutil + +import ( + "testing" +) + +func TestTrace(t *testing.T) { + var ( + op = "Test" + steps = []string{"Step1, Step2"} + ) + + trace := New(op) + if trace.operation != op { + t.Errorf("Expected %v, got %v\n", op, trace.operation) + } + + for _, v := range steps { + trace.Step(v) + trace.Step(v) + } + + for i, v := range steps { + if v != trace.steps[i].msg { + t.Errorf("Expected %v, got %v\n.", v, trace.steps[i].msg) + } + } +} From 4f1bbff88803732997c6ba1bcf014cf04864300d Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Wed, 18 Sep 2019 12:46:34 -0700 Subject: [PATCH 2/6] pkg: add field to record additional detail of trace; add stepThreshold to reduce log volume. --- etcdserver/apply.go | 12 +- etcdserver/util.go | 14 +- etcdserver/v3_server.go | 19 ++- mvcc/kv_test.go | 3 +- mvcc/kv_view.go | 11 +- mvcc/kvstore_test.go | 7 +- mvcc/kvstore_txn.go | 10 +- pkg/traceutil/trace.go | 117 ++++++++++--- pkg/traceutil/trace_test.go | 319 ++++++++++++++++++++++++++++++++++-- 9 files changed, 443 insertions(+), 69 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index f5b3c24b03c..7ce4ffd830a 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -34,7 +34,8 @@ import ( ) const ( - warnApplyDuration = 100 * time.Millisecond + warnApplyDuration = 100 * time.Millisecond + rangeTraceThreshold = 100 * time.Millisecond ) type applyResult struct { @@ -247,11 +248,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ } func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { - trace, ok := ctx.Value("trace").(*traceutil.Trace) - if !ok || trace == nil { - trace = traceutil.New("Apply Range") - ctx = context.WithValue(ctx, "trace", trace) - } + trace := traceutil.Get(ctx) resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} @@ -350,8 +347,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { isWrite := !isTxnReadonly(rt) - trace := traceutil.New("ReadOnlyTxn") - txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace)) + txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(traceutil.TODO())) txnPath := compareToPath(txn, rt) if isWrite { diff --git a/etcdserver/util.go b/etcdserver/util.go index 4db42d0651e..fe5024ef00d 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -24,7 +24,6 @@ import ( "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/rafthttp" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" - "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -109,7 +108,7 @@ func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Strin if !isNil(respMsg) { resp = fmt.Sprintf("size:%d", proto.Size(respMsg)) } - warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "", resp, err) + warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err) } func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { @@ -127,18 +126,18 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR } resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse)) } - warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "read-only range ", resp, err) + warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) } -func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { +func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { var resp string if !isNil(rangeResponse) { resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse)) } - warnOfExpensiveGenericRequest(lg, trace, now, reqStringer, "read-only range ", resp, err) + warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) } -func warnOfExpensiveGenericRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { +func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { d := time.Since(now) if d > warnApplyDuration { if lg != nil { @@ -160,9 +159,6 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, trace *traceutil.Trace, now t } plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d) } - if trace != nil { - trace.Log(lg) - } slowApplies.Inc() } } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index efe3bfe3540..6f10f87b9b8 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -86,13 +86,23 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { - trace := traceutil.New("Range") + trace := traceutil.New("Range", + traceutil.Field{Key: "RangeBegin", Value: string(r.Key)}, + traceutil.Field{Key: "RangeEnd", Value: string(r.RangeEnd)}, + ) ctx = context.WithValue(ctx, "trace", trace) var resp *pb.RangeResponse var err error defer func(start time.Time) { - warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), trace, start, r, resp, err) + warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err) + if resp != nil { + trace.AddField( + traceutil.Field{Key: "ResponseCount", Value: len(resp.Kvs)}, + traceutil.Field{Key: "ResponseRevision", Value: resp.Header.Revision}, + ) + } + trace.LogIfLong(rangeTraceThreshold, s.getLogger()) }(time.Now()) if !r.Serializable { @@ -564,9 +574,8 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e return err } - if trace, ok := ctx.Value("trace").(*traceutil.Trace); ok && trace != nil { - trace.Step("Authentication.") - } + trace := traceutil.Get(ctx) + trace.Step("Authentication.") // fetch response for serialized request get() // check for stale token revision in case the auth store was updated while diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 673cfba74e6..6c72d0879c7 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -25,6 +25,7 @@ import ( "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/pkg/traceutil" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -47,7 +48,7 @@ var ( return kv.Range(key, end, ro) } txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { - txn := kv.Read(nil) + txn := kv.Read(traceutil.TODO()) defer txn.End() return txn.Range(key, end, ro) } diff --git a/mvcc/kv_view.go b/mvcc/kv_view.go index 56070d18ff0..9750fd764f5 100644 --- a/mvcc/kv_view.go +++ b/mvcc/kv_view.go @@ -14,24 +14,27 @@ package mvcc -import "go.etcd.io/etcd/lease" +import ( + "go.etcd.io/etcd/lease" + "go.etcd.io/etcd/pkg/traceutil" +) type readView struct{ kv KV } func (rv *readView) FirstRev() int64 { - tr := rv.kv.Read(nil) + tr := rv.kv.Read(traceutil.TODO()) defer tr.End() return tr.FirstRev() } func (rv *readView) Rev() int64 { - tr := rv.kv.Read(nil) + tr := rv.kv.Read(traceutil.TODO()) defer tr.End() return tr.Rev() } func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - tr := rv.kv.Read(nil) + tr := rv.kv.Read(traceutil.TODO()) defer tr.End() return tr.Range(key, end, ro) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index cde1954de44..c4180c963ad 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -34,6 +34,7 @@ import ( "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/schedule" "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -658,7 +659,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { s.Put([]byte("foo"), []byte("bar"), lease.NoLease) // readTx simulates a long read request - readTx1 := s.Read(nil) + readTx1 := s.Read(traceutil.TODO()) // write should not be blocked by reads done := make(chan struct{}) @@ -673,7 +674,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { } // readTx2 simulates a short read request - readTx2 := s.Read(nil) + readTx2 := s.Read(traceutil.TODO()) ro := RangeOptions{Limit: 1, Rev: 0, Count: false} ret, err := readTx2.Range([]byte("foo"), nil, ro) if err != nil { @@ -756,7 +757,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { mu.Lock() wKVs := make(kvs, len(committedKVs)) copy(wKVs, committedKVs) - tx := s.Read(nil) + tx := s.Read(traceutil.TODO()) mu.Unlock() // get all keys in backend store, and compare with wKVs ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index c5e5c973e84..ee9651ff6e7 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -69,7 +69,7 @@ func (s *store) Write() TxnWrite { tx := s.b.BatchTx() tx.Lock() tw := &storeTxnWrite{ - storeTxnRead: storeTxnRead{s, tx, 0, 0, nil}, + storeTxnRead: storeTxnRead{s, tx, 0, 0, traceutil.TODO()}, tx: tx, beginRev: s.currentRev, changes: make([]mvccpb.KeyValue, 0, 4), @@ -127,9 +127,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } revpairs := tr.s.kvindex.Revisions(key, end, rev) - if tr.trace != nil { - tr.trace.Step("Range keys from in-memory index tree.") - } + tr.trace.Step("Range keys from in-memory index tree.") if len(revpairs) == 0 { return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil } @@ -169,9 +167,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } } } - if tr.trace != nil { - tr.trace.Step("Range keys from bolt db.") - } + tr.trace.Step("Range keys from bolt db.") return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil } diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index e21f6ef090a..1d2e26c4239 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -2,7 +2,9 @@ package traceutil import ( "bytes" + "context" "fmt" + "math/rand" "time" "github.com/coreos/pkg/capnslog" @@ -13,48 +15,121 @@ var ( plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "trace") ) +// Field is a kv pair to record additional details of the trace. +type Field struct { + Key string + Value interface{} +} + +func (f *Field) format() string { + return fmt.Sprintf("%s:%v; ", f.Key, f.Value) +} + +func writeFields(fields []Field) string { + if len(fields) == 0 { + return "" + } + var buf bytes.Buffer + buf.WriteString("{") + for _, f := range fields { + buf.WriteString(f.format()) + } + buf.WriteString("}") + return buf.String() +} + type Trace struct { operation string + fields []Field startTime time.Time steps []step } type step struct { - time time.Time - msg string + time time.Time + msg string + fields []Field } -func New(op string) *Trace { - return &Trace{operation: op, startTime: time.Now()} +func New(op string, fields ...Field) *Trace { + return &Trace{operation: op, startTime: time.Now(), fields: fields} } -func (t *Trace) Step(msg string) { - t.steps = append(t.steps, step{time: time.Now(), msg: msg}) +// traceutil.TODO() returns a non-nil, empty Trace +func TODO() *Trace { + return &Trace{} } -// Dump all steps in the Trace -func (t *Trace) Log(lg *zap.Logger) { +func Get(ctx context.Context) *Trace { + if trace, ok := ctx.Value("trace").(*Trace); ok && trace != nil { + return trace + } + return TODO() +} - var buf bytes.Buffer +func GetOrCreate(ctx context.Context, op string, fields ...Field) (context.Context, *Trace) { + trace, ok := ctx.Value("trace").(*Trace) + if !ok || trace == nil { + trace = New(op) + trace.fields = fields + ctx = context.WithValue(ctx, "trace", trace) + } + return ctx, trace +} - buf.WriteString(fmt.Sprintf("The tracing of %v request:\n", t.operation)) +func (t *Trace) Step(msg string, fields ...Field) { + t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) +} - buf.WriteString("Request started at:") - buf.WriteString(t.startTime.Format("2006-01-02 15:04:05")) - buf.WriteString(fmt.Sprintf(".%06d", t.startTime.Nanosecond()/1000)) - buf.WriteString("\n") - lastStepTime := t.startTime - for i, step := range t.steps { - buf.WriteString(fmt.Sprintf("Step %d: %v Time cost: %v\n", i, step.msg, step.time.Sub(lastStepTime))) - //fmt.Println(step.msg, " costs: ", step.time.Sub(lastStepTime)) - lastStepTime = step.time +func (t *Trace) AddField(fields ...Field) { + for _, f := range fields { + t.fields = append(t.fields, f) + } +} + +// Log dumps all steps in the Trace +func (t *Trace) Log(lg *zap.Logger) { + t.LogWithStepThreshold(0, lg) +} + +// LogIfLong dumps logs if the duration is longer than threshold +func (t *Trace) LogIfLong(threshold time.Duration, lg *zap.Logger) { + if time.Since(t.startTime) > threshold { + stepThreshold := threshold / time.Duration(len(t.steps)+1) + t.LogWithStepThreshold(stepThreshold, lg) } - buf.WriteString("Trace End\n") +} - s := buf.String() +// LogWithStepThreshold only dumps step whose duration is longer than step threshold +func (t *Trace) LogWithStepThreshold(threshold time.Duration, lg *zap.Logger) { + s := t.format(threshold) if lg != nil { lg.Info(s) } else { plog.Info(s) } } + +func (t *Trace) format(threshold time.Duration) string { + endTime := time.Now() + totalDuration := endTime.Sub(t.startTime) + var buf bytes.Buffer + traceNum := rand.Int31() + + buf.WriteString(fmt.Sprintf("Trace[%d] \"%v\" %s (duration: %v, start: %v)\n", + traceNum, t.operation, writeFields(t.fields), totalDuration, + t.startTime.Format("2006-01-02 15:04:05.000"))) + lastStepTime := t.startTime + for _, step := range t.steps { + stepDuration := step.time.Sub(lastStepTime) + if stepDuration > threshold { + buf.WriteString(fmt.Sprintf("Trace[%d] Step \"%v\" %s (duration: %v)\n", + traceNum, step.msg, writeFields(step.fields), stepDuration)) + } + lastStepTime = step.time + } + buf.WriteString(fmt.Sprintf("Trace[%d] End %v\n", traceNum, + endTime.Format("2006-01-02 15:04:05.000"))) + + return buf.String() +} diff --git a/pkg/traceutil/trace_test.go b/pkg/traceutil/trace_test.go index 2517c546392..3e6da09d503 100644 --- a/pkg/traceutil/trace_test.go +++ b/pkg/traceutil/trace_test.go @@ -1,28 +1,325 @@ package traceutil import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "strings" "testing" + "time" + + "go.uber.org/zap" ) -func TestTrace(t *testing.T) { +func TestGet(t *testing.T) { + traceForTest := &Trace{operation: "test"} + tests := []struct { + name string + inputCtx context.Context + outputTrace *Trace + }{ + { + name: "When the context does not have trace", + inputCtx: context.TODO(), + outputTrace: TODO(), + }, + { + name: "When the context has trace", + inputCtx: context.WithValue(context.Background(), "trace", traceForTest), + outputTrace: traceForTest, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + trace := Get(tt.inputCtx) + if trace == nil { + t.Errorf("Expected %v; Got nil\n", tt.outputTrace) + } + if trace.operation != tt.outputTrace.operation { + t.Errorf("Expected %v; Got %v\n", tt.outputTrace, trace) + } + }) + } +} + +func TestGetOrCreate(t *testing.T) { + tests := []struct { + name string + inputCtx context.Context + outputTraceOp string + }{ + { + name: "When the context does not have trace", + inputCtx: context.TODO(), + outputTraceOp: "test", + }, + { + name: "When the context has trace", + inputCtx: context.WithValue(context.Background(), "trace", &Trace{operation: "test"}), + outputTraceOp: "test", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, trace := GetOrCreate(tt.inputCtx, "test") + if trace == nil { + t.Errorf("Expected trace object; Got nil\n") + } else if trace.operation != tt.outputTraceOp { + t.Errorf("Expected %v; Got %v\n", tt.outputTraceOp, trace.operation) + } + if ctx.Value("trace") == nil { + t.Errorf("Expected context has attached trace; Got nil\n") + } + }) + } +} + +func TestCreate(t *testing.T) { var ( - op = "Test" - steps = []string{"Step1, Step2"} + op = "Test" + steps = []string{"Step1, Step2"} + fields = []Field{ + {"traceKey1", "traceValue1"}, + {"traceKey2", "traceValue2"}, + } + stepFields = []Field{ + {"stepKey1", "stepValue2"}, + {"stepKey2", "stepValue2"}, + } ) - trace := New(op) + trace := New(op, fields[0], fields[1]) if trace.operation != op { - t.Errorf("Expected %v, got %v\n", op, trace.operation) + t.Errorf("Expected %v; Got %v\n", op, trace.operation) } - - for _, v := range steps { - trace.Step(v) - trace.Step(v) + for i, f := range trace.fields { + if f.Key != fields[i].Key { + t.Errorf("Expected %v; Got %v\n", fields[i].Key, f.Key) + } + if f.Value != fields[i].Value { + t.Errorf("Expected %v; Got %v\n", fields[i].Value, f.Value) + } } for i, v := range steps { - if v != trace.steps[i].msg { - t.Errorf("Expected %v, got %v\n.", v, trace.steps[i].msg) + trace.Step(v, stepFields[i]) + } + + for i, v := range trace.steps { + if steps[i] != v.msg { + t.Errorf("Expected %v, got %v\n.", steps[i], v.msg) + } + if stepFields[i].Key != v.fields[0].Key { + t.Errorf("Expected %v; Got %v\n", stepFields[i].Key, v.fields[0].Key) + } + if stepFields[i].Value != v.fields[0].Value { + t.Errorf("Expected %v; Got %v\n", stepFields[i].Value, v.fields[0].Value) + } + } +} + +func TestLog(t *testing.T) { + test := struct { + name string + trace *Trace + expectedMsg []string + }{ + name: "When dump all logs", + trace: &Trace{ + operation: "Test", + startTime: time.Now().Add(-100 * time.Millisecond), + steps: []step{ + {time: time.Now().Add(-80 * time.Millisecond), msg: "msg1"}, + {time: time.Now().Add(-50 * time.Millisecond), msg: "msg2"}, + }, + }, + expectedMsg: []string{ + "msg1", "msg2", + }, + } + + t.Run(test.name, func(t *testing.T) { + logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-%d", time.Now().UnixNano())) + defer os.RemoveAll(logPath) + + lcfg := zap.NewProductionConfig() + lcfg.OutputPaths = []string{logPath} + lcfg.ErrorOutputPaths = []string{logPath} + lg, _ := lcfg.Build() + + test.trace.Log(lg) + data, err := ioutil.ReadFile(logPath) + if err != nil { + t.Fatal(err) + } + + for _, msg := range test.expectedMsg { + if !bytes.Contains(data, []byte(msg)) { + t.Errorf("Expected to find %v in log.\n", msg) + } } + }) +} + +func TestTraceFormat(t *testing.T) { + tests := []struct { + name string + trace *Trace + fields []Field + expectedMsg []string + }{ + { + name: "When trace has fields", + trace: &Trace{ + operation: "Test", + startTime: time.Now().Add(-100 * time.Millisecond), + steps: []step{ + { + time: time.Now().Add(-80 * time.Millisecond), + msg: "msg1", + fields: []Field{{"stepKey1", "stepValue1"}}, + }, + { + time: time.Now().Add(-50 * time.Millisecond), + msg: "msg2", + fields: []Field{{"stepKey2", "stepValue2"}}, + }, + }, + }, + fields: []Field{ + {"traceKey1", "traceValue1"}, + {"count", 1}, + }, + expectedMsg: []string{ + "Test", + "msg1", "msg2", + "traceKey1:traceValue1", "count:1", + "stepKey1:stepValue1", "stepKey2:stepValue2", + }, + }, + { + name: "When trace has no field", + trace: &Trace{ + operation: "Test", + startTime: time.Now().Add(-100 * time.Millisecond), + steps: []step{ + {time: time.Now().Add(-80 * time.Millisecond), msg: "msg1"}, + {time: time.Now().Add(-50 * time.Millisecond), msg: "msg2"}, + }, + }, + fields: []Field{}, + expectedMsg: []string{ + "Test", + "msg1", "msg2", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for _, f := range tt.fields { + tt.trace.AddField(f) + } + s := tt.trace.format(0) + var buf bytes.Buffer + buf.WriteString(`Trace\[(\d*)?\](.+)\(duration(.+)start(.+)\)\n`) + for range tt.trace.steps { + buf.WriteString(`Trace\[(\d*)?\](.+)Step(.+)\(duration(.+)\)\n`) + } + buf.WriteString(`Trace\[(\d*)?\](.+)End(.+)\n`) + pattern := buf.String() + + r, _ := regexp.Compile(pattern) + if !r.MatchString(s) { + t.Errorf("Wrong log format.\n") + } + for _, msg := range tt.expectedMsg { + if !strings.Contains(s, msg) { + t.Errorf("Expected to find %v in log.\n", msg) + } + } + }) + } +} + +func TestLogIfLong(t *testing.T) { + tests := []struct { + name string + threshold time.Duration + trace *Trace + expectedMsg []string + }{ + { + name: "When the duration is smaller than threshold", + threshold: time.Duration(200 * time.Millisecond), + trace: &Trace{ + operation: "Test", + startTime: time.Now().Add(-100 * time.Millisecond), + steps: []step{ + {time: time.Now().Add(-50 * time.Millisecond), msg: "msg1"}, + {time: time.Now(), msg: "msg2"}, + }, + }, + expectedMsg: []string{}, + }, + { + name: "When the duration is longer than threshold", + threshold: time.Duration(50 * time.Millisecond), + trace: &Trace{ + operation: "Test", + startTime: time.Now().Add(-100 * time.Millisecond), + steps: []step{ + {time: time.Now().Add(-50 * time.Millisecond), msg: "msg1"}, + {time: time.Now(), msg: "msg2"}, + }, + }, + expectedMsg: []string{ + "msg1", "msg2", + }, + }, + { + name: "When not all steps are longer than step threshold", + threshold: time.Duration(50 * time.Millisecond), + trace: &Trace{ + operation: "Test", + startTime: time.Now().Add(-100 * time.Millisecond), + steps: []step{ + {time: time.Now(), msg: "msg1"}, + {time: time.Now(), msg: "msg2"}, + }, + }, + expectedMsg: []string{ + "msg1", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-%d", time.Now().UnixNano())) + defer os.RemoveAll(logPath) + + lcfg := zap.NewProductionConfig() + lcfg.OutputPaths = []string{logPath} + lcfg.ErrorOutputPaths = []string{logPath} + lg, _ := lcfg.Build() + + tt.trace.LogIfLong(tt.threshold, lg) + data, err := ioutil.ReadFile(logPath) + if err != nil { + t.Fatal(err) + } + for _, msg := range tt.expectedMsg { + if !bytes.Contains(data, []byte(msg)) { + t.Errorf("Expected to find %v in log\n", msg) + } + } + }) } } From 8717327697d230cda81fa0b818482545ab04ed1d Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Tue, 24 Sep 2019 15:29:01 -0700 Subject: [PATCH 3/6] pkg: use zap logger to format the structure log output. --- etcdserver/apply.go | 4 +- etcdserver/v3_server.go | 22 ++--- mvcc/kvstore_txn.go | 4 +- pkg/traceutil/trace.go | 76 ++++++++-------- pkg/traceutil/trace_test.go | 175 ++++++++++++------------------------ 5 files changed, 110 insertions(+), 171 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 7ce4ffd830a..02fdea73139 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -331,7 +331,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra rr.KVs = rr.KVs[:r.Limit] resp.More = true } - trace.Step("Filter and sort the key-value pairs.") + trace.Step("filter and sort the key-value pairs") resp.Header.Revision = rr.Rev resp.Count = int64(rr.Count) resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) @@ -341,7 +341,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra } resp.Kvs[i] = &rr.KVs[i] } - trace.Step("Assemble the response.") + trace.Step("assemble the response") return resp, nil } diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 6f10f87b9b8..721800dc89b 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -86,11 +86,12 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { - trace := traceutil.New("Range", - traceutil.Field{Key: "RangeBegin", Value: string(r.Key)}, - traceutil.Field{Key: "RangeEnd", Value: string(r.RangeEnd)}, + trace := traceutil.New("range", + s.getLogger(), + traceutil.Field{Key: "range_begin", Value: string(r.Key)}, + traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)}, ) - ctx = context.WithValue(ctx, "trace", trace) + ctx = context.WithValue(ctx, traceutil.CtxKey, trace) var resp *pb.RangeResponse var err error @@ -98,16 +99,16 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err) if resp != nil { trace.AddField( - traceutil.Field{Key: "ResponseCount", Value: len(resp.Kvs)}, - traceutil.Field{Key: "ResponseRevision", Value: resp.Header.Revision}, + traceutil.Field{Key: "response_count", Value: len(resp.Kvs)}, + traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}, ) } - trace.LogIfLong(rangeTraceThreshold, s.getLogger()) + trace.LogIfLong(rangeTraceThreshold) }(time.Now()) if !r.Serializable { err = s.linearizableReadNotify(ctx) - trace.Step("Agreement among raft nodes before linearized reading.") + trace.Step("agreement among raft nodes before linearized reading") if err != nil { return nil, err } @@ -562,6 +563,7 @@ func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure. func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { + trace := traceutil.Get(ctx) ai, err := s.AuthInfoFromCtx(ctx) if err != nil { return err @@ -573,9 +575,7 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e if err = chk(ai); err != nil { return err } - - trace := traceutil.Get(ctx) - trace.Step("Authentication.") + trace.Step("get authentication metadata") // fetch response for serialized request get() // check for stale token revision in case the auth store was updated while diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index ee9651ff6e7..27afe889ba5 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -127,7 +127,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } revpairs := tr.s.kvindex.Revisions(key, end, rev) - tr.trace.Step("Range keys from in-memory index tree.") + tr.trace.Step("range keys from in-memory index tree") if len(revpairs) == 0 { return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil } @@ -167,7 +167,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } } } - tr.trace.Step("Range keys from bolt db.") + tr.trace.Step("range keys from bolt db") return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil } diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index 1d2e26c4239..f0b71bb9c25 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -1,3 +1,18 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package traceutil implements tracing utilities using "context". package traceutil import ( @@ -7,13 +22,10 @@ import ( "math/rand" "time" - "github.com/coreos/pkg/capnslog" "go.uber.org/zap" ) -var ( - plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "trace") -) +const CtxKey = "trace" // Field is a kv pair to record additional details of the trace. type Field struct { @@ -40,6 +52,7 @@ func writeFields(fields []Field) string { type Trace struct { operation string + lg *zap.Logger fields []Field startTime time.Time steps []step @@ -51,32 +64,22 @@ type step struct { fields []Field } -func New(op string, fields ...Field) *Trace { - return &Trace{operation: op, startTime: time.Now(), fields: fields} +func New(op string, lg *zap.Logger, fields ...Field) *Trace { + return &Trace{operation: op, lg: lg, startTime: time.Now(), fields: fields} } -// traceutil.TODO() returns a non-nil, empty Trace +// TODO returns a non-nil, empty Trace func TODO() *Trace { return &Trace{} } func Get(ctx context.Context) *Trace { - if trace, ok := ctx.Value("trace").(*Trace); ok && trace != nil { + if trace, ok := ctx.Value(CtxKey).(*Trace); ok && trace != nil { return trace } return TODO() } -func GetOrCreate(ctx context.Context, op string, fields ...Field) (context.Context, *Trace) { - trace, ok := ctx.Value("trace").(*Trace) - if !ok || trace == nil { - trace = New(op) - trace.fields = fields - ctx = context.WithValue(ctx, "trace", trace) - } - return ctx, trace -} - func (t *Trace) Step(msg string, fields ...Field) { t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) } @@ -88,48 +91,47 @@ func (t *Trace) AddField(fields ...Field) { } // Log dumps all steps in the Trace -func (t *Trace) Log(lg *zap.Logger) { - t.LogWithStepThreshold(0, lg) +func (t *Trace) Log() { + t.LogWithStepThreshold(0) } // LogIfLong dumps logs if the duration is longer than threshold -func (t *Trace) LogIfLong(threshold time.Duration, lg *zap.Logger) { +func (t *Trace) LogIfLong(threshold time.Duration) { if time.Since(t.startTime) > threshold { stepThreshold := threshold / time.Duration(len(t.steps)+1) - t.LogWithStepThreshold(stepThreshold, lg) + t.LogWithStepThreshold(stepThreshold) } } // LogWithStepThreshold only dumps step whose duration is longer than step threshold -func (t *Trace) LogWithStepThreshold(threshold time.Duration, lg *zap.Logger) { - s := t.format(threshold) - if lg != nil { - lg.Info(s) - } else { - plog.Info(s) +func (t *Trace) LogWithStepThreshold(threshold time.Duration) { + msg, fs := t.logInfo(threshold) + if t.lg != nil { + t.lg.Info(msg, fs...) } } -func (t *Trace) format(threshold time.Duration) string { +func (t *Trace) logInfo(threshold time.Duration) (string, []zap.Field) { endTime := time.Now() totalDuration := endTime.Sub(t.startTime) - var buf bytes.Buffer traceNum := rand.Int31() + msg := fmt.Sprintf("trace[%d] %s", traceNum, t.operation) - buf.WriteString(fmt.Sprintf("Trace[%d] \"%v\" %s (duration: %v, start: %v)\n", - traceNum, t.operation, writeFields(t.fields), totalDuration, - t.startTime.Format("2006-01-02 15:04:05.000"))) + var steps []string lastStepTime := t.startTime for _, step := range t.steps { stepDuration := step.time.Sub(lastStepTime) if stepDuration > threshold { - buf.WriteString(fmt.Sprintf("Trace[%d] Step \"%v\" %s (duration: %v)\n", + steps = append(steps, fmt.Sprintf("trace[%d] step '%v' %s (duration: %v)", traceNum, step.msg, writeFields(step.fields), stepDuration)) } lastStepTime = step.time } - buf.WriteString(fmt.Sprintf("Trace[%d] End %v\n", traceNum, - endTime.Format("2006-01-02 15:04:05.000"))) - return buf.String() + fs := []zap.Field{zap.String("detail", writeFields(t.fields)), + zap.Duration("duration", totalDuration), + zap.Time("start", t.startTime), + zap.Time("end", endTime), + zap.Strings("steps", steps)} + return msg, fs } diff --git a/pkg/traceutil/trace_test.go b/pkg/traceutil/trace_test.go index 3e6da09d503..59111ec89b1 100644 --- a/pkg/traceutil/trace_test.go +++ b/pkg/traceutil/trace_test.go @@ -1,3 +1,17 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package traceutil import ( @@ -7,8 +21,6 @@ import ( "io/ioutil" "os" "path/filepath" - "regexp" - "strings" "testing" "time" @@ -29,7 +41,7 @@ func TestGet(t *testing.T) { }, { name: "When the context has trace", - inputCtx: context.WithValue(context.Background(), "trace", traceForTest), + inputCtx: context.WithValue(context.Background(), CtxKey, traceForTest), outputTrace: traceForTest, }, } @@ -38,43 +50,10 @@ func TestGet(t *testing.T) { t.Run(tt.name, func(t *testing.T) { trace := Get(tt.inputCtx) if trace == nil { - t.Errorf("Expected %v; Got nil\n", tt.outputTrace) + t.Errorf("Expected %v; Got nil", tt.outputTrace) } if trace.operation != tt.outputTrace.operation { - t.Errorf("Expected %v; Got %v\n", tt.outputTrace, trace) - } - }) - } -} - -func TestGetOrCreate(t *testing.T) { - tests := []struct { - name string - inputCtx context.Context - outputTraceOp string - }{ - { - name: "When the context does not have trace", - inputCtx: context.TODO(), - outputTraceOp: "test", - }, - { - name: "When the context has trace", - inputCtx: context.WithValue(context.Background(), "trace", &Trace{operation: "test"}), - outputTraceOp: "test", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx, trace := GetOrCreate(tt.inputCtx, "test") - if trace == nil { - t.Errorf("Expected trace object; Got nil\n") - } else if trace.operation != tt.outputTraceOp { - t.Errorf("Expected %v; Got %v\n", tt.outputTraceOp, trace.operation) - } - if ctx.Value("trace") == nil { - t.Errorf("Expected context has attached trace; Got nil\n") + t.Errorf("Expected %v; Got %v", tt.outputTrace, trace) } }) } @@ -94,16 +73,16 @@ func TestCreate(t *testing.T) { } ) - trace := New(op, fields[0], fields[1]) + trace := New(op, nil, fields[0], fields[1]) if trace.operation != op { - t.Errorf("Expected %v; Got %v\n", op, trace.operation) + t.Errorf("Expected %v; Got %v", op, trace.operation) } for i, f := range trace.fields { if f.Key != fields[i].Key { - t.Errorf("Expected %v; Got %v\n", fields[i].Key, f.Key) + t.Errorf("Expected %v; Got %v", fields[i].Key, f.Key) } if f.Value != fields[i].Value { - t.Errorf("Expected %v; Got %v\n", fields[i].Value, f.Value) + t.Errorf("Expected %v; Got %v", fields[i].Value, f.Value) } } @@ -113,67 +92,38 @@ func TestCreate(t *testing.T) { for i, v := range trace.steps { if steps[i] != v.msg { - t.Errorf("Expected %v, got %v\n.", steps[i], v.msg) + t.Errorf("Expected %v; Got %v", steps[i], v.msg) } if stepFields[i].Key != v.fields[0].Key { - t.Errorf("Expected %v; Got %v\n", stepFields[i].Key, v.fields[0].Key) + t.Errorf("Expected %v; Got %v", stepFields[i].Key, v.fields[0].Key) } if stepFields[i].Value != v.fields[0].Value { - t.Errorf("Expected %v; Got %v\n", stepFields[i].Value, v.fields[0].Value) + t.Errorf("Expected %v; Got %v", stepFields[i].Value, v.fields[0].Value) } } } func TestLog(t *testing.T) { - test := struct { - name string - trace *Trace - expectedMsg []string - }{ - name: "When dump all logs", - trace: &Trace{ - operation: "Test", - startTime: time.Now().Add(-100 * time.Millisecond), - steps: []step{ - {time: time.Now().Add(-80 * time.Millisecond), msg: "msg1"}, - {time: time.Now().Add(-50 * time.Millisecond), msg: "msg2"}, - }, - }, - expectedMsg: []string{ - "msg1", "msg2", - }, - } - - t.Run(test.name, func(t *testing.T) { - logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-%d", time.Now().UnixNano())) - defer os.RemoveAll(logPath) - - lcfg := zap.NewProductionConfig() - lcfg.OutputPaths = []string{logPath} - lcfg.ErrorOutputPaths = []string{logPath} - lg, _ := lcfg.Build() - - test.trace.Log(lg) - data, err := ioutil.ReadFile(logPath) - if err != nil { - t.Fatal(err) - } - - for _, msg := range test.expectedMsg { - if !bytes.Contains(data, []byte(msg)) { - t.Errorf("Expected to find %v in log.\n", msg) - } - } - }) -} - -func TestTraceFormat(t *testing.T) { tests := []struct { name string trace *Trace fields []Field expectedMsg []string }{ + { + name: "When dump all logs", + trace: &Trace{ + operation: "Test", + startTime: time.Now().Add(-100 * time.Millisecond), + steps: []step{ + {time: time.Now().Add(-80 * time.Millisecond), msg: "msg1"}, + {time: time.Now().Add(-50 * time.Millisecond), msg: "msg2"}, + }, + }, + expectedMsg: []string{ + "msg1", "msg2", + }, + }, { name: "When trace has fields", trace: &Trace{ @@ -203,45 +153,31 @@ func TestTraceFormat(t *testing.T) { "stepKey1:stepValue1", "stepKey2:stepValue2", }, }, - { - name: "When trace has no field", - trace: &Trace{ - operation: "Test", - startTime: time.Now().Add(-100 * time.Millisecond), - steps: []step{ - {time: time.Now().Add(-80 * time.Millisecond), msg: "msg1"}, - {time: time.Now().Add(-50 * time.Millisecond), msg: "msg2"}, - }, - }, - fields: []Field{}, - expectedMsg: []string{ - "Test", - "msg1", "msg2", - }, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logPath := filepath.Join(os.TempDir(), fmt.Sprintf("test-log-%d", time.Now().UnixNano())) + defer os.RemoveAll(logPath) + + lcfg := zap.NewProductionConfig() + lcfg.OutputPaths = []string{logPath} + lcfg.ErrorOutputPaths = []string{logPath} + lg, _ := lcfg.Build() + for _, f := range tt.fields { tt.trace.AddField(f) } - s := tt.trace.format(0) - var buf bytes.Buffer - buf.WriteString(`Trace\[(\d*)?\](.+)\(duration(.+)start(.+)\)\n`) - for range tt.trace.steps { - buf.WriteString(`Trace\[(\d*)?\](.+)Step(.+)\(duration(.+)\)\n`) + tt.trace.lg = lg + tt.trace.Log() + data, err := ioutil.ReadFile(logPath) + if err != nil { + t.Fatal(err) } - buf.WriteString(`Trace\[(\d*)?\](.+)End(.+)\n`) - pattern := buf.String() - r, _ := regexp.Compile(pattern) - if !r.MatchString(s) { - t.Errorf("Wrong log format.\n") - } for _, msg := range tt.expectedMsg { - if !strings.Contains(s, msg) { - t.Errorf("Expected to find %v in log.\n", msg) + if !bytes.Contains(data, []byte(msg)) { + t.Errorf("Expected to find %v in log", msg) } } }) @@ -310,14 +246,15 @@ func TestLogIfLong(t *testing.T) { lcfg.ErrorOutputPaths = []string{logPath} lg, _ := lcfg.Build() - tt.trace.LogIfLong(tt.threshold, lg) + tt.trace.lg = lg + tt.trace.LogIfLong(tt.threshold) data, err := ioutil.ReadFile(logPath) if err != nil { t.Fatal(err) } for _, msg := range tt.expectedMsg { if !bytes.Contains(data, []byte(msg)) { - t.Errorf("Expected to find %v in log\n", msg) + t.Errorf("Expected to find %v in log", msg) } } }) From daa432cfa701509c82430df06eabd36fa408f050 Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Tue, 1 Oct 2019 14:08:06 -0700 Subject: [PATCH 4/6] etcdserver: add put request steps. mvcc: add put request steps; add trace to KV.Write() as input parameter. --- clientv3/snapshot/v3_snapshot.go | 3 ++- etcdserver/apply.go | 17 +++++++++++++---- etcdserver/server.go | 3 ++- mvcc/kv.go | 2 +- mvcc/kv_test.go | 10 +++++----- mvcc/kv_view.go | 4 ++-- mvcc/kvstore.go | 3 ++- mvcc/kvstore_bench_test.go | 5 +++-- mvcc/kvstore_test.go | 4 ++-- mvcc/kvstore_txn.go | 9 ++++++--- mvcc/watchable_store.go | 3 ++- mvcc/watchable_store_bench_test.go | 3 ++- mvcc/watchable_store_txn.go | 9 +++++++-- pkg/traceutil/trace.go | 14 +++++++++++++- 14 files changed, 62 insertions(+), 27 deletions(-) diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index 54f8c67c9ef..791035e7db8 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -39,6 +39,7 @@ import ( "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/pkg/fileutil" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" @@ -384,7 +385,7 @@ func (s *v3Manager) saveDB() error { lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) - txn := mvs.Write() + txn := mvs.Write(traceutil.TODO()) btx := be.BatchTx() del := func(k, v []byte) error { txn.DeleteRange(k, nil) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 02fdea73139..c1ea2768754 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -179,7 +179,14 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) { resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - + trace := traceutil.New("put", + a.s.getLogger(), + traceutil.Field{Key: "key", Value: string(p.Key)}, + traceutil.Field{Key: "value", Value: string(p.Value)}, + ) + defer func() { + trace.LogIfLong(warnApplyDuration) + }() val, leaseID := p.Value, lease.LeaseID(p.Lease) if txn == nil { if leaseID != lease.NoLease { @@ -187,16 +194,18 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu return nil, lease.ErrLeaseNotFound } } - txn = a.s.KV().Write() + txn = a.s.KV().Write(trace) defer txn.End() } var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { + trace.StepBegin() rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) if err != nil { return nil, err } + trace.StepEnd("get previous kv pair") } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { @@ -226,7 +235,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ end := mkGteRange(dr.RangeEnd) if txn == nil { - txn = a.s.kv.Write() + txn = a.s.kv.Write(traceutil.TODO()) defer txn.End() } @@ -369,7 +378,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { // be the revision of the write txn. if isWrite { txn.End() - txn = a.s.KV().Write() + txn = a.s.KV().Write(traceutil.TODO()) } a.applyTxn(txn, rt, txnPath, txnResp) rev := txn.Rev() diff --git a/etcdserver/server.go b/etcdserver/server.go index 78daa0ea97b..e2a5fa004bc 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -50,6 +50,7 @@ import ( "go.etcd.io/etcd/pkg/pbutil" "go.etcd.io/etcd/pkg/runtime" "go.etcd.io/etcd/pkg/schedule" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.etcd.io/etcd/pkg/wait" "go.etcd.io/etcd/raft" @@ -1178,7 +1179,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { plog.Info("recovering lessor...") } - s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() }) + s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) }) if lg != nil { lg.Info("restored lease store") diff --git a/mvcc/kv.go b/mvcc/kv.go index 065b9079989..b7d2a14729a 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -106,7 +106,7 @@ type KV interface { Read(trace *traceutil.Trace) TxnRead // Write creates a write transaction. - Write() TxnWrite + Write(trace *traceutil.Trace) TxnWrite // Hash computes the hash of the KV's backend. Hash() (hash uint32, revision int64, err error) diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 6c72d0879c7..38e17e1cabe 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -57,7 +57,7 @@ var ( return kv.Put(key, value, lease) } txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 { - txn := kv.Write() + txn := kv.Write(traceutil.TODO()) defer txn.End() return txn.Put(key, value, lease) } @@ -66,7 +66,7 @@ var ( return kv.DeleteRange(key, end) } txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) { - txn := kv.Write() + txn := kv.Write(traceutil.TODO()) defer txn.End() return txn.DeleteRange(key, end) } @@ -410,7 +410,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) { func() { s.DeleteRange([]byte("foo"), nil) }, } for i, tt := range tests { - txn := s.Write() + txn := s.Write(traceutil.TODO()) done := make(chan struct{}, 1) go func() { tt() @@ -439,7 +439,7 @@ func TestKVTxnNonBlockRange(t *testing.T) { s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) - txn := s.Write() + txn := s.Write(traceutil.TODO()) defer txn.End() donec := make(chan struct{}) @@ -461,7 +461,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) base := int64(i + 1) // put foo diff --git a/mvcc/kv_view.go b/mvcc/kv_view.go index 9750fd764f5..d4f0ca6880a 100644 --- a/mvcc/kv_view.go +++ b/mvcc/kv_view.go @@ -42,13 +42,13 @@ func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err type writeView struct{ kv KV } func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) { - tw := wv.kv.Write() + tw := wv.kv.Write(traceutil.TODO()) defer tw.End() return tw.DeleteRange(key, end) } func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) { - tw := wv.kv.Write() + tw := wv.kv.Write(traceutil.TODO()) defer tw.End() return tw.Put(key, value, lease) } diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index c2ef4b252a5..f398dd59fb0 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/schedule" + "go.etcd.io/etcd/pkg/traceutil" "github.com/coreos/pkg/capnslog" "go.uber.org/zap" @@ -140,7 +141,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI s.ReadView = &readView{s} s.WriteView = &writeView{s} if s.le != nil { - s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } tx := s.b.BatchTx() diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index 4e7c9a49725..e6a4af84090 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -20,6 +20,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -130,7 +131,7 @@ func BenchmarkStoreTxnPut(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) txn.Put(keys[i], vals[i], lease.NoLease) txn.End() } @@ -151,7 +152,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { for i := 0; i < b.N; i++ { for j := 0; j < revsPerKey; j++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) txn.Put(keys[i], vals[i], lease.NoLease) txn.End() } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index c4180c963ad..fc2b33204fc 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -640,7 +640,7 @@ func TestTxnPut(t *testing.T) { defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) base := int64(i + 2) if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base { t.Errorf("#%d: rev = %d, want %d", i, rev, base) @@ -731,7 +731,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { defer wg.Done() time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time - tx := s.Write() + tx := s.Write(traceutil.TODO()) numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1 var pendingKvs kvs for j := 0; j < numOfPuts; j++ { diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 27afe889ba5..716a6d82ff2 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -64,12 +64,12 @@ type storeTxnWrite struct { changes []mvccpb.KeyValue } -func (s *store) Write() TxnWrite { +func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() tx.Lock() tw := &storeTxnWrite{ - storeTxnRead: storeTxnRead{s, tx, 0, 0, traceutil.TODO()}, + storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, beginRev: s.currentRev, changes: make([]mvccpb.KeyValue, 0, 4), @@ -183,7 +183,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { c = created.main oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) } - + tw.trace.Step("get key's previous created_revision and leaseID") ibytes := newRevBytes() idxRev := revision{main: rev, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) @@ -210,9 +210,11 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { } } + tw.trace.Step("marshal mvccpb.KeyValue") tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) tw.s.kvindex.Put(key, idxRev) tw.changes = append(tw.changes, kv) + tw.trace.Step("store kv pair into bolt db") if oldLease != lease.NoLease { if tw.s.le == nil { @@ -239,6 +241,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { panic("unexpected error from lease Attach") } } + tw.trace.Step("attach lease to kv pair") } func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 { diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 3cf491d1fdf..a51e5aa529b 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -84,7 +85,7 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co s.store.WriteView = &writeView{s} if s.le != nil { // use this store as the deleter so revokes trigger watch events - s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } s.wg.Add(2) go s.syncWatchersLoop() diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 0f8fb578d2d..0f553493fa0 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -59,7 +60,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - txn := s.Write() + txn := s.Write(traceutil.TODO()) txn.Put(keys[i], vals[i], lease.NoLease) txn.End() } diff --git a/mvcc/watchable_store_txn.go b/mvcc/watchable_store_txn.go index 3bcfa4d7566..70b12983d97 100644 --- a/mvcc/watchable_store_txn.go +++ b/mvcc/watchable_store_txn.go @@ -14,7 +14,10 @@ package mvcc -import "go.etcd.io/etcd/mvcc/mvccpb" +import ( + "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" +) func (tw *watchableStoreTxnWrite) End() { changes := tw.Changes() @@ -48,4 +51,6 @@ type watchableStoreTxnWrite struct { s *watchableStore } -func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} } +func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite { + return &watchableStoreTxnWrite{s.store.Write(trace), s} +} diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index f0b71bb9c25..d056097a001 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -56,6 +56,7 @@ type Trace struct { fields []Field startTime time.Time steps []step + inStep bool } type step struct { @@ -81,7 +82,18 @@ func Get(ctx context.Context) *Trace { } func (t *Trace) Step(msg string, fields ...Field) { - t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) + if !t.inStep { + t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) + } +} + +func (t *Trace) StepBegin() { + t.inStep = true +} + +func (t *Trace) StepEnd(msg string, fields ...Field) { + t.inStep = false + t.Step(msg, fields...) } func (t *Trace) AddField(fields ...Field) { From 92455183630b77669367e6dac22e39e84fe7a7c3 Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Tue, 1 Oct 2019 15:38:52 -0700 Subject: [PATCH 5/6] etcdserver: trace raft requests. --- etcdserver/apply.go | 35 ++++++++++++++++------------------- etcdserver/apply_auth.go | 9 +++++---- etcdserver/corrupt.go | 5 +++-- etcdserver/v3_server.go | 8 ++++++++ pkg/traceutil/trace.go | 15 +++++++++++++++ 5 files changed, 47 insertions(+), 25 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index c1ea2768754..81b16f39aa6 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -34,8 +34,7 @@ import ( ) const ( - warnApplyDuration = 100 * time.Millisecond - rangeTraceThreshold = 100 * time.Millisecond + warnApplyDuration = 100 * time.Millisecond ) type applyResult struct { @@ -45,13 +44,14 @@ type applyResult struct { // to being logically reflected by the node. Currently only used for // Compaction requests. physc <-chan struct{} + trace *traceutil.Trace } // applierV3 is the interface for processing V3 raft messages type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult - Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) + Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) @@ -123,7 +123,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.Range != nil: ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) case r.Put != nil: - ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put) + ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put) case r.DeleteRange != nil: ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) case r.Txn != nil: @@ -176,22 +176,19 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { return ar } -func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) { +func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - trace := traceutil.New("put", + trace = traceutil.New("put", a.s.getLogger(), traceutil.Field{Key: "key", Value: string(p.Key)}, traceutil.Field{Key: "value", Value: string(p.Value)}, ) - defer func() { - trace.LogIfLong(warnApplyDuration) - }() val, leaseID := p.Value, lease.LeaseID(p.Lease) if txn == nil { if leaseID != lease.NoLease { if l := a.s.lessor.Lookup(leaseID); l == nil { - return nil, lease.ErrLeaseNotFound + return nil, nil, lease.ErrLeaseNotFound } } txn = a.s.KV().Write(trace) @@ -203,14 +200,14 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu trace.StepBegin() rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) if err != nil { - return nil, err + return nil, nil, err } trace.StepEnd("get previous kv pair") } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { // ignore_{lease,value} flag expects previous key-value pair - return nil, ErrKeyNotFound + return nil, nil, ErrKeyNotFound } } if p.IgnoreValue { @@ -226,7 +223,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu } resp.Header.Revision = txn.Put(p.Key, val, leaseID) - return resp, nil + return resp, trace, nil } func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { @@ -540,7 +537,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat } respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp case *pb.RequestOp_RequestPut: - resp, err := a.Put(txn, tv.RequestPut) + resp, _, err := a.Put(txn, tv.RequestPut) if err != nil { if lg != nil { lg.Panic("unexpected error during txn", zap.Error(err)) @@ -688,8 +685,8 @@ type applierV3Capped struct { // with Puts so that the number of keys in the store is capped. func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } -func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { - return nil, ErrNoSpace +func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { + return nil, nil, ErrNoSpace } func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) { @@ -838,13 +835,13 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { return "aApplierV3{app, NewBackendQuota(s, "v3-applier")} } -func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { +func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { ok := a.q.Available(p) - resp, err := a.applierV3.Put(txn, p) + resp, trace, err := a.applierV3.Put(txn, p) if err == nil && !ok { err = ErrNoSpace } - return resp, err + return resp, trace, err } func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { diff --git a/etcdserver/apply_auth.go b/etcdserver/apply_auth.go index c31644b3d19..269af4758cd 100644 --- a/etcdserver/apply_auth.go +++ b/etcdserver/apply_auth.go @@ -22,6 +22,7 @@ import ( pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" ) type authApplierV3 struct { @@ -62,9 +63,9 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return ret } -func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) { +func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil { - return nil, err + return nil, nil, err } if err := aa.checkLeasePuts(lease.LeaseID(r.Lease)); err != nil { @@ -72,13 +73,13 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon // be written by this user. It means the user cannot revoke the // lease so attaching the lease to the newly written key should // be forbidden. - return nil, err + return nil, nil, err } if r.PrevKv { err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, nil) if err != nil { - return nil, err + return nil, nil, err } } return aa.applierV3.Put(txn, r) diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 0f9a4053f04..07f306424d6 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -382,8 +383,8 @@ type applierV3Corrupt struct { func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } -func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { - return nil, ErrCorrupt +func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { + return nil, nil, ErrCorrupt } func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 721800dc89b..a005d8e2cc0 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -39,6 +39,8 @@ const ( // However, if the committed entries are very heavy to apply, the gap might grow. // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 + rangeTraceThreshold = 100 * time.Millisecond + putTraceThreshold = 100 * time.Millisecond ) type RaftKV interface { @@ -126,6 +128,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe } func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + ctx = context.WithValue(ctx, "time", time.Now()) resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) if err != nil { return nil, err @@ -549,6 +552,11 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque if result.err != nil { return nil, result.err } + if startTime, ok := ctx.Value("time").(time.Time); ok && result.trace != nil { + applyStart := result.trace.ResetStartTime(startTime) + result.trace.InsertStep(0, applyStart, "process raft request") + result.trace.LogIfLong(putTraceThreshold) + } return result.resp, nil } diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index d056097a001..2628db665ba 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -81,6 +81,21 @@ func Get(ctx context.Context) *Trace { return TODO() } +func (t *Trace) ResetStartTime(time time.Time) (prev time.Time) { + prev = t.startTime + t.startTime = time + return prev +} + +func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) { + newStep := step{time, msg, fields} + if at < len(t.steps) { + t.steps = append(t.steps[:at+1], t.steps[at:]...) + t.steps[at] = newStep + } else { + t.steps = append(t.steps, newStep) + } +} func (t *Trace) Step(msg string, fields ...Field) { if !t.inStep { t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) From 480d5510f9323a5ea5e7c568be8ddb54fcd94325 Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Tue, 1 Oct 2019 17:18:26 -0700 Subject: [PATCH 6/6] etcdserver: trace compaction request; add return parameter 'trace' to applierV3.Compaction() mvcc: trace compaction request; add input parameter 'trace' to KV.Compact() --- etcdserver/apply.go | 25 ++++++++++++------- etcdserver/corrupt.go | 4 +-- etcdserver/v3_server.go | 32 ++++++++++++++++++------ integration/v3_alarm_test.go | 3 ++- mvcc/kv.go | 2 +- mvcc/kv_test.go | 8 +++--- mvcc/kvstore.go | 12 +++++---- mvcc/kvstore_compaction_test.go | 3 ++- mvcc/kvstore_test.go | 6 ++--- mvcc/watchable_store_test.go | 3 ++- pkg/traceutil/trace.go | 44 +++++++++++++++++++-------------- pkg/traceutil/trace_test.go | 2 +- tools/benchmark/cmd/mvcc-put.go | 3 ++- 13 files changed, 92 insertions(+), 55 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 81b16f39aa6..822b5e32204 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -55,7 +55,7 @@ type applierV3 interface { Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) - Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) + Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) @@ -129,7 +129,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.Txn != nil: ar.resp, ar.err = a.s.applyV3.Txn(r.Txn) case r.Compaction != nil: - ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction) + ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction) case r.LeaseGrant != nil: ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant) case r.LeaseRevoke != nil: @@ -182,7 +182,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu trace = traceutil.New("put", a.s.getLogger(), traceutil.Field{Key: "key", Value: string(p.Key)}, - traceutil.Field{Key: "value", Value: string(p.Value)}, + traceutil.Field{Key: "req_size", Value: proto.Size(p)}, ) val, leaseID := p.Value, lease.LeaseID(p.Lease) if txn == nil { @@ -197,12 +197,13 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { - trace.StepBegin() + trace.DisableStep() rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) if err != nil { return nil, nil, err } - trace.StepEnd("get previous kv pair") + trace.EnableStep() + trace.Step("get previous kv pair") } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { @@ -223,6 +224,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu } resp.Header.Revision = txn.Put(p.Key, val, leaseID) + trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) return resp, trace, nil } @@ -568,17 +570,22 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat return txns } -func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { +func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { resp := &pb.CompactionResponse{} resp.Header = &pb.ResponseHeader{} - ch, err := a.s.KV().Compact(compaction.Revision) + trace := traceutil.New("compact", + a.s.getLogger(), + traceutil.Field{Key: "revision", Value: compaction.Revision}, + ) + + ch, err := a.s.KV().Compact(trace, compaction.Revision) if err != nil { - return nil, ch, err + return nil, ch, nil, err } // get the current revision. which key to get is not important. rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{}) resp.Header.Revision = rr.Rev - return resp, ch, err + return resp, ch, trace, err } func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 07f306424d6..2351eef4458 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -399,8 +399,8 @@ func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { return nil, ErrCorrupt } -func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { - return nil, nil, ErrCorrupt +func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { + return nil, nil, nil, ErrCorrupt } func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index a005d8e2cc0..bfe08ea35c1 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -39,8 +39,7 @@ const ( // However, if the committed entries are very heavy to apply, the gap might grow. // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 - rangeTraceThreshold = 100 * time.Millisecond - putTraceThreshold = 100 * time.Millisecond + traceThreshold = 100 * time.Millisecond ) type RaftKV interface { @@ -93,7 +92,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe traceutil.Field{Key: "range_begin", Value: string(r.Key)}, traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)}, ) - ctx = context.WithValue(ctx, traceutil.CtxKey, trace) + ctx = context.WithValue(ctx, traceutil.TraceKey, trace) var resp *pb.RangeResponse var err error @@ -105,7 +104,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}, ) } - trace.LogIfLong(rangeTraceThreshold) + trace.LogIfLong(traceThreshold) }(time.Now()) if !r.Serializable { @@ -128,7 +127,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe } func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { - ctx = context.WithValue(ctx, "time", time.Now()) + ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now()) resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) if err != nil { return nil, err @@ -205,7 +204,18 @@ func isTxnReadonly(r *pb.TxnRequest) bool { } func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + startTime := time.Now() result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) + trace := traceutil.TODO() + if result != nil && result.trace != nil { + trace = result.trace + defer func() { + trace.LogIfLong(traceThreshold) + }() + applyStart := result.trace.GetStartTime() + result.trace.SetStartTime(startTime) + trace.InsertStep(0, applyStart, "process raft request") + } if r.Physical && result != nil && result.physc != nil { <-result.physc // The compaction is done deleting keys; the hash is now settled @@ -214,6 +224,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. // if the compaction resumes. Force the finished compaction to // commit so it won't resume following a crash. s.be.ForceCommit() + trace.Step("physically apply compaction") } if err != nil { return nil, err @@ -229,6 +240,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. resp.Header = &pb.ResponseHeader{} } resp.Header.Revision = s.kv.Rev() + trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) return resp, nil } @@ -552,10 +564,14 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque if result.err != nil { return nil, result.err } - if startTime, ok := ctx.Value("time").(time.Time); ok && result.trace != nil { - applyStart := result.trace.ResetStartTime(startTime) + if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil { + applyStart := result.trace.GetStartTime() + // The trace object is created in apply. Here reset the start time to trace + // the raft request time by the difference between the request start time + // and apply start time + result.trace.SetStartTime(startTime) result.trace.InsertStep(0, applyStart, "process raft request") - result.trace.LogIfLong(putTraceThreshold) + result.trace.LogIfLong(traceThreshold) } return result.resp, nil } diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go index 443c2aae13a..0b2dd05ceec 100644 --- a/integration/v3_alarm_test.go +++ b/integration/v3_alarm_test.go @@ -27,6 +27,7 @@ import ( "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -173,7 +174,7 @@ func TestV3CorruptAlarm(t *testing.T) { // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'. s.Put([]byte("abc"), []byte("def"), 0) s.Put([]byte("xyz"), []byte("123"), 0) - s.Compact(5) + s.Compact(traceutil.TODO(), 5) s.Commit() s.Close() be.Close() diff --git a/mvcc/kv.go b/mvcc/kv.go index b7d2a14729a..c057f926118 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -115,7 +115,7 @@ type KV interface { HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) // Compact frees all superseded keys with revisions less than rev. - Compact(rev int64) (<-chan struct{}, error) + Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) // Commit commits outstanding txns into the underlying backend. Commit() diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 38e17e1cabe..466040790ff 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -183,7 +183,7 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) { defer cleanup(s, b, tmpPath) put3TestKVs(s) - if _, err := s.Compact(4); err != nil { + if _, err := s.Compact(traceutil.TODO(), 4); err != nil { t.Fatalf("compact error (%v)", err) } @@ -545,7 +545,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { }, } for i, tt := range tests { - _, err := s.Compact(tt.rev) + _, err := s.Compact(traceutil.TODO(), tt.rev) if err != nil { t.Errorf("#%d: unexpect compact error %v", i, err) } @@ -581,7 +581,7 @@ func TestKVCompactBad(t *testing.T) { {100, ErrFutureRev}, } for i, tt := range tests { - _, err := s.Compact(tt.rev) + _, err := s.Compact(traceutil.TODO(), tt.rev) if err != tt.werr { t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr) } @@ -627,7 +627,7 @@ func TestKVRestore(t *testing.T) { func(kv KV) { kv.Put([]byte("foo"), []byte("bar0"), 1) kv.Put([]byte("foo"), []byte("bar1"), 2) - kv.Compact(1) + kv.Compact(traceutil.TODO(), 1) }, } for i, tt := range tests { diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index f398dd59fb0..ed05bc28825 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -270,9 +270,10 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { return nil, nil } -func (s *store) compact(rev int64) (<-chan struct{}, error) { +func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { start := time.Now() keep := s.kvindex.Compact(rev) + trace.Step("compact in-memory index tree") ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { @@ -289,6 +290,7 @@ func (s *store) compact(rev int64) (<-chan struct{}, error) { s.fifoSched.Schedule(j) indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) + trace.Step("schedule compaction") return ch, nil } @@ -298,21 +300,21 @@ func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { return ch, err } - return s.compact(rev) + return s.compact(traceutil.TODO(), rev) } -func (s *store) Compact(rev int64) (<-chan struct{}, error) { +func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() ch, err := s.updateCompactRev(rev) - + trace.Step("check and update compact revision") if err != nil { s.mu.Unlock() return ch, err } s.mu.Unlock() - return s.compact(rev) + return s.compact(trace, rev) } // DefaultIgnores is a map of keys to ignore in hash checking. diff --git a/mvcc/kvstore_compaction_test.go b/mvcc/kvstore_compaction_test.go index 1d5c63261bc..d1e576dcb70 100644 --- a/mvcc/kvstore_compaction_test.go +++ b/mvcc/kvstore_compaction_test.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -109,7 +110,7 @@ func TestCompactAllAndRestore(t *testing.T) { rev := s0.Rev() // compact all keys - done, err := s0.Compact(rev) + done, err := s0.Compact(traceutil.TODO(), rev) if err != nil { t.Fatal(err) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index fc2b33204fc..eb9b1f130f4 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -332,7 +332,7 @@ func TestStoreCompact(t *testing.T) { key2 := newTestKeyBytes(revision{2, 0}, false) b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil} - s.Compact(3) + s.Compact(traceutil.TODO(), 3) s.fifoSched.WaitFinish(1) if s.compactMainRev != 3 { @@ -583,7 +583,7 @@ func TestHashKVWhenCompacting(t *testing.T) { go func() { defer wg.Done() for i := 100; i >= 0; i-- { - _, err := s.Compact(int64(rev - 1 - i)) + _, err := s.Compact(traceutil.TODO(), int64(rev-1-i)) if err != nil { t.Error(err) } @@ -610,7 +610,7 @@ func TestHashKVZeroRevision(t *testing.T) { for i := 2; i <= rev; i++ { s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) } - if _, err := s.Compact(int64(rev / 2)); err != nil { + if _, err := s.Compact(traceutil.TODO(), int64(rev/2)); err != nil { t.Fatal(err) } diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index fd496ad7526..e4d0cd62ec5 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -237,7 +238,7 @@ func TestWatchCompacted(t *testing.T) { for i := 0; i < maxRev; i++ { s.Put(testKey, testValue, lease.NoLease) } - _, err := s.Compact(compactRev) + _, err := s.Compact(traceutil.TODO(), compactRev) if err != nil { t.Fatalf("failed to compact kv (%v)", err) } diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index 2628db665ba..2d247dd9acc 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -25,7 +25,10 @@ import ( "go.uber.org/zap" ) -const CtxKey = "trace" +const ( + TraceKey = "trace" + StartTimeKey = "startTime" +) // Field is a kv pair to record additional details of the trace. type Field struct { @@ -51,12 +54,12 @@ func writeFields(fields []Field) string { } type Trace struct { - operation string - lg *zap.Logger - fields []Field - startTime time.Time - steps []step - inStep bool + operation string + lg *zap.Logger + fields []Field + startTime time.Time + steps []step + stepDisabled bool } type step struct { @@ -75,16 +78,18 @@ func TODO() *Trace { } func Get(ctx context.Context) *Trace { - if trace, ok := ctx.Value(CtxKey).(*Trace); ok && trace != nil { + if trace, ok := ctx.Value(TraceKey).(*Trace); ok && trace != nil { return trace } return TODO() } -func (t *Trace) ResetStartTime(time time.Time) (prev time.Time) { - prev = t.startTime +func (t *Trace) GetStartTime() time.Time { + return t.startTime +} + +func (t *Trace) SetStartTime(time time.Time) { t.startTime = time - return prev } func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) { @@ -96,19 +101,22 @@ func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) t.steps = append(t.steps, newStep) } } + +// Step adds step to trace func (t *Trace) Step(msg string, fields ...Field) { - if !t.inStep { + if !t.stepDisabled { t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) } } -func (t *Trace) StepBegin() { - t.inStep = true +// DisableStep sets the flag to prevent the trace from adding steps +func (t *Trace) DisableStep() { + t.stepDisabled = true } -func (t *Trace) StepEnd(msg string, fields ...Field) { - t.inStep = false - t.Step(msg, fields...) +// EnableStep re-enable the trace to add steps +func (t *Trace) EnableStep() { + t.stepDisabled = false } func (t *Trace) AddField(fields ...Field) { @@ -149,7 +157,7 @@ func (t *Trace) logInfo(threshold time.Duration) (string, []zap.Field) { for _, step := range t.steps { stepDuration := step.time.Sub(lastStepTime) if stepDuration > threshold { - steps = append(steps, fmt.Sprintf("trace[%d] step '%v' %s (duration: %v)", + steps = append(steps, fmt.Sprintf("trace[%d] '%v' %s (duration: %v)", traceNum, step.msg, writeFields(step.fields), stepDuration)) } lastStepTime = step.time diff --git a/pkg/traceutil/trace_test.go b/pkg/traceutil/trace_test.go index 59111ec89b1..9b99288764f 100644 --- a/pkg/traceutil/trace_test.go +++ b/pkg/traceutil/trace_test.go @@ -41,7 +41,7 @@ func TestGet(t *testing.T) { }, { name: "When the context has trace", - inputCtx: context.WithValue(context.Background(), CtxKey, traceForTest), + inputCtx: context.WithValue(context.Background(), TraceKey, traceForTest), outputTrace: traceForTest, }, } diff --git a/tools/benchmark/cmd/mvcc-put.go b/tools/benchmark/cmd/mvcc-put.go index 026693efe05..200db9f0292 100644 --- a/tools/benchmark/cmd/mvcc-put.go +++ b/tools/benchmark/cmd/mvcc-put.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/pkg/report" + "go.etcd.io/etcd/pkg/traceutil" "github.com/spf13/cobra" ) @@ -114,7 +115,7 @@ func mvccPutFunc(cmd *cobra.Command, args []string) { for i := 0; i < mvccTotalRequests; i++ { st := time.Now() - tw := s.Write() + tw := s.Write(traceutil.TODO()) for j := i; j < i+nrTxnOps; j++ { tw.Put(keys[j], vals[j], lease.NoLease) }