Skip to content

Commit

Permalink
serve: add logging of grpc requests (#1373)
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Lanford <[email protected]>
  • Loading branch information
joelanford authored Jul 10, 2024
1 parent cd680c4 commit 2356c9c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 12 deletions.
70 changes: 61 additions & 9 deletions cmd/opm/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"runtime/pprof"
"sync"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"google.golang.org/grpc"
health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"

"github.com/operator-framework/operator-registry/pkg/api"
Expand Down Expand Up @@ -89,7 +91,8 @@ will not be reflected in the served content.
}

func (s *serve) run(ctx context.Context) error {
p := newProfilerInterface(s.pprofAddr, s.logger)
mainLogger := s.logger.Dup()
p := newProfilerInterface(s.pprofAddr, mainLogger)
if err := p.startEndpoint(); err != nil {
return fmt.Errorf("could not start pprof endpoint: %v", err)
}
Expand All @@ -102,12 +105,12 @@ func (s *serve) run(ctx context.Context) error {
// Immediately set up termination log
err := log.AddDefaultWriterHooks(s.terminationLog)
if err != nil {
s.logger.WithError(err).Warn("unable to set termination log path")
mainLogger.WithError(err).Warn("unable to set termination log path")
}

// Ensure there is a default nsswitch config
if err := dns.EnsureNsswitch(); err != nil {
s.logger.WithError(err).Warn("unable to write default nsswitch config")
mainLogger.WithError(err).Warn("unable to write default nsswitch config")
}

if s.cacheDir == "" && s.cacheEnforceIntegrity {
Expand All @@ -121,12 +124,12 @@ func (s *serve) run(ctx context.Context) error {
}
defer os.RemoveAll(s.cacheDir)
}
s.logger = s.logger.WithFields(logrus.Fields{
mainLogger = mainLogger.WithFields(logrus.Fields{
"configs": s.configDir,
"cache": s.cacheDir,
})

store, err := cache.New(s.cacheDir, cache.WithLog(s.logger))
store, err := cache.New(s.cacheDir, cache.WithLog(mainLogger))
if err != nil {
return err
}
Expand All @@ -148,26 +151,30 @@ func (s *serve) run(ctx context.Context) error {
return nil
}

s.logger = s.logger.WithFields(logrus.Fields{"port": s.port})
mainLogger = mainLogger.WithFields(logrus.Fields{"port": s.port})

lis, err := net.Listen("tcp", ":"+s.port)
if err != nil {
return fmt.Errorf("failed to listen: %s", err)
}

grpcServer := grpc.NewServer()
streamLogger, unaryLogger := loggingInterceptors(s.logger.Dup())
grpcServer := grpc.NewServer(
grpc.ChainStreamInterceptor(streamLogger),
grpc.ChainUnaryInterceptor(unaryLogger),
)
api.RegisterRegistryServer(grpcServer, server.NewRegistryServer(store))
health.RegisterHealthServer(grpcServer, server.NewHealthServer())
reflection.Register(grpcServer)
s.logger.Info("serving registry")
mainLogger.Info("serving registry")
p.stopCpuProfileCache()

return graceful.Shutdown(s.logger, func() error {
return grpcServer.Serve(lis)
}, func() {
grpcServer.GracefulStop()
if err := p.stopEndpoint(ctx); err != nil {
s.logger.Warnf("error shutting down pprof server: %v", err)
mainLogger.Warnf("error shutting down pprof server: %v", err)
}
})

Expand Down Expand Up @@ -293,3 +300,48 @@ func (p *profilerInterface) setCacheReady() {
p.cacheReady = true
p.cacheLock.Unlock()
}

func loggingInterceptors(logger *logrus.Entry) (grpc.StreamServerInterceptor, grpc.UnaryServerInterceptor) {
requestLogger := logger.Dup()
requestLoggerOpts := []logging.Option{
logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
logging.WithFieldsFromContext(func(ctx context.Context) logging.Fields {
fields := logging.ExtractFields(ctx)
metadataFields := logging.Fields{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
for k, v := range md {
metadataFields = append(metadataFields, k, v)
}
fields = fields.AppendUnique(metadataFields)
}
return fields
}),
}
return logging.StreamServerInterceptor(interceptorLogger(requestLogger), requestLoggerOpts...),
logging.UnaryServerInterceptor(interceptorLogger(requestLogger), requestLoggerOpts...)
}

func interceptorLogger(l *logrus.Entry) logging.Logger {
return logging.LoggerFunc(func(_ context.Context, lvl logging.Level, msg string, fields ...any) {
f := make(map[string]any, len(fields)/2)
i := logging.Fields(fields).Iterator()
for i.Next() {
k, v := i.At()
f[k] = v
}
l := l.WithFields(f)

switch lvl {
case logging.LevelDebug:
l.Debug(msg)
case logging.LevelInfo:
l.Info(msg)
case logging.LevelWarn:
l.Warn(msg)
case logging.LevelError:
l.Error(msg)
default:
panic(fmt.Sprintf("unknown level %v", lvl))
}
})
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.4
github.com/google/go-cmp v0.6.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/grpc-ecosystem/grpc-health-probe v0.4.29
github.com/h2non/filetype v1.1.3
github.com/h2non/go-is-svg v0.0.0-20160927212452-35e8c4b0612c
Expand Down Expand Up @@ -64,7 +65,7 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Microsoft/hcsshim v0.12.3 // indirect
github.com/alessio/shellescape v1.4.1 // indirect
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bshuster-repo/logrus-logstash-hook v1.0.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0=
github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 h1:goHVqTbFX3AIo0tzGr14pgfAW2ZfPChKO21Z9MGf/gk=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
Expand Down Expand Up @@ -202,6 +202,8 @@ github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWS
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is=
Expand Down

0 comments on commit 2356c9c

Please sign in to comment.