Skip to content

Commit

Permalink
Add tests and godoc for metrics client
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Apr 24, 2023
1 parent 4d5d9f1 commit 55f63c2
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 15 deletions.
56 changes: 41 additions & 15 deletions agent/hcp/client/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
Expand All @@ -14,41 +15,62 @@ import (
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"

"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/version"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-retryablehttp"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
)

const (
// HTTP Client config
defaultStreamTimeout = 15 * time.Second

// Retry config
defaultRetryWaitMin = 15 * time.Second
defaultRetryWaitMin = 1 * time.Second
defaultRetryWaitMax = 15 * time.Second
defaultRetryMax = 4
)

// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway.
type MetricsClient interface {
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error
}

// hcpConfig represents HCP config for TLS abstracted in an interface for easy testing.
type hcpConfig interface {
oauth2.TokenSource
APITLSConfig() *tls.Config
}

// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
type cloudConfig interface {
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpConfig, error)
}

// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle.
// It also holds default HTTP headers to add to export requests.
type otlpClient struct {
client *retryablehttp.Client
headers map[string]string
}

// TelemetryClientCfg is used to configure the MetricsClient.
type TelemetryClientCfg struct {
cloudCfg config.CloudConfig
logger hclog.Logger
CloudCfg cloudConfig
Logger hclog.Logger
}

func NewMetricsClient(cfg TelemetryClientCfg) (MetricsClient, error) {
c, err := newHTTPClient(cfg.cloudCfg, cfg.logger)
// NewMetricsClient returns a configured MetricsClient.
// The current implementation uses otlpClient to provide retry functionality.
func NewMetricsClient(cfg *TelemetryClientCfg) (MetricsClient, error) {
if cfg.CloudCfg == nil || cfg.Logger == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide valid TelemetryClientCfg")
}

c, err := newHTTPClient(cfg.CloudCfg, cfg.Logger)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}

headers := map[string]string{
Expand All @@ -62,7 +84,8 @@ func NewMetricsClient(cfg TelemetryClientCfg) (MetricsClient, error) {
}, nil
}

func newHTTPClient(cloudCfg config.CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) {
// newHTTPClient configures the retryable HTTP client.
func newHTTPClient(cloudCfg cloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) {
hcpCfg, err := cloudCfg.HCPConfig()
if err != nil {
return nil, err
Expand All @@ -83,7 +106,7 @@ func newHTTPClient(cloudCfg config.CloudConfig, logger hclog.Logger) (*retryable

retryClient := &retryablehttp.Client{
HTTPClient: client,
Logger: logger,
Logger: logger.Named("hcp_telemetry_client"),
RetryWaitMin: defaultRetryWaitMin,
RetryWaitMax: defaultRetryWaitMax,
RetryMax: defaultRetryMax,
Expand All @@ -94,19 +117,22 @@ func newHTTPClient(cloudCfg config.CloudConfig, logger hclog.Logger) (*retryable
return retryClient, nil
}

// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint.
// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config.
// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request.
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}

body, err := proto.Marshal(pbRequest)
if err != nil {
return err
return fmt.Errorf("failed to export metrics: %v", err)
}

req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body))
if err != nil {
return err
return fmt.Errorf("failed to export metrics: %v", err)
}

for k, v := range o.headers {
Expand All @@ -115,23 +141,23 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R

resp, err := o.client.Do(req)
if err != nil {
return err
return fmt.Errorf("failed to export metrics: %v", err)
}

var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
return err
return fmt.Errorf("failed to export metrics: %v", err)
}

if respData.Len() != 0 {
var respProto colmetricpb.ExportMetricsServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return err
return fmt.Errorf("failed to export metrics: %v", err)
}

if respProto.PartialSuccess != nil {
msg := respProto.PartialSuccess.GetErrorMessage()
return fmt.Errorf("failed to upload metrics: partial success: %s", msg)
return fmt.Errorf("failed to export metrics: partial success: %s", msg)
}
}

Expand Down
155 changes: 155 additions & 0 deletions agent/hcp/client/metrics_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package client

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"

colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"

"github.com/hashicorp/consul/version"
"github.com/hashicorp/go-hclog"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"github.com/stretchr/testify/require"
)

type mockHCPCfg struct{}

func (m *mockHCPCfg) APITLSConfig() *tls.Config {
return nil
}

func (m *mockHCPCfg) Token() (*oauth2.Token, error) {
return &oauth2.Token{
AccessToken: "test-token",
}, nil
}

type mockCloudCfg struct{}

func (m mockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpConfig, error) {
return &mockHCPCfg{}, nil
}

type mockErrCloudCfg struct{}

func (m mockErrCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpConfig, error) {
return nil, errors.New("test bad HCP config")
}

func TestNewMetricsClient(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
cfg *TelemetryClientCfg
}{
"success": {
cfg: &TelemetryClientCfg{
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
CloudCfg: &mockCloudCfg{},
},
},
"failsWithoutCloudCfg": {
wantErr: "failed to init telemetry client",
cfg: &TelemetryClientCfg{
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
CloudCfg: nil,
},
},
"failsWithoutLogger": {
wantErr: "failed to init telemetry client",
cfg: &TelemetryClientCfg{
Logger: nil,
CloudCfg: &mockErrCloudCfg{},
},
},
"failsHCPConfig": {
wantErr: "failed to init telemetry client",
cfg: &TelemetryClientCfg{
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
CloudCfg: &mockErrCloudCfg{},
},
},
} {
t.Run(name, func(t *testing.T) {
client, err := NewMetricsClient(test.cfg)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}

require.Nil(t, err)
require.NotNil(t, client)
})
}
}

func TestExportMetrics(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
status int
}{
"success": {
status: http.StatusOK,
},
"failsWithNonRetryableError": {
status: http.StatusBadRequest,
wantErr: "failed to export metrics",
},
} {
t.Run(name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf")

require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")
require.Equal(t, r.Header.Get("X-HCP-Source-Channel"), fmt.Sprintf("consul %s hcp-go-sdk/%s", version.GetHumanVersion(), version.Version))

body := colpb.ExportMetricsServiceResponse{}

if test.wantErr != "" {
body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{
ErrorMessage: "partial failure",
}
}
bytes, err := proto.Marshal(&body)

require.NoError(t, err)

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(test.status)
w.Write(bytes)
}))
defer srv.Close()

cfg := &TelemetryClientCfg{
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
CloudCfg: mockCloudCfg{},
}

client, err := NewMetricsClient(cfg)
require.NoError(t, err)

ctx := context.Background()
metrics := &metricpb.ResourceMetrics{}
err = client.ExportMetrics(ctx, metrics, srv.URL)

if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}

require.NoError(t, err)
})
}

}

0 comments on commit 55f63c2

Please sign in to comment.