From 977e4b95dd8bcc63ede3a020a87f296c4debfcc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Ma=C5=82ota-W=C3=B3jcik?= <59281144+outofforest@users.noreply.github.com> Date: Wed, 19 Jul 2023 17:50:29 +0200 Subject: [PATCH] Clean up logging (#144) --- executor/flavour.go | 3 +- executor/handlers.go | 93 +-------------------------------------- executor/logging.go | 101 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 94 deletions(-) create mode 100644 executor/logging.go diff --git a/executor/flavour.go b/executor/flavour.go index c7570fe..c80eba9 100644 --- a/executor/flavour.go +++ b/executor/flavour.go @@ -4,7 +4,6 @@ import ( "context" "os" - "github.com/outofforest/logger" "github.com/outofforest/parallel" "github.com/outofforest/run" "github.com/pkg/errors" @@ -38,6 +37,6 @@ func NewFlavour(config Config) run.FlavourFunc { return errors.New("exactly three arguments are required") } - return runServer(logger.WithLogger(ctx, logger.Get(ctx).Named("executor")), config, os.Args[2]) + return runServer(ctx, config, os.Args[2]) } } diff --git a/executor/handlers.go b/executor/handlers.go index 9836d9c..6192923 100644 --- a/executor/handlers.go +++ b/executor/handlers.go @@ -1,17 +1,13 @@ package executor import ( - "bytes" "context" "os/exec" - "sync" - "time" "github.com/outofforest/libexec" "github.com/outofforest/logger" "github.com/pkg/errors" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "github.com/outofforest/isolator/lib/docker" "github.com/outofforest/isolator/lib/libhttp" @@ -82,7 +78,7 @@ func NewRunEmbeddedFunctionHandler(funcs map[string]EmbeddedFunc) HandlerFunc { log := logger.Get(ctx) log.Info("Starting embedded function") - if err := fn(zapTransmitter(ctx, encode), m.Args); err != nil { + if err := fn(withZAPTransmitter(ctx, encode), m.Args); err != nil { log.Error("Embedded function exited with error", zap.Error(err)) return err } @@ -117,90 +113,3 @@ func ExecuteHandler(ctx context.Context, content interface{}, encode wire.Encode log.Info("Command exited") return nil } - -func newLogTransmitter(encode wire.EncoderFunc) *logTransmitter { - return &logTransmitter{ - encode: encode, - } -} - -type logTransmitter struct { - encode wire.EncoderFunc - - mu sync.Mutex - buf []byte - start int - end int - length int -} - -func (lt *logTransmitter) Write(data []byte) (int, error) { - lt.mu.Lock() - defer lt.mu.Unlock() - - bufLen := 1024 - - dataLength := len(data) - if dataLength == 0 { - return 0, nil - } - - if len(lt.buf[lt.end:]) < dataLength { - if newBufLen := lt.length + dataLength; bufLen < newBufLen { - bufLen = newBufLen - } - newBuf := make([]byte, bufLen) - if lt.length > 0 { - copy(newBuf, lt.buf[lt.start:lt.end]) - } - lt.buf = newBuf - lt.start = 0 - lt.end = lt.length - } - copy(lt.buf[lt.end:], data) - lt.end += dataLength - lt.length += dataLength - - for { - if lt.length == 0 { - break - } - pos := bytes.IndexByte(lt.buf[lt.start:lt.end], '\n') - if pos < 0 { - break - } - - if pos > 0 { - err := lt.encode(wire.Log{Time: time.Now().UTC(), Content: lt.buf[lt.start : lt.start+pos]}) - if err != nil { - return 0, err - } - } - - lt.start += pos + 1 - lt.length -= pos + 1 - - if lt.start == lt.end { - lt.start = 0 - lt.end = 0 - } - } - - return dataLength, nil -} - -func (lt *logTransmitter) Sync() error { - return nil -} - -func zapTransmitter(ctx context.Context, encode wire.EncoderFunc) context.Context { - transmitter := newLogTransmitter(encode) - transmitCore := zapcore.NewCore(zapcore.NewJSONEncoder(logger.EncoderConfig), transmitter, zap.NewAtomicLevelAt(zap.DebugLevel)) - - log := logger.Get(ctx) - log = log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { - return zapcore.NewTee(core, transmitCore) - })) - - return logger.WithLogger(ctx, log) -} diff --git a/executor/logging.go b/executor/logging.go new file mode 100644 index 0000000..abb89fe --- /dev/null +++ b/executor/logging.go @@ -0,0 +1,101 @@ +package executor + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/outofforest/logger" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/outofforest/isolator/wire" +) + +func newLogTransmitter(encode wire.EncoderFunc) *logTransmitter { + return &logTransmitter{ + encode: encode, + } +} + +type logTransmitter struct { + encode wire.EncoderFunc + + mu sync.Mutex + buf []byte + start int + end int + length int +} + +func (lt *logTransmitter) Write(data []byte) (int, error) { + lt.mu.Lock() + defer lt.mu.Unlock() + + bufLen := 1024 + + dataLength := len(data) + if dataLength == 0 { + return 0, nil + } + + if len(lt.buf[lt.end:]) < dataLength { + if newBufLen := lt.length + dataLength; bufLen < newBufLen { + bufLen = newBufLen + } + newBuf := make([]byte, bufLen) + if lt.length > 0 { + copy(newBuf, lt.buf[lt.start:lt.end]) + } + lt.buf = newBuf + lt.start = 0 + lt.end = lt.length + } + copy(lt.buf[lt.end:], data) + lt.end += dataLength + lt.length += dataLength + + for { + if lt.length == 0 { + break + } + pos := bytes.IndexByte(lt.buf[lt.start:lt.end], '\n') + if pos < 0 { + break + } + + if pos > 0 { + err := lt.encode(wire.Log{Time: time.Now().UTC(), Content: lt.buf[lt.start : lt.start+pos]}) + if err != nil { + return 0, err + } + } + + lt.start += pos + 1 + lt.length -= pos + 1 + + if lt.start == lt.end { + lt.start = 0 + lt.end = 0 + } + } + + return dataLength, nil +} + +func (lt *logTransmitter) Sync() error { + return nil +} + +func withZAPTransmitter(ctx context.Context, encode wire.EncoderFunc) context.Context { + transmitter := newLogTransmitter(encode) + transmitCore := zapcore.NewCore(zapcore.NewJSONEncoder(logger.EncoderConfig), transmitter, zap.NewAtomicLevelAt(zap.DebugLevel)) + + log := logger.Get(ctx) + log = log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return transmitCore + })) + + return logger.WithLogger(ctx, log) +}