Skip to content

Commit

Permalink
Rebased and cleaned up telemetry_provider
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Jul 31, 2023
1 parent 4a40a31 commit 9ed67e4
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 203 deletions.
14 changes: 12 additions & 2 deletions agent/hcp/client/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package client
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/hashicorp/go-cleanhttp"
Expand Down Expand Up @@ -39,6 +41,10 @@ const (
defaultErrRespBodyLength = 100
)

var (
errInvalidEndpoint = errors.New("invalid nil endpoint")
)

// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
type CloudConfig interface {
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
Expand Down Expand Up @@ -122,7 +128,11 @@ func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Cl
// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint.
// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config.
// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request.
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint *url.URL) error {
if endpoint == nil {
return errInvalidEndpoint
}

pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}
Expand All @@ -132,7 +142,7 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R
return fmt.Errorf("failed to marshal the request: %w", err)
}

req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body))
req, err := retryablehttp.NewRequest(http.MethodPost, endpoint.String(), bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
Expand Down
5 changes: 4 additions & 1 deletion agent/hcp/client/metrics_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -123,7 +124,9 @@ func TestExportMetrics(t *testing.T) {

ctx := context.Background()
metrics := &metricpb.ResourceMetrics{}
err = client.ExportMetrics(ctx, metrics, srv.URL)
u, err := url.Parse(srv.URL)
require.NoError(t, err)
err = client.ExportMetrics(ctx, metrics, u)

if test.wantErr != "" {
require.Error(t, err)
Expand Down
6 changes: 3 additions & 3 deletions agent/hcp/client/telemetry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

var (
// defaultMetricFilters is a regex that matches all metric names.
defaultMetricFilters = regexp.MustCompile(".+")
DefaultMetricFilters = regexp.MustCompile(".+")

// Validation errors for AgentTelemetryConfigOK response.
errMissingPayload = errors.New("missing payload")
Expand Down Expand Up @@ -142,15 +142,15 @@ func convertMetricFilters(ctx context.Context, payloadFilters []string) *regexp.

if len(validFilters) == 0 {
logger.Error("no valid filters", "error", mErr)
return defaultMetricFilters
return DefaultMetricFilters
}

// Combine the valid regex strings with OR.
finalRegex := strings.Join(validFilters, "|")
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
logger.Error("failed to compile regex", "error", mErr)
return defaultMetricFilters
return DefaultMetricFilters
}

return composedRegex
Expand Down
6 changes: 3 additions & 3 deletions agent/hcp/client/telemetry_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
MetricsConfig: &MetricsConfig{
Endpoint: validTestURL,
Labels: map[string]string{"test": "test"},
Filters: defaultMetricFilters,
Filters: DefaultMetricFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
Expand Down Expand Up @@ -276,13 +276,13 @@ func TestConvertMetricFilters(t *testing.T) {
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"emptyRegex": {
filters: []string{},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
Expand Down
8 changes: 4 additions & 4 deletions agent/hcp/telemetry/otel_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// MetricsClient exports Consul metrics in OTLP format to the desired endpoint.
type MetricsClient interface {
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint *url.URL) error
}

// EndpointProvider provides the endpoint where metrics are exported to by the OTELExporter.
Expand Down Expand Up @@ -66,8 +66,7 @@ func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre

// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
endpoint := e.endpointProvider.GetEndpoint()
if endpoint == nil {
if !e.endpointProvider.Enabled() {
return nil
}

Expand All @@ -76,7 +75,8 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM
return nil
}

err := e.client.ExportMetrics(ctx, otlpMetrics, endpoint.String())
endpoint := e.endpointProvider.GetEndpoint()
err := e.client.ExportMetrics(ctx, otlpMetrics, endpoint)
if err != nil {
goMetrics.IncrCounter(internalMetricExportFailure, 1)
return fmt.Errorf("failed to export metrics: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ type mockMetricsClient struct {
exportErr error
}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint *url.URL) error {
return m.exportErr
}

type mockEndpointProvider struct {
endpoint *url.URL
enabled bool
}

func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint }
func (m *mockEndpointProvider) Enabled() bool { return m.enabled }

func TestTemporality(t *testing.T) {
t.Parallel()
Expand Down
32 changes: 7 additions & 25 deletions agent/hcp/telemetry_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/hashstructure/v2"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/telemetry"
Expand All @@ -21,7 +20,6 @@ var (
// internalMetricRefreshSuccess is a metric to monitor refresh successes.
internalMetricRefreshSuccess []string = []string{"hcp", "telemetry_config_provider", "refresh", "success"}
defaultTelemetryConfigRefreshInterval = 1 * time.Minute
defaultTelemetryConfigFilters = regexp.MustCompile(".+")
)

// Ensure hcpProviderImpl implements telemetry provider interfaces.
Expand Down Expand Up @@ -53,42 +51,26 @@ type dynamicConfig struct {
RefreshInterval time.Duration
}

// equals returns true if two dynamicConfig objects are equal.
func (d *dynamicConfig) equals(newCfg *dynamicConfig) (bool, error) {
currHash, err := hashstructure.Hash(*d, hashstructure.FormatV2, nil)
if err != nil {
return false, err
}

newHash, err := hashstructure.Hash(*newCfg, hashstructure.FormatV2, nil)
if err != nil {
return false, err
}

return currHash == newHash, err
}

// NewHCPProvider initializes and starts a HCP Telemetry provider with provided params.
func NewHCPProvider(ctx context.Context, hcpClient client.Client) *hcpProviderImpl {
ticker := time.NewTicker(defaultTelemetryConfigRefreshInterval)
t := &hcpProviderImpl{
// Initialize with default config values.
cfg: &dynamicConfig{
Labels: make(map[string]string),
Filters: defaultTelemetryConfigFilters,
Labels: map[string]string{},
Filters: client.DefaultMetricFilters,
RefreshInterval: defaultTelemetryConfigRefreshInterval,
Endpoint: nil,
Enabled: false,
},
hcpClient: hcpClient,
ticker: ticker,
}

// Try to initialize the config once before running periodic fetcher.
newCfg, _ := t.checkUpdate(ctx)
if newCfg != nil {
if newCfg := t.getUpdate(ctx); newCfg != nil {
t.cfg = newCfg
ticker.Reset(newCfg.RefreshInterval)
}

go t.run(ctx, ticker.C)
go t.run(ctx, t.cfg.RefreshInterval)

return t
}
Expand Down
Loading

0 comments on commit 9ed67e4

Please sign in to comment.