Skip to content

Commit

Permalink
Clean up logging (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Jul 19, 2023
1 parent e8c4a20 commit 977e4b9
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 94 deletions.
3 changes: 1 addition & 2 deletions executor/flavour.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"os"

"github.com/outofforest/logger"
"github.com/outofforest/parallel"
"github.com/outofforest/run"
"github.com/pkg/errors"
Expand Down Expand Up @@ -38,6 +37,6 @@ func NewFlavour(config Config) run.FlavourFunc {
return errors.New("exactly three arguments are required")
}

return runServer(logger.WithLogger(ctx, logger.Get(ctx).Named("executor")), config, os.Args[2])
return runServer(ctx, config, os.Args[2])
}
}
93 changes: 1 addition & 92 deletions executor/handlers.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package executor

import (
"bytes"
"context"
"os/exec"
"sync"
"time"

"github.com/outofforest/libexec"
"github.com/outofforest/logger"
"github.com/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/outofforest/isolator/lib/docker"
"github.com/outofforest/isolator/lib/libhttp"
Expand Down Expand Up @@ -82,7 +78,7 @@ func NewRunEmbeddedFunctionHandler(funcs map[string]EmbeddedFunc) HandlerFunc {
log := logger.Get(ctx)
log.Info("Starting embedded function")

if err := fn(zapTransmitter(ctx, encode), m.Args); err != nil {
if err := fn(withZAPTransmitter(ctx, encode), m.Args); err != nil {
log.Error("Embedded function exited with error", zap.Error(err))
return err
}
Expand Down Expand Up @@ -117,90 +113,3 @@ func ExecuteHandler(ctx context.Context, content interface{}, encode wire.Encode
log.Info("Command exited")
return nil
}

func newLogTransmitter(encode wire.EncoderFunc) *logTransmitter {
return &logTransmitter{
encode: encode,
}
}

type logTransmitter struct {
encode wire.EncoderFunc

mu sync.Mutex
buf []byte
start int
end int
length int
}

func (lt *logTransmitter) Write(data []byte) (int, error) {
lt.mu.Lock()
defer lt.mu.Unlock()

bufLen := 1024

dataLength := len(data)
if dataLength == 0 {
return 0, nil
}

if len(lt.buf[lt.end:]) < dataLength {
if newBufLen := lt.length + dataLength; bufLen < newBufLen {
bufLen = newBufLen
}
newBuf := make([]byte, bufLen)
if lt.length > 0 {
copy(newBuf, lt.buf[lt.start:lt.end])
}
lt.buf = newBuf
lt.start = 0
lt.end = lt.length
}
copy(lt.buf[lt.end:], data)
lt.end += dataLength
lt.length += dataLength

for {
if lt.length == 0 {
break
}
pos := bytes.IndexByte(lt.buf[lt.start:lt.end], '\n')
if pos < 0 {
break
}

if pos > 0 {
err := lt.encode(wire.Log{Time: time.Now().UTC(), Content: lt.buf[lt.start : lt.start+pos]})
if err != nil {
return 0, err
}
}

lt.start += pos + 1
lt.length -= pos + 1

if lt.start == lt.end {
lt.start = 0
lt.end = 0
}
}

return dataLength, nil
}

func (lt *logTransmitter) Sync() error {
return nil
}

func zapTransmitter(ctx context.Context, encode wire.EncoderFunc) context.Context {
transmitter := newLogTransmitter(encode)
transmitCore := zapcore.NewCore(zapcore.NewJSONEncoder(logger.EncoderConfig), transmitter, zap.NewAtomicLevelAt(zap.DebugLevel))

log := logger.Get(ctx)
log = log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(core, transmitCore)
}))

return logger.WithLogger(ctx, log)
}
101 changes: 101 additions & 0 deletions executor/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package executor

import (
"bytes"
"context"
"sync"
"time"

"github.com/outofforest/logger"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/outofforest/isolator/wire"
)

func newLogTransmitter(encode wire.EncoderFunc) *logTransmitter {
return &logTransmitter{
encode: encode,
}
}

type logTransmitter struct {
encode wire.EncoderFunc

mu sync.Mutex
buf []byte
start int
end int
length int
}

func (lt *logTransmitter) Write(data []byte) (int, error) {
lt.mu.Lock()
defer lt.mu.Unlock()

bufLen := 1024

dataLength := len(data)
if dataLength == 0 {
return 0, nil
}

if len(lt.buf[lt.end:]) < dataLength {
if newBufLen := lt.length + dataLength; bufLen < newBufLen {
bufLen = newBufLen
}
newBuf := make([]byte, bufLen)
if lt.length > 0 {
copy(newBuf, lt.buf[lt.start:lt.end])
}
lt.buf = newBuf
lt.start = 0
lt.end = lt.length
}
copy(lt.buf[lt.end:], data)
lt.end += dataLength
lt.length += dataLength

for {
if lt.length == 0 {
break
}
pos := bytes.IndexByte(lt.buf[lt.start:lt.end], '\n')
if pos < 0 {
break
}

if pos > 0 {
err := lt.encode(wire.Log{Time: time.Now().UTC(), Content: lt.buf[lt.start : lt.start+pos]})
if err != nil {
return 0, err
}
}

lt.start += pos + 1
lt.length -= pos + 1

if lt.start == lt.end {
lt.start = 0
lt.end = 0
}
}

return dataLength, nil
}

func (lt *logTransmitter) Sync() error {
return nil
}

func withZAPTransmitter(ctx context.Context, encode wire.EncoderFunc) context.Context {
transmitter := newLogTransmitter(encode)
transmitCore := zapcore.NewCore(zapcore.NewJSONEncoder(logger.EncoderConfig), transmitter, zap.NewAtomicLevelAt(zap.DebugLevel))

log := logger.Get(ctx)
log = log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return transmitCore
}))

return logger.WithLogger(ctx, log)
}

0 comments on commit 977e4b9

Please sign in to comment.