From 96542d3f06aba8da98a47fd6eef431f71f35c497 Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 21 Jul 2020 13:50:21 +0200 Subject: [PATCH] go/common/grpc: Client verbose logging and metrics --- .changelog/3121.feature.md | 21 +++ docs/oasis-node/metrics.md | 9 +- go/common/grpc/grpc.go | 177 ++++++++++++++++-- go/oasis-node/cmd/common/grpc/grpc.go | 1 + .../scenario/e2e/runtime/txsource.go | 2 + 5 files changed, 192 insertions(+), 18 deletions(-) create mode 100644 .changelog/3121.feature.md diff --git a/.changelog/3121.feature.md b/.changelog/3121.feature.md new file mode 100644 index 00000000000..56c1a3e6cb5 --- /dev/null +++ b/.changelog/3121.feature.md @@ -0,0 +1,21 @@ +go/common/grpc: Client verbose logging and metrics + +Adds option to enable verbose logging for gRPC client and adds basic gRPC +client instrumentation. + +Verbose gRPC client logging can be enabled with the existing `grpc.log.debug` +flag. + +Metric changes: + +Existing gRPC server metrics were renamed: + +- `oasis_grpc_calls` -> `oasis_grpc_server_calls` +- `oasis_grpc_latency` -> `oasis_grpc_server_latency` +- `oasis_grpc_stream_writes` -> `oasis_grpc_server_stream_writes` + +Added corresponding metrics: + +- `oasis_grpc_client_calls` gRPC client calls metric. +- `oasis_grpc_client_latency` gRPC client call latencies. +- `oasis_grpc_client_stream_writes` gRPC client stream writes. diff --git a/docs/oasis-node/metrics.md b/docs/oasis-node/metrics.md index e5e6278565c..7be90df26d4 100644 --- a/docs/oasis-node/metrics.md +++ b/docs/oasis-node/metrics.md @@ -44,9 +44,12 @@ oasis_codec_size | Summary | CBOR codec message size (bytes). | call, module | [ oasis_consensus_proposed_blocks | Counter | Number of blocks proposed by the node. | backend | [consensus/metrics](../../go/consensus/metrics/metrics.go) oasis_consensus_signed_blocks | Counter | Number of blocks signed by the node. | backend | [consensus/metrics](../../go/consensus/metrics/metrics.go) oasis_finalized_rounds | Counter | Number of finalized rounds. | | [roothash](../../go/roothash/metrics.go) -oasis_grpc_calls | Counter | Number of gRPC calls. | call | [common/grpc](../../go/common/grpc/grpc.go) -oasis_grpc_latency | Summary | gRPC call latency (seconds). | call | [common/grpc](../../go/common/grpc/grpc.go) -oasis_grpc_stream_writes | Counter | Number of gRPC stream writes. | call | [common/grpc](../../go/common/grpc/grpc.go) +oasis_grpc_client_calls | Counter | Number of gRPC calls. | call | [common/grpc](../../go/common/grpc/grpc.go) +oasis_grpc_client_latency | Summary | gRPC call latency (seconds). | call | [common/grpc](../../go/common/grpc/grpc.go) +oasis_grpc_client_stream_writes | Counter | Number of gRPC stream writes. | call | [common/grpc](../../go/common/grpc/grpc.go) +oasis_grpc_server_calls | Counter | Number of gRPC calls. | call | [common/grpc](../../go/common/grpc/grpc.go) +oasis_grpc_server_latency | Summary | gRPC call latency (seconds). | call | [common/grpc](../../go/common/grpc/grpc.go) +oasis_grpc_server_stream_writes | Counter | Number of gRPC stream writes. | call | [common/grpc](../../go/common/grpc/grpc.go) oasis_node_cpu_stime_seconds | Gauge | CPU system time spent by worker as reported by /proc/<PID>/stat (seconds). | | [oasis-node/cmd/common/metrics](../../go/oasis-node/cmd/common/metrics/cpu.go) oasis_node_cpu_utime_seconds | Gauge | CPU user time spent by worker as reported by /proc/<PID>/stat (seconds). | | [oasis-node/cmd/common/metrics](../../go/oasis-node/cmd/common/metrics/cpu.go) oasis_node_disk_read_bytes | Gauge | Read data from block storage by the worker as reported by /proc/<PID>/io (bytes). | | [oasis-node/cmd/common/metrics](../../go/oasis-node/cmd/common/metrics/disk.go) diff --git a/go/common/grpc/grpc.go b/go/common/grpc/grpc.go index 548d5b5b0d9..c59a911faca 100644 --- a/go/common/grpc/grpc.go +++ b/go/common/grpc/grpc.go @@ -45,32 +45,56 @@ var ( grpcMetricsOnce sync.Once grpcGlobalLoggerOnce sync.Once - grpcCalls = prometheus.NewCounterVec( + grpcServerCalls = prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "oasis_grpc_calls", + Name: "oasis_grpc_server_calls", Help: "Number of gRPC calls.", }, []string{"call"}, ) - grpcLatency = prometheus.NewSummaryVec( + grpcServerLatency = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Name: "oasis_grpc_latency", + Name: "oasis_grpc_server_latency", Help: "gRPC call latency (seconds).", }, []string{"call"}, ) - grpcStreamWrites = prometheus.NewCounterVec( + grpcServerStreamWrites = prometheus.NewCounterVec( prometheus.CounterOpts{ - Name: "oasis_grpc_stream_writes", + Name: "oasis_grpc_server_stream_writes", + Help: "Number of gRPC stream writes.", + }, + []string{"call"}, + ) + grpcClientCalls = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "oasis_grpc_client_calls", + Help: "Number of gRPC calls.", + }, + []string{"call"}, + ) + grpcClientLatency = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "oasis_grpc_client_latency", + Help: "gRPC call latency (seconds).", + }, + []string{"call"}, + ) + grpcClientStreamWrites = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "oasis_grpc_client_stream_writes", Help: "Number of gRPC stream writes.", }, []string{"call"}, ) grpcCollectors = []prometheus.Collector{ - grpcCalls, - grpcLatency, - grpcStreamWrites, + grpcClientCalls, + grpcClientLatency, + grpcClientStreamWrites, + grpcServerCalls, + grpcServerLatency, + grpcServerStreamWrites, } serverKeepAliveParams = keepalive.ServerParameters{ @@ -160,11 +184,11 @@ func (l *grpcLogAdapter) unaryLogger(ctx context.Context, req interface{}, info ) } - grpcCalls.With(prometheus.Labels{"call": info.FullMethod}).Inc() + grpcServerCalls.With(prometheus.Labels{"call": info.FullMethod}).Inc() start := time.Now() resp, err = handler(ctx, req) - grpcLatency.With(prometheus.Labels{"call": info.FullMethod}).Observe(time.Since(start).Seconds()) + grpcServerLatency.With(prometheus.Labels{"call": info.FullMethod}).Observe(time.Since(start).Seconds()) switch err { case nil: if l.isDebug { @@ -185,6 +209,77 @@ func (l *grpcLogAdapter) unaryLogger(ctx context.Context, req interface{}, info return } +func (l *grpcLogAdapter) unaryClientLogger(ctx context.Context, + method string, + req, rsp interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, +) error { + seq := atomic.AddUint64(&l.reqSeq, 1) + if l.isDebug { + l.reqLogger.Debug("request", + "method", method, + "req_seq", seq, + "req", req, + ) + } + + grpcClientCalls.With(prometheus.Labels{"call": method}).Inc() + + start := time.Now() + err := invoker(ctx, method, req, rsp, cc, opts...) + grpcClientLatency.With(prometheus.Labels{"call": method}).Observe(time.Since(start).Seconds()) + switch err { + case nil: + if l.isDebug { + l.reqLogger.Debug("request succeeded", + "method", method, + "req_seq", seq, + ) + } + default: + l.reqLogger.Error("request failed", + "method", method, + "req_seq", seq, + "rsp", rsp, + "err", err, + ) + return err + } + + return nil +} + +func (l *grpcLogAdapter) streamClientLogger(ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, +) (grpc.ClientStream, error) { + grpcClientCalls.With(prometheus.Labels{"call": method}).Inc() + + seq := atomic.AddUint64(&l.streamSeq, 1) + cs, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + l.reqLogger.Error("stream closed (failure)", + "method", method, + "stream_seq", seq, + "err", err, + ) + } + + csLog := &grpcClientStreamLogger{ + ClientStream: cs, + logAdapter: l, + method: method, + seq: seq, + } + + return csLog, err +} + func (l *grpcLogAdapter) streamLogger(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { seq := atomic.AddUint64(&l.streamSeq, 1) if l.isDebug { @@ -201,7 +296,7 @@ func (l *grpcLogAdapter) streamLogger(srv interface{}, ss grpc.ServerStream, inf seq: seq, } - grpcCalls.With(prometheus.Labels{"call": info.FullMethod}).Inc() + grpcServerCalls.With(prometheus.Labels{"call": info.FullMethod}).Inc() err := handler(srv, stream) @@ -241,6 +336,40 @@ func newGrpcLogAdapter(baseLogger *logging.Logger) *grpcLogAdapter { } } +type grpcClientStreamLogger struct { + grpc.ClientStream + + logAdapter *grpcLogAdapter + + method string + seq uint64 +} + +func (s *grpcClientStreamLogger) SendMsg(m interface{}) error { + grpcClientStreamWrites.With(prometheus.Labels{"call": s.method}).Inc() + err := s.ClientStream.SendMsg(m) + + if s.logAdapter.isDebug { + switch err { + case nil: + s.logAdapter.reqLogger.Debug("SendMsg", + "method", s.method, + "stream_seq", s.seq, + "message", m, + ) + default: + s.logAdapter.reqLogger.Debug("SendMsg failed", + "method", s.method, + "stream_seq", s.seq, + "message", m, + "err", err, + ) + } + } + + return err +} + type grpcStreamLogger struct { grpc.ServerStream @@ -251,7 +380,7 @@ type grpcStreamLogger struct { } func (s *grpcStreamLogger) SendMsg(m interface{}) error { - grpcStreamWrites.With(prometheus.Labels{"call": s.method}).Inc() + grpcServerStreamWrites.With(prometheus.Labels{"call": s.method}).Inc() err := s.ServerStream.SendMsg(m) if s.logAdapter.isDebug { @@ -517,10 +646,28 @@ func NewServer(config *ServerConfig) (*Server, error) { // Dial creates a client connection to the given target. func Dial(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + // If debug gRPC logs are enabled, setup the global gRPC logger. + if viper.GetBool(CfgLogDebug) { + // NOTE: this will get setup on any code that starts a server + // regardless of the CfgLogDebug flag. However on code that only uses + // gRPC clients, only enable this if gRPC verbose logging is enabled. + grpcGlobalLoggerOnce.Do(func() { + logger := logging.GetLogger("grpc") + logAdapter := newGrpcLogAdapter(logger) + grpclog.SetLoggerV2(logAdapter) + }) + } + + grpcMetricsOnce.Do(func() { + prometheus.MustRegister(grpcCollectors...) + }) + + logger := logging.GetLogger("grpc/client") + logAdapter := newGrpcLogAdapter(logger) dialOpts := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.ForceCodec(&CBORCodec{})), - grpc.WithChainUnaryInterceptor(clientUnaryErrorMapper), - grpc.WithChainStreamInterceptor(clientStreamErrorMapper), + grpc.WithChainUnaryInterceptor(logAdapter.unaryClientLogger, clientUnaryErrorMapper), + grpc.WithChainStreamInterceptor(logAdapter.streamClientLogger, clientStreamErrorMapper), } dialOpts = append(dialOpts, opts...) return grpc.Dial(target, dialOpts...) diff --git a/go/oasis-node/cmd/common/grpc/grpc.go b/go/oasis-node/cmd/common/grpc/grpc.go index f3174cac28c..2a43465030e 100644 --- a/go/oasis-node/cmd/common/grpc/grpc.go +++ b/go/oasis-node/cmd/common/grpc/grpc.go @@ -124,5 +124,6 @@ func init() { ClientFlags.StringP(CfgAddress, "a", defaultAddress, "remote gRPC address") ClientFlags.Bool(CfgWait, false, "wait for gRPC address to become available") + ClientFlags.AddFlagSet(cmnGrpc.Flags) _ = viper.BindPFlags(ClientFlags) } diff --git a/go/oasis-test-runner/scenario/e2e/runtime/txsource.go b/go/oasis-test-runner/scenario/e2e/runtime/txsource.go index d2dd7f909d0..812d0f1ce84 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/txsource.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/txsource.go @@ -14,6 +14,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/crypto/drbg" "github.com/oasisprotocol/oasis-core/go/common/crypto/mathrand" + commonGrpc "github.com/oasisprotocol/oasis-core/go/common/grpc" "github.com/oasisprotocol/oasis-core/go/common/logging" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common" @@ -307,6 +308,7 @@ func (sc *txSourceImpl) startWorkload(childEnv *env.Env, errCh chan error, name "--" + flags.CfgDebugTestEntity, "--log.format", logFmt.String(), "--log.level", logLevel.String(), + "--" + commonGrpc.CfgLogDebug, "--" + flags.CfgGenesisFile, sc.Net.GenesisPath(), "--" + workload.CfgRuntimeID, runtimeID.String(), "--" + txsource.CfgWorkload, name,