diff --git a/receiver/awscontainerinsightreceiver/go.mod b/receiver/awscontainerinsightreceiver/go.mod index c8d10b5e5ddf..efbff16ede1a 100644 --- a/receiver/awscontainerinsightreceiver/go.mod +++ b/receiver/awscontainerinsightreceiver/go.mod @@ -13,7 +13,8 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v0.77.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.77.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.77.0 - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver v0.77.0 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.77.0 + github.com/prometheus/common v0.42.0 github.com/prometheus/prometheus v0.43.0 github.com/shirou/gopsutil/v3 v3.23.4 github.com/stretchr/testify v1.8.2 @@ -143,7 +144,6 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.77.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.77.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.77.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.77.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runc v1.1.5 // indirect @@ -157,7 +157,6 @@ require ( github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.15.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/common/sigv4 v0.1.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/rs/cors v1.9.0 // indirect @@ -231,8 +230,6 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil -replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver => ../simpleprometheusreceiver - replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => ../prometheusreceiver replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go index af6bb89e8f7c..fb9d75f099fe 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/leaderelection.go @@ -240,7 +240,7 @@ func (le *LeaderElection) startLeaderElection(ctx context.Context, lock resource le.mu.Lock() defer le.mu.Unlock() le.leading = false - // node and pod are only used for cluster level metrics, endpoint is used for decorator too. + // node and pod are only used for cluster level metrics, Endpoint is used for decorator too. le.k8sClient.ShutdownNodeClient() le.k8sClient.ShutdownPodClient() }, diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go deleted file mode 100644 index 40b3bdfbb765..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package k8sapiserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" - -import ( - "context" - "fmt" - - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/zap" -) - -const ( - controlPlaneResourceType = "control_plane" -) - -var ( - defaultResourceToType = map[string]string{ - controlPlaneResourceType: "Cluster", - } - defaultMetricsToResource = map[string]string{ - "apiserver_storage_objects": controlPlaneResourceType, - "apiserver_request_total": controlPlaneResourceType, - "apiserver_request_duration_seconds": controlPlaneResourceType, - "apiserver_admission_controller_admission_duration_seconds": controlPlaneResourceType, - "rest_client_request_duration_seconds": controlPlaneResourceType, - "rest_client_requests_total": controlPlaneResourceType, - "etcd_request_duration_seconds": controlPlaneResourceType, - "etcd_db_total_size_in_bytes": controlPlaneResourceType, - } -) - -type prometheusConsumer struct { - nextConsumer consumer.Metrics - logger *zap.Logger - clusterName string - nodeName string - resourcesToType map[string]string - metricsToResource map[string]string -} - -func newPrometheusConsumer(logger *zap.Logger, nextConsumer consumer.Metrics, clusterName string, nodeName string) *prometheusConsumer { - return &prometheusConsumer{ - logger: logger, - nextConsumer: nextConsumer, - clusterName: clusterName, - nodeName: nodeName, - resourcesToType: defaultResourceToType, - metricsToResource: defaultMetricsToResource, - } -} -func (c prometheusConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } -} - -func (c prometheusConsumer) ConsumeMetrics(ctx context.Context, originalMetrics pmetric.Metrics) error { - - localScopeMetrics := map[string]pmetric.ScopeMetrics{} - newMetrics := pmetric.NewMetrics() - - for key, value := range c.resourcesToType { - newResourceMetrics := newMetrics.ResourceMetrics().AppendEmpty() - // common attributes - newResourceMetrics.Resource().Attributes().PutStr("ClusterName", c.clusterName) - newResourceMetrics.Resource().Attributes().PutStr("Version", "0") - newResourceMetrics.Resource().Attributes().PutStr("Sources", "[\"apiserver\"]") - newResourceMetrics.Resource().Attributes().PutStr("NodeName", c.nodeName) - - // resource-specific type metric - newResourceMetrics.Resource().Attributes().PutStr("Type", value) - - newScopeMetrics := newResourceMetrics.ScopeMetrics().AppendEmpty() - localScopeMetrics[key] = newScopeMetrics - } - - rms := originalMetrics.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - scopeMetrics := rms.At(i).ScopeMetrics() - for j := 0; j < scopeMetrics.Len(); j++ { - scopeMetric := scopeMetrics.At(j) - for k := 0; k < scopeMetric.Metrics().Len(); k++ { - metric := scopeMetric.Metrics().At(k) - // check control plane metrics - resourceName, ok := c.metricsToResource[metric.Name()] - if !ok { - continue - } - resourceSpecificScopeMetrics, ok := localScopeMetrics[resourceName] - if !ok { - continue - } - c.logger.Debug(fmt.Sprintf("Copying metric %s into resource %s", metric.Name(), resourceName)) - metric.CopyTo(resourceSpecificScopeMetrics.Metrics().AppendEmpty()) - } - } - } - - c.logger.Info("Forwarding on k8sapiserver prometheus metrics", - zap.Int("MetricCount", newMetrics.MetricCount()), - zap.Int("DataPointCount", newMetrics.DataPointCount())) - - // forward on the new metrics - return c.nextConsumer.ConsumeMetrics(ctx, newMetrics) -} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer_test.go deleted file mode 100644 index b5ad24e4535e..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_consumer_test.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package k8sapiserver - -import ( - "context" - "errors" - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.uber.org/zap" -) - -type mockNextConsumer struct { - throwError bool - t *testing.T -} - -func (m mockNextConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: false, - } -} - -func (m mockNextConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - if m.throwError { - return errors.New("throwing an error") - } - - // verify some of the attributes - value, found := md.ResourceMetrics().At(0).Resource().Attributes().Get("ClusterName") - assert.True(m.t, found) - assert.Equal(m.t, "test-cluster", value.Str()) - - value, found = md.ResourceMetrics().At(0).Resource().Attributes().Get("Version") - assert.Equal(m.t, "0", value.Str()) - assert.True(m.t, found) - - value, found = md.ResourceMetrics().At(0).Resource().Attributes().Get("Type") - assert.True(m.t, found) - assert.NotEmpty(m.t, value.Str()) - - value, found = md.ResourceMetrics().At(0).Resource().Attributes().Get("Timestamp") - assert.False(m.t, found) - assert.Empty(m.t, value.Str()) - - value, found = md.ResourceMetrics().At(0).Resource().Attributes().Get("Sources") - assert.True(m.t, found) - assert.Equal(m.t, "[\"apiserver\"]", value.Str()) - - value, found = md.ResourceMetrics().At(0).Resource().Attributes().Get("NodeName") - assert.True(m.t, found) - assert.Equal(m.t, "test-node", value.Str()) - - assert.Equal(m.t, len(defaultResourceToType), md.ResourceMetrics().Len()) - assert.Equal(m.t, 1, md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) - - metric1 := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) - assert.Equal(m.t, "apiserver_storage_objects", metric1.Name()) - assert.Equal(m.t, 123.4, metric1.Gauge().DataPoints().At(0).DoubleValue()) - - assert.Equal(m.t, 1, md.MetricCount()) - - return nil -} - -func TestPrometheusConsumeMetrics(t *testing.T) { - nextConsumer := mockNextConsumer{ - throwError: false, - t: t, - } - - consumer := newPrometheusConsumer(zap.NewNop(), nextConsumer, "test-cluster", "test-node") - consumer.metricsToResource["invalid-metrics-to-resource"] = "invalid" - - cap := consumer.Capabilities() - assert.True(t, cap.MutatesData) - - metrics := pmetric.NewMetrics() - metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - metric1 := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) - metric1.SetName("apiserver_storage_objects") - metric1.SetEmptyGauge().DataPoints().AppendEmpty().SetDoubleValue(123.4) - - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() - metric2 := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1) - metric2.SetName("some_excluded_metric") - metric2.SetEmptyGauge().DataPoints().AppendEmpty().SetDoubleValue(456.7) - - metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() - metric3 := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1) - metric3.SetName("invalid-metrics-to-resource") - metric3.SetEmptyGauge().DataPoints().AppendEmpty().SetDoubleValue(99.9) - - assert.Equal(t, 3, metrics.MetricCount()) - - result := consumer.ConsumeMetrics(context.TODO(), metrics) - assert.NoError(t, result) -} - -func TestPrometheusConsumeMetricsForwardsError(t *testing.T) { - nextConsumer := mockNextConsumer{ - throwError: true, - } - - consumer := newPrometheusConsumer(zap.NewNop(), nextConsumer, "test-cluster", "test-node") - - metrics := pmetric.NewMetrics() - metrics.ResourceMetrics().AppendEmpty() - - result := consumer.ConsumeMetrics(context.TODO(), metrics) - assert.Error(t, result) -} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go index 575ba99c28a9..2c2c2057d748 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper.go @@ -19,17 +19,21 @@ import ( "errors" "fmt" "os" + "strings" "time" + configutil "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/model/relabel" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" ) const ( @@ -37,56 +41,128 @@ const ( collectionInterval = 60 * time.Second ) +var ( + controlPlaneMetricAllowList = []string{ + "apiserver_storage_oapiserver_storage_objectsbjects", + "apiserver_request_total", + "apiserver_request_duration_seconds.*", + "apiserver_admission_controller_admission_duration_seconds.*", + "rest_client_request_duration_seconds.*", + "rest_client_requests_total", + "etcd_request_duration_seconds.*", + "etcd_db_total_size_in_bytes.*", + } +) + type PrometheusScraper struct { - ctx context.Context - settings component.TelemetrySettings - host component.Host - clusterNameProvider clusterNameProvider - simplePrometheusReceiver receiver.Metrics - leaderElection *LeaderElection - running bool + ctx context.Context + settings component.TelemetrySettings + host component.Host + clusterNameProvider clusterNameProvider + prometheusReceiver receiver.Metrics + leaderElection *LeaderElection + running bool +} + +type PrometheusScraperOpts struct { + Ctx context.Context + TelemetrySettings component.TelemetrySettings + Endpoint string + Consumer consumer.Metrics + Host component.Host + ClusterNameProvider clusterNameProvider + LeaderElection *LeaderElection + BearerToken string } -func NewPrometheusScraper(ctx context.Context, telemetrySettings component.TelemetrySettings, endpoint string, nextConsumer consumer.Metrics, host component.Host, clusterNameProvider clusterNameProvider, leaderElection *LeaderElection) (*PrometheusScraper, error) { - if leaderElection == nil { - return nil, errors.New("leader election cannot be null") +func NewPrometheusScraper(opts PrometheusScraperOpts) (*PrometheusScraper, error) { + if opts.Consumer == nil { + return nil, errors.New("consumer cannot be nil") + } + if opts.Host == nil { + return nil, errors.New("host cannot be nil") + } + if opts.LeaderElection == nil { + return nil, errors.New("leader election cannot be nil") + } + if opts.ClusterNameProvider == nil { + return nil, errors.New("cluster name provider cannot be nil") } - spConfig := simpleprometheusreceiver.Config{ - HTTPClientSettings: confighttp.HTTPClientSettings{ - Endpoint: endpoint, - TLSSetting: configtls.TLSClientSetting{ - TLSSetting: configtls.TLSSetting{ - CAFile: caFile, - }, - Insecure: false, + controlPlaneMetricsAllowRegex := "" + for _, item := range controlPlaneMetricAllowList { + controlPlaneMetricsAllowRegex += item + "|" + } + controlPlaneMetricsAllowRegex = strings.TrimSuffix(controlPlaneMetricsAllowRegex, "|") + + scrapeConfig := &config.ScrapeConfig{ + HTTPClientConfig: configutil.HTTPClientConfig{ + TLSConfig: configutil.TLSConfig{ + CAFile: caFile, InsecureSkipVerify: false, }, }, - MetricsPath: "/metrics", - CollectionInterval: collectionInterval, - UseServiceAccount: true, + ScrapeInterval: model.Duration(collectionInterval), + ScrapeTimeout: model.Duration(collectionInterval), + JobName: fmt.Sprintf("%s/%s", "containerInsightsKubeAPIServerScraper", opts.Endpoint), + HonorTimestamps: true, + Scheme: "https", + MetricsPath: "/metrics", + ServiceDiscoveryConfigs: discovery.Configs{ + &discovery.StaticConfig{ + { + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue(opts.Endpoint), + "ClusterName": model.LabelValue(opts.ClusterNameProvider.GetClusterName()), + "Version": model.LabelValue("0"), + "Sources": model.LabelValue("[\"apiserver\"]"), + "NodeName": model.LabelValue(os.Getenv("HOST_NAME")), + "Type": model.LabelValue("control_plane"), + }, + }, + }, + }, + }, + MetricRelabelConfigs: []*relabel.Config{ + { + // allow list filter for the control plane metrics we care about + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp(controlPlaneMetricsAllowRegex), + Action: relabel.Keep, + }, + }, + } + + if opts.BearerToken != "" { + scrapeConfig.HTTPClientConfig.BearerToken = configutil.Secret(opts.BearerToken) + } else { + opts.TelemetrySettings.Logger.Warn("bearer token is not set, control plane metrics will not be published") } - consumer := newPrometheusConsumer(telemetrySettings.Logger, nextConsumer, clusterNameProvider.GetClusterName(), os.Getenv("HOST_NAME")) + promConfig := prometheusreceiver.Config{ + PrometheusConfig: &config.Config{ + ScrapeConfigs: []*config.ScrapeConfig{scrapeConfig}, + }, + } params := receiver.CreateSettings{ - TelemetrySettings: telemetrySettings, + TelemetrySettings: opts.TelemetrySettings, } - spFactory := simpleprometheusreceiver.NewFactory() - spr, err := spFactory.CreateMetricsReceiver(ctx, params, &spConfig, consumer) + promFactory := prometheusreceiver.NewFactory() + promReceiver, err := promFactory.CreateMetricsReceiver(opts.Ctx, params, &promConfig, opts.Consumer) if err != nil { - return nil, fmt.Errorf("failed to create simple prometheus receiver: %w", err) + return nil, fmt.Errorf("failed to create prometheus receiver: %w", err) } return &PrometheusScraper{ - ctx: ctx, - settings: telemetrySettings, - host: host, - clusterNameProvider: clusterNameProvider, - simplePrometheusReceiver: spr, - leaderElection: leaderElection, + ctx: opts.Ctx, + settings: opts.TelemetrySettings, + host: opts.Host, + clusterNameProvider: opts.ClusterNameProvider, + prometheusReceiver: promReceiver, + leaderElection: opts.LeaderElection, }, nil } @@ -100,9 +176,9 @@ func (ps *PrometheusScraper) GetMetrics() []pmetric.Metrics { // if we are leading, ensure we are running if !ps.running { ps.settings.Logger.Info("The scraper is not running, starting up the scraper") - err := ps.simplePrometheusReceiver.Start(ps.ctx, ps.host) + err := ps.prometheusReceiver.Start(ps.ctx, ps.host) if err != nil { - ps.settings.Logger.Error("Unable to start SimplePrometheusReceiver", zap.Error(err)) + ps.settings.Logger.Error("Unable to start PrometheusReceiver", zap.Error(err)) } ps.running = err == nil } @@ -110,9 +186,9 @@ func (ps *PrometheusScraper) GetMetrics() []pmetric.Metrics { } func (ps *PrometheusScraper) Shutdown() { if ps.running { - err := ps.simplePrometheusReceiver.Shutdown(ps.ctx) + err := ps.prometheusReceiver.Shutdown(ps.ctx) if err != nil { - ps.settings.Logger.Error("Unable to shutdown SimplePrometheusReceiver", zap.Error(err)) + ps.settings.Logger.Error("Unable to shutdown PrometheusReceiver", zap.Error(err)) } ps.running = false } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go index 24cfb0e1d486..7d544635f3dc 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/prometheus_scraper_test.go @@ -16,20 +16,23 @@ package k8sapiserver import ( "context" + "fmt" "strings" "testing" - "time" + configutil "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" ) const renameMetric = ` @@ -50,9 +53,10 @@ rpc_duration_total{method="post",port="6381"} 120.0 ` type mockConsumer struct { - t *testing.T - up *bool - httpConnected *bool + t *testing.T + up *bool + httpConnected *bool + rpcDurationTotal *bool } func (m mockConsumer) Capabilities() consumer.Capabilities { @@ -71,6 +75,9 @@ func (m mockConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er assert.Equal(m.t, float64(15), metric.Sum().DataPoints().At(0).DoubleValue()) *m.httpConnected = true } + if metric.Name() == "rpc_duration_total" { + *m.rpcDurationTotal = true + } if metric.Name() == "up" { assert.Equal(m.t, float64(1), metric.Gauge().DataPoints().At(0).DoubleValue()) *m.up = true @@ -80,23 +87,75 @@ func (m mockConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er return nil } -func TestNewPrometheusScraperNilLeaderElection(t *testing.T) { +func TestNewPrometheusScraperBadInputs(t *testing.T) { settings := componenttest.NewNopTelemetrySettings() settings.Logger, _ = zap.NewDevelopment() - scraper, err := NewPrometheusScraper(context.TODO(), settings, "", mockConsumer{}, componenttest.NewNopHost(), mockClusterNameProvider{}, nil) - assert.Error(t, err) - assert.Nil(t, scraper) + leaderElection := LeaderElection{ + leading: true, + } + + tests := []PrometheusScraperOpts{ + { + Ctx: context.TODO(), + TelemetrySettings: settings, + Endpoint: "", + Consumer: mockConsumer{}, + Host: componenttest.NewNopHost(), + ClusterNameProvider: mockClusterNameProvider{}, + LeaderElection: nil, + BearerToken: "", + }, + { + Ctx: context.TODO(), + TelemetrySettings: settings, + Endpoint: "", + Consumer: nil, + Host: componenttest.NewNopHost(), + ClusterNameProvider: mockClusterNameProvider{}, + LeaderElection: &leaderElection, + BearerToken: "", + }, + { + Ctx: context.TODO(), + TelemetrySettings: settings, + Endpoint: "", + Consumer: mockConsumer{}, + Host: nil, + ClusterNameProvider: mockClusterNameProvider{}, + LeaderElection: &leaderElection, + BearerToken: "", + }, + { + Ctx: context.TODO(), + TelemetrySettings: settings, + Endpoint: "", + Consumer: mockConsumer{}, + Host: componenttest.NewNopHost(), + ClusterNameProvider: nil, + LeaderElection: &leaderElection, + BearerToken: "", + }, + } + + for _, tt := range tests { + scraper, err := NewPrometheusScraper(tt) + + assert.Error(t, err) + assert.Nil(t, scraper) + } } func TestNewPrometheusScraperEndToEnd(t *testing.T) { upPtr := false httpPtr := false + rpcDurationTotalPtr := false consumer := mockConsumer{ - t: t, - up: &upPtr, - httpConnected: &httpPtr, + t: t, + up: &upPtr, + httpConnected: &httpPtr, + rpcDurationTotal: &rpcDurationTotalPtr, } settings := componenttest.NewNopTelemetrySettings() @@ -105,16 +164,26 @@ func TestNewPrometheusScraperEndToEnd(t *testing.T) { leaderElection := LeaderElection{ leading: true, } - scraper, err := NewPrometheusScraper(context.TODO(), settings, "", consumer, componenttest.NewNopHost(), mockClusterNameProvider{}, &leaderElection) + + scraper, err := NewPrometheusScraper(PrometheusScraperOpts{ + Ctx: context.TODO(), + TelemetrySettings: settings, + Endpoint: "", + Consumer: mockConsumer{}, + Host: componenttest.NewNopHost(), + ClusterNameProvider: mockClusterNameProvider{}, + LeaderElection: &leaderElection, + BearerToken: "", + }) assert.NoError(t, err) assert.Equal(t, mockClusterNameProvider{}, scraper.clusterNameProvider) - // build up a new SPR - spFactory := simpleprometheusreceiver.NewFactory() + // build up a new PR + promFactory := prometheusreceiver.NewFactory() targets := []*testData{ { - name: "prometheus_simple", + name: "prometheus", pages: []mockPrometheusResponse{ {code: 200, data: renameMetric}, }, @@ -125,24 +194,55 @@ func TestNewPrometheusScraperEndToEnd(t *testing.T) { split := strings.Split(mp.srv.URL, "http://") - spConfig := simpleprometheusreceiver.Config{ - HTTPClientSettings: confighttp.HTTPClientSettings{ - Endpoint: split[1], - TLSSetting: configtls.TLSClientSetting{ - Insecure: true, + scrapeConfig := &config.ScrapeConfig{ + HTTPClientConfig: configutil.HTTPClientConfig{ + TLSConfig: configutil.TLSConfig{ InsecureSkipVerify: true, }, }, - MetricsPath: cfg.ScrapeConfigs[0].MetricsPath, - CollectionInterval: time.Duration(cfg.ScrapeConfigs[0].ScrapeInterval), - UseServiceAccount: false, + ScrapeInterval: cfg.ScrapeConfigs[0].ScrapeInterval, + ScrapeTimeout: cfg.ScrapeConfigs[0].ScrapeInterval, + JobName: fmt.Sprintf("%s/%s", "containerInsightsKubeAPIServerScraper", cfg.ScrapeConfigs[0].MetricsPath), + HonorTimestamps: true, + Scheme: "http", + MetricsPath: cfg.ScrapeConfigs[0].MetricsPath, + ServiceDiscoveryConfigs: discovery.Configs{ + &discovery.StaticConfig{ + { + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue(split[1]), + "ClusterName": model.LabelValue("test_cluster_name"), + "Version": model.LabelValue("0"), + "Sources": model.LabelValue("[\"apiserver\"]"), + "NodeName": model.LabelValue("test"), + "Type": model.LabelValue("control_plane"), + }, + }, + }, + }, + }, + MetricRelabelConfigs: []*relabel.Config{ + { + // allow list filter for the control plane metrics we care about + SourceLabels: model.LabelNames{"__name__"}, + Regex: relabel.MustNewRegexp("http_connected_total"), + Action: relabel.Keep, + }, + }, + } + + promConfig := prometheusreceiver.Config{ + PrometheusConfig: &config.Config{ + ScrapeConfigs: []*config.ScrapeConfig{scrapeConfig}, + }, } - // replace the SPR + // replace the prom receiver params := receiver.CreateSettings{ TelemetrySettings: scraper.settings, } - scraper.simplePrometheusReceiver, err = spFactory.CreateMetricsReceiver(scraper.ctx, params, &spConfig, consumer) + scraper.prometheusReceiver, err = promFactory.CreateMetricsReceiver(scraper.ctx, params, &promConfig, consumer) assert.NoError(t, err) assert.NotNil(t, mp) defer mp.Close() @@ -160,4 +260,5 @@ func TestNewPrometheusScraperEndToEnd(t *testing.T) { assert.True(t, *consumer.up) assert.True(t, *consumer.httpConnected) + assert.False(t, *consumer.rpcDurationTotal) // this will get filtered out by our metric relabel config } diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 857733658c26..222e962cea9b 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "k8s.io/client-go/rest" ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" @@ -156,7 +157,26 @@ func (acir *awsContainerInsightReceiver) startPrometheusScraper(ctx context.Cont acir.settings.Logger.Debug("kube apiserver endpoint found", zap.String("endpoint", endpoint)) // use the same leader - acir.prometheusScraper, err = k8sapiserver.NewPrometheusScraper(ctx, acir.settings, endpoint, acir.nextConsumer, host, hostinfo, leaderElection) + + restConfig, err := rest.InClusterConfig() + if err != nil { + return err + } + bearerToken := restConfig.BearerToken + if bearerToken == "" { + return errors.New("bearer token was empty") + } + + acir.prometheusScraper, err = k8sapiserver.NewPrometheusScraper(k8sapiserver.PrometheusScraperOpts{ + Ctx: ctx, + TelemetrySettings: acir.settings, + Endpoint: endpoint, + Consumer: acir.nextConsumer, + Host: host, + ClusterNameProvider: hostinfo, + LeaderElection: leaderElection, + BearerToken: bearerToken, + }) return err }