diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 00b818172b7..4e50ca44eb8 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" ) @@ -49,15 +50,11 @@ type PRWExporter struct { closeChan chan struct{} concurrency int userAgentHeader string + clientSettings *confighttp.HTTPClientSettings } // NewPRWExporter initializes a new PRWExporter instance and sets fields accordingly. func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, error) { - client, err := cfg.HTTPClientSettings.ToClient() - if err != nil { - return nil, err - } - sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg.ExternalLabels) if err != nil { return nil, err @@ -74,14 +71,20 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e namespace: cfg.Namespace, externalLabels: sanitizedLabels, endpointURL: endpointURL, - client: client, wg: new(sync.WaitGroup), closeChan: make(chan struct{}), userAgentHeader: userAgentHeader, concurrency: cfg.RemoteWriteQueue.NumConsumers, + clientSettings: &cfg.HTTPClientSettings, }, nil } +// Start creates the prometheus client +func (prwe *PRWExporter) Start(_ context.Context, _ component.Host) (err error) { + prwe.client, err = prwe.clientSettings.ToClient() + return err +} + // Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations // to finish before returning func (prwe *PRWExporter) Shutdown(context.Context) error { diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 56295b07eef..1eda3f6e9b8 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -30,8 +30,10 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/internal/testdata" @@ -53,59 +55,52 @@ func Test_NewPRWExporter(t *testing.T) { } tests := []struct { - name string - config *Config - namespace string - endpoint string - concurrency int - externalLabels map[string]string - client *http.Client - returnError bool - buildInfo component.BuildInfo + name string + config *Config + namespace string + endpoint string + concurrency int + externalLabels map[string]string + returnErrorOnCreate bool + buildInfo component.BuildInfo }{ { - "invalid_URL", - cfg, - "test", - "invalid URL", - 5, - map[string]string{"Key1": "Val1"}, - http.DefaultClient, - true, - buildInfo, + name: "invalid_URL", + config: cfg, + namespace: "test", + endpoint: "invalid URL", + concurrency: 5, + externalLabels: map[string]string{"Key1": "Val1"}, + returnErrorOnCreate: true, + buildInfo: buildInfo, }, { - "invalid_labels_case", - cfg, - "test", - "http://some.url:9411/api/prom/push", - 5, - map[string]string{"Key1": ""}, - http.DefaultClient, - true, - buildInfo, + name: "invalid_labels_case", + config: cfg, + namespace: "test", + endpoint: "http://some.url:9411/api/prom/push", + concurrency: 5, + externalLabels: map[string]string{"Key1": ""}, + returnErrorOnCreate: true, + buildInfo: buildInfo, }, { - "success_case", - cfg, - "test", - "http://some.url:9411/api/prom/push", - 5, - map[string]string{"Key1": "Val1"}, - http.DefaultClient, - false, - buildInfo, + name: "success_case", + config: cfg, + namespace: "test", + endpoint: "http://some.url:9411/api/prom/push", + concurrency: 5, + externalLabels: map[string]string{"Key1": "Val1"}, + buildInfo: buildInfo, }, { - "success_case_no_labels", - cfg, - "test", - "http://some.url:9411/api/prom/push", - 5, - map[string]string{}, - http.DefaultClient, - false, - buildInfo, + name: "success_case_no_labels", + config: cfg, + namespace: "test", + endpoint: "http://some.url:9411/api/prom/push", + concurrency: 5, + externalLabels: map[string]string{}, + buildInfo: buildInfo, }, } @@ -117,7 +112,7 @@ func Test_NewPRWExporter(t *testing.T) { cfg.RemoteWriteQueue.NumConsumers = 1 prwe, err := NewPRWExporter(cfg, tt.buildInfo) - if tt.returnError { + if tt.returnErrorOnCreate { assert.Error(t, err) return } @@ -125,10 +120,87 @@ func Test_NewPRWExporter(t *testing.T) { assert.NotNil(t, prwe.namespace) assert.NotNil(t, prwe.endpointURL) assert.NotNil(t, prwe.externalLabels) - assert.NotNil(t, prwe.client) assert.NotNil(t, prwe.closeChan) assert.NotNil(t, prwe.wg) assert.NotNil(t, prwe.userAgentHeader) + assert.NotNil(t, prwe.clientSettings) + }) + } +} + +// Test_Start checks if the client is properly created as expected. +func Test_Start(t *testing.T) { + cfg := &Config{ + ExporterSettings: config.NewExporterSettings(config.NewID(typeStr)), + TimeoutSettings: exporterhelper.TimeoutSettings{}, + RetrySettings: exporterhelper.RetrySettings{}, + Namespace: "", + ExternalLabels: map[string]string{}, + } + buildInfo := component.BuildInfo{ + Description: "OpenTelemetry Collector", + Version: "1.0", + } + tests := []struct { + name string + config *Config + namespace string + concurrency int + externalLabels map[string]string + returnErrorOnStartUp bool + buildInfo component.BuildInfo + endpoint string + clientSettings confighttp.HTTPClientSettings + }{ + { + name: "success_case", + config: cfg, + namespace: "test", + concurrency: 5, + externalLabels: map[string]string{"Key1": "Val1"}, + buildInfo: buildInfo, + clientSettings: confighttp.HTTPClientSettings{Endpoint: "https://some.url:9411/api/prom/push"}, + }, + { + name: "invalid_tls", + config: cfg, + namespace: "test", + concurrency: 5, + externalLabels: map[string]string{"Key1": "Val1"}, + buildInfo: buildInfo, + returnErrorOnStartUp: true, + clientSettings: confighttp.HTTPClientSettings{ + Endpoint: "https://some.url:9411/api/prom/push", + TLSSetting: configtls.TLSClientSetting{ + TLSSetting: configtls.TLSSetting{ + CAFile: "non-existent file", + CertFile: "", + KeyFile: "", + }, + Insecure: false, + ServerName: "", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg.ExternalLabels = tt.externalLabels + cfg.Namespace = tt.namespace + cfg.RemoteWriteQueue.NumConsumers = 1 + cfg.HTTPClientSettings = tt.clientSettings + + prwe, err := NewPRWExporter(cfg, tt.buildInfo) + assert.NoError(t, err) + assert.NotNil(t, prwe) + + err = prwe.Start(context.Background(), componenttest.NewNopHost()) + if tt.returnErrorOnStartUp { + assert.Error(t, err) + return + } + assert.NotNil(t, prwe.client) }) } } @@ -195,11 +267,11 @@ func Test_export(t *testing.T) { // Create in test table format to check if different HTTP response codes or server errors // are properly identified tests := []struct { - name string - ts prompb.TimeSeries - serverUp bool - httpResponseCode int - returnError bool + name string + ts prompb.TimeSeries + serverUp bool + httpResponseCode int + returnErrorOnCreate bool }{ {"success_case", *ts1, @@ -236,7 +308,7 @@ func Test_export(t *testing.T) { server.Close() } errs := runExportPipeline(ts1, serverURL) - if tt.returnError { + if tt.returnErrorOnCreate { assert.Error(t, errs[0]) return } @@ -266,6 +338,12 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error { errs = append(errs, err) return errs } + + if err = prwe.Start(context.Background(), componenttest.NewNopHost()); err != nil { + errs = append(errs, err) + return errs + } + errs = append(errs, prwe.export(context.Background(), testmap)...) return errs } @@ -515,6 +593,7 @@ func Test_PushMetrics(t *testing.T) { } prwe, nErr := NewPRWExporter(cfg, buildInfo) require.NoError(t, nErr) + require.NoError(t, prwe.Start(context.Background(), componenttest.NewNopHost())) err := prwe.PushMetrics(context.Background(), *tt.md) if tt.returnErr { assert.Error(t, err) @@ -527,10 +606,10 @@ func Test_PushMetrics(t *testing.T) { func Test_validateAndSanitizeExternalLabels(t *testing.T) { tests := []struct { - name string - inputLabels map[string]string - expectedLabels map[string]string - returnError bool + name string + inputLabels map[string]string + expectedLabels map[string]string + returnErrorOnCreate bool }{ {"success_case_no_labels", map[string]string{}, @@ -562,7 +641,7 @@ func Test_validateAndSanitizeExternalLabels(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { newLabels, err := validateAndSanitizeExternalLabels(tt.inputLabels) - if tt.returnError { + if tt.returnErrorOnCreate { assert.Error(t, err) return } diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index 878790992e2..6b9b2c155a1 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -68,6 +68,7 @@ func createMetricsExporter(_ context.Context, params component.ExporterCreatePar }), exporterhelper.WithRetry(prwCfg.RetrySettings), exporterhelper.WithResourceToTelemetryConversion(prwCfg.ResourceToTelemetrySettings), + exporterhelper.WithStart(prwe.Start), exporterhelper.WithShutdown(prwe.Shutdown), ) } diff --git a/exporter/prometheusremotewriteexporter/factory_test.go b/exporter/prometheusremotewriteexporter/factory_test.go index b2d813a1478..3a9060c5d97 100644 --- a/exporter/prometheusremotewriteexporter/factory_test.go +++ b/exporter/prometheusremotewriteexporter/factory_test.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/confighttp" @@ -51,37 +52,49 @@ func Test_createMetricsExporter(t *testing.T) { ServerName: "", } tests := []struct { - name string - cfg config.Exporter - params component.ExporterCreateParams - returnError bool + name string + cfg config.Exporter + params component.ExporterCreateParams + returnErrorOnCreate bool + returnErrorOnStart bool }{ {"success_case", createDefaultConfig(), component.ExporterCreateParams{Logger: zap.NewNop()}, false, + false, }, {"fail_case", nil, component.ExporterCreateParams{Logger: zap.NewNop()}, true, + false, }, {"invalid_config_case", invalidConfig, component.ExporterCreateParams{Logger: zap.NewNop()}, true, + false, }, {"invalid_tls_config_case", invalidTLSConfig, component.ExporterCreateParams{Logger: zap.NewNop()}, + false, true, }, } // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := createMetricsExporter(context.Background(), tt.params, tt.cfg) - if tt.returnError { + exp, err := createMetricsExporter(context.Background(), tt.params, tt.cfg) + if tt.returnErrorOnCreate { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.NotNil(t, exp) + err = exp.Start(context.Background(), componenttest.NewNopHost()) + if tt.returnErrorOnStart { assert.Error(t, err) return }