Skip to content

Commit

Permalink
Better log transmitter (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
outofforest authored Jul 14, 2023
1 parent 139e743 commit 8afb5bc
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 60 deletions.
2 changes: 1 addition & 1 deletion examples/execute/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func main() {
if err != nil {
panic(err)
}
if _, err := stream.WriteString(m.Text); err != nil {
if _, err := stream.Write(m.Content); err != nil {
panic(err)
}
// wire.Result means command finished
Expand Down
126 changes: 71 additions & 55 deletions executor/handlers.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package executor

import (
"bytes"
"context"
"os/exec"
"sync"

"github.com/outofforest/libexec"
"github.com/outofforest/logger"
Expand Down Expand Up @@ -41,19 +43,8 @@ func RunDockerContainerHandler(ctx context.Context, content interface{}, encode
return errors.Errorf("unexpected type %T", content)
}

stdOut := &logTransmitter{
Stream: wire.StreamOut,
Encode: encode,
}
stdErr := &logTransmitter{
Stream: wire.StreamErr,
Encode: encode,
}

defer func() {
_ = stdOut.Flush()
_ = stdErr.Flush()
}()
stdOut := newLogTransmitter(encode, wire.StreamOut)
stdErr := newLogTransmitter(encode, wire.StreamErr)

return docker.RunContainer(ctx, docker.RunContainerConfig{
CacheDir: m.CacheDir,
Expand Down Expand Up @@ -98,14 +89,8 @@ func ExecuteHandler(ctx context.Context, content interface{}, encode wire.Encode
return errors.Errorf("unexpected type %T", content)
}

outTransmitter := &logTransmitter{
Stream: wire.StreamOut,
Encode: encode,
}
errTransmitter := &logTransmitter{
Stream: wire.StreamErr,
Encode: encode,
}
outTransmitter := newLogTransmitter(encode, wire.StreamOut)
errTransmitter := newLogTransmitter(encode, wire.StreamErr)

cmd := exec.Command("/bin/sh", "-c", m.Command)
cmd.Stdout = outTransmitter
Expand All @@ -115,57 +100,88 @@ func ExecuteHandler(ctx context.Context, content interface{}, encode wire.Encode

err := libexec.Exec(ctx, cmd)

_ = outTransmitter.Flush()
_ = errTransmitter.Flush()

return err
}

type logTransmitter struct {
Encode wire.EncoderFunc
Stream wire.Stream
func newLogTransmitter(encode wire.EncoderFunc, stream wire.Stream) *logTransmitter {
return &logTransmitter{
encode: encode,
stream: stream,
}
}

buf []byte
type logTransmitter struct {
encode wire.EncoderFunc
stream wire.Stream

mu sync.Mutex
buf []byte
start int
end int
length int
}

func (lt *logTransmitter) Write(data []byte) (int, error) {
length := len(lt.buf) + len(data)
if length < 100 {
lt.buf = append(lt.buf, data...)
return len(data), nil
}
buf := make([]byte, length)
copy(buf, lt.buf)
copy(buf[len(lt.buf):], data)
err := lt.Encode(wire.Log{Stream: lt.Stream, Text: string(buf)})
if err != nil {
return 0, err
lt.mu.Lock()
defer lt.mu.Unlock()

bufLen := 1024

dataLength := len(data)
if dataLength == 0 {
return 0, nil
}
lt.buf = make([]byte, 0, len(lt.buf))
return len(data), nil
}

func (lt *logTransmitter) Flush() error {
if len(lt.buf) == 0 {
return nil
if len(lt.buf[lt.end:]) < dataLength {
if newBufLen := lt.length + dataLength; bufLen < newBufLen {
bufLen = newBufLen
}
newBuf := make([]byte, bufLen)
if lt.length > 0 {
copy(newBuf, lt.buf[lt.start:lt.end])
}
lt.buf = newBuf
lt.start = 0
lt.end = lt.length
}
if err := lt.Encode(wire.Log{Stream: lt.Stream, Text: string(lt.buf)}); err != nil {
return err
copy(lt.buf[lt.end:], data)
lt.end += dataLength
lt.length += dataLength

for {
if lt.length == 0 {
break
}
pos := bytes.IndexByte(lt.buf[lt.start:lt.end], '\n')
if pos < 0 {
break
}

if pos > 0 {
err := lt.encode(wire.Log{Stream: lt.stream, Content: lt.buf[lt.start : lt.start+pos]})
if err != nil {
return 0, err
}
}

lt.start += pos + 1
lt.length -= pos + 1

if lt.start == lt.end {
lt.start = 0
lt.end = 0
}
}
lt.buf = make([]byte, 0, len(lt.buf))
return nil

return dataLength, nil
}

func (lt *logTransmitter) Sync() error {
return lt.Flush()
return nil
}

func zapTransmitter(ctx context.Context, encode wire.EncoderFunc) context.Context {
transmitter := &logTransmitter{
Stream: wire.StreamErr,
Encode: encode,
}

transmitter := newLogTransmitter(encode, wire.StreamErr)
transmitCore := zapcore.NewCore(zapcore.NewJSONEncoder(logger.EncoderConfig), transmitter, zap.NewAtomicLevelAt(zap.DebugLevel))

log := logger.Get(ctx)
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion scenarios/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (c Container) run(ctx context.Context, config RunAppsConfig, appDir string,
if err != nil {
return errors.WithStack(err)
}
if _, err := stream.WriteString(m.Text); err != nil {
if _, err := stream.Write(m.Content); err != nil {
return errors.WithStack(err)
}
// wire.Result means command finished
Expand Down
2 changes: 1 addition & 1 deletion scenarios/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (e Embedded) run(ctx context.Context, appDir string, appHosts map[string]ne
if err != nil {
return errors.WithStack(err)
}
if _, err := stream.WriteString(m.Text); err != nil {
if _, err := stream.Write(m.Content); err != nil {
return errors.WithStack(err)
}
// wire.Result means command finished
Expand Down
9 changes: 7 additions & 2 deletions wire/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"os"
"reflect"
"sync"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -122,8 +123,8 @@ type Log struct {
// Stream is the type of stream where log was produced
Stream Stream

// Text is text printed by command
Text string
// Content is text printed by command
Content []byte
}

type message struct {
Expand All @@ -136,13 +137,17 @@ type EncoderFunc func(content interface{}) error

// NewEncoder creates new message encoder.
func NewEncoder(w io.Writer) EncoderFunc {
var mu sync.Mutex
encoder := json.NewEncoder(w)
return func(content interface{}) error {
contentRaw, err := json.Marshal(content)
if err != nil {
return errors.WithStack(err)
}

mu.Lock()
defer mu.Unlock()

return errors.WithStack(encoder.Encode(message{
Type: ContentToType(content),
Payload: contentRaw,
Expand Down

0 comments on commit 8afb5bc

Please sign in to comment.