Skip to content

Commit

Permalink
Use disable instead of enable
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Aug 2, 2023
1 parent e2020e6 commit bc356c3
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 96 deletions.
6 changes: 3 additions & 3 deletions agent/hcp/client/telemetry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ type RefreshConfig struct {
RefreshInterval time.Duration
}

// MetricsEnabled returns true if metrics export is enabled, i.e. a valid metrics endpoint exists.
func (t *TelemetryConfig) MetricsEnabled() bool {
return t.MetricsConfig.Endpoint != nil
// MetricsDisabled returns true if metrics export is disabled, i.e. no valid metrics endpoint exists.
func (t *TelemetryConfig) MetricsDisabled() bool {
return t.MetricsConfig.Endpoint == nil
}

// validateAgentTelemetryConfigPayload ensures the returned payload from HCP is valid.
Expand Down
8 changes: 3 additions & 5 deletions agent/hcp/client/telemetry_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
resp *consul_telemetry_service.AgentTelemetryConfigOK
expectedTelemetryCfg *TelemetryConfig
wantErr error
expectedEnabled bool
expectedDisabled bool
}{
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand All @@ -112,7 +112,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"successNoEndpoint": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand All @@ -139,7 +138,7 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: false,
expectedDisabled: true,
},
"successBadFilters": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand All @@ -166,7 +165,6 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"errorsWithInvalidRefreshInterval": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Expand Down Expand Up @@ -206,7 +204,7 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
}
require.NoError(t, err)
require.Equal(t, tc.expectedTelemetryCfg, telemetryCfg)
require.Equal(t, tc.expectedEnabled, telemetryCfg.MetricsEnabled())
require.Equal(t, tc.expectedDisabled, telemetryCfg.MetricsDisabled())
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions agent/hcp/telemetry/otel_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type MetricsClient interface {
// EndpointProvider exposes the GetEndpoint() interface method to fetch the endpoint.
// This abstraction layer offers flexibility, in particular for dynamic configuration or changes to the endpoint.
type EndpointProvider interface {
Enabler
Disabled
GetEndpoint() *url.URL
}

Expand Down Expand Up @@ -66,7 +66,7 @@ func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre

// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
if !e.endpointProvider.Enabled() {
if e.endpointProvider.IsDisabled() {
return nil
}

Expand Down
21 changes: 12 additions & 9 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *met

type mockEndpointProvider struct {
endpoint *url.URL
enabled bool
disabled bool
}

func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint }
func (m *mockEndpointProvider) Enabled() bool { return m.enabled }
func (m *mockEndpointProvider) IsDisabled() bool { return m.disabled }

func TestTemporality(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -79,23 +79,28 @@ func TestExport(t *testing.T) {
client MetricsClient
provider EndpointProvider
}{
"earlyReturnDisabledProvider": {
client: &mockMetricsClient{},
provider: &mockEndpointProvider{
disabled: true,
},
},
"earlyReturnWithoutEndpoint": {
client: &mockMetricsClient{},
provider: &mockEndpointProvider{},
},
"earlyReturnWithoutScopeMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
provider: &mockEndpointProvider{
enabled: true},
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
provider: &mockEndpointProvider{},
},
"earlyReturnWithoutMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics([]metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{}},
},
),
provider: &mockEndpointProvider{enabled: true},
provider: &mockEndpointProvider{},
},
"errorWithExportFailure": {
client: &mockMetricsClient{
Expand All @@ -113,7 +118,6 @@ func TestExport(t *testing.T) {
},
),
provider: &mockEndpointProvider{
enabled: true,
endpoint: &url.URL{},
},
wantErr: "failed to export metrics",
Expand Down Expand Up @@ -190,7 +194,6 @@ func TestExport_CustomMetrics(t *testing.T) {

exp := NewOTELExporter(tc.client, &mockEndpointProvider{
endpoint: u,
enabled: true,
})

ctx := context.Background()
Expand Down
16 changes: 8 additions & 8 deletions agent/hcp/telemetry/otel_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (
// DefaultExportInterval is a default time interval between export of aggregated metrics.
const DefaultExportInterval = 10 * time.Second

// Enabler must be implemented as it is required to process metrics
// Returning false will mean the sink is disabled, and will not accept metrics.
type Enabler interface {
Enabled() bool
// Disabled should be implemented to turn on/off metrics processing
type Disabled interface {
// IsDisabled() can return true disallow the sink from accepting metrics.
IsDisabled() bool
}

// ConfigProvider is required to provide custom metrics processing.
type ConfigProvider interface {
Enabler
Disabled
// GetLabels should return a set of OTEL attributes added by default all metrics.
GetLabels() map[string]string
// GetFilters should return filtesr that are required to enable metric processing.
Expand Down Expand Up @@ -134,7 +134,7 @@ func (o *OTELSink) IncrCounter(key []string, val float32) {
// AddSampleWithLabels emits a Consul gauge metric that gets
// registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) {
if !o.cfgProvider.Enabled() {
if o.cfgProvider.IsDisabled() {
return
}

Expand Down Expand Up @@ -165,7 +165,7 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr

// AddSampleWithLabels emits a Consul sample metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) {
if !o.cfgProvider.Enabled() {
if o.cfgProvider.IsDisabled() {
return
}

Expand Down Expand Up @@ -194,7 +194,7 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet

// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) {
if !o.cfgProvider.Enabled() {
if o.cfgProvider.IsDisabled() {
return
}

Expand Down
22 changes: 10 additions & 12 deletions agent/hcp/telemetry/otel_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
)

type mockConfigProvider struct {
filter *regexp.Regexp
labels map[string]string
enabled bool
filter *regexp.Regexp
labels map[string]string
disabled bool
}

func (m *mockConfigProvider) GetLabels() map[string]string {
Expand All @@ -31,8 +31,8 @@ func (m *mockConfigProvider) GetFilters() *regexp.Regexp {
return m.filter
}

func (m *mockConfigProvider) Enabled() bool {
return m.enabled
func (m *mockConfigProvider) IsDisabled() bool {
return m.disabled
}

var (
Expand Down Expand Up @@ -189,8 +189,7 @@ func TestOTELSink(t *testing.T) {
opts := &OTELSinkOpts{
Reader: reader,
ConfigProvider: &mockConfigProvider{
enabled: true,
filter: regexp.MustCompile("raft|autopilot"),
filter: regexp.MustCompile("raft|autopilot"),
labels: map[string]string{
"node_id": "test",
},
Expand Down Expand Up @@ -232,8 +231,8 @@ func TestOTELSinkDisabled(t *testing.T) {

sink, err := NewOTELSink(ctx, &OTELSinkOpts{
ConfigProvider: &mockConfigProvider{
filter: regexp.MustCompile("raft"),
enabled: false,
filter: regexp.MustCompile("raft"),
disabled: true,
},
Reader: reader,
})
Expand Down Expand Up @@ -380,9 +379,8 @@ func TestOTELSink_Race(t *testing.T) {
opts := &OTELSinkOpts{
Reader: reader,
ConfigProvider: &mockConfigProvider{
filter: regexp.MustCompile("test"),
labels: defaultLabels,
enabled: true,
filter: regexp.MustCompile("test"),
labels: defaultLabels,
},
}

Expand Down
66 changes: 33 additions & 33 deletions agent/hcp/telemetry_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,45 @@ type hcpProviderImpl struct {
// dynamicConfig is a set of configurable settings for metrics collection, processing and export.
// fields MUST be exported to compute hash for equals method.
type dynamicConfig struct {
Enabled bool
Endpoint *url.URL
Labels map[string]string
Filters *regexp.Regexp
disabled bool
endpoint *url.URL
labels map[string]string
filters *regexp.Regexp
// refreshInterval controls the interval at which configuration is fetched from HCP to refresh config.
RefreshInterval time.Duration
refreshInterval time.Duration
}

// NewHCPProvider initializes and starts a HCP Telemetry provider.
func NewHCPProvider(ctx context.Context, hcpClient client.Client) *hcpProviderImpl {
h := &hcpProviderImpl{
// Initialize with default config values.
cfg: &dynamicConfig{
Labels: map[string]string{},
Filters: client.DefaultMetricFilters,
RefreshInterval: defaultTelemetryConfigRefreshInterval,
Endpoint: nil,
Enabled: false,
labels: map[string]string{},
filters: client.DefaultMetricFilters,
refreshInterval: defaultTelemetryConfigRefreshInterval,
endpoint: nil,
disabled: true,
},
hcpClient: hcpClient,
}

// Try to initialize config once before starting periodic fetch.
h.updateConfig(ctx)

go h.run(ctx, h.cfg.RefreshInterval)
go h.run(ctx)

return h
}

// run continously checks for updates to the telemetry configuration by making a request to HCP.
func (h *hcpProviderImpl) run(ctx context.Context, refreshInterval time.Duration) {
ticker := time.NewTicker(refreshInterval)
func (h *hcpProviderImpl) run(ctx context.Context) {
// Try to initialize config once before starting periodic fetch.
h.updateConfig(ctx)

ticker := time.NewTicker(h.cfg.refreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if newCfg := h.updateConfig(ctx); newCfg != nil {
ticker.Reset(newCfg.RefreshInterval)
if newRefreshInterval := h.updateConfig(ctx); newRefreshInterval > 0 {
ticker.Reset(newRefreshInterval)
}
case <-ctx.Done():
return
Expand All @@ -91,7 +91,7 @@ func (h *hcpProviderImpl) run(ctx context.Context, refreshInterval time.Duration
}

// updateConfig makes a HTTP request to HCP to update metrics configuration held in the provider.
func (h *hcpProviderImpl) updateConfig(ctx context.Context) *dynamicConfig {
func (h *hcpProviderImpl) updateConfig(ctx context.Context) time.Duration {
logger := hclog.FromContext(ctx).Named("telemetry_config_provider")

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
Expand All @@ -101,23 +101,23 @@ func (h *hcpProviderImpl) updateConfig(ctx context.Context) *dynamicConfig {
if err != nil {
logger.Error("failed to fetch telemetry config from HCP", "error", err)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil
return 0
}

// newRefreshInterval of 0 or less can cause ticker Reset() panic.
newRefreshInterval := telemetryCfg.RefreshConfig.RefreshInterval
if newRefreshInterval <= 0 {
logger.Error("invalid refresh interval duration", "refreshInterval", newRefreshInterval)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil
return 0
}

newDynamicConfig := &dynamicConfig{
Filters: telemetryCfg.MetricsConfig.Filters,
Endpoint: telemetryCfg.MetricsConfig.Endpoint,
Labels: telemetryCfg.MetricsConfig.Labels,
RefreshInterval: telemetryCfg.RefreshConfig.RefreshInterval,
Enabled: telemetryCfg.MetricsEnabled(),
filters: telemetryCfg.MetricsConfig.Filters,
endpoint: telemetryCfg.MetricsConfig.Endpoint,
labels: telemetryCfg.MetricsConfig.Labels,
refreshInterval: telemetryCfg.RefreshConfig.RefreshInterval,
disabled: telemetryCfg.MetricsDisabled(),
}

// Acquire write lock to update new configuration.
Expand All @@ -127,37 +127,37 @@ func (h *hcpProviderImpl) updateConfig(ctx context.Context) *dynamicConfig {

metrics.IncrCounter(internalMetricRefreshSuccess, 1)

return newDynamicConfig
return newDynamicConfig.refreshInterval
}

// GetEndpoint acquires a read lock to return endpoint configuration for consumers.
func (h *hcpProviderImpl) GetEndpoint() *url.URL {
h.rw.RLock()
defer h.rw.RUnlock()

return h.cfg.Endpoint
return h.cfg.endpoint
}

// GetFilters acquires a read lock to return filters configuration for consumers.
func (h *hcpProviderImpl) GetFilters() *regexp.Regexp {
h.rw.RLock()
defer h.rw.RUnlock()

return h.cfg.Filters
return h.cfg.filters
}

// GetLabels acquires a read lock to return labels configuration for consumers.
func (h *hcpProviderImpl) GetLabels() map[string]string {
h.rw.RLock()
defer h.rw.RUnlock()

return h.cfg.Labels
return h.cfg.labels
}

// GetLabels acquires a read lock and return true if metrics are enabled.
func (h *hcpProviderImpl) Enabled() bool {
// IsDisabled acquires a read lock and return true if metrics are enabled.
func (h *hcpProviderImpl) IsDisabled() bool {
h.rw.RLock()
defer h.rw.RUnlock()

return h.cfg.Enabled
return h.cfg.disabled
}
Loading

0 comments on commit bc356c3

Please sign in to comment.