Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored PromRemoteWriteExporter to create client in Start to accomodate future update to ToClient() apis #3308

Merged
merged 4 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/prometheus/prompb"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
)
Expand All @@ -49,15 +50,11 @@ type PRWExporter struct {
closeChan chan struct{}
concurrency int
userAgentHeader string
clientSettings *confighttp.HTTPClientSettings
}

// NewPRWExporter initializes a new PRWExporter instance and sets fields accordingly.
func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, error) {
client, err := cfg.HTTPClientSettings.ToClient()
if err != nil {
return nil, err
}

sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg.ExternalLabels)
if err != nil {
return nil, err
Expand All @@ -74,14 +71,20 @@ func NewPRWExporter(cfg *Config, buildInfo component.BuildInfo) (*PRWExporter, e
namespace: cfg.Namespace,
externalLabels: sanitizedLabels,
endpointURL: endpointURL,
client: client,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.HTTPClientSettings,
}, nil
}

// Start creates the prometheus client
func (prwe *PRWExporter) Start(_ context.Context, _ component.Host) (err error) {
prwe.client, err = prwe.clientSettings.ToClient()
return err
}

// Shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (prwe *PRWExporter) Shutdown(context.Context) error {
Expand Down
195 changes: 137 additions & 58 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/testdata"
Expand All @@ -53,59 +55,52 @@ func Test_NewPRWExporter(t *testing.T) {
}

tests := []struct {
name string
config *Config
namespace string
endpoint string
concurrency int
externalLabels map[string]string
client *http.Client
returnError bool
buildInfo component.BuildInfo
name string
config *Config
namespace string
endpoint string
concurrency int
externalLabels map[string]string
returnErrorOnCreate bool
buildInfo component.BuildInfo
}{
{
"invalid_URL",
cfg,
"test",
"invalid URL",
5,
map[string]string{"Key1": "Val1"},
http.DefaultClient,
true,
buildInfo,
name: "invalid_URL",
config: cfg,
namespace: "test",
endpoint: "invalid URL",
concurrency: 5,
externalLabels: map[string]string{"Key1": "Val1"},
returnErrorOnCreate: true,
buildInfo: buildInfo,
},
{
"invalid_labels_case",
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": ""},
http.DefaultClient,
true,
buildInfo,
name: "invalid_labels_case",
config: cfg,
namespace: "test",
endpoint: "http://some.url:9411/api/prom/push",
concurrency: 5,
externalLabels: map[string]string{"Key1": ""},
returnErrorOnCreate: true,
buildInfo: buildInfo,
},
{
"success_case",
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{"Key1": "Val1"},
http.DefaultClient,
false,
buildInfo,
name: "success_case",
config: cfg,
namespace: "test",
endpoint: "http://some.url:9411/api/prom/push",
concurrency: 5,
externalLabels: map[string]string{"Key1": "Val1"},
buildInfo: buildInfo,
},
{
"success_case_no_labels",
cfg,
"test",
"http://some.url:9411/api/prom/push",
5,
map[string]string{},
http.DefaultClient,
false,
buildInfo,
name: "success_case_no_labels",
config: cfg,
namespace: "test",
endpoint: "http://some.url:9411/api/prom/push",
concurrency: 5,
externalLabels: map[string]string{},
buildInfo: buildInfo,
},
}

Expand All @@ -117,18 +112,95 @@ func Test_NewPRWExporter(t *testing.T) {
cfg.RemoteWriteQueue.NumConsumers = 1
prwe, err := NewPRWExporter(cfg, tt.buildInfo)

if tt.returnError {
if tt.returnErrorOnCreate {
assert.Error(t, err)
return
}
require.NotNil(t, prwe)
assert.NotNil(t, prwe.namespace)
assert.NotNil(t, prwe.endpointURL)
assert.NotNil(t, prwe.externalLabels)
assert.NotNil(t, prwe.client)
assert.NotNil(t, prwe.closeChan)
assert.NotNil(t, prwe.wg)
assert.NotNil(t, prwe.userAgentHeader)
assert.NotNil(t, prwe.clientSettings)
})
}
}

// Test_Start checks if the client is properly created as expected.
func Test_Start(t *testing.T) {
cfg := &Config{
ExporterSettings: config.NewExporterSettings(config.NewID(typeStr)),
TimeoutSettings: exporterhelper.TimeoutSettings{},
RetrySettings: exporterhelper.RetrySettings{},
Namespace: "",
ExternalLabels: map[string]string{},
}
buildInfo := component.BuildInfo{
Description: "OpenTelemetry Collector",
Version: "1.0",
}
tests := []struct {
name string
config *Config
namespace string
concurrency int
externalLabels map[string]string
returnErrorOnStartUp bool
buildInfo component.BuildInfo
endpoint string
clientSettings confighttp.HTTPClientSettings
}{
{
name: "success_case",
config: cfg,
namespace: "test",
concurrency: 5,
externalLabels: map[string]string{"Key1": "Val1"},
buildInfo: buildInfo,
clientSettings: confighttp.HTTPClientSettings{Endpoint: "https://some.url:9411/api/prom/push"},
},
{
name: "invalid_tls",
config: cfg,
namespace: "test",
concurrency: 5,
externalLabels: map[string]string{"Key1": "Val1"},
buildInfo: buildInfo,
returnErrorOnStartUp: true,
clientSettings: confighttp.HTTPClientSettings{
Endpoint: "https://some.url:9411/api/prom/push",
TLSSetting: configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "non-existent file",
CertFile: "",
KeyFile: "",
},
Insecure: false,
ServerName: "",
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg.ExternalLabels = tt.externalLabels
cfg.Namespace = tt.namespace
cfg.RemoteWriteQueue.NumConsumers = 1
cfg.HTTPClientSettings = tt.clientSettings

prwe, err := NewPRWExporter(cfg, tt.buildInfo)
assert.NoError(t, err)
assert.NotNil(t, prwe)

err = prwe.Start(context.Background(), componenttest.NewNopHost())
if tt.returnErrorOnStartUp {
assert.Error(t, err)
return
}
assert.NotNil(t, prwe.client)
})
}
}
Expand Down Expand Up @@ -195,11 +267,11 @@ func Test_export(t *testing.T) {
// Create in test table format to check if different HTTP response codes or server errors
// are properly identified
tests := []struct {
name string
ts prompb.TimeSeries
serverUp bool
httpResponseCode int
returnError bool
name string
ts prompb.TimeSeries
serverUp bool
httpResponseCode int
returnErrorOnCreate bool
}{
{"success_case",
*ts1,
Expand Down Expand Up @@ -236,7 +308,7 @@ func Test_export(t *testing.T) {
server.Close()
}
errs := runExportPipeline(ts1, serverURL)
if tt.returnError {
if tt.returnErrorOnCreate {
assert.Error(t, errs[0])
return
}
Expand Down Expand Up @@ -266,6 +338,12 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error {
errs = append(errs, err)
return errs
}

if err = prwe.Start(context.Background(), componenttest.NewNopHost()); err != nil {
errs = append(errs, err)
return errs
}

errs = append(errs, prwe.export(context.Background(), testmap)...)
return errs
}
Expand Down Expand Up @@ -515,6 +593,7 @@ func Test_PushMetrics(t *testing.T) {
}
prwe, nErr := NewPRWExporter(cfg, buildInfo)
require.NoError(t, nErr)
require.NoError(t, prwe.Start(context.Background(), componenttest.NewNopHost()))
err := prwe.PushMetrics(context.Background(), *tt.md)
if tt.returnErr {
assert.Error(t, err)
Expand All @@ -527,10 +606,10 @@ func Test_PushMetrics(t *testing.T) {

func Test_validateAndSanitizeExternalLabels(t *testing.T) {
tests := []struct {
name string
inputLabels map[string]string
expectedLabels map[string]string
returnError bool
name string
inputLabels map[string]string
expectedLabels map[string]string
returnErrorOnCreate bool
}{
{"success_case_no_labels",
map[string]string{},
Expand Down Expand Up @@ -562,7 +641,7 @@ func Test_validateAndSanitizeExternalLabels(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
newLabels, err := validateAndSanitizeExternalLabels(tt.inputLabels)
if tt.returnError {
if tt.returnErrorOnCreate {
assert.Error(t, err)
return
}
Expand Down
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func createMetricsExporter(_ context.Context, params component.ExporterCreatePar
}),
exporterhelper.WithRetry(prwCfg.RetrySettings),
exporterhelper.WithResourceToTelemetryConversion(prwCfg.ResourceToTelemetrySettings),
exporterhelper.WithStart(prwe.Start),
exporterhelper.WithShutdown(prwe.Shutdown),
)
}
Expand Down
24 changes: 18 additions & 6 deletions exporter/prometheusremotewriteexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/confighttp"
Expand Down Expand Up @@ -51,37 +52,48 @@ func Test_createMetricsExporter(t *testing.T) {
ServerName: "",
}
tests := []struct {
name string
cfg config.Exporter
params component.ExporterCreateParams
returnError bool
name string
cfg config.Exporter
params component.ExporterCreateParams
returnErrorOnCreate bool
returnErrorOnStart bool
}{
{"success_case",
createDefaultConfig(),
component.ExporterCreateParams{Logger: zap.NewNop()},
false,
false,
},
{"fail_case",
nil,
component.ExporterCreateParams{Logger: zap.NewNop()},
true,
false,
},
{"invalid_config_case",
invalidConfig,
component.ExporterCreateParams{Logger: zap.NewNop()},
true,
false,
},
{"invalid_tls_config_case",
invalidTLSConfig,
component.ExporterCreateParams{Logger: zap.NewNop()},
false,
true,
},
}
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := createMetricsExporter(context.Background(), tt.params, tt.cfg)
if tt.returnError {
exp, err := createMetricsExporter(context.Background(), tt.params, tt.cfg)
if tt.returnErrorOnCreate {
assert.Error(t, err)
return
}
assert.NotNil(t, exp)
pavankrish123 marked this conversation as resolved.
Show resolved Hide resolved
err = exp.Start(context.Background(), componenttest.NewNopHost())
if tt.returnErrorOnStart {
assert.Error(t, err)
return
}
Expand Down