diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 908f0c64a855..c97e7466215a 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -40,8 +41,15 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio if tls != nil { opts = append(opts, grpc.Creds(credentials.NewTLS(tls))) } - opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s))) - opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s))) + opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + newLogUnaryInterceptor(s), + newUnaryInterceptor(s), + grpc_prometheus.UnaryServerInterceptor, + ))) + opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + newStreamInterceptor(s), + grpc_prometheus.StreamServerInterceptor, + ))) opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index f38dc4a99cf1..1e3a826178a6 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -25,9 +25,11 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" - prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" ) const ( @@ -40,7 +42,7 @@ type streamsMap struct { } func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { if !api.IsCapabilityEnabled(api.V3rpcCapability) { return nil, rpctypes.ErrGRPCNotCapable } @@ -54,7 +56,126 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { } } - return prometheus.UnaryServerInterceptor(ctx, req, info, handler) + resp, err := handler(ctx, req) + return resp, err + } +} + +func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + resp, err := handler(ctx, req) + defer logUnaryRequestStats(ctx, s.Logger(), info, startTime, req, resp) + return resp, err + } +} + +func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) { + duration := time.Since(startTime) + remote := "No remote client info." + peerInfo, ok := peer.FromContext(ctx) + if ok { + remote = peerInfo.Addr.String() + } + var responseType string = info.FullMethod + var reqCount, respCount int64 = 0, 0 + var reqSize, respSize int = 0, 0 + var reqContent string + switch _resp := resp.(type) { + case *pb.RangeResponse: + _req, ok := req.(*pb.RangeRequest) + if ok { + reqCount = 0 + reqSize = _req.Size() + reqContent = _req.String() + } + if _resp != nil { + respCount = _resp.GetCount() + respSize = _resp.Size() + } + case *pb.PutResponse: + _req, ok := req.(*pb.PutRequest) + if ok { + reqCount = 1 + reqSize = _req.Size() + reqContent = pb.NewLoggablePutRequest(_req).String() + // redact value field from request content, see PR #9821 + } + if _resp != nil { + respCount = 0 + respSize = _resp.Size() + } + case *pb.DeleteRangeResponse: + _req, ok := req.(*pb.DeleteRangeRequest) + if ok { + reqCount = 0 + reqSize = _req.Size() + reqContent = _req.String() + } + if _resp != nil { + respCount = _resp.GetDeleted() + respSize = _resp.Size() + } + case *pb.TxnResponse: + _req, ok := req.(*pb.TxnRequest) + if ok && _resp != nil { + if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure + reqCount = int64(len(_req.GetSuccess())) + reqSize = 0 + for _, r := range _req.GetSuccess() { + reqSize += r.Size() + } + } else { + reqCount = int64(len(_req.GetFailure())) + reqSize = 0 + for _, r := range _req.GetFailure() { + reqSize += r.Size() + } + } + reqContent = pb.NewLoggableTxnRequest(_req).String() + // redact value field from request content, see PR #9821 + } + if _resp != nil { + respCount = 0 + respSize = _resp.Size() + } + default: + reqCount = -1 + reqSize = -1 + respCount = -1 + respSize = -1 + } + + logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent) +} + +func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string, + reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) { + if lg == nil { + plog.Debugf( + "start time = %v, " + + "time spent = %v, " + + "remote = %s, " + + "response type = %s, " + + "request count = %d, " + + "request size = %d, " + + "response count = %d, " + + "response size = %d, " + + "request content = %s", + startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent, + ) + } else { + lg.Debug("request stats", + zap.Time("start time", startTime), + zap.Duration("time spent", duration), + zap.String("remote", remote), + zap.String("response type", responseType), + zap.Int64("request count", reqCount), + zap.Int("request size", reqSize), + zap.Int64("response count", respCount), + zap.Int("response size", respSize), + zap.String("request content", reqContent), + ) } } @@ -90,7 +211,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor } } - return prometheus.StreamServerInterceptor(srv, ss, info, handler) + err := handler(srv, ss) + return err } } diff --git a/etcdserver/etcdserverpb/raft_internal_stringer.go b/etcdserver/etcdserverpb/raft_internal_stringer.go index ec6b6397b399..3d3536a326dd 100644 --- a/etcdserver/etcdserverpb/raft_internal_stringer.go +++ b/etcdserver/etcdserverpb/raft_internal_stringer.go @@ -59,7 +59,7 @@ func (as *InternalRaftStringer) String() string { case as.Request.Put != nil: return fmt.Sprintf("header:<%s> put:<%s>", as.Request.Header.String(), - newLoggablePutRequest(as.Request.Put).String(), + NewLoggablePutRequest(as.Request.Put).String(), ) case as.Request.Txn != nil: return fmt.Sprintf("header:<%s> txn:<%s>", @@ -121,7 +121,7 @@ func newLoggableRequestOp(op *RequestOp) *requestOpStringer { func (as *requestOpStringer) String() string { switch op := as.Op.Request.(type) { case *RequestOp_RequestPut: - return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String()) + return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String()) case *RequestOp_RequestTxn: return fmt.Sprintf("request_txn:<%s>", NewLoggableTxnRequest(op.RequestTxn).String()) default: @@ -167,7 +167,7 @@ type loggablePutRequest struct { IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"` } -func newLoggablePutRequest(request *PutRequest) *loggablePutRequest { +func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest { return &loggablePutRequest{ request.Key, len(request.Value), diff --git a/etcdserver/server.go b/etcdserver/server.go index 5f6110bbf6c8..41121f769bf5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -2407,3 +2407,7 @@ func (s *EtcdServer) goAttach(f func()) { func (s *EtcdServer) Alarms() []*pb.AlarmMember { return s.alarmStore.Get(pb.AlarmType_NONE) } + +func (s *EtcdServer) Logger() *zap.Logger { + return s.lg +}