Skip to content

Commit

Permalink
Merge pull request #24 from PettitWesley/partial-message-flags
Browse files Browse the repository at this point in the history
fix: set partial message metadata flags in log events
  • Loading branch information
JoseVillalta authored Apr 21, 2022
2 parents 53d3f60 + d39b51c commit ce3bac8
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 104 deletions.
33 changes: 17 additions & 16 deletions logger/buffered_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/aws/shim-loggers-for-containerd/debug"

dockerlogger "github.com/docker/docker/daemon/logger"
types "github.com/docker/docker/api/types/backend"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -180,29 +182,28 @@ func (bl *bufferedLogger) Read(
func (bl *bufferedLogger) saveSingleLogMessageToRingBuffer(
line []byte,
source string,
isFirstPartial, isPartialMsg bool,
partialTimestamp time.Time,
) (error, time.Time, bool, bool) {
msgTimestamp, partialTimestamp, isFirstPartial, isPartialMsg := getLogTimestamp(
isFirstPartial,
isPartialMsg,
partialTimestamp,
bl.containerID,
)
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error {
if debug.Verbose {
debug.SendEventsToLog(bl.containerID,
fmt.Sprintf("[Pipe %s] Scanned message: %s", source, string(line)),
debug.DEBUG, 0)
}

message := newMessage(line, source, msgTimestamp)
if isPartialMsg {
message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
}
err := bl.buffer.Enqueue(message)
if err != nil {
err := errors.Wrap(err, "failed to save logs to buffer")
return err, partialTimestamp, isFirstPartial, isPartialMsg
return err
}

return nil, partialTimestamp, isFirstPartial, isPartialMsg
return nil
}

// sendLogMessagesToDestination consumes logs from ring buffer and use the
Expand Down Expand Up @@ -243,7 +244,7 @@ func (bl *bufferedLogger) sendLogMessageToDestination() error {
return errors.Wrap(err, "failed to read logs from buffer")
}

err = bl.Log(msg.Line, msg.Source, msg.Timestamp)
err = bl.Log(msg)
if err != nil {
return errors.Wrap(err, "failed to send logs to destination")
}
Expand All @@ -256,7 +257,7 @@ func (bl *bufferedLogger) sendLogMessageToDestination() error {
func (bl *bufferedLogger) flushMessages() error {
messages := bl.buffer.Flush()
for _, msg := range messages {
err := bl.Log(msg.Line, msg.Source, msg.Timestamp)
err := bl.Log(msg)
if err != nil {
return errors.Wrap(err, "unable to flush the remaining messages to destination")
}
Expand All @@ -266,13 +267,13 @@ func (bl *bufferedLogger) flushMessages() error {
}

// Log lets underlying log driver send logs to destination.
func (bl *bufferedLogger) Log(line []byte, source string, logTimestamp time.Time) error {
func (bl *bufferedLogger) Log(message *dockerlogger.Message) error {
if debug.Verbose {
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("[BUFFER] Sending message: %s", string(line)),
fmt.Sprintf("[BUFFER] Sending message: %s", string(message.Line)),
debug.DEBUG, 0)
}
return bl.l.Log(line, source, logTimestamp)
return bl.l.Log(message)
}

// GetPipes gets pipes of container and its name that exposed by containerd.
Expand Down
155 changes: 86 additions & 69 deletions logger/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package logger
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"math"
Expand All @@ -24,6 +26,8 @@ import (
"github.com/aws/shim-loggers-for-containerd/debug"

dockerlogger "github.com/docker/docker/daemon/logger"

types "github.com/docker/docker/api/types/backend"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -106,7 +110,7 @@ type LogDriver interface {
// GetPipes gets pipes of container that exposed by containerd.
GetPipes() (map[string]io.Reader, error)
// Log sends logs to destination.
Log([]byte, string, time.Time) error
Log(*dockerlogger.Message) error
// Read reads a single log message from container pipe and sends it to
// destination or saves it to ring buffer, depending on the mode of log
// driver.
Expand Down Expand Up @@ -224,9 +228,11 @@ func (l *Logger) sendLogs(
type sendLogToDestFunc func(
line []byte,
source string,
isFirstPartial, isPartialMsg bool,
partialTimestamp time.Time,
) (error, time.Time, bool, bool)
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error

// Read gets container logs, saves them to our own buffer. Then we will read logs line by line
// and send them to destination. In non-blocking mode, the destination is the ring buffer. More
Expand All @@ -239,7 +245,7 @@ func (l *Logger) Read(
sendLogMsgToDest sendLogToDestFunc,
) error {
var (
partialTimestamp time.Time
msgTimestamp time.Time
bytesInBuffer int
err error
eof bool
Expand All @@ -251,6 +257,13 @@ func (l *Logger) Read(
isFirstPartial := true
// isPartialMsg indicates if current message is a partial log message. Initialize to false.
isPartialMsg := false
// isLastPartial indicates if this message completes a partial message
isLastPartial := false
// partialID is a random ID given to each split message
partialID := ""
// partialOrdinal orders the split messages and count up from 1
partialOrdinal := 1

for {
select {
case <-ctx.Done():
Expand All @@ -274,13 +287,24 @@ func (l *Logger) Read(
// This function returns -1 if '\n' in not present in buffer.
lenOfLine := bytes.IndexByte(buf[head:bytesInBuffer], newline)
for lenOfLine >= 0 {
// If this is the end of a partial message
// use the existing timestamp, so that all
// partials split from the same message have the same timestamp
// If not, new timestamp.
if isPartialMsg {
isLastPartial = true
} else {
msgTimestamp = time.Now().UTC()
}
curLine := buf[head : head+lenOfLine]
err, partialTimestamp, _, _ = sendLogMsgToDest(
err = sendLogMsgToDest(
curLine,
source,
isFirstPartial,
isPartialMsg,
partialTimestamp,
isLastPartial,
partialID,
partialOrdinal,
msgTimestamp,
)
if err != nil {
return err
Expand All @@ -289,6 +313,9 @@ func (l *Logger) Read(
// Reset flags.
isFirstPartial = true
isPartialMsg = false
isLastPartial = false
partialID = ""
partialOrdinal = 1;

// Update the index of head of next line message.
head += lenOfLine + 1
Expand All @@ -303,12 +330,25 @@ func (l *Logger) Read(
// Still bytes left in the buffer after we identified all newline symbols.
if head < bytesInBuffer {
curLine := buf[head:bytesInBuffer]
err, partialTimestamp, isFirstPartial, isPartialMsg = sendLogMsgToDest(

// Record as a partial message.
isPartialMsg = true
if isFirstPartial {
msgTimestamp = time.Now().UTC()
partialID, err = generateRandomID()
}
if err != nil {
return err
}

err = sendLogMsgToDest(
curLine,
source,
isFirstPartial,
true, // Record as a partial message.
partialTimestamp,
isPartialMsg,
isLastPartial,
partialID,
partialOrdinal,
msgTimestamp,
)
if err != nil {
return err
Expand All @@ -317,6 +357,13 @@ func (l *Logger) Read(
// reset head and bytesInBuffer
head = 0
bytesInBuffer = 0
// increment partial flags
partialOrdinal += 1
if isFirstPartial {
// if this was the first partial message
// the next one is not the first if it is also partial
isFirstPartial = false
}
}
// If pipe is closed after we send all bytes left in buffer, then directly return.
if eof {
Expand All @@ -334,6 +381,19 @@ func (l *Logger) Read(
}
}

// generateRandomID is based on Docker
// GenerateRandomID: https://github.com/moby/moby/blob/bca8d9f2ce0d63e1490692917cde6273bc288bad/pkg/stringid/stringid.go#L40
// with the simplification that we don't need to worry about guaranteeing the string isn't all 0 - 9
// Consequently ^ we have our own function instead of importing from Docker.
func generateRandomID() (string, error) {
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", err
}
id := hex.EncodeToString(b)
return id, nil
}

// readFromContainerPipe reads bytes from container pipe, upto max read size in bytes of 2048.
func readFromContainerPipe(pipe io.Reader, buf []byte, bytesInBuffer, maxReadBytes int) (bool, int, error) {
// eof indicates if we have already met EOF error.
Expand Down Expand Up @@ -367,77 +427,34 @@ func bufferIsFull(buf []byte, head, bytesInBuffer int) bool {
func (l *Logger) sendLogMsgToDest(
line []byte,
source string,
isFirstPartial, isPartialMsg bool,
partialTimestamp time.Time,
) (error, time.Time, bool, bool) {
msgTimestamp, partialTimestamp, isFirstPartial, isPartialMsg := getLogTimestamp(
isFirstPartial,
isPartialMsg,
partialTimestamp,
l.Info.ContainerID,
)
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error {
if debug.Verbose {
debug.SendEventsToLog(l.Info.ContainerID,
fmt.Sprintf("[Pipe %s] Scanned message: %s", source, string(line)),
debug.DEBUG, 0)
}

err := l.Log(line, source, msgTimestamp)
if err != nil {
return err, partialTimestamp, isFirstPartial, isPartialMsg
}

return nil, partialTimestamp, isFirstPartial, isPartialMsg
}

// getLogTimestamp gets the timestamp of a log message. It could be current timestamp
// if it's a new line, or the recorded timestamp from the first partial if it's the other partial
// messages.
func getLogTimestamp(
isFirstPartial, isPartialMsg bool,
partialTimestamp time.Time,
containerID string,
) (time.Time, time.Time, bool, bool) {
msgTimestamp := time.Now().UTC()

// If it is not end with with newline for the first time, record it as a partial
// message and set it to be the first partial. Then record the timestamp as the
// timestamp of the whole log message.
if isFirstPartial {
partialTimestamp = msgTimestamp
if debug.Verbose {
debug.SendEventsToLog(containerID,
fmt.Sprintf("Saving first partial at time %s", partialTimestamp.String()),
debug.DEBUG, 0)
}
// Set isFirstPartial to false and set indicator of partial log message to be true.
isFirstPartial = false
isPartialMsg = true
} else if isPartialMsg {
// If there are more partial messages recorded before the current read, use the
// recorded timestamp as it of the current message as well.
msgTimestamp = partialTimestamp
if debug.Verbose {
debug.SendEventsToLog(containerID,
fmt.Sprintf("Setting partial log message to time %s", msgTimestamp.String()),
debug.DEBUG, 0)
}
message := newMessage(line, source, msgTimestamp)
if isPartialMsg {
message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
}

return msgTimestamp, partialTimestamp, isFirstPartial, isPartialMsg
}

// Log sends logs to destination.
func (l *Logger) Log(line []byte, source string, logTimestamp time.Time) error {
message := newMessage(line, source, logTimestamp)
err := l.Stream.Log(message)
err := l.Log(message)
if err != nil {
return errors.Wrapf(err, "failed to log msg for container %s", l.Info.ContainerName)
}

return nil
}

// Log sends logs to destination.
func (l *Logger) Log(message *dockerlogger.Message) error {
return l.Stream.Log(message)
}

// newMessage creates a new logger message.
func newMessage(line []byte, source string, logTimestamp time.Time) *dockerlogger.Message {
msg := dockerlogger.NewMessage()
Expand Down
Loading

0 comments on commit ce3bac8

Please sign in to comment.