From 667606ce72467cf77988d284225ec0987654e40e Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 29 Nov 2024 21:53:58 +0530 Subject: [PATCH 1/5] chore: add filelog cases for memory limiter --- testbed/datasenders/stanza.go | 2 ++ testbed/tests/log_test.go | 68 +++++++++++++++++++++++------------ testbed/tests/scenarios.go | 30 ++++++++++++++-- 3 files changed, 74 insertions(+), 26 deletions(-) diff --git a/testbed/datasenders/stanza.go b/testbed/datasenders/stanza.go index 1dcfa60fe3bc..c5d30c26a6db 100644 --- a/testbed/datasenders/stanza.go +++ b/testbed/datasenders/stanza.go @@ -110,6 +110,8 @@ func (f *FileLogWriter) GenConfigYAMLStr() string { // We are testing stanza receiver here. return fmt.Sprintf(` filelog: + retry_on_failure: + enabled: true include: [ %s ] start_at: beginning operators: diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 2756ce2ffe97..9e1b921f443a 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -370,41 +370,63 @@ func TestLargeFileOnce(t *testing.T) { } func TestMemoryLimiterHit(t *testing.T) { - otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) - otlpreceiver.WithRetry(` + tests := []struct { + name string + sender func() testbed.DataSender + receiver func() testbed.DataReceiver + }{ + { + name: "otlp", + sender: func() testbed.DataSender { + return testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) + }, + }, + { + name: "filelog", + sender: func() testbed.DataSender { + return datasenders.NewFileLogWriter() + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + otlpreceiver.WithRetry(` retry_on_failure: enabled: true max_interval: 5s `) - otlpreceiver.WithQueue(` + otlpreceiver.WithQueue(` sending_queue: - enabled: true - queue_size: 100000 - num_consumers: 20 + enabled: true + queue_size: 100000 + num_consumers: 20 `) - otlpreceiver.WithTimeout(` + otlpreceiver.WithTimeout(` timeout: 0s `) - processors := []ProcessorNameAndConfigBody{ - { - Name: "memory_limiter", - Body: ` + processors := []ProcessorNameAndConfigBody{ + { + Name: "memory_limiter", + Body: ` memory_limiter: check_interval: 1s limit_mib: 300 spike_limit_mib: 150 `, - }, + }, + } + ScenarioMemoryLimiterHit( + t, + test.sender(), + otlpreceiver, + testbed.LoadOptions{ + DataItemsPerSecond: 100000, + ItemsPerBatch: 1000, + Parallel: 1, + MaxDelay: 20 * time.Second, + }, + performanceResultsSummary, 100, processors) + }) } - ScenarioMemoryLimiterHit( - t, - testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)), - otlpreceiver, - testbed.LoadOptions{ - DataItemsPerSecond: 100000, - ItemsPerBatch: 1000, - Parallel: 1, - MaxDelay: 20 * time.Second, - }, - performanceResultsSummary, 100, processors) } diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index a2e3ea868a5c..8fec991c044b 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -11,6 +11,7 @@ import ( "math/rand" "path" "path/filepath" + "regexp" "testing" "time" @@ -23,6 +24,8 @@ import ( ) var performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{} +var batchRegex = regexp.MustCompile(` batch_index=(\S+) `) +var itemRegex = regexp.MustCompile(` item_index=(\S+) `) type ProcessorNameAndConfigBody struct { Name string @@ -654,9 +657,8 @@ func getLogsID(logToRetry []plog.Logs) []string { logRecord := logElement.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() for index := 0; index < logRecord.Len(); index++ { logObj := logRecord.At(index) - itemIndex, _ := logObj.Attributes().Get("item_index") - batchIndex, _ := logObj.Attributes().Get("batch_index") - result = append(result, fmt.Sprintf("%s%s", batchIndex.AsString(), itemIndex.AsString())) + itemIndex, batchIndex := extractIdFromLog(logObj) + result = append(result, fmt.Sprintf("%s%s", batchIndex, itemIndex)) } } return result @@ -680,3 +682,25 @@ func allElementsExistInSlice(slice1, slice2 []string) bool { return true } + +// in case of filelog receiver, the batch_index and item_index are a part of log body. +// we use regex to extract them +func extractIdFromLog(log plog.LogRecord) (string, string) { + var batch, item string + match := batchRegex.FindStringSubmatch(log.Body().AsString()) + if len(match) == 2 { + batch = match[0] + } + match = itemRegex.FindStringSubmatch(log.Body().AsString()) + if len(match) == 2 { + batch = match[0] + } + // in case of otlp recevier, batch_index and item_index are part of attributes. + if batchIndex, ok := log.Attributes().Get("batch_index"); ok { + batch = batchIndex.AsString() + } + if itemIndex, ok := log.Attributes().Get("item_index"); ok { + item = itemIndex.AsString() + } + return batch, item +} From f04f177eb75409d85fae2b76cfbd68793b33be4a Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Mon, 2 Dec 2024 23:12:10 +0530 Subject: [PATCH 2/5] chore: lint --- testbed/tests/scenarios.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 8fec991c044b..bb4d8f6c6fb9 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -657,7 +657,7 @@ func getLogsID(logToRetry []plog.Logs) []string { logRecord := logElement.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() for index := 0; index < logRecord.Len(); index++ { logObj := logRecord.At(index) - itemIndex, batchIndex := extractIdFromLog(logObj) + itemIndex, batchIndex := extractIDFromLog(logObj) result = append(result, fmt.Sprintf("%s%s", batchIndex, itemIndex)) } } @@ -685,7 +685,7 @@ func allElementsExistInSlice(slice1, slice2 []string) bool { // in case of filelog receiver, the batch_index and item_index are a part of log body. // we use regex to extract them -func extractIdFromLog(log plog.LogRecord) (string, string) { +func extractIDFromLog(log plog.LogRecord) (string, string) { var batch, item string match := batchRegex.FindStringSubmatch(log.Body().AsString()) if len(match) == 2 { From 2c7f7505c19df5ceeae4d17418ae38454aa5b4c6 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Tue, 3 Dec 2024 16:47:54 +0530 Subject: [PATCH 3/5] lint --- testbed/tests/scenarios.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index bb4d8f6c6fb9..c61e0c5ae051 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -23,9 +23,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) -var performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{} -var batchRegex = regexp.MustCompile(` batch_index=(\S+) `) -var itemRegex = regexp.MustCompile(` item_index=(\S+) `) +var ( + batchRegex = regexp.MustCompile(` batch_index=(\S+) `) + itemRegex = regexp.MustCompile(` item_index=(\S+) `) + performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{} +) type ProcessorNameAndConfigBody struct { Name string From 13ba0c9474fc191cefe5c8d8a2aba939b0961a81 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Mon, 16 Dec 2024 19:38:49 +0530 Subject: [PATCH 4/5] chore: use options --- testbed/datasenders/stanza.go | 2 -- testbed/tests/log_test.go | 5 ++++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/testbed/datasenders/stanza.go b/testbed/datasenders/stanza.go index 6bf75fee3452..d056075e8c09 100644 --- a/testbed/datasenders/stanza.go +++ b/testbed/datasenders/stanza.go @@ -122,8 +122,6 @@ func (f *FileLogWriter) GenConfigYAMLStr() string { // We are testing stanza receiver here. return fmt.Sprintf(` filelog: - retry_on_failure: - enabled: true include: [ %s ] start_at: beginning operators: diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 843fe4580119..b98c3493b22e 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -385,7 +385,10 @@ func TestMemoryLimiterHit(t *testing.T) { { name: "filelog", sender: func() testbed.DataSender { - return datasenders.NewFileLogWriter() + return datasenders.NewFileLogWriter().WithRetry(` + retry_on_failure: + enabled: true +`) }, }, } From dd6a4d3746033437d5dd457787b0752b34a47127 Mon Sep 17 00:00:00 2001 From: Vihas Makwana <121151420+VihasMakwana@users.noreply.github.com> Date: Tue, 17 Dec 2024 21:35:10 +0530 Subject: [PATCH 5/5] Update testbed/tests/scenarios.go Co-authored-by: Christos Markou --- testbed/tests/scenarios.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index c39f5cbf8933..a5aa6fc84e96 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -701,7 +701,7 @@ func extractIDFromLog(log plog.LogRecord) (string, string) { if len(match) == 2 { batch = match[0] } - // in case of otlp recevier, batch_index and item_index are part of attributes. + // in case of otlp receiver, batch_index and item_index are part of attributes. if batchIndex, ok := log.Attributes().Get("batch_index"); ok { batch = batchIndex.AsString() }