Skip to content
This repository has been archived by the owner on Oct 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #112 from thaJeztah/18.09_backport_moby_37747
Browse files Browse the repository at this point in the history
[18.09 backport] awslogs: account for UTF-8 normalization in limits
Upstream-commit: 08a77f11a63b2d25c00ca0e35012194bdcfbe917
Component: engine
  • Loading branch information
andrewhsu authored Nov 27, 2018
2 parents bf3a100 + 6ad9059 commit 1e216e2
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 13 deletions.
64 changes: 51 additions & 13 deletions components/engine/daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"sync"
"time"
"unicode/utf8"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -46,6 +47,10 @@ const (
maximumLogEventsPerPut = 10000

// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
// Because the events are interpreted as UTF-8 encoded Unicode, invalid UTF-8 byte sequences are replaced with the
// Unicode replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To compensate for that and to avoid
// splitting valid UTF-8 characters into invalid byte sequences, we calculate the length of each event assuming that
// this replacement happens.
maximumBytesPerEvent = 262144 - perEventBytes

resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
Expand Down Expand Up @@ -495,15 +500,16 @@ func (l *logStream) collectBatch(created chan bool) {
}
line := msg.Line
if l.multilinePattern != nil {
if l.multilinePattern.Match(line) || len(eventBuffer)+len(line) > maximumBytesPerEvent {
lineEffectiveLen := effectiveLen(string(line))
if l.multilinePattern.Match(line) || effectiveLen(string(eventBuffer))+lineEffectiveLen > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
l.processEvent(batch, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
// Append new line if event is less than max event size
if len(line) < maximumBytesPerEvent {
// Append newline if event is less than max event size
if lineEffectiveLen < maximumBytesPerEvent {
line = append(line, "\n"...)
}
eventBuffer = append(eventBuffer, line...)
Expand All @@ -524,16 +530,17 @@ func (l *logStream) collectBatch(created chan bool) {
// batch (defined in maximumBytesPerPut). Log messages are split by the maximum
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
// byte overhead (defined in perEventBytes) which is accounted for in split- and
// batch-calculations.
func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int64) {
for len(events) > 0 {
// batch-calculations. Because the events are interpreted as UTF-8 encoded
// Unicode, invalid UTF-8 byte sequences are replaced with the Unicode
// replacement character (U+FFFD), which is a 3-byte sequence in UTF-8. To
// compensate for that and to avoid splitting valid UTF-8 characters into
// invalid byte sequences, we calculate the length of each event assuming that
// this replacement happens.
func (l *logStream) processEvent(batch *eventBatch, bytes []byte, timestamp int64) {
for len(bytes) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(events)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := events[:lineBytes]

splitOffset, lineBytes := findValidSplit(string(bytes), maximumBytesPerEvent)
line := bytes[:splitOffset]
event := wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Expand All @@ -544,14 +551,45 @@ func (l *logStream) processEvent(batch *eventBatch, events []byte, timestamp int

added := batch.add(event, lineBytes)
if added {
events = events[lineBytes:]
bytes = bytes[splitOffset:]
} else {
l.publishBatch(batch)
batch.reset()
}
}
}

// effectiveLen counts the effective number of bytes in the string, after
// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do
// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode
// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as
// utf8.RuneError)
func effectiveLen(line string) int {
effectiveBytes := 0
for _, rune := range line {
effectiveBytes += utf8.RuneLen(rune)
}
return effectiveBytes
}

// findValidSplit finds the byte offset to split a string without breaking valid
// Unicode codepoints given a maximum number of total bytes. findValidSplit
// returns the byte offset for splitting a string or []byte, as well as the
// effective number of bytes if the string were normalized to replace invalid
// UTF-8 encoded bytes with the Unicode replacement character (a 3-byte UTF-8
// sequence, represented in Go as utf8.RuneError)
func findValidSplit(line string, maxBytes int) (splitOffset, effectiveBytes int) {
for offset, rune := range line {
splitOffset = offset
if effectiveBytes+utf8.RuneLen(rune) > maxBytes {
return splitOffset, effectiveBytes
}
effectiveBytes += utf8.RuneLen(rune)
}
splitOffset = len(line)
return
}

// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
Expand Down
182 changes: 182 additions & 0 deletions components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,62 @@ func TestCollectBatchClose(t *testing.T) {
}
}

func TestEffectiveLen(t *testing.T) {
tests := []struct {
str string
effectiveBytes int
}{
{"Hello", 5},
{string([]byte{1, 2, 3, 4}), 4},
{"🙃", 4},
{string([]byte{0xFF, 0xFF, 0xFF, 0xFF}), 12},
{"He\xff\xffo", 9},
{"", 0},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
assert.Equal(t, tc.effectiveBytes, effectiveLen(tc.str))
})
}
}

func TestFindValidSplit(t *testing.T) {
tests := []struct {
str string
maxEffectiveBytes int
splitOffset int
effectiveBytes int
}{
{"", 10, 0, 0},
{"Hello", 6, 5, 5},
{"Hello", 2, 2, 2},
{"Hello", 0, 0, 0},
{"🙃", 3, 0, 0},
{"🙃", 4, 4, 4},
{string([]byte{'a', 0xFF}), 2, 1, 1},
{string([]byte{'a', 0xFF}), 4, 2, 4},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("%d/%s", i, tc.str), func(t *testing.T) {
splitOffset, effectiveBytes := findValidSplit(tc.str, tc.maxEffectiveBytes)
assert.Equal(t, tc.splitOffset, splitOffset, "splitOffset")
assert.Equal(t, tc.effectiveBytes, effectiveBytes, "effectiveBytes")
t.Log(tc.str[:tc.splitOffset])
t.Log(tc.str[tc.splitOffset:])
})
}
}

func TestProcessEventEmoji(t *testing.T) {
stream := &logStream{}
batch := &eventBatch{}
bytes := []byte(strings.Repeat("🙃", maximumBytesPerEvent/4+1))
stream.processEvent(batch, bytes, 0)
assert.Equal(t, 2, len(batch.batch), "should be two events in the batch")
assert.Equal(t, strings.Repeat("🙃", maximumBytesPerEvent/4), aws.StringValue(batch.batch[0].inputLogEvent.Message))
assert.Equal(t, "🙃", aws.StringValue(batch.batch[1].inputLogEvent.Message))
}

func TestCollectBatchLineSplit(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
Expand Down Expand Up @@ -987,6 +1043,55 @@ func TestCollectBatchLineSplit(t *testing.T) {
}
}

func TestCollectBatchLineSplitWithBinary(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}

d := make(chan bool)
close(d)
go stream.collectBatch(d)

longline := strings.Repeat("\xFF", maximumBytesPerEvent/3) // 0xFF is counted as the 3-byte utf8.RuneError
stream.Log(&logger.Message{
Line: []byte(longline + "\xFD"),
Timestamp: time.Time{},
})

// no ticks
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}
if len(argument.LogEvents) != 2 {
t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents))
}
if *argument.LogEvents[0].Message != longline {
t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message)
}
if *argument.LogEvents[1].Message != "\xFD" {
t.Errorf("Expected message to be %s but was %s", "\xFD", *argument.LogEvents[1].Message)
}
}

func TestCollectBatchMaxEvents(t *testing.T) {
mockClient := newMockClientBuffered(1)
stream := &logStream{
Expand Down Expand Up @@ -1125,6 +1230,83 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
}
}

func TestCollectBatchMaxTotalBytesWithBinary(t *testing.T) {
expectedPuts := 2
mockClient := newMockClientBuffered(expectedPuts)
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
for i := 0; i < expectedPuts; i++ {
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
}

var ticks = make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}

d := make(chan bool)
close(d)
go stream.collectBatch(d)

// maxline is the maximum line that could be submitted after
// accounting for its overhead.
maxline := strings.Repeat("\xFF", (maximumBytesPerPut-perEventBytes)/3) // 0xFF is counted as the 3-byte utf8.RuneError
// This will be split and batched up to the `maximumBytesPerPut'
// (+/- `maximumBytesPerEvent'). This /should/ be aligned, but
// should also tolerate an offset within that range.
stream.Log(&logger.Message{
Line: []byte(maxline),
Timestamp: time.Time{},
})
stream.Log(&logger.Message{
Line: []byte("B"),
Timestamp: time.Time{},
})

// no ticks, guarantee batch by size (and chan close)
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}

// Should total to the maximum allowed bytes.
eventBytes := 0
for _, event := range argument.LogEvents {
eventBytes += effectiveLen(*event.Message)
}
eventsOverhead := len(argument.LogEvents) * perEventBytes
payloadTotal := eventBytes + eventsOverhead
// lowestMaxBatch allows the payload to be offset if the messages
// don't lend themselves to align with the maximum event size.
lowestMaxBatch := maximumBytesPerPut - maximumBytesPerEvent

if payloadTotal > maximumBytesPerPut {
t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, payloadTotal)
}
if payloadTotal < lowestMaxBatch {
t.Errorf("Batch to be no less than %d but was %d", lowestMaxBatch, payloadTotal)
}

argument = <-mockClient.putLogEventsArgument
message := *argument.LogEvents[len(argument.LogEvents)-1].Message
if message[len(message)-1:] != "B" {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
}
}

func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
Expand Down

0 comments on commit 1e216e2

Please sign in to comment.