Skip to content

Commit

Permalink
Support max batch size for logs (#2736)
Browse files Browse the repository at this point in the history
  • Loading branch information
gregoryfranklin authored Mar 24, 2021
1 parent 9c14430 commit 49ddca9
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 1 deletion.
1 change: 0 additions & 1 deletion processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ regardless of size.
- `send_batch_max_size` (default = 0): The maximum number of items in a batch.
This property ensures that larger batches are split into smaller units.
By default (`0`), there is no upper limit of the batch size.
It is currently supported only for the trace and metric pipelines.

Examples:

Expand Down
10 changes: 10 additions & 0 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ func (bp *batchProcessor) processItem(item interface{}) {
}()
}
}
if td, ok := item.(pdata.Logs); ok {
itemCount := bp.batch.itemCount()
if itemCount+uint32(td.LogRecordCount()) > bp.sendBatchMaxSize {
tdRemainSize := splitLogs(int(bp.sendBatchSize-itemCount), td)
item = tdRemainSize
go func() {
bp.newItem <- td
}()
}
}
}

bp.batch.add(item)
Expand Down
69 changes: 69 additions & 0 deletions processor/batchprocessor/splitlogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchprocessor

import (
"go.opentelemetry.io/collector/consumer/pdata"
)

// splitLogs removes logrecords from the input data and returns a new data of the specified size.
func splitLogs(size int, toSplit pdata.Logs) pdata.Logs {
if toSplit.LogRecordCount() <= size {
return toSplit
}
copiedLogs := 0
result := pdata.NewLogs()
rls := toSplit.ResourceLogs()
result.ResourceLogs().Resize(rls.Len())
rlsCount := 0
for i := rls.Len() - 1; i >= 0; i-- {
rlsCount++
rl := rls.At(i)
destRl := result.ResourceLogs().At(result.ResourceLogs().Len() - 1 - i)
rl.Resource().CopyTo(destRl.Resource())

for j := rl.InstrumentationLibraryLogs().Len() - 1; j >= 0; j-- {
instLogs := rl.InstrumentationLibraryLogs().At(j)
destInstLogs := pdata.NewInstrumentationLibraryLogs()
destRl.InstrumentationLibraryLogs().Append(destInstLogs)
instLogs.InstrumentationLibrary().CopyTo(destInstLogs.InstrumentationLibrary())

if size-copiedLogs >= instLogs.Logs().Len() {
destInstLogs.Logs().Resize(instLogs.Logs().Len())
} else {
destInstLogs.Logs().Resize(size - copiedLogs)
}
for k, destIdx := instLogs.Logs().Len()-1, 0; k >= 0 && copiedLogs < size; k, destIdx = k-1, destIdx+1 {
log := instLogs.Logs().At(k)
log.CopyTo(destInstLogs.Logs().At(destIdx))
copiedLogs++
// remove log
instLogs.Logs().Resize(instLogs.Logs().Len() - 1)
}
if instLogs.Logs().Len() == 0 {
rl.InstrumentationLibraryLogs().Resize(rl.InstrumentationLibraryLogs().Len() - 1)
}
if copiedLogs == size {
result.ResourceLogs().Resize(rlsCount)
return result
}
}
if rl.InstrumentationLibraryLogs().Len() == 0 {
rls.Resize(rls.Len() - 1)
}
}
result.ResourceLogs().Resize(rlsCount)
return result
}
113 changes: 113 additions & 0 deletions processor/batchprocessor/splitlogs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package batchprocessor

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
)

func TestSplitLogs_noop(t *testing.T) {
td := testdata.GenerateLogDataManyLogsSameResource(20)
splitSize := 40
split := splitLogs(splitSize, td)
assert.Equal(t, td, split)

td.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().Resize(5)
assert.EqualValues(t, td, split)
}

func TestSplitLogs(t *testing.T) {
td := testdata.GenerateLogDataManyLogsSameResource(20)
logs := td.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
for i := 0; i < logs.Len(); i++ {
logs.At(i).SetName(getTestLogName(0, i))
}
cp := pdata.NewLogs()
cp.ResourceLogs().Resize(1)
cp.ResourceLogs().At(0).InstrumentationLibraryLogs().Resize(1)
cp.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().Resize(5)
cpLogs := cp.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
td.ResourceLogs().At(0).Resource().CopyTo(
cp.ResourceLogs().At(0).Resource())
td.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).InstrumentationLibrary().CopyTo(
cp.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).InstrumentationLibrary())
logs.At(19).CopyTo(cpLogs.At(0))
logs.At(18).CopyTo(cpLogs.At(1))
logs.At(17).CopyTo(cpLogs.At(2))
logs.At(16).CopyTo(cpLogs.At(3))
logs.At(15).CopyTo(cpLogs.At(4))

splitSize := 5
split := splitLogs(splitSize, td)
assert.Equal(t, splitSize, split.LogRecordCount())
assert.Equal(t, cp, split)
assert.Equal(t, 15, td.LogRecordCount())
assert.Equal(t, "test-log-int-0-19", split.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Name())
assert.Equal(t, "test-log-int-0-15", split.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(4).Name())
}

func TestSplitLogsMultipleResourceLogs(t *testing.T) {
td := testdata.GenerateLogDataManyLogsSameResource(20)
logs := td.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
for i := 0; i < logs.Len(); i++ {
logs.At(i).SetName(getTestLogName(0, i))
}
td.ResourceLogs().Resize(2)
// add second index to resource logs
testdata.GenerateLogDataManyLogsSameResource(20).
ResourceLogs().At(0).CopyTo(td.ResourceLogs().At(1))
logs = td.ResourceLogs().At(1).InstrumentationLibraryLogs().At(0).Logs()
for i := 0; i < logs.Len(); i++ {
logs.At(i).SetName(getTestLogName(1, i))
}

splitSize := 5
split := splitLogs(splitSize, td)
assert.Equal(t, splitSize, split.LogRecordCount())
assert.Equal(t, 35, td.LogRecordCount())
assert.Equal(t, "test-log-int-1-19", split.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Name())
assert.Equal(t, "test-log-int-1-15", split.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(4).Name())
}

func TestSplitLogsMultipleResourceLogs_split_size_greater_than_log_size(t *testing.T) {
td := testdata.GenerateLogDataManyLogsSameResource(20)
logs := td.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
for i := 0; i < logs.Len(); i++ {
logs.At(i).SetName(getTestLogName(0, i))
}
td.ResourceLogs().Resize(2)
// add second index to resource logs
testdata.GenerateLogDataManyLogsSameResource(20).
ResourceLogs().At(0).CopyTo(td.ResourceLogs().At(1))
logs = td.ResourceLogs().At(1).InstrumentationLibraryLogs().At(0).Logs()
for i := 0; i < logs.Len(); i++ {
logs.At(i).SetName(getTestLogName(1, i))
}

splitSize := 25
split := splitLogs(splitSize, td)
assert.Equal(t, splitSize, split.LogRecordCount())
assert.Equal(t, 40-splitSize, td.LogRecordCount())
assert.Equal(t, 1, td.ResourceLogs().Len())
assert.Equal(t, "test-log-int-1-19", split.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Name())
assert.Equal(t, "test-log-int-1-0", split.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(19).Name())
assert.Equal(t, "test-log-int-0-19", split.ResourceLogs().At(1).InstrumentationLibraryLogs().At(0).Logs().At(0).Name())
assert.Equal(t, "test-log-int-0-15", split.ResourceLogs().At(1).InstrumentationLibraryLogs().At(0).Logs().At(4).Name())
}

0 comments on commit 49ddca9

Please sign in to comment.