Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set partial message metadata flags in log events #24

Merged
merged 2 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions logger/buffered_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/aws/shim-loggers-for-containerd/debug"

dockerlogger "github.com/docker/docker/daemon/logger"
types "github.com/docker/docker/api/types/backend"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -180,29 +182,28 @@ func (bl *bufferedLogger) Read(
func (bl *bufferedLogger) saveSingleLogMessageToRingBuffer(
line []byte,
source string,
isFirstPartial, isPartialMsg bool,
partialTimestamp time.Time,
) (error, time.Time, bool, bool) {
msgTimestamp, partialTimestamp, isFirstPartial, isPartialMsg := getLogTimestamp(
isFirstPartial,
isPartialMsg,
partialTimestamp,
bl.containerID,
)
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error {
if debug.Verbose {
debug.SendEventsToLog(bl.containerID,
fmt.Sprintf("[Pipe %s] Scanned message: %s", source, string(line)),
debug.DEBUG, 0)
}

message := newMessage(line, source, msgTimestamp)
if isPartialMsg {
message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
}
err := bl.buffer.Enqueue(message)
if err != nil {
err := errors.Wrap(err, "failed to save logs to buffer")
return err, partialTimestamp, isFirstPartial, isPartialMsg
return err
}

return nil, partialTimestamp, isFirstPartial, isPartialMsg
return nil
}

// sendLogMessagesToDestination consumes logs from ring buffer and use the
Expand Down Expand Up @@ -243,7 +244,7 @@ func (bl *bufferedLogger) sendLogMessageToDestination() error {
return errors.Wrap(err, "failed to read logs from buffer")
}

err = bl.Log(msg.Line, msg.Source, msg.Timestamp)
err = bl.Log(msg)
if err != nil {
return errors.Wrap(err, "failed to send logs to destination")
}
Expand All @@ -256,7 +257,7 @@ func (bl *bufferedLogger) sendLogMessageToDestination() error {
func (bl *bufferedLogger) flushMessages() error {
messages := bl.buffer.Flush()
for _, msg := range messages {
err := bl.Log(msg.Line, msg.Source, msg.Timestamp)
err := bl.Log(msg)
if err != nil {
return errors.Wrap(err, "unable to flush the remaining messages to destination")
}
Expand All @@ -266,13 +267,13 @@ func (bl *bufferedLogger) flushMessages() error {
}

// Log lets underlying log driver send logs to destination.
func (bl *bufferedLogger) Log(line []byte, source string, logTimestamp time.Time) error {
func (bl *bufferedLogger) Log(message *dockerlogger.Message) error {
if debug.Verbose {
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("[BUFFER] Sending message: %s", string(line)),
fmt.Sprintf("[BUFFER] Sending message: %s", string(message.Line)),
debug.DEBUG, 0)
}
return bl.l.Log(line, source, logTimestamp)
return bl.l.Log(message)
}

// GetPipes gets pipes of container and its name that exposed by containerd.
Expand Down
155 changes: 86 additions & 69 deletions logger/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package logger
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"math"
Expand All @@ -24,6 +26,8 @@ import (
"github.com/aws/shim-loggers-for-containerd/debug"

dockerlogger "github.com/docker/docker/daemon/logger"

types "github.com/docker/docker/api/types/backend"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -106,7 +110,7 @@ type LogDriver interface {
// GetPipes gets pipes of container that exposed by containerd.
GetPipes() (map[string]io.Reader, error)
// Log sends logs to destination.
Log([]byte, string, time.Time) error
Log(*dockerlogger.Message) error
// Read reads a single log message from container pipe and sends it to
// destination or saves it to ring buffer, depending on the mode of log
// driver.
Expand Down Expand Up @@ -224,9 +228,11 @@ func (l *Logger) sendLogs(
type sendLogToDestFunc func(
line []byte,
source string,
isFirstPartial, isPartialMsg bool,
partialTimestamp time.Time,
) (error, time.Time, bool, bool)
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error

// Read gets container logs, saves them to our own buffer. Then we will read logs line by line
// and send them to destination. In non-blocking mode, the destination is the ring buffer. More
Expand All @@ -239,7 +245,7 @@ func (l *Logger) Read(
sendLogMsgToDest sendLogToDestFunc,
) error {
var (
partialTimestamp time.Time
msgTimestamp time.Time
bytesInBuffer int
err error
eof bool
Expand All @@ -251,6 +257,13 @@ func (l *Logger) Read(
isFirstPartial := true
// isPartialMsg indicates if current message is a partial log message. Initialize to false.
isPartialMsg := false
// isLastPartial indicates if this message completes a partial message
isLastPartial := false
// partialID is a random ID given to each split message
partialID := ""
// partialOrdinal orders the split messages and count up from 1
partialOrdinal := 1

for {
select {
case <-ctx.Done():
Expand All @@ -274,13 +287,24 @@ func (l *Logger) Read(
// This function returns -1 if '\n' in not present in buffer.
lenOfLine := bytes.IndexByte(buf[head:bytesInBuffer], newline)
for lenOfLine >= 0 {
// If this is the end of a partial message
// use the existing timestamp, so that all
// partials split from the same message have the same timestamp
// If not, new timestamp.
if isPartialMsg {
isLastPartial = true
} else {
msgTimestamp = time.Now().UTC()
}
PettitWesley marked this conversation as resolved.
Show resolved Hide resolved
curLine := buf[head : head+lenOfLine]
err, partialTimestamp, _, _ = sendLogMsgToDest(
err = sendLogMsgToDest(
curLine,
source,
isFirstPartial,
isPartialMsg,
partialTimestamp,
isLastPartial,
partialID,
partialOrdinal,
msgTimestamp,
)
if err != nil {
return err
Expand All @@ -289,6 +313,9 @@ func (l *Logger) Read(
// Reset flags.
isFirstPartial = true
isPartialMsg = false
isLastPartial = false
partialID = ""
partialOrdinal = 1;

// Update the index of head of next line message.
head += lenOfLine + 1
Expand All @@ -303,12 +330,25 @@ func (l *Logger) Read(
// Still bytes left in the buffer after we identified all newline symbols.
if head < bytesInBuffer {
curLine := buf[head:bytesInBuffer]
err, partialTimestamp, isFirstPartial, isPartialMsg = sendLogMsgToDest(

// Record as a partial message.
isPartialMsg = true
if isFirstPartial {
msgTimestamp = time.Now().UTC()
partialID, err = generateRandomID()
}
if err != nil {
return err
}

err = sendLogMsgToDest(
curLine,
source,
isFirstPartial,
true, // Record as a partial message.
partialTimestamp,
isPartialMsg,
isLastPartial,
partialID,
partialOrdinal,
msgTimestamp,
)
if err != nil {
return err
Expand All @@ -317,6 +357,13 @@ func (l *Logger) Read(
// reset head and bytesInBuffer
head = 0
bytesInBuffer = 0
// increment partial flags
partialOrdinal += 1
if isFirstPartial {
// if this was the first partial message
// the next one is not the first if it is also partial
isFirstPartial = false
}
}
// If pipe is closed after we send all bytes left in buffer, then directly return.
if eof {
Expand All @@ -334,6 +381,19 @@ func (l *Logger) Read(
}
}

// generateRandomID is based on Docker
// GenerateRandomID: https://github.com/moby/moby/blob/bca8d9f2ce0d63e1490692917cde6273bc288bad/pkg/stringid/stringid.go#L40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can we not just use that method? Why are we replicating it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is explained in my comment below this line... I can make it longer and more detailed to make it clearer. The Docker generate function does the rand generation in a loop, because it can't allow the ID to container only 0 - 9 (digits) because then it can't be used a container ID. We don't have that concern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason to not import from docker is properly handling that error return in rand.Read

// with the simplification that we don't need to worry about guaranteeing the string isn't all 0 - 9
// Consequently ^ we have our own function instead of importing from Docker.
func generateRandomID() (string, error) {
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", err
}
id := hex.EncodeToString(b)
return id, nil
}

// readFromContainerPipe reads bytes from container pipe, upto max read size in bytes of 2048.
func readFromContainerPipe(pipe io.Reader, buf []byte, bytesInBuffer, maxReadBytes int) (bool, int, error) {
// eof indicates if we have already met EOF error.
Expand Down Expand Up @@ -367,77 +427,34 @@ func bufferIsFull(buf []byte, head, bytesInBuffer int) bool {
func (l *Logger) sendLogMsgToDest(
line []byte,
source string,
isFirstPartial, isPartialMsg bool,
partialTimestamp time.Time,
) (error, time.Time, bool, bool) {
msgTimestamp, partialTimestamp, isFirstPartial, isPartialMsg := getLogTimestamp(
isFirstPartial,
isPartialMsg,
partialTimestamp,
l.Info.ContainerID,
)
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error {
if debug.Verbose {
debug.SendEventsToLog(l.Info.ContainerID,
fmt.Sprintf("[Pipe %s] Scanned message: %s", source, string(line)),
debug.DEBUG, 0)
}

err := l.Log(line, source, msgTimestamp)
if err != nil {
return err, partialTimestamp, isFirstPartial, isPartialMsg
}

return nil, partialTimestamp, isFirstPartial, isPartialMsg
}

// getLogTimestamp gets the timestamp of a log message. It could be current timestamp
// if it's a new line, or the recorded timestamp from the first partial if it's the other partial
// messages.
func getLogTimestamp(
isFirstPartial, isPartialMsg bool,
partialTimestamp time.Time,
containerID string,
) (time.Time, time.Time, bool, bool) {
msgTimestamp := time.Now().UTC()

// If it is not end with with newline for the first time, record it as a partial
// message and set it to be the first partial. Then record the timestamp as the
// timestamp of the whole log message.
if isFirstPartial {
partialTimestamp = msgTimestamp
if debug.Verbose {
debug.SendEventsToLog(containerID,
fmt.Sprintf("Saving first partial at time %s", partialTimestamp.String()),
debug.DEBUG, 0)
}
// Set isFirstPartial to false and set indicator of partial log message to be true.
isFirstPartial = false
isPartialMsg = true
} else if isPartialMsg {
// If there are more partial messages recorded before the current read, use the
// recorded timestamp as it of the current message as well.
msgTimestamp = partialTimestamp
if debug.Verbose {
debug.SendEventsToLog(containerID,
fmt.Sprintf("Setting partial log message to time %s", msgTimestamp.String()),
debug.DEBUG, 0)
}
message := newMessage(line, source, msgTimestamp)
if isPartialMsg {
message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
}

return msgTimestamp, partialTimestamp, isFirstPartial, isPartialMsg
}

// Log sends logs to destination.
func (l *Logger) Log(line []byte, source string, logTimestamp time.Time) error {
message := newMessage(line, source, logTimestamp)
err := l.Stream.Log(message)
err := l.Log(message)
if err != nil {
return errors.Wrapf(err, "failed to log msg for container %s", l.Info.ContainerName)
}

return nil
}

// Log sends logs to destination.
func (l *Logger) Log(message *dockerlogger.Message) error {
return l.Stream.Log(message)
}

// newMessage creates a new logger message.
func newMessage(line []byte, source string, logTimestamp time.Time) *dockerlogger.Message {
msg := dockerlogger.NewMessage()
Expand Down
Loading