Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename hcp-metrics-collector to consul-telemetry-collector #17327

Merged
merged 4 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17327.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
xds: rename envoy_hcp_metrics_bind_socket_dir to envoy_telemetry_collector_bind_socket_dir to remove HCP naming references.
```
37 changes: 19 additions & 18 deletions agent/proxycfg/connect_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
return snap, err
}

if err := s.maybeInitializeHCPMetricsWatches(ctx, snap); err != nil {
return snap, fmt.Errorf("failed to initialize HCP metrics watches: %w", err)
if err := s.maybeInitializeTelemetryCollectorWatches(ctx, snap); err != nil {
return snap, fmt.Errorf("failed to initialize telemetry collector watches: %w", err)
}

if s.proxyCfg.Mode == structs.ProxyModeTransparent {
Expand Down Expand Up @@ -628,16 +628,17 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
return nil
}

// hcpMetricsConfig represents the basic opaque config values for pushing telemetry to HCP.
type hcpMetricsConfig struct {
// HCPMetricsBindSocketDir is a string that configures the directory for a
// telemetryCollectorConfig represents the basic opaque config values for pushing telemetry to
// a consul telemetry collector.
type telemetryCollectorConfig struct {
// TelemetryCollectorBindSocketDir is a string that configures the directory for a
// unix socket where Envoy will forward metrics. These metrics get pushed to
// the HCP Metrics collector to show service mesh metrics on HCP.
HCPMetricsBindSocketDir string `mapstructure:"envoy_hcp_metrics_bind_socket_dir"`
// the Consul Telemetry collector.
TelemetryCollectorBindSocketDir string `mapstructure:"envoy_telemetry_collector_bind_socket_dir"`
}

func parseHCPMetricsConfig(m map[string]interface{}) (hcpMetricsConfig, error) {
var cfg hcpMetricsConfig
func parseTelemetryCollectorConfig(m map[string]interface{}) (telemetryCollectorConfig, error) {
var cfg telemetryCollectorConfig
err := mapstructure.WeakDecode(m, &cfg)

if err != nil {
Expand All @@ -647,34 +648,34 @@ func parseHCPMetricsConfig(m map[string]interface{}) (hcpMetricsConfig, error) {
return cfg, nil
}

// maybeInitializeHCPMetricsWatches will initialize a synthetic upstream and discovery chain
// watch for the HCP metrics collector, if metrics collection is enabled on the proxy registration.
func (s *handlerConnectProxy) maybeInitializeHCPMetricsWatches(ctx context.Context, snap ConfigSnapshot) error {
hcpCfg, err := parseHCPMetricsConfig(s.proxyCfg.Config)
// maybeInitializeTelemetryCollectorWatches will initialize a synthetic upstream and discovery chain
// watch for the consul telemetry collector, if telemetry data collection is enabled on the proxy registration.
func (s *handlerConnectProxy) maybeInitializeTelemetryCollectorWatches(ctx context.Context, snap ConfigSnapshot) error {
cfg, err := parseTelemetryCollectorConfig(s.proxyCfg.Config)
if err != nil {
s.logger.Error("failed to parse connect.proxy.config", "error", err)
}

if hcpCfg.HCPMetricsBindSocketDir == "" {
// Metrics collection is not enabled, return early.
if cfg.TelemetryCollectorBindSocketDir == "" {
// telemetry collection is not enabled, return early.
return nil
}

// The path includes the proxy ID so that when multiple proxies are on the same host
// they each have a distinct path to send their metrics.
// they each have a distinct path to send their telemetry data.
id := s.proxyID.NamespaceOrDefault() + "_" + s.proxyID.ID

// UNIX domain sockets paths have a max length of 108, so we take a hash of the compound ID
// to limit the length of the socket path.
h := sha1.New()
h.Write([]byte(id))
hash := base64.RawURLEncoding.EncodeToString(h.Sum(nil))
path := path.Join(hcpCfg.HCPMetricsBindSocketDir, hash+".sock")
path := path.Join(cfg.TelemetryCollectorBindSocketDir, hash+".sock")

upstream := structs.Upstream{
DestinationNamespace: acl.DefaultNamespaceName,
DestinationPartition: s.proxyID.PartitionOrDefault(),
DestinationName: api.HCPMetricsCollectorName,
DestinationName: api.TelemetryCollectorName,
LocalBindSocketPath: path,
Config: map[string]interface{}{
"protocol": "grpc",
Expand Down
74 changes: 37 additions & 37 deletions agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,18 +467,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {

// Used to account for differences in OSS/ent implementations of ServiceID.String()
var (
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)
hcpCollector = structs.NewServiceName(apimod.HCPMetricsCollectorName, nil)

apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
hcpCollectorUID = NewUpstreamIDFromServiceName(hcpCollector)
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiA = structs.NewServiceName("api-a", nil)
telemetryCollector = structs.NewServiceName(apimod.TelemetryCollectorName, nil)

apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
telemetryCollectorUID = NewUpstreamIDFromServiceName(telemetryCollector)
)
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
extApiUID.Peer = "peer-a"
Expand Down Expand Up @@ -3638,7 +3638,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"hcp-metrics": {
"telemetry-collector": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Expand All @@ -3648,16 +3648,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Config: map[string]interface{}{
"envoy_hcp_metrics_bind_socket_dir": "/tmp/consul/hcp-metrics/",
"envoy_telemetry_collector_bind_socket_dir": "/tmp/consul/telemetry-collector/",
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: hcpCollector.Name,
fmt.Sprintf("discovery-chain:%s", telemetryCollectorUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: telemetryCollector.Name,
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
EvaluateInPartition: "default",
Expand Down Expand Up @@ -3698,9 +3698,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Result: &structs.ConfigEntryResponse{},
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s", hcpCollectorUID.String()),
CorrelationID: fmt.Sprintf("discovery-chain:%s", telemetryCollectorUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, hcpCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil, nil),
Chain: discoverychain.TestCompileConfigEntries(t, telemetryCollector.Name, "default", "default", "dc1", "trustdomain.consul", nil, nil),
},
Err: nil,
},
Expand All @@ -3710,19 +3710,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)

// An event was received with the HCP collector's discovery chain, which sets up some bookkeeping in the snapshot.
// An event was received with the telemetry collector's discovery chain, which sets up some bookkeeping in the snapshot.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, hcpCollectorUID)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, telemetryCollectorUID)

require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, hcpCollectorUID)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, telemetryCollectorUID)

expectUpstream := structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: apimod.HCPMetricsCollectorName,
LocalBindSocketPath: "/tmp/consul/hcp-metrics/gqmuzdHCUPAEY5mbF8vgkZCNI14.sock",
DestinationName: apimod.TelemetryCollectorName,
LocalBindSocketPath: "/tmp/consul/telemetry-collector/gqmuzdHCUPAEY5mbF8vgkZCNI14.sock",
Config: map[string]interface{}{
"protocol": "grpc",
},
Expand All @@ -3733,16 +3733,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, &expectUpstream, snap.ConnectProxy.UpstreamConfig[uid])

// No endpoints have arrived yet.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID], 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[telemetryCollectorUID], 0, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
},
},
{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.HCPMetricsCollectorName) + hcpCollectorUID.String(): genVerifyServiceSpecificRequest(apimod.HCPMetricsCollectorName, "", "dc1", true),
fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.TelemetryCollectorName) + telemetryCollectorUID.String(): genVerifyServiceSpecificRequest(apimod.TelemetryCollectorName, "", "dc1", true),
},
events: []UpdateEvent{
{
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.HCPMetricsCollectorName) + hcpCollectorUID.String(),
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", apimod.TelemetryCollectorName) + telemetryCollectorUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Expand All @@ -3751,8 +3751,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: apimod.HCPMetricsCollectorName,
Service: apimod.HCPMetricsCollectorName,
ID: apimod.TelemetryCollectorName,
Service: apimod.TelemetryCollectorName,
Port: 8080,
},
},
Expand All @@ -3766,16 +3766,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.ConnectProxy.Leaf)

// Discovery chain for the HCP collector should still be stored in the snapshot.
// Discovery chain for the telemetry collector should still be stored in the snapshot.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, hcpCollectorUID)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, telemetryCollectorUID)

require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, hcpCollectorUID)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, telemetryCollectorUID)

// An endpoint arrived for the HCP collector, so it should be present in the snapshot.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID], 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)
// An endpoint arrived for the telemetry collector, so it should be present in the snapshot.
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[telemetryCollectorUID], 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints)

nodes := structs.CheckServiceNodes{
{
Expand All @@ -3784,14 +3784,14 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Address: "10.0.0.1",
},
Service: &structs.NodeService{
ID: apimod.HCPMetricsCollectorName,
Service: apimod.HCPMetricsCollectorName,
ID: apimod.TelemetryCollectorName,
Service: apimod.TelemetryCollectorName,
Port: 8080,
},
},
}
target := fmt.Sprintf("%s.default.default.dc1", apimod.HCPMetricsCollectorName)
require.Equal(t, nodes, snap.ConnectProxy.WatchedUpstreamEndpoints[hcpCollectorUID][target])
target := fmt.Sprintf("%s.default.default.dc1", apimod.TelemetryCollectorName)
require.Equal(t, nodes, snap.ConnectProxy.WatchedUpstreamEndpoints[telemetryCollectorUID][target])
},
},
},
Expand Down
14 changes: 7 additions & 7 deletions agent/proxycfg/testing_connect_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,19 @@ func TestConfigSnapshotGRPCExposeHTTP1(t testing.T) *ConfigSnapshot {
})
}

// TestConfigSnapshotDiscoveryChain returns a fully populated snapshot using a discovery chain
func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
// TestConfigSnapshotTelemetryCollector returns a fully populated snapshot using a discovery chain
func TestConfigSnapshotTelemetryCollector(t testing.T) *ConfigSnapshot {
// DiscoveryChain without an UpstreamConfig should yield a
// filter chain when in transparent proxy mode
var (
collector = structs.NewServiceName(api.HCPMetricsCollectorName, nil)
collector = structs.NewServiceName(api.TelemetryCollectorName, nil)
collectorUID = NewUpstreamIDFromServiceName(collector)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.HCPMetricsCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
collectorChain = discoverychain.TestCompileConfigEntries(t, api.TelemetryCollectorName, "default", "default", "dc1", connect.TestClusterID+".consul", nil, nil)
)

return TestConfigSnapshot(t, func(ns *structs.NodeService) {
ns.Proxy.Config = map[string]interface{}{
"envoy_hcp_metrics_bind_socket_dir": "/tmp/consul/hcp-metrics",
"envoy_telemetry_collector_bind_socket_dir": "/tmp/consul/telemetry-collector",
}
}, []UpdateEvent{
{
Expand All @@ -330,7 +330,7 @@ func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
},
},
{
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", api.HCPMetricsCollectorName) + collectorUID.String(),
CorrelationID: fmt.Sprintf("upstream-target:%s.default.default.dc1:", api.TelemetryCollectorName) + collectorUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: []structs.CheckServiceNode{
{
Expand All @@ -339,7 +339,7 @@ func TestConfigSnapshotHCPMetrics(t testing.T) *ConfigSnapshot {
Datacenter: "dc1",
},
Service: &structs.NodeService{
Service: api.HCPMetricsCollectorName,
Service: api.TelemetryCollectorName,
Address: "9.9.9.9",
Port: 9090,
},
Expand Down
4 changes: 2 additions & 2 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func TestAllResourcesFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotPeeringLocalMeshGateway,
},
{
name: "hcp-metrics",
create: proxycfg.TestConfigSnapshotHCPMetrics,
name: "telemetry-collector",
create: proxycfg.TestConfigSnapshotTelemetryCollector,
},
}
tests = append(tests, getConnectProxyTransparentProxyGoldenTestCases()...)
Expand Down
Loading