From 479c69d78683b91fbd2ecbe3afdbf84d5d4696f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 13:00:23 +0200 Subject: [PATCH 1/7] Change Processor.OnEmit to accept a record pointer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/log cpu: Intel(R) Core(TM) i9-10885H CPU @ 2.40GHz │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ Processor/Simple-16 536.0n ± 10% 751.2n ± 9% +40.14% (p=0.000 n=10) Processor/Batch-16 1.068µ ± 3% 1.296µ ± 5% +21.35% (p=0.000 n=10) Processor/ModifyTimestampSimple-16 557.8n ± 5% 827.2n ± 8% +48.31% (p=0.000 n=10) Processor/ModifyTimestampBatch-16 1.028µ ± 2% 1.318µ ± 6% +28.22% (p=0.000 n=10) Processor/ModifyAttributesSimple-16 615.2n ± 2% 887.1n ± 3% +44.19% (p=0.000 n=10) Processor/ModifyAttributesBatch-16 1.054µ ± 3% 1.406µ ± 6% +33.33% (p=0.000 n=10) geomean 772.7n 1.048µ +35.60% │ old.txt │ new.txt │ │ B/op │ B/op vs base │ Processor/Simple-16 416.0 ± 0% 834.0 ± 0% +100.48% (p=0.000 n=10) Processor/Batch-16 620.5 ± 2% 1049.5 ± 1% +69.14% (p=0.000 n=10) Processor/ModifyTimestampSimple-16 417.0 ± 0% 834.0 ± 0% +100.00% (p=0.000 n=10) Processor/ModifyTimestampBatch-16 617.5 ± 2% 1047.5 ± 1% +69.64% (p=0.000 n=10) Processor/ModifyAttributesSimple-16 465.0 ± 0% 882.0 ± 0% +89.68% (p=0.000 n=10) Processor/ModifyAttributesBatch-16 642.0 ± 2% 1085.0 ± 1% +69.00% (p=0.000 n=10) geomean 520.3 949.3 +82.44% │ old.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ Processor/Simple-16 1.000 ± 0% 2.000 ± 0% +100.00% (p=0.000 n=10) Processor/Batch-16 0.000 ± 0% 1.000 ± 0% ? (p=0.000 n=10) Processor/ModifyTimestampSimple-16 1.000 ± 0% 2.000 ± 0% +100.00% (p=0.000 n=10) Processor/ModifyTimestampBatch-16 0.000 ± 0% 1.000 ± 0% ? (p=0.000 n=10) Processor/ModifyAttributesSimple-16 2.000 ± 0% 3.000 ± 0% +50.00% (p=0.000 n=10) Processor/ModifyAttributesBatch-16 1.000 ± 0% 2.000 ± 0% +100.00% (p=0.000 n=10) geomean ¹ 1.698 ? ¹ summaries must be >0 to compute geomean --- sdk/log/batch.go | 4 ++-- sdk/log/batch_test.go | 30 +++++++++++++++--------------- sdk/log/bench_test.go | 6 ++---- sdk/log/exporter.go | 3 --- sdk/log/logger.go | 2 +- sdk/log/processor.go | 10 ++++------ sdk/log/provider_test.go | 4 ++-- sdk/log/simple.go | 4 ++-- sdk/log/simple_test.go | 10 +++++----- 9 files changed, 33 insertions(+), 40 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 8e43b0e8f75..bfdd36ea26f 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -176,11 +176,11 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } // OnEmit batches provided log record. -func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { +func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { if b.stopped.Load() || b.q == nil { return nil } - if n := b.q.Enqueue(r); n >= b.batchSize { + if n := b.q.Enqueue(*r); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 70b12ab04fa..a6f017dd165 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -45,9 +45,9 @@ func TestEmptyBatchConfig(t *testing.T) { assert.NotPanics(t, func() { var bp BatchProcessor ctx := context.Background() - var record Record + record := new(Record) assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit") - assert.False(t, bp.Enabled(ctx, record), "Enabled") + assert.False(t, bp.Enabled(ctx, *record), "Enabled") assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush") assert.NoError(t, bp.Shutdown(ctx), "Shutdown") }) @@ -198,7 +198,7 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, size) { - assert.NoError(t, b.OnEmit(ctx, r)) + assert.NoError(t, b.OnEmit(ctx, &r)) } var got []Record assert.Eventually(t, func() bool { @@ -221,7 +221,7 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, 10*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + assert.NoError(t, b.OnEmit(ctx, &r)) } assert.Eventually(t, func() bool { return e.ExportN() > 1 @@ -244,7 +244,7 @@ func TestBatchProcessor(t *testing.T) { WithExportTimeout(time.Hour), ) for _, r := range make([]Record, 2*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + assert.NoError(t, b.OnEmit(ctx, &r)) } var n int @@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) { var err error require.Eventually(t, func() bool { - err = b.OnEmit(ctx, Record{}) + err = b.OnEmit(ctx, new(Record)) return true }, time.Second, time.Microsecond, "OnEmit blocked") assert.NoError(t, err) @@ -303,7 +303,7 @@ func TestBatchProcessor(t *testing.T) { assert.NoError(t, b.Shutdown(ctx)) want := e.ExportN() - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) assert.Equal(t, want, e.ExportN(), "Export called after shutdown") }) @@ -311,7 +311,7 @@ func TestBatchProcessor(t *testing.T) { e := newTestExporter(nil) b := NewBatchProcessor(e) - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) assert.NoError(t, b.Shutdown(ctx)) assert.NoError(t, b.ForceFlush(ctx)) @@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) { ) t.Cleanup(func() { _ = b.Shutdown(ctx) }) - var r Record + r := new(Record) r.SetBody(log.BoolValue(true)) require.NoError(t, b.OnEmit(ctx, r)) @@ -353,7 +353,7 @@ func TestBatchProcessor(t *testing.T) { if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") { got := e.Records() if assert.Len(t, got[0], 1, "records received") { - assert.Equal(t, r, got[0][0]) + assert.Equal(t, *r, got[0][0]) } } }) @@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) { // Enqueue 10 x "batch size" amount of records. for i := 0; i < 10*batch; i++ { - require.NoError(t, b.OnEmit(ctx, Record{})) + require.NoError(t, b.OnEmit(ctx, new(Record))) } assert.Eventually(t, func() bool { return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input) @@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) { b := NewBatchProcessor(e) t.Cleanup(func() { _ = b.Shutdown(ctx) }) - var r Record + r := new(Record) r.SetBody(log.BoolValue(true)) _ = b.OnEmit(ctx, r) t.Cleanup(func() { _ = b.Shutdown(ctx) }) @@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Hour), WithExportTimeout(time.Hour), ) - var r Record + r := new(Record) // First record will be blocked by testExporter.Export assert.NoError(t, b.OnEmit(ctx, r), "exported record") require.Eventually(t, func() bool { @@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) { case <-ctx.Done(): return default: - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) // Ignore partial flush errors. _ = b.ForceFlush(ctx) } @@ -642,7 +642,7 @@ func TestQueue(t *testing.T) { } func BenchmarkBatchProcessorOnEmit(b *testing.B) { - var r Record + r := new(Record) body := log.BoolValue(true) r.SetBody(body) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index ff5d6fe2bfa..75b430b6d88 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -83,8 +83,7 @@ type timestampDecorator struct { Processor } -func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (e timestampDecorator) OnEmit(ctx context.Context, r *Record) error { r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC)) return e.Processor.OnEmit(ctx, r) } @@ -93,8 +92,7 @@ type attrDecorator struct { Processor } -func (e attrDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (e attrDecorator) OnEmit(ctx context.Context, r *Record) error { r.SetAttributes(log.String("replace", "me")) return e.Processor.OnEmit(ctx, r) } diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 1cdddc03e39..a4f6e31fbfe 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -31,9 +31,6 @@ type Exporter interface { // Handler. // // Implementations must not retain the records slice. - // - // Before modifying a Record, the implementation must use Record.Clone - // to create a copy that shares no state with the original. Export(ctx context.Context, records []Record) error // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 245867f3fd6..04c44ac5bb8 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -36,7 +36,7 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger { func (l *logger) Emit(ctx context.Context, r log.Record) { newRecord := l.newRecord(ctx, r) for _, p := range l.provider.processors { - if err := p.OnEmit(ctx, newRecord); err != nil { + if err := p.OnEmit(ctx, &newRecord); err != nil { otel.Handle(err) } } diff --git a/sdk/log/processor.go b/sdk/log/processor.go index f95ea949027..e9346316171 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -26,9 +26,10 @@ type Processor interface { // considered unrecoverable and will be reported to a configured error // Handler. // - // Before modifying a Record, the implementation must use Record.Clone - // to create a copy that shares no state with the original. - OnEmit(ctx context.Context, record Record) error + // The modification done to the record are visible + // to the subsequent registered processors. + // Implementations must not modify the record after OnEmit returns. + OnEmit(ctx context.Context, record *Record) error // Enabled returns whether the Processor will process for the given context // and record. // @@ -43,9 +44,6 @@ type Processor interface { // state. An implementation should default to returning true for an // indeterminate state, but may return false if valid reasons in particular // circumstances exist (e.g. performance, correctness). - // - // Before modifying a Record, the implementation must use Record.Clone - // to create a copy that shares no state with the original. Enabled(ctx context.Context, record Record) bool // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index bfa8afcda1d..1ce1d832fae 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -36,12 +36,12 @@ func newProcessor(name string) *processor { return &processor{Name: name, enabled: true} } -func (p *processor) OnEmit(ctx context.Context, r Record) error { +func (p *processor) OnEmit(ctx context.Context, r *Record) error { if p.Err != nil { return p.Err } - p.records = append(p.records, r) + p.records = append(p.records, *r) return nil } diff --git a/sdk/log/simple.go b/sdk/log/simple.go index c7aa14b8706..a015cadfacf 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -31,8 +31,8 @@ func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimplePr } // OnEmit batches provided log record. -func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error { - return s.exporter.Export(ctx, []Record{r}) +func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { + return s.exporter.Export(ctx, []Record{*r}) } // Enabled returns true. diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index 805130465b0..dbc91a90156 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -42,12 +42,12 @@ func TestSimpleProcessorOnEmit(t *testing.T) { e := new(exporter) s := log.NewSimpleProcessor(e) - var r log.Record + r := new(log.Record) r.SetSeverityText("test") _ = s.OnEmit(context.Background(), r) require.True(t, e.exportCalled, "exporter Export not called") - assert.Equal(t, []log.Record{r}, e.records) + assert.Equal(t, []log.Record{*r}, e.records) } func TestSimpleProcessorEnabled(t *testing.T) { @@ -75,7 +75,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { var wg sync.WaitGroup wg.Add(goRoutineN) - var r log.Record + r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() s := log.NewSimpleProcessor(nil) @@ -84,7 +84,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { defer wg.Done() _ = s.OnEmit(ctx, r) - _ = s.Enabled(ctx, r) + _ = s.Enabled(ctx, *r) _ = s.Shutdown(ctx) _ = s.ForceFlush(ctx) }() @@ -94,7 +94,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { } func BenchmarkSimpleProcessorOnEmit(b *testing.B) { - var r log.Record + r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() s := log.NewSimpleProcessor(nil) From 0dd4f812e2ff5be2e8c51849f01de367d310f8d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 14:14:26 +0200 Subject: [PATCH 2/7] Update processor.go --- sdk/log/processor.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/log/processor.go b/sdk/log/processor.go index e9346316171..3b3d08a2bec 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -26,8 +26,6 @@ type Processor interface { // considered unrecoverable and will be reported to a configured error // Handler. // - // The modification done to the record are visible - // to the subsequent registered processors. // Implementations must not modify the record after OnEmit returns. OnEmit(ctx context.Context, record *Record) error // Enabled returns whether the Processor will process for the given context From b24dfcc2779434afceca30bab3496e9a8f854c37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 14:18:44 +0200 Subject: [PATCH 3/7] Update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb1b86f31ed..b1fdedc3104 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `IsEmpty` method is added to the `Instrument` type in `go.opentelemetry.io/otel/sdk/metric`. This method is used to check if an `Instrument` instance is a zero-value. (#5431) +### Changed + +- `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5469) + ### Fixed - Log a warning to the OpenTelemetry internal logger when a `Record` in `go.opentelemetry.io/otel/sdk/log` drops an attribute due to a limit being reached. (#5376) From d4db6bc7b7403564b3759d7af6ecba4c126dbe97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 17:44:06 +0200 Subject: [PATCH 4/7] Update exporter.go --- sdk/log/exporter.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index a4f6e31fbfe..1cdddc03e39 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -31,6 +31,9 @@ type Exporter interface { // Handler. // // Implementations must not retain the records slice. + // + // Before modifying a Record, the implementation must use Record.Clone + // to create a copy that shares no state with the original. Export(ctx context.Context, records []Record) error // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. From 11e2351e00a23ffd2d0f690a2f5a3df68382decc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 17:59:17 +0200 Subject: [PATCH 5/7] Update processor.go --- sdk/log/processor.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/log/processor.go b/sdk/log/processor.go index 3b3d08a2bec..3260ad674f2 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -26,7 +26,10 @@ type Processor interface { // considered unrecoverable and will be reported to a configured error // Handler. // - // Implementations must not modify the record after OnEmit returns. + // Implementations may synchronously modify the record so that the changes + // are visible in the next registered processor. + // Implementations must not modify the record asynchronously as [Record] + // is not concurrent safe. OnEmit(ctx context.Context, record *Record) error // Enabled returns whether the Processor will process for the given context // and record. @@ -42,6 +45,9 @@ type Processor interface { // state. An implementation should default to returning true for an // indeterminate state, but may return false if valid reasons in particular // circumstances exist (e.g. performance, correctness). + // + // Before modifying a Record, the implementation must use Record.Clone + // to create a copy that shares no state with the original. Enabled(ctx context.Context, record Record) bool // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. From d72d990760a3ef0af0c2f8bc5df5f1d96599651d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 19:15:49 +0200 Subject: [PATCH 6/7] BatchProcessor clones the record --- sdk/log/batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index bfdd36ea26f..41b6065a09e 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -180,7 +180,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { if b.stopped.Load() || b.q == nil { return nil } - if n := b.q.Enqueue(*r); n >= b.batchSize { + if n := b.q.Enqueue(r.Clone()); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: From ac568ef6d00f8f595c0b8c987abb052aa4086bfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 3 Jun 2024 19:40:39 +0200 Subject: [PATCH 7/7] Update processor.go --- sdk/log/processor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/log/processor.go b/sdk/log/processor.go index 3260ad674f2..d2c99d90cc7 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -30,6 +30,7 @@ type Processor interface { // are visible in the next registered processor. // Implementations must not modify the record asynchronously as [Record] // is not concurrent safe. + // Implementations must not retain the record. OnEmit(ctx context.Context, record *Record) error // Enabled returns whether the Processor will process for the given context // and record. @@ -48,6 +49,7 @@ type Processor interface { // // Before modifying a Record, the implementation must use Record.Clone // to create a copy that shares no state with the original. + // Implementations must not retain the record. Enabled(ctx context.Context, record Record) bool // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call.