From fbf7891d8b8b38d3116e0384ce62fa324da9f6e1 Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Thu, 19 Jul 2018 09:08:25 -0700 Subject: [PATCH] Issue #340 - add gRPC payload logging interceptor --- reposerver/server.go | 7 ++++ server/server.go | 16 ++++++--- util/grpc/logging.go | 82 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 4 deletions(-) create mode 100644 util/grpc/logging.go diff --git a/reposerver/server.go b/reposerver/server.go index 627c45a16e662..25a1460f67a0b 100644 --- a/reposerver/server.go +++ b/reposerver/server.go @@ -9,6 +9,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" log "github.com/sirupsen/logrus" + "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -34,10 +35,16 @@ func (a *ArgoCDRepoServer) CreateGRPC() *grpc.Server { server := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_logrus.StreamServerInterceptor(a.log), + grpc_util.PayloadStreamServerInterceptor(a.log, false, func(ctx context.Context, fullMethodName string, servingObject interface{}) bool { + return true + }), grpc_util.PanicLoggerStreamServerInterceptor(a.log), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_logrus.UnaryServerInterceptor(a.log), + grpc_util.PayloadUnaryServerInterceptor(a.log, false, func(ctx context.Context, fullMethodName string, servingObject interface{}) bool { + return true + }), grpc_util.PanicLoggerUnaryServerInterceptor(a.log), )), ) diff --git a/server/server.go b/server/server.go index aa7e85637258d..71887982401ab 100644 --- a/server/server.go +++ b/server/server.go @@ -13,9 +13,9 @@ import ( "github.com/gobuffalo/packr" golang_proto "github.com/golang/protobuf/proto" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" - grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/grpc-ecosystem/go-grpc-middleware/auth" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "github.com/grpc-ecosystem/grpc-gateway/runtime" log "github.com/sirupsen/logrus" "github.com/soheilhy/cmux" @@ -28,7 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - argocd "github.com/argoproj/argo-cd" + "github.com/argoproj/argo-cd" "github.com/argoproj/argo-cd/common" "github.com/argoproj/argo-cd/errors" @@ -55,6 +55,7 @@ import ( "github.com/argoproj/argo-cd/util/swagger" tlsutil "github.com/argoproj/argo-cd/util/tls" "github.com/argoproj/argo-cd/util/webhook" + netCtx "golang.org/x/net/context" ) var ( @@ -299,11 +300,15 @@ func (a *ArgoCDServer) useTLS() bool { func (a *ArgoCDServer) newGRPCServer() *grpc.Server { var sOpts []grpc.ServerOption + loginMethodName := "/session.SessionService/Create" // NOTE: notice we do not configure the gRPC server here with TLS (e.g. grpc.Creds(creds)) // This is because TLS handshaking occurs in cmux handling sOpts = append(sOpts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_logrus.StreamServerInterceptor(a.log), grpc_auth.StreamServerInterceptor(a.authenticate), + grpc_util.PayloadStreamServerInterceptor(a.log, true, func(ctx netCtx.Context, fullMethodName string, servingObject interface{}) bool { + return fullMethodName != loginMethodName + }), grpc_util.ErrorCodeStreamServerInterceptor(), grpc_util.PanicLoggerStreamServerInterceptor(a.log), ))) @@ -311,6 +316,9 @@ func (a *ArgoCDServer) newGRPCServer() *grpc.Server { bug21955WorkaroundInterceptor, grpc_logrus.UnaryServerInterceptor(a.log), grpc_auth.UnaryServerInterceptor(a.authenticate), + grpc_util.PayloadUnaryServerInterceptor(a.log, true, func(ctx netCtx.Context, fullMethodName string, servingObject interface{}) bool { + return fullMethodName != loginMethodName + }), grpc_util.ErrorCodeUnaryServerInterceptor(), grpc_util.PanicLoggerUnaryServerInterceptor(a.log), ))) diff --git a/util/grpc/logging.go b/util/grpc/logging.go new file mode 100644 index 0000000000000..439c052fe8bf3 --- /dev/null +++ b/util/grpc/logging.go @@ -0,0 +1,82 @@ +package grpc + +import ( + "bytes" + "encoding/json" + "fmt" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/gogo/protobuf/proto" + "github.com/grpc-ecosystem/go-grpc-middleware/logging" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "github.com/grpc-ecosystem/go-grpc-middleware/tags/logrus" + "github.com/sirupsen/logrus" +) + +func logRequest(entry *logrus.Entry, info string, pbMsg interface{}, ctx context.Context, logClaims bool) { + if logClaims { + if data, err := json.Marshal(ctx.Value("claims")); err == nil { + entry = entry.WithField("grpc.request.claims", string(data)) + } + } + if p, ok := pbMsg.(proto.Message); ok { + entry = entry.WithField("grpc.request.content", &jsonpbMarshalleble{p}) + } + entry.Info(info) +} + +type jsonpbMarshalleble struct { + proto.Message +} + +func (j *jsonpbMarshalleble) MarshalJSON() ([]byte, error) { + b := &bytes.Buffer{} + if err := grpc_logrus.JsonPbMarshaller.Marshal(b, j.Message); err != nil { + return nil, fmt.Errorf("jsonpb serializer failed: %v", err) + } + return b.Bytes(), nil +} + +type loggingServerStream struct { + grpc.ServerStream + entry *logrus.Entry + logClaims bool + info string +} + +func (l *loggingServerStream) SendMsg(m interface{}) error { + return l.ServerStream.SendMsg(m) +} + +func (l *loggingServerStream) RecvMsg(m interface{}) error { + err := l.ServerStream.RecvMsg(m) + if err == nil { + logRequest(l.entry, l.info, m, l.ServerStream.Context(), l.logClaims) + } + return err +} + +func PayloadStreamServerInterceptor(entry *logrus.Entry, logClaims bool, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if !decider(stream.Context(), info.FullMethod, srv) { + return handler(srv, stream) + } + logEntry := entry.WithFields(ctx_logrus.Extract(stream.Context()).Data) + newStream := &loggingServerStream{ServerStream: stream, entry: logEntry, logClaims: logClaims, info: fmt.Sprintf("received streaming call %s", info.FullMethod)} + return handler(srv, newStream) + } +} + +func PayloadUnaryServerInterceptor(entry *logrus.Entry, logClaims bool, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if !decider(ctx, info.FullMethod, info.Server) { + return handler(ctx, req) + } + logEntry := entry.WithFields(ctx_logrus.Extract(ctx).Data) + logRequest(logEntry, fmt.Sprintf("received unary call %s", info.FullMethod), req, ctx, logClaims) + resp, err := handler(ctx, req) + return resp, err + } +}