Skip to content

Commit

Permalink
Batch Processor: Log Support (#1723)
Browse files Browse the repository at this point in the history
This mimics the same logic as metrics for logs in the batchprocessor.
  • Loading branch information
benkeith-splunk authored Sep 2, 2020
1 parent f3b5b45 commit 889948e
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 6 deletions.
10 changes: 10 additions & 0 deletions consumer/pdata/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ func (ld Logs) LogRecordCount() int {
return logCount
}

// SizeBytes returns the number of bytes in the internal representation of the
// logs.
func (ld Logs) SizeBytes() int {
size := 0
for i := range *ld.orig {
size += (*ld.orig)[i].Size()
}
return size
}

func (ld Logs) ResourceLogs() ResourceLogsSlice {
return ResourceLogsSlice(ld)
}
Expand Down
97 changes: 97 additions & 0 deletions go.sum

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions internal/data/testdata/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,19 @@ func generateOtlpLogThree() *otlplogs.LogRecord {
Body: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "something else happened"}},
}
}

func GenerateLogDataManyLogsSameResource(count int) pdata.Logs {
ld := GenerateLogDataOneEmptyLogs()
rs0 := ld.ResourceLogs().At(0)
rs0.InstrumentationLibraryLogs().Resize(1)
rs0.InstrumentationLibraryLogs().At(0).Logs().Resize(count)
for i := 0; i < count; i++ {
l := rs0.InstrumentationLibraryLogs().At(0).Logs().At(i)
if i%2 == 0 {
fillLogOne(l)
} else {
fillLogTwo(l)
}
}
return ld
}
10 changes: 5 additions & 5 deletions processor/batchprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Batch Processor

Supported pipeline types: metric, traces
Supported pipeline types: metric, traces, logs

The batch processor accepts spans or metrics and places them into batches.
Batching helps better compress the data and reduce the number of outgoing
connections required to transmit the data. This processor supports both size and
time based batching.
The batch processor accepts spans, metrics, or logs and places them into
batches. Batching helps better compress the data and reduce the number of
outgoing connections required to transmit the data. This processor supports
both size and time based batching.

It is highly recommended to configure the batch processor on every collector.
The batch processor should be defined in the pipeline after the `memory_limiter`
Expand Down
53 changes: 53 additions & 0 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type batch interface {

var _ consumer.TraceConsumer = (*batchProcessor)(nil)
var _ consumer.MetricsConsumer = (*batchProcessor)(nil)
var _ consumer.LogsConsumer = (*batchProcessor)(nil)

func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batch batch, telemetryLevel telemetry.Level) *batchProcessor {
return &batchProcessor{
Expand Down Expand Up @@ -182,6 +183,12 @@ func (bp *batchProcessor) ConsumeMetrics(_ context.Context, md pdata.Metrics) er
return nil
}

// ConsumeLogs implements LogsProcessor
func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
bp.newItem <- ld
return nil
}

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
func newBatchTracesProcessor(params component.ProcessorCreateParams, trace consumer.TraceConsumer, cfg *Config, telemetryLevel telemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchTraces(trace), telemetryLevel)
Expand All @@ -192,6 +199,11 @@ func newBatchMetricsProcessor(params component.ProcessorCreateParams, metrics co
return newBatchProcessor(params, cfg, newBatchMetrics(metrics), telemetryLevel)
}

// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout
func newBatchLogsProcessor(params component.ProcessorCreateParams, logs consumer.LogsConsumer, cfg *Config, telemetryLevel telemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchLogs(logs), telemetryLevel)
}

type batchTraces struct {
nextConsumer consumer.TraceConsumer
traceData pdata.Traces
Expand Down Expand Up @@ -274,3 +286,44 @@ func (bm *batchMetrics) add(item interface{}) {
bm.metricCount += uint32(newMetricsCount)
md.ResourceMetrics().MoveAndAppendTo(bm.metricData.ResourceMetrics())
}

type batchLogs struct {
nextConsumer consumer.LogsConsumer
logData pdata.Logs
logCount uint32
}

func newBatchLogs(nextConsumer consumer.LogsConsumer) *batchLogs {
b := &batchLogs{nextConsumer: nextConsumer}
b.reset()
return b
}

func (bm *batchLogs) export(ctx context.Context) error {
return bm.nextConsumer.ConsumeLogs(ctx, bm.logData)
}

func (bm *batchLogs) itemCount() uint32 {
return bm.logCount
}

func (bm *batchLogs) size() int {
return bm.logData.SizeBytes()
}

// resets the current batchLogs structure with zero/empty values.
func (bm *batchLogs) reset() {
bm.logData = pdata.NewLogs()
bm.logCount = 0
}

func (bm *batchLogs) add(item interface{}) {
ld := item.(pdata.Logs)

newLogsCount := ld.LogRecordCount()
if newLogsCount == 0 {
return
}
bm.logCount += uint32(newLogsCount)
ld.ResourceLogs().MoveAndAppendTo(bm.logData.ResourceLogs())
}
211 changes: 211 additions & 0 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,3 +513,214 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
td.SpanCount()
}
}

func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
}

requestCount := 100
logsPerRequest := 5
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

logDataSlice := make([]pdata.Logs, 0, requestCount)

for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogDataManyLogsSameResource(logsPerRequest)
logs := ld.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
logs.At(logIndex).SetName(getTestLogName(requestNum, logIndex))
}
logDataSlice = append(logDataSlice, ld.Clone())
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}

// Added to test case with empty resources sent.
ld := testdata.GenerateLogDataEmpty()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))

require.NoError(t, batcher.Shutdown(context.Background()))

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordsCount())
receivedMds := sink.AllLogs()
logsReceivedByName := logsReceivedByName(receivedMds)
for requestNum := 0; requestNum < requestCount; requestNum++ {
logs := logDataSlice[requestNum].ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs()
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
require.EqualValues(t,
logs.At(logIndex),
logsReceivedByName[getTestLogName(requestNum, logIndex)])
}
}
}

func TestBatchLogProcessor_BatchSize(t *testing.T) {
views := MetricViews(telemetry.Detailed)
view.Register(views...)
defer view.Unregister(views...)

// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 50,
}

requestCount := 100
logsPerRequest := 5
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
size := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogDataManyLogsSameResource(logsPerRequest)
size += ld.SizeBytes()
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}
require.NoError(t, batcher.Shutdown(context.Background()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

expectedBatchesNum := requestCount * logsPerRequest / int(cfg.SendBatchSize)
expectedBatchingFactor := int(cfg.SendBatchSize) / logsPerRequest

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordsCount())
receivedMds := sink.AllLogs()
require.Equal(t, expectedBatchesNum, len(receivedMds))
for _, ld := range receivedMds {
require.Equal(t, expectedBatchingFactor, ld.ResourceLogs().Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, logsPerRequest, ld.ResourceLogs().At(i).InstrumentationLibraryLogs().At(0).Logs().Len())
}
}

viewData, err := view.RetrieveData(statBatchSendSize.Name())
require.NoError(t, err)
assert.Equal(t, 1, len(viewData))
distData := viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, sink.LogRecordsCount(), int(distData.Sum()))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Min))
assert.Equal(t, cfg.SendBatchSize, uint32(distData.Max))

viewData, err = view.RetrieveData(statBatchSendSizeBytes.Name())
require.NoError(t, err)
assert.Equal(t, 1, len(viewData))
distData = viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, size, int(distData.Sum()))
}

func TestBatchLogsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 100,
}
requestCount := 5
logsPerRequest := 10
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogDataManyLogsSameResource(logsPerRequest)
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}

// Wait for at least one batch to be sent.
for {
if sink.LogRecordsCount() != 0 {
break
}
<-time.After(cfg.Timeout)
}

elapsed := time.Since(start)
require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds())

// This should not change the results in the sink, verified by the expectedBatchesNum
require.NoError(t, batcher.Shutdown(context.Background()))

expectedBatchesNum := 1
expectedBatchingFactor := 5

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordsCount())
receivedMds := sink.AllLogs()
require.Equal(t, expectedBatchesNum, len(receivedMds))
for _, ld := range receivedMds {
require.Equal(t, expectedBatchingFactor, ld.ResourceLogs().Len())
for i := 0; i < expectedBatchingFactor; i++ {
require.Equal(t, logsPerRequest, ld.ResourceLogs().At(i).InstrumentationLibraryLogs().At(0).Logs().Len())
}
}
}

func TestBatchLogProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
}
requestCount := 5
logsPerRequest := 10
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogDataManyLogsSameResource(logsPerRequest)
assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld))
}

require.NoError(t, batcher.Shutdown(context.Background()))

require.Equal(t, requestCount*logsPerRequest, sink.LogRecordsCount())
require.Equal(t, 1, len(sink.AllLogs()))
}

func getTestLogName(requestNum, index int) string {
return fmt.Sprintf("test-log-int-%d-%d", requestNum, index)
}

func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord {
logsReceivedByName := map[string]pdata.LogRecord{}
for i := range lds {
ld := lds[i]
rms := ld.ResourceLogs()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
if rm.IsNil() {
continue
}
ilms := rm.InstrumentationLibraryLogs()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
if ilm.IsNil() {
continue
}
logs := ilm.Logs()
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)
logsReceivedByName[log.Name()] = log
}
}
}
}
return logsReceivedByName
}
14 changes: 13 additions & 1 deletion processor/batchprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func NewFactory() component.ProcessorFactory {
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
processorhelper.WithMetrics(createMetricsProcessor))
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
}

func createDefaultConfig() configmodels.Processor {
Expand Down Expand Up @@ -75,3 +76,14 @@ func createMetricsProcessor(
level, _ := telemetry.GetLevel()
return newBatchMetricsProcessor(params, nextConsumer, oCfg, level), nil
}

func createLogsProcessor(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.LogsConsumer,
) (component.LogsProcessor, error) {
oCfg := cfg.(*Config)
level, _ := telemetry.GetLevel()
return newBatchLogsProcessor(params, nextConsumer, oCfg, level), nil
}
4 changes: 4 additions & 0 deletions processor/batchprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ func TestCreateProcessor(t *testing.T) {
mp, err := factory.CreateMetricsProcessor(context.Background(), creationParams, nil, cfg)
assert.NotNil(t, mp)
assert.NoError(t, err, "cannot create metric processor")

lp, err := factory.CreateLogsProcessor(context.Background(), creationParams, cfg, nil)
assert.NotNil(t, lp)
assert.NoError(t, err, "cannot create logs processor")
}

0 comments on commit 889948e

Please sign in to comment.