Skip to content

Commit

Permalink
Refactored PromRemoteWriteExporter to create client in Start to accom…
Browse files Browse the repository at this point in the history
…odate future update to ToClient() apis (open-telemetry#3308)

* Refactored PromRemoteWriteExporter to create client in Start

* fixed impi errors

* Fixed per review comments
  • Loading branch information
pavankrish123 authored and dashpole committed Jun 14, 2021
1 parent a6379b3 commit 6c8186b
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 70 deletions.
15 changes: 9 additions & 6 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
)
Expand All @@ -49,15 +50,11 @@ type PRWExporter struct {
closeChan chan struct{}
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
}

// NewPRWExporter initializes a new PRWExporter instance and sets fields accordingly.
func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, error) {
client, err := cfg.HTTPClientSettings.ToClient()
if err != nil {
return nil, err
}

sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg.ExternalLabels)
if err != nil {
return nil, err
Expand All @@ -74,14 +71,20 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
client: client,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
}, nil
}

// Start creates the prometheus client
func (prwe *PRWExporter) Start(_ context.Context, _ component.Host) (err error) {
prwe.client, err = prwe.clientSettings.ToClient()
return err
}

// Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (prwe *PRWExporter) Shutdown(context.Context) error {
Expand Down
195 changes: 137 additions & 58 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/testdata"
Expand All @@ -53,59 +55,52 @@ func Test_NewPRWExporter(t *testing.T) {
}

tests := []struct {
name string
config *Config
namespace string
endpoint string
concurrency int
externalLabels map[string]string
client *http.Client
returnError bool
buildInfo component.BuildInfo
name string
config *Config
namespace string
endpoint string
concurrency int
externalLabels map[string]string
returnErrorOnCreate bool
buildInfo component.BuildInfo
}{
{
"invalid_URL",
cfg,
"test",
"invalid URL",
5,
map[string]string{"Key1": "Val1"},
http.DefaultClient,
true,
buildInfo,
name: "invalid_URL",
config: cfg,
namespace: "test",
endpoint: "invalid URL",
concurrency: 5,
externalLabels: map[string]string{"Key1": "Val1"},
returnErrorOnCreate: true,
buildInfo: buildInfo,
},
{
"invalid_labels_case",
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": ""},
http.DefaultClient,
true,
buildInfo,
name: "invalid_labels_case",
config: cfg,
namespace: "test",
endpoint: "http://some.url:9411/api/prom/push",
concurrency: 5,
externalLabels: map[string]string{"Key1": ""},
returnErrorOnCreate: true,
buildInfo: buildInfo,
},
{
"success_case",
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": "Val1"},
http.DefaultClient,
false,
buildInfo,
name: "success_case",
config: cfg,
namespace: "test",
endpoint: "http://some.url:9411/api/prom/push",
concurrency: 5,
externalLabels: map[string]string{"Key1": "Val1"},
buildInfo: buildInfo,
},
{
"success_case_no_labels",
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{},
http.DefaultClient,
false,
buildInfo,
name: "success_case_no_labels",
config: cfg,
namespace: "test",
endpoint: "http://some.url:9411/api/prom/push",
concurrency: 5,
externalLabels: map[string]string{},
buildInfo: buildInfo,
},
}

Expand All @@ -117,18 +112,95 @@ func Test_NewPRWExporter(t *testing.T) {
cfg.RemoteWriteQueue.NumConsumers = 1
prwe, err := NewPRWExporter(cfg, tt.buildInfo)

if tt.returnError {
if tt.returnErrorOnCreate {
assert.Error(t, err)
return
}
require.NotNil(t, prwe)
assert.NotNil(t, prwe.namespace)
assert.NotNil(t, prwe.endpointURL)
assert.NotNil(t, prwe.externalLabels)
assert.NotNil(t, prwe.client)
assert.NotNil(t, prwe.closeChan)
assert.NotNil(t, prwe.wg)
assert.NotNil(t, prwe.userAgentHeader)
assert.NotNil(t, prwe.clientSettings)
})
}
}

// Test_Start checks if the client is properly created as expected.
func Test_Start(t *testing.T) {
cfg := &Config{
ExporterSettings: config.NewExporterSettings(config.NewID(typeStr)),
TimeoutSettings: exporterhelper.TimeoutSettings{},
RetrySettings: exporterhelper.RetrySettings{},
Namespace: "",
ExternalLabels: map[string]string{},
}
buildInfo := component.BuildInfo{
Description: "OpenTelemetry Collector",
Version: "1.0",
}
tests := []struct {
name string
config *Config
namespace string
concurrency int
externalLabels map[string]string
returnErrorOnStartUp bool
buildInfo component.BuildInfo
endpoint string
clientSettings confighttp.HTTPClientSettings
}{
{
name: "success_case",
config: cfg,
namespace: "test",
concurrency: 5,
externalLabels: map[string]string{"Key1": "Val1"},
buildInfo: buildInfo,
clientSettings: confighttp.HTTPClientSettings{Endpoint: "https://some.url:9411/api/prom/push"},
},
{
name: "invalid_tls",
config: cfg,
namespace: "test",
concurrency: 5,
externalLabels: map[string]string{"Key1": "Val1"},
buildInfo: buildInfo,
returnErrorOnStartUp: true,
clientSettings: confighttp.HTTPClientSettings{
Endpoint: "https://some.url:9411/api/prom/push",
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "non-existent file",
CertFile: "",
KeyFile: "",
},
Insecure: false,
ServerName: "",
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg.ExternalLabels = tt.externalLabels
cfg.Namespace = tt.namespace
cfg.RemoteWriteQueue.NumConsumers = 1
cfg.HTTPClientSettings = tt.clientSettings

prwe, err := NewPRWExporter(cfg, tt.buildInfo)
assert.NoError(t, err)
assert.NotNil(t, prwe)

err = prwe.Start(context.Background(), componenttest.NewNopHost())
if tt.returnErrorOnStartUp {
assert.Error(t, err)
return
}
assert.NotNil(t, prwe.client)
})
}
}
Expand Down Expand Up @@ -195,11 +267,11 @@ func Test_export(t *testing.T) {
// Create in test table format to check if different HTTP response codes or server errors
// are properly identified
tests := []struct {
name string
ts prompb.TimeSeries
serverUp bool
httpResponseCode int
returnError bool
name string
ts prompb.TimeSeries
serverUp bool
httpResponseCode int
returnErrorOnCreate bool
}{
{"success_case",
*ts1,
Expand Down Expand Up @@ -236,7 +308,7 @@ func Test_export(t *testing.T) {
server.Close()
}
errs := runExportPipeline(ts1, serverURL)
if tt.returnError {
if tt.returnErrorOnCreate {
assert.Error(t, errs[0])
return
}
Expand Down Expand Up @@ -266,6 +338,12 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error {
errs = append(errs, err)
return errs
}

if err = prwe.Start(context.Background(), componenttest.NewNopHost()); err != nil {
errs = append(errs, err)
return errs
}

errs = append(errs, prwe.export(context.Background(), testmap)...)
return errs
}
Expand Down Expand Up @@ -515,6 +593,7 @@ func Test_PushMetrics(t *testing.T) {
}
prwe, nErr := NewPRWExporter(cfg, buildInfo)
require.NoError(t, nErr)
require.NoError(t, prwe.Start(context.Background(), componenttest.NewNopHost()))
err := prwe.PushMetrics(context.Background(), *tt.md)
if tt.returnErr {
assert.Error(t, err)
Expand All @@ -527,10 +606,10 @@ func Test_PushMetrics(t *testing.T) {

func Test_validateAndSanitizeExternalLabels(t *testing.T) {
tests := []struct {
name string
inputLabels map[string]string
expectedLabels map[string]string
returnError bool
name string
inputLabels map[string]string
expectedLabels map[string]string
returnErrorOnCreate bool
}{
{"success_case_no_labels",
map[string]string{},
Expand Down Expand Up @@ -562,7 +641,7 @@ func Test_validateAndSanitizeExternalLabels(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
newLabels, err := validateAndSanitizeExternalLabels(tt.inputLabels)
if tt.returnError {
if tt.returnErrorOnCreate {
assert.Error(t, err)
return
}
Expand Down
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func createMetricsExporter(_ context.Context, params component.ExporterCreatePar
}),
exporterhelper.WithRetry(prwCfg.RetrySettings),
exporterhelper.WithResourceToTelemetryConversion(prwCfg.ResourceToTelemetrySettings),
exporterhelper.WithStart(prwe.Start),
exporterhelper.WithShutdown(prwe.Shutdown),
)
}
Expand Down
25 changes: 19 additions & 6 deletions exporter/prometheusremotewriteexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/confighttp"
Expand Down Expand Up @@ -51,37 +52,49 @@ func Test_createMetricsExporter(t *testing.T) {
ServerName: "",
}
tests := []struct {
name string
cfg config.Exporter
params component.ExporterCreateParams
returnError bool
name string
cfg config.Exporter
params component.ExporterCreateParams
returnErrorOnCreate bool
returnErrorOnStart bool
}{
{"success_case",
createDefaultConfig(),
component.ExporterCreateParams{Logger: zap.NewNop()},
false,
false,
},
{"fail_case",
nil,
component.ExporterCreateParams{Logger: zap.NewNop()},
true,
false,
},
{"invalid_config_case",
invalidConfig,
component.ExporterCreateParams{Logger: zap.NewNop()},
true,
false,
},
{"invalid_tls_config_case",
invalidTLSConfig,
component.ExporterCreateParams{Logger: zap.NewNop()},
false,
true,
},
}
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := createMetricsExporter(context.Background(), tt.params, tt.cfg)
if tt.returnError {
exp, err := createMetricsExporter(context.Background(), tt.params, tt.cfg)
if tt.returnErrorOnCreate {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.NotNil(t, exp)
err = exp.Start(context.Background(), componenttest.NewNopHost())
if tt.returnErrorOnStart {
assert.Error(t, err)
return
}
Expand Down

0 comments on commit 6c8186b

Please sign in to comment.