Skip to content

Commit

Permalink
Rename hcp-metrics-collector to consul-telemetry-collector (#17327)
Browse files Browse the repository at this point in the history
* Rename hcp-metrics-collector to consul-telemetry-collector

* Fix docs

* Fix doc comment

---------

Co-authored-by: Ashvitha Sridharan <[email protected]>
  • Loading branch information
clly and Achooo authored May 16, 2023
1 parent 06481bf commit 0789661
Show file tree
Hide file tree
Showing 18 changed files with 269 additions and 265 deletions.
3 changes: 3 additions & 0 deletions .changelog/17327.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
xds: rename envoy_hcp_metrics_bind_socket_dir to envoy_telemetry_collector_bind_socket_dir to remove HCP naming references.
```
37 changes: 19 additions & 18 deletions agent/proxycfg/connect_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
return snap, err
}

if err := s.maybeInitializeHCPMetricsWatches(ctx, snap); err != nil {
return snap, fmt.Errorf("failed to initialize HCP metrics watches: %w", err)
if err := s.maybeInitializeTelemetryCollectorWatches(ctx, snap); err != nil {
return snap, fmt.Errorf("failed to initialize telemetry collector watches: %w", err)
}

if s.proxyCfg.Mode == structs.ProxyModeTransparent {
Expand Down Expand Up @@ -628,16 +628,17 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
return nil
}

// hcpMetricsConfig represents the basic opaque config values for pushing telemetry to HCP.
type hcpMetricsConfig struct {
// HCPMetricsBindSocketDir is a string that configures the directory for a
// telemetryCollectorConfig represents the basic opaque config values for pushing telemetry to
// a consul telemetry collector.
type telemetryCollectorConfig struct {
// TelemetryCollectorBindSocketDir is a string that configures the directory for a
// unix socket where Envoy will forward metrics. These metrics get pushed to
// the HCP Metrics collector to show service mesh metrics on HCP.
HCPMetricsBindSocketDir string `mapstructure:"envoy_hcp_metrics_bind_socket_dir"`
// the Consul Telemetry collector.
TelemetryCollectorBindSocketDir string `mapstructure:"envoy_telemetry_collector_bind_socket_dir"`
}

func parseHCPMetricsConfig(m map[string]interface{}) (hcpMetricsConfig, error) {
var cfg hcpMetricsConfig
func parseTelemetryCollectorConfig(m map[string]interface{}) (telemetryCollectorConfig, error) {
var cfg telemetryCollectorConfig
err := mapstructure.WeakDecode(m, &cfg)

if err != nil {
Expand All @@ -647,34 +648,34 @@ func parseHCPMetricsConfig(m map[string]interface{}) (hcpMetricsConfig, error) {
return cfg, nil
}

// maybeInitializeHCPMetricsWatches will initialize a synthetic upstream and discovery chain
// watch for the HCP metrics collector, if metrics collection is enabled on the proxy registration.
func (s *handlerConnectProxy) maybeInitializeHCPMetricsWatches(ctx context.Context, snap ConfigSnapshot) error {
hcpCfg, err := parseHCPMetricsConfig(s.proxyCfg.Config)
// maybeInitializeTelemetryCollectorWatches will initialize a synthetic upstream and discovery chain
// watch for the consul telemetry collector, if telemetry data collection is enabled on the proxy registration.
func (s *handlerConnectProxy) maybeInitializeTelemetryCollectorWatches(ctx context.Context, snap ConfigSnapshot) error {
cfg, err := parseTelemetryCollectorConfig(s.proxyCfg.Config)
if err != nil {
s.logger.Error("failed to parse connect.proxy.config", "error", err)
}

if hcpCfg.HCPMetricsBindSocketDir == "" {
// Metrics collection is not enabled, return early.
if cfg.TelemetryCollectorBindSocketDir == "" {
// telemetry collection is not enabled, return early.
return nil
}

// The path includes the proxy ID so that when multiple proxies are on the same host
// they each have a distinct path to send their metrics.
// they each have a distinct path to send their telemetry data.
id := s.proxyID.NamespaceOrDefault() + "_" + s.proxyID.ID

// UNIX domain sockets paths have a max length of 108, so we take a hash of the compound ID
// to limit the length of the socket path.
h := sha1.New()
h.Write([]byte(id))
hash := base64.RawURLEncoding.EncodeToString(h.Sum(nil))
path := path.Join(hcpCfg.HCPMetricsBindSocketDir, hash+".sock")
path := path.Join(cfg.TelemetryCollectorBindSocketDir, hash+".sock")

upstream := structs.Upstream{
DestinationNamespace: acl.DefaultNamespaceName,
DestinationPartition: s.proxyID.PartitionOrDefault(),
DestinationName: api.HCPMetricsCollectorName,
DestinationName: api.TelemetryCollectorName,
LocalBindSocketPath: path,
Config: map[string]interface{}{
"protocol": "grpc",
Expand Down
74 changes: 37 additions & 37 deletions agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,18 +467,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {

// Used to account for differences in OSS/ent implementations of ServiceID.String()
var (
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)
hcpCollector = structs.NewServiceName(apimod.HCPMetricsCollectorName, nil)

apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
hcpCollectorUID = NewUpstreamIDFromServiceName(hcpCollector)
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)
telemetryCollector = structs.NewServiceName(apimod.TelemetryCollectorName, nil)

apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
telemetryCollectorUID = NewUpstreamIDFromServiceName(telemetryCollector)
)
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
extApiUID.Peer = "peer-a"
Expand Down Expand Up @@ -3638,7 +3638,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"hcp-metrics": {
"telemetry-collector": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Expand All @@ -3648,16 +3648,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Config: map[string]interface{}{
"envoy_hcp_metrics_bind_socket_dir": "/tmp/consul/hcp-metrics/",
"envoy_telemetry_collector_bind_socket_dir": "/tmp/consul/telemetry-collector/",
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: hcpCollector.Name,
fmt.Sprintf("discovery-chain:%s", telemetryCollectorUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: telemetryCollector.Name,
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
EvaluateInPartition: "default",
Expand Down Expand Up @@ -3698,9 +3698,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Result: &structs.ConfigEntryResponse{},
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()),
CorrelationID: fmt.Sprintf("discovery-chain:%s", telemetryCollectorUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, hcpCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil, nil),
Chain: discoverychain.TestCompileConfigEntries(t, telemetryCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil, nil),
},
Err: nil,
},
Expand All @@ -3710,19 +3710,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)

// An event was received with the HCP collector's discovery chain, which sets up some bookkeeping in the snapshot.
// An event was received with the telemetry collector's discovery chain, which sets up some bookkeeping in the snapshot.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, hcpCollectorUID)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, telemetryCollectorUID)

require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, hcpCollectorUID)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, telemetryCollectorUID)

expectUpstream := structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: apimod.HCPMetricsCollectorName,
LocalBindSocketPath: "/tmp/consul/hcp-metrics/gqmuzdHCUPAEY5mbF8vgkZCNI14.sock",
DestinationName: apimod.TelemetryCollectorName,
LocalBindSocketPath: "/tmp/consul/telemetry-collector/gqmuzdHCUPAEY5mbF8vgkZCNI14.sock",
Config: map[string]interface{}{
"protocol": "grpc",
},
Expand All @@ -3733,16 +3733,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, &expectUpstream, snap.ConnectProxy.UpstreamConfig[uid])

// No endpoints have arrived yet.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID], 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[telemetryCollectorUID], 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
},
},
{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.HCPMetricsCollectorName) + hcpCollectorUID.String(): genVerifyServiceSpecificRequest(apimod.HCPMetricsCollectorName, "", "dc1", true),
fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.TelemetryCollectorName) + telemetryCollectorUID.String(): genVerifyServiceSpecificRequest(apimod.TelemetryCollectorName, "", "dc1", true),
},
events: []UpdateEvent{
{
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.HCPMetricsCollectorName) + hcpCollectorUID.String(),
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.TelemetryCollectorName) + telemetryCollectorUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Expand All @@ -3751,8 +3751,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: apimod.HCPMetricsCollectorName,
Service: apimod.HCPMetricsCollectorName,
ID: apimod.TelemetryCollectorName,
Service: apimod.TelemetryCollectorName,
Port: 8080,
},
},
Expand All @@ -3766,16 +3766,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)

// Discovery chain for the HCP collector should still be stored in the snapshot.
// Discovery chain for the telemetry collector should still be stored in the snapshot.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, hcpCollectorUID)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, telemetryCollectorUID)

require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, hcpCollectorUID)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, telemetryCollectorUID)

// An endpoint arrived for the HCP collector, so it should be present in the snapshot.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID], 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
// An endpoint arrived for the telemetry collector, so it should be present in the snapshot.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[telemetryCollectorUID], 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)

nodes := structs.CheckServiceNodes{
{
Expand All @@ -3784,14 +3784,14 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: apimod.HCPMetricsCollectorName,
Service: apimod.HCPMetricsCollectorName,
ID: apimod.TelemetryCollectorName,
Service: apimod.TelemetryCollectorName,
Port: 8080,
},
},
}
target := fmt.Sprintf("%s.default.default.dc1", apimod.HCPMetricsCollectorName)
require.Equal(t, nodes, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID][target])
target := fmt.Sprintf("%s.default.default.dc1", apimod.TelemetryCollectorName)
require.Equal(t, nodes, snap.ConnectProxy.WatchedUpstreamEndpoints[telemetryCollectorUID][target])
},
},
},
Expand Down
14 changes: 7 additions & 7 deletions agent/proxycfg/testing_connect_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,19 @@ func TestConfigSnapshotGRPCExposeHTTP1(t testing.T) *ConfigSnapshot {
})
}

// TestConfigSnapshotDiscoveryChain returns a fully populated snapshot using a discovery chain
func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
// TestConfigSnapshotTelemetryCollector returns a fully populated snapshot using a discovery chain
func TestConfigSnapshotTelemetryCollector(t testing.T) *ConfigSnapshot {
// DiscoveryChain without an UpstreamConfig should yield a
// filter chain when in transparent proxy mode
var (
collector = structs.NewServiceName(api.HCPMetricsCollectorName, nil)
collector = structs.NewServiceName(api.TelemetryCollectorName, nil)
collectorUID = NewUpstreamIDFromServiceName(collector)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.HCPMetricsCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.TelemetryCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)

return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Config = map[string]interface{}{
"envoy_hcp_metrics_bind_socket_dir": "/tmp/consul/hcp-metrics",
"envoy_telemetry_collector_bind_socket_dir": "/tmp/consul/telemetry-collector",
}
}, []UpdateEvent{
{
Expand All @@ -330,7 +330,7 @@ func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
},
},
{
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", api.HCPMetricsCollectorName) + collectorUID.String(),
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", api.TelemetryCollectorName) + collectorUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: []structs.CheckServiceNode{
{
Expand All @@ -339,7 +339,7 @@ func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: api.HCPMetricsCollectorName,
Service: api.TelemetryCollectorName,
Address: "9.9.9.9",
Port: 9090,
},
Expand Down
4 changes: 2 additions & 2 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func TestAllResourcesFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotPeeringLocalMeshGateway,
},
{
name: "hcp-metrics",
create: proxycfg.TestConfigSnapshotHCPMetrics,
name: "telemetry-collector",
create: proxycfg.TestConfigSnapshotTelemetryCollector,
},
}
tests = append(tests, getConnectProxyTransparentProxyGoldenTestCases()...)
Expand Down
Loading

0 comments on commit 0789661

Please sign in to comment.