Skip to content

Commit

Permalink
Merge branch 'main' into cc-4960/move-first-fetch-into-provider
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Aug 29, 2023
2 parents 9c8711c + 48c8a83 commit 150fb64
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 24 deletions.
3 changes: 3 additions & 0 deletions .changelog/18584.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
Reduce the frequency of metric exports from Consul to HCP from every 10s to every 1m
```
2 changes: 1 addition & 1 deletion .github/workflows/test-integrations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ jobs:
contents: read
strategy:
matrix:
vault-version: ["1.13.1", "1.12.5", "1.11.9", "1.10.11"]
vault-version: ["1.14.1", "1.13.5", "1.12.9", "1.11.12"]
env:
VAULT_BINARY_VERSION: ${{ matrix.vault-version }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func sink(

cfgProvider := NewHCPProvider(ctx, hcpClient)

reader := telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval)
reader := telemetry.NewOTELReader(metricsClient, cfgProvider)
sinkOpts := &telemetry.OTELSinkOpts{
Reader: reader,
ConfigProvider: cfgProvider,
Expand Down
20 changes: 10 additions & 10 deletions agent/hcp/telemetry/otel_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,32 @@ type EndpointProvider interface {
GetEndpoint() *url.URL
}

// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// otelExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics.
// This allows us to use a custom client - HCP authenticated MetricsClient.
type OTELExporter struct {
type otelExporter struct {
client MetricsClient
endpointProvider EndpointProvider
}

// NewOTELExporter returns a configured OTELExporter.
func NewOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *OTELExporter {
return &OTELExporter{
// newOTELExporter returns a configured OTELExporter.
func newOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *otelExporter {
return &otelExporter{
client: client,
endpointProvider: endpointProvider,
}
}

// Temporality returns the Cumulative temporality for metrics aggregation.
// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default.
func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality {
func (e *otelExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}

// Aggregation returns the Aggregation to use for an instrument kind.
// The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics.
// This custom version replicates that logic, but removes the panic.
func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation {
func (e *otelExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation {
switch kind {
case metric.InstrumentKindObservableGauge:
return aggregation.LastValue{}
Expand All @@ -69,7 +69,7 @@ func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre
}

// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
func (e *otelExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
if e.endpointProvider.IsDisabled() {
return nil
}
Expand All @@ -95,13 +95,13 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM
}

// ForceFlush is a no-op, as the MetricsClient client holds no state.
func (e *OTELExporter) ForceFlush(ctx context.Context) error {
func (e *otelExporter) ForceFlush(ctx context.Context) error {
goMetrics.IncrCounter(internalMetricExporterForceFlush, 1)
return ctx.Err()
}

// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
func (e *OTELExporter) Shutdown(ctx context.Context) error {
func (e *otelExporter) Shutdown(ctx context.Context) error {
goMetrics.IncrCounter(internalMetricExporterShutdown, 1)
return ctx.Err()
}
12 changes: 6 additions & 6 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (m *mockEndpointProvider) IsDisabled() bool { return m.disabled }

func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
exp := &otelExporter{}
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter))
}

Expand All @@ -68,7 +68,7 @@ func TestAggregation(t *testing.T) {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
exp := &otelExporter{}
require.Equal(t, test.expAgg, exp.Aggregation(test.kind))
})
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestExport(t *testing.T) {
}
}

exp := NewOTELExporter(test.client, provider)
exp := newOTELExporter(test.client, provider)

err := exp.Export(context.Background(), test.metrics)
if test.wantErr != "" {
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestExport_CustomMetrics(t *testing.T) {
u, err := url.Parse(testExportEndpoint)
require.NoError(t, err)

exp := NewOTELExporter(tc.client, &mockEndpointProvider{
exp := newOTELExporter(tc.client, &mockEndpointProvider{
endpoint: u,
})

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestExport_CustomMetrics(t *testing.T) {

func TestForceFlush(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
exp := &otelExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

Expand All @@ -235,7 +235,7 @@ func TestForceFlush(t *testing.T) {

func TestShutdown(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
exp := &otelExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

Expand Down
29 changes: 23 additions & 6 deletions agent/hcp/telemetry/otel_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,28 @@ import (
"time"

gometrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
otelsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"

"github.com/hashicorp/go-hclog"
)

// DefaultExportInterval is a default time interval between export of aggregated metrics.
const DefaultExportInterval = 10 * time.Second
const (
// defaultExportInterval is a default time interval between export of aggregated metrics.
// At the time of writing this is the same as the otelsdk.Reader's default export interval.
defaultExportInterval = 60 * time.Second

// defaultExportTimeout is the time the otelsdk.Reader waits on an export before cancelling it.
// At the time of writing this is the same as the otelsdk.Reader's default export timeout default.
//
// note: in practice we are more likely to hit the http.Client Timeout in telemetry.MetricsClient.
// That http.Client Timeout is 15 seconds (at the time of writing). The otelsdk.Reader will use
// defaultExportTimeout for the entire Export call, but since the http.Client's Timeout is 15s,
// we should hit that first before reaching the 30 second timeout set here.
defaultExportTimeout = 30 * time.Second
)

// Disabled should be implemented to turn on/off metrics processing
type Disabled interface {
Expand All @@ -34,6 +47,7 @@ type ConfigProvider interface {
Disabled
// GetLabels should return a set of OTEL attributes added by default all metrics.
GetLabels() map[string]string

// GetFilters should return filtesr that are required to enable metric processing.
// Filters act as an allowlist to collect only the required metrics.
GetFilters() *regexp.Regexp
Expand Down Expand Up @@ -82,9 +96,12 @@ type OTELSink struct {
// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds.
// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export
// metrics in OTLP format to an external url.
func NewOTELReader(client MetricsClient, endpointProvider EndpointProvider, exportInterval time.Duration) otelsdk.Reader {
exporter := NewOTELExporter(client, endpointProvider)
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval))
func NewOTELReader(client MetricsClient, endpointProvider EndpointProvider) otelsdk.Reader {
return otelsdk.NewPeriodicReader(
newOTELExporter(client, endpointProvider),
otelsdk.WithInterval(defaultExportInterval),
otelsdk.WithTimeout(defaultExportTimeout),
)
}

// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface.
Expand Down

0 comments on commit 150fb64

Please sign in to comment.