From bad6e4143f79c51ffd682c7deefca5b9887c2605 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 25 Feb 2021 10:36:55 -0500 Subject: [PATCH 1/4] Send metrics to Dynatrace in chunks of 1000 --- .../dynatraceexporter/metrics_exporter.go | 26 ++++++++++ .../metrics_exporter_test.go | 49 +++++++++++++++++-- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/exporter/dynatraceexporter/metrics_exporter.go b/exporter/dynatraceexporter/metrics_exporter.go index 087b287c4ee5..1f0e83cba23f 100644 --- a/exporter/dynatraceexporter/metrics_exporter.go +++ b/exporter/dynatraceexporter/metrics_exporter.go @@ -129,6 +129,32 @@ func (e *exporter) serializeMetrics(md pdata.Metrics) ([]string, int) { // Returns the number of lines rejected by Dynatrace. // An error indicates all lines were dropped regardless of the returned number. func (e *exporter) send(ctx context.Context, lines []string) (int, error) { + if len(lines) > 1000 { + e.logger.Warn("Batch too large. Sending in chunks of 1000 metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to 1000 or less.") + } + + rejected := 0 + for i := 0; i < len(lines); i += 1000 { + end := i + 1000 + + if end > len(lines) { + end = len(lines) + } + + batchRejected, err := e.sendBatch(ctx, lines[i:end]) + rejected += batchRejected + if err != nil { + return rejected, err + } + } + + return rejected, nil +} + +// send sends a serialized metric batch to Dynatrace. +// Returns the number of lines rejected by Dynatrace. +// An error indicates all lines were dropped regardless of the returned number. +func (e *exporter) sendBatch(ctx context.Context, lines []string) (int, error) { message := strings.Join(lines, "\n") e.logger.Debug("Sending lines to Dynatrace\n" + message) req, err := http.NewRequestWithContext(ctx, "POST", e.cfg.Endpoint, bytes.NewBufferString(message)) diff --git a/exporter/dynatraceexporter/metrics_exporter_test.go b/exporter/dynatraceexporter/metrics_exporter_test.go index d50669565f2b..d281752bc197 100644 --- a/exporter/dynatraceexporter/metrics_exporter_test.go +++ b/exporter/dynatraceexporter/metrics_exporter_test.go @@ -17,6 +17,7 @@ package dynatraceexporter import ( "context" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -281,7 +282,7 @@ func Test_exporter_send_BadRequest(t *testing.T) { }, client: ts.Client(), } - invalid, err := e.send(context.Background(), []string{}) + invalid, err := e.send(context.Background(), []string{""}) if invalid != 10 { t.Errorf("Expected 10 lines to be reported invalid") return @@ -310,7 +311,7 @@ func Test_exporter_send_Unauthorized(t *testing.T) { }, client: ts.Client(), } - _, err := e.send(context.Background(), []string{}) + _, err := e.send(context.Background(), []string{""}) if !consumererror.IsPermanent(err) { t.Errorf("Expected error to be permanent %v", err) return @@ -335,7 +336,7 @@ func Test_exporter_send_TooLarge(t *testing.T) { }, client: ts.Client(), } - _, err := e.send(context.Background(), []string{}) + _, err := e.send(context.Background(), []string{""}) if !consumererror.IsPermanent(err) { t.Errorf("Expected error to be permanent %v", err) return @@ -363,7 +364,7 @@ func Test_exporter_send_NotFound(t *testing.T) { }, client: ts.Client(), } - _, err := e.send(context.Background(), []string{}) + _, err := e.send(context.Background(), []string{""}) if !consumererror.IsPermanent(err) { t.Errorf("Expected error to be permanent %v", err) return @@ -374,6 +375,46 @@ func Test_exporter_send_NotFound(t *testing.T) { } } +func Test_exporter_send_chunking(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + body, _ := json.Marshal(metricsResponse{ + Ok: 0, + Invalid: 10, + }) + w.Write(body) + })) + defer ts.Close() + + e := &exporter{ + logger: zap.NewNop(), + cfg: &config.Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{Endpoint: ts.URL}, + }, + client: ts.Client(), + } + + batch := make([]string, 1001) + + for i := 0; i < 1001; i++ { + batch[i] = fmt.Sprintf("%d", i) + } + + invalid, err := e.send(context.Background(), []string{""}) + if invalid != 10 { + t.Errorf("Expected 10 lines to be reported invalid") + return + } + if consumererror.IsPermanent(err) { + t.Errorf("Expected error to not be permanent %v", err) + return + } + if e.isDisabled { + t.Error("Expected exporter to not be disabled") + return + } +} + func Test_exporter_PushMetricsData_Error(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) From 980d1893932be66f5aefcd3e9e6f115e4e63864d Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 25 Feb 2021 10:37:03 -0500 Subject: [PATCH 2/4] Send metrics to Dynatrace in chunks of 1000 --- exporter/dynatraceexporter/metrics_exporter.go | 8 ++++++-- .../dynatraceexporter/metrics_exporter_test.go | 14 ++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/exporter/dynatraceexporter/metrics_exporter.go b/exporter/dynatraceexporter/metrics_exporter.go index 1f0e83cba23f..a9bb82992f33 100644 --- a/exporter/dynatraceexporter/metrics_exporter.go +++ b/exporter/dynatraceexporter/metrics_exporter.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "net/http" "strings" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" @@ -125,12 +126,15 @@ func (e *exporter) serializeMetrics(md pdata.Metrics) ([]string, int) { return lines, dropped } +var lastLog int64 = 0 + // send sends a serialized metric batch to Dynatrace. // Returns the number of lines rejected by Dynatrace. // An error indicates all lines were dropped regardless of the returned number. func (e *exporter) send(ctx context.Context, lines []string) (int, error) { - if len(lines) > 1000 { - e.logger.Warn("Batch too large. Sending in chunks of 1000 metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to 1000 or less.") + if now := time.Now().Unix(); len(lines) > 1000 && now-lastLog > 60 { + e.logger.Warn("Batch too large. Sending in chunks of 1000 metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to 1000 or less. Suppressing this log for 60 seconds.") + lastLog = time.Now().Unix() } rejected := 0 diff --git a/exporter/dynatraceexporter/metrics_exporter_test.go b/exporter/dynatraceexporter/metrics_exporter_test.go index d281752bc197..695adcd338ae 100644 --- a/exporter/dynatraceexporter/metrics_exporter_test.go +++ b/exporter/dynatraceexporter/metrics_exporter_test.go @@ -376,13 +376,16 @@ func Test_exporter_send_NotFound(t *testing.T) { } func Test_exporter_send_chunking(t *testing.T) { + sentChunks := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) body, _ := json.Marshal(metricsResponse{ Ok: 0, - Invalid: 10, + Invalid: 1, }) w.Write(body) + sentChunks++ })) defer ts.Close() @@ -400,9 +403,12 @@ func Test_exporter_send_chunking(t *testing.T) { batch[i] = fmt.Sprintf("%d", i) } - invalid, err := e.send(context.Background(), []string{""}) - if invalid != 10 { - t.Errorf("Expected 10 lines to be reported invalid") + invalid, err := e.send(context.Background(), batch) + if sentChunks != 2 { + t.Errorf("Expected batch to be sent in 2 chunks") + } + if invalid != 2 { + t.Errorf("Expected 2 lines to be reported invalid") return } if consumererror.IsPermanent(err) { From fd2a27b7c2384b3c665dfd5463c8cb29f7492532 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 25 Feb 2021 12:31:20 -0500 Subject: [PATCH 3/4] Use a constant for max chunk size --- exporter/dynatraceexporter/metrics_exporter.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/exporter/dynatraceexporter/metrics_exporter.go b/exporter/dynatraceexporter/metrics_exporter.go index a9bb82992f33..368ea76bd989 100644 --- a/exporter/dynatraceexporter/metrics_exporter.go +++ b/exporter/dynatraceexporter/metrics_exporter.go @@ -33,6 +33,9 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dynatraceexporter/serialization" ) +// The maximum number of metrics that may be sent in a single request to the Dynatrace API +const maxChunkSize = 1000 + // NewExporter exports to a Dynatrace Metrics v2 API func newMetricsExporter(params component.ExporterCreateParams, cfg *config.Config) (*exporter, error) { client, err := cfg.HTTPClientSettings.ToClient() @@ -132,14 +135,14 @@ var lastLog int64 = 0 // Returns the number of lines rejected by Dynatrace. // An error indicates all lines were dropped regardless of the returned number. func (e *exporter) send(ctx context.Context, lines []string) (int, error) { - if now := time.Now().Unix(); len(lines) > 1000 && now-lastLog > 60 { - e.logger.Warn("Batch too large. Sending in chunks of 1000 metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to 1000 or less. Suppressing this log for 60 seconds.") + if now := time.Now().Unix(); len(lines) > maxChunkSize && now-lastLog > 60 { + e.logger.Sugar().Warnf("Batch too large. Sending in chunks of %[1]s metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to %[1]s or less. Suppressing this log for 60 seconds.", maxChunkSize) lastLog = time.Now().Unix() } rejected := 0 - for i := 0; i < len(lines); i += 1000 { - end := i + 1000 + for i := 0; i < len(lines); i += maxChunkSize { + end := i + maxChunkSize if end > len(lines) { end = len(lines) From d126ce095e07095fea7f27da32b7b0c85fafed44 Mon Sep 17 00:00:00 2001 From: Daniel Dyla Date: Thu, 25 Feb 2021 12:41:25 -0500 Subject: [PATCH 4/4] make logging consistent with other logging --- exporter/dynatraceexporter/metrics_exporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/dynatraceexporter/metrics_exporter.go b/exporter/dynatraceexporter/metrics_exporter.go index 368ea76bd989..ce8b3d9c22a2 100644 --- a/exporter/dynatraceexporter/metrics_exporter.go +++ b/exporter/dynatraceexporter/metrics_exporter.go @@ -136,7 +136,7 @@ var lastLog int64 = 0 // An error indicates all lines were dropped regardless of the returned number. func (e *exporter) send(ctx context.Context, lines []string) (int, error) { if now := time.Now().Unix(); len(lines) > maxChunkSize && now-lastLog > 60 { - e.logger.Sugar().Warnf("Batch too large. Sending in chunks of %[1]s metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to %[1]s or less. Suppressing this log for 60 seconds.", maxChunkSize) + e.logger.Warn(fmt.Sprintf("Batch too large. Sending in chunks of %[1]d metrics. If any chunk fails, previous chunks in the batch could be retried by the batch processor. Please set send_batch_max_size to %[1]d or less. Suppressing this log for 60 seconds.", maxChunkSize)) lastLog = time.Now().Unix() }