Skip to content

Commit

Permalink
[HCP Observability] New MetricsClient (#17100)
Browse files Browse the repository at this point in the history
* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext
  • Loading branch information
Achooo authored May 8, 2023
1 parent b6381b7 commit 5b82711
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 3 deletions.
152 changes: 152 additions & 0 deletions agent/hcp/client/metrics_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package client

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"

"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-retryablehttp"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"
)

const (
// HTTP Client config
defaultStreamTimeout = 15 * time.Second

// Retry config
// TODO: Evenutally, we'd like to configure these values dynamically.
defaultRetryWaitMin = 1 * time.Second
defaultRetryWaitMax = 15 * time.Second
defaultRetryMax = 4
)

// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway.
type MetricsClient interface {
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error
}

// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
type CloudConfig interface {
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
}

// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle.
// It also holds default HTTP headers to add to export requests.
type otlpClient struct {
client *retryablehttp.Client
header *http.Header
}

// NewMetricsClient returns a configured MetricsClient.
// The current implementation uses otlpClient to provide retry functionality.
func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, error) {
if cfg == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)")
}

if ctx == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide a valid context")
}

logger := hclog.FromContext(ctx)

c, err := newHTTPClient(cfg, logger)
if err != nil {
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}

header := make(http.Header)
header.Set("Content-Type", "application/x-protobuf")

return &otlpClient{
client: c,
header: &header,
}, nil
}

// newHTTPClient configures the retryable HTTP client.
func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) {
hcpCfg, err := cloudCfg.HCPConfig()
if err != nil {
return nil, err
}

tlsTransport := cleanhttp.DefaultPooledTransport()
tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig()

var transport http.RoundTripper = &oauth2.Transport{
Base: tlsTransport,
Source: hcpCfg,
}

client := &http.Client{
Transport: transport,
Timeout: defaultStreamTimeout,
}

retryClient := &retryablehttp.Client{
HTTPClient: client,
Logger: logger.Named("hcp_telemetry_client"),
RetryWaitMin: defaultRetryWaitMin,
RetryWaitMax: defaultRetryWaitMax,
RetryMax: defaultRetryMax,
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}

return retryClient, nil
}

// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint.
// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config.
// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request.
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}

body, err := proto.Marshal(pbRequest)
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}

req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}
req.Header = *o.header

resp, err := o.client.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}
defer resp.Body.Close()

var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}

if respData.Len() != 0 {
var respProto colmetricpb.ExportMetricsServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}

if respProto.PartialSuccess != nil {
msg := respProto.PartialSuccess.GetErrorMessage()
return fmt.Errorf("failed to export metrics: partial success: %s", msg)
}
}

return nil
}
107 changes: 107 additions & 0 deletions agent/hcp/client/metrics_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package client

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/require"
colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/protobuf/proto"
)

func TestNewMetricsClient(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
cfg CloudConfig
ctx context.Context
}{
"success": {
cfg: &MockCloudCfg{},
ctx: context.Background(),
},
"failsWithoutCloudCfg": {
wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)",
cfg: nil,
ctx: context.Background(),
},
"failsWithoutContext": {
wantErr: "failed to init telemetry client: provide a valid context",
cfg: MockCloudCfg{},
ctx: nil,
},
"failsHCPConfig": {
wantErr: "failed to init telemetry client",
cfg: MockErrCloudCfg{},
ctx: context.Background(),
},
} {
t.Run(name, func(t *testing.T) {
client, err := NewMetricsClient(test.cfg, test.ctx)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}

require.Nil(t, err)
require.NotNil(t, client)
})
}
}

func TestExportMetrics(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
status int
}{
"success": {
status: http.StatusOK,
},
"failsWithNonRetryableError": {
status: http.StatusBadRequest,
wantErr: "failed to export metrics",
},
} {
t.Run(name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf")

require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")

body := colpb.ExportMetricsServiceResponse{}

if test.wantErr != "" {
body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{
ErrorMessage: "partial failure",
}
}
bytes, err := proto.Marshal(&body)

require.NoError(t, err)

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(test.status)
w.Write(bytes)
}))
defer srv.Close()

client, err := NewMetricsClient(MockCloudCfg{}, context.Background())
require.NoError(t, err)

ctx := context.Background()
metrics := &metricpb.ResourceMetrics{}
err = client.ExportMetrics(ctx, metrics, srv.URL)

if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}

require.NoError(t, err)
})
}
}
40 changes: 40 additions & 0 deletions agent/hcp/client/mock_CloudConfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package client

import (
"crypto/tls"
"errors"
"net/url"

hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"golang.org/x/oauth2"
)

type mockHCPCfg struct{}

func (m *mockHCPCfg) Token() (*oauth2.Token, error) {
return &oauth2.Token{
AccessToken: "test-token",
}, nil
}

func (m *mockHCPCfg) APITLSConfig() *tls.Config { return nil }

func (m *mockHCPCfg) SCADAAddress() string { return "" }

func (m *mockHCPCfg) SCADATLSConfig() *tls.Config { return &tls.Config{} }

func (m *mockHCPCfg) APIAddress() string { return "" }

func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} }

type MockCloudCfg struct{}

func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
return &mockHCPCfg{}, nil
}

type MockErrCloudCfg struct{}

func (m MockErrCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
return nil, errors.New("test bad HCP config")
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/hashicorp/go-memdb v1.3.4
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-raftchunking v0.7.0
github.com/hashicorp/go-retryablehttp v0.6.7
github.com/hashicorp/go-secure-stdlib/awsutil v0.1.6
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/go-syslog v1.0.0
Expand Down Expand Up @@ -95,6 +96,7 @@ require (
github.com/shirou/gopsutil/v3 v3.22.8
github.com/stretchr/testify v1.8.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/proto/otlp v0.19.0
go.uber.org/goleak v1.1.10
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/net v0.7.0
Expand Down Expand Up @@ -167,11 +169,11 @@ require (
github.com/googleapis/gax-go/v2 v2.1.0 // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/gophercloud/gophercloud v0.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-msgpack/v2 v2.0.0 // indirect
github.com/hashicorp/go-plugin v1.4.5 // indirect
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect
Expand Down Expand Up @@ -225,7 +227,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.mongodb.org/mongo-driver v1.10.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
Expand Down
Loading

0 comments on commit 5b82711

Please sign in to comment.