From 2f4166ece73795dbd675f264a933ec6723a3e435 Mon Sep 17 00:00:00 2001 From: John Murret Date: Wed, 16 Aug 2023 20:29:39 -0600 Subject: [PATCH] NET-4932 - xds v2 - implement base connect proxy functionality for endpoints --- agent/xds/clusters_test.go | 3 +- agent/xds/endpoints_test.go | 127 +++- agent/xds/naming/naming.go | 2 +- agent/xds/proxystateconverter/clusters.go | 6 +- agent/xds/proxystateconverter/converter.go | 17 +- agent/xds/proxystateconverter/endpoints.go | 586 ++++++++++++++++++ .../proxystateconverter/locality_policy.go | 21 + .../locality_policy_oss.go | 15 + agent/xdsv2/endpoint_resources.go | 45 +- agent/xdsv2/resources.go | 13 +- 10 files changed, 801 insertions(+), 34 deletions(-) create mode 100644 agent/xds/proxystateconverter/locality_policy.go create mode 100644 agent/xds/proxystateconverter/locality_policy_oss.go diff --git a/agent/xds/clusters_test.go b/agent/xds/clusters_test.go index 2651d77c2844..0c7cc323cc56 100644 --- a/agent/xds/clusters_test.go +++ b/agent/xds/clusters_test.go @@ -5,6 +5,7 @@ package xds import ( "bytes" + "github.com/hashicorp/consul/types" "path/filepath" "sort" "testing" @@ -26,7 +27,6 @@ import ( "github.com/hashicorp/consul/agent/xdsv2" "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/types" ) type mockCfgFetcher struct { @@ -1138,6 +1138,7 @@ func TestClustersFromSnapshot(t *testing.T) { require.NoError(t, err) clusters = res[xdscommon.ClusterType] + // The order of clusters returned via CDS isn't relevant, so it's safe // to sort these for the purposes of test comparisons. sort.Slice(clusters, func(i, j int) bool { diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go index 3156685934d5..1f0270436f13 100644 --- a/agent/xds/endpoints_test.go +++ b/agent/xds/endpoints_test.go @@ -8,22 +8,20 @@ import ( "sort" "testing" - "github.com/hashicorp/go-hclog" - envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - - "github.com/hashicorp/consul/agent/xds/testcommon" - - "github.com/mitchellh/copystructure" - testinf "github.com/mitchellh/go-testing-interface" - "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/xds/proxystateconverter" "github.com/hashicorp/consul/agent/xds/response" + "github.com/hashicorp/consul/agent/xds/testcommon" + "github.com/hashicorp/consul/agent/xdsv2" "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/go-hclog" + "github.com/mitchellh/copystructure" + testinf "github.com/mitchellh/go-testing-interface" + "github.com/stretchr/testify/require" ) func Test_makeLoadAssignment(t *testing.T) { @@ -244,6 +242,7 @@ type endpointTestCase struct { name string create func(t testinf.T) *proxycfg.ConfigSnapshot overrideGoldenName string + alsoRunTestForV2 bool } func makeEndpointDiscoChainTests(enterprise bool) []endpointTestCase { @@ -253,72 +252,85 @@ func makeEndpointDiscoChainTests(enterprise bool) []endpointTestCase { create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "simple", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-chain-external-sni", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "external-sni", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-chain-and-overrides", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "simple-with-overrides", enterprise, nil, nil) }, + // TODO(proxystate): requires routes work + alsoRunTestForV2: false, }, { name: "connect-proxy-with-chain-and-failover", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-tcp-chain-failover-through-remote-gateway", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-through-remote-gateway", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-tcp-chain-failover-through-remote-gateway-triggered", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-through-remote-gateway-triggered", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-tcp-chain-double-failover-through-remote-gateway", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-through-double-remote-gateway", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-tcp-chain-double-failover-through-remote-gateway-triggered", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-through-double-remote-gateway-triggered", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-tcp-chain-failover-through-local-gateway", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-through-local-gateway", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-tcp-chain-failover-through-local-gateway-triggered", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-through-local-gateway-triggered", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-tcp-chain-double-failover-through-local-gateway", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-through-double-local-gateway", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-tcp-chain-double-failover-through-local-gateway-triggered", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "failover-through-double-local-gateway-triggered", enterprise, nil, nil) }, + alsoRunTestForV2: true, }, { name: "connect-proxy-with-default-chain-and-custom-cluster", @@ -330,12 +342,16 @@ func makeEndpointDiscoChainTests(enterprise bool) []endpointTestCase { }) }, nil) }, + // TODO(proxystate): requires custom cluster work + alsoRunTestForV2: false, }, { name: "splitter-with-resolver-redirect", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotDiscoveryChain(t, "splitter-with-resolver-redirect-multidc", enterprise, nil, nil) }, + // TODO(proxystate): requires routes work + alsoRunTestForV2: false, }, } } @@ -354,48 +370,64 @@ func TestEndpointsFromSnapshot(t *testing.T) { create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotMeshGateway(t, "default", nil, nil) }, + // TODO(proxystate): mesh gateway will come at a later time + alsoRunTestForV2: false, }, { name: "mesh-gateway-using-federation-states", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotMeshGateway(t, "federation-states", nil, nil) }, + // TODO(proxystate): mesh gateway will come at a later time + alsoRunTestForV2: false, }, { name: "mesh-gateway-newer-information-in-federation-states", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotMeshGateway(t, "newer-info-in-federation-states", nil, nil) }, + // TODO(proxystate): mesh gateway will come at a later time + alsoRunTestForV2: false, }, { name: "mesh-gateway-using-federation-control-plane", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotMeshGateway(t, "mesh-gateway-federation", nil, nil) }, + // TODO(proxystate): mesh gateway will come at a later time + alsoRunTestForV2: false, }, { name: "mesh-gateway-older-information-in-federation-states", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotMeshGateway(t, "older-info-in-federation-states", nil, nil) }, + // TODO(proxystate): mesh gateway will come at a later time + alsoRunTestForV2: false, }, { name: "mesh-gateway-no-services", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotMeshGateway(t, "no-services", nil, nil) }, + // TODO(proxystate): mesh gateway will come at a later time + alsoRunTestForV2: false, }, { name: "mesh-gateway-service-subsets", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotMeshGateway(t, "service-subsets2", nil, nil) }, + // TODO(proxystate): mesh gateway will come at a later time + alsoRunTestForV2: false, }, { name: "mesh-gateway-default-service-subset", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotMeshGateway(t, "default-service-subsets2", nil, nil) }, + // TODO(proxystate): mesh gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-gateway", @@ -403,12 +435,16 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-gateway-nil-config-entry", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotIngressGateway_NilConfigEntry(t) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-gateway-no-services", @@ -416,6 +452,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-chain", @@ -423,6 +461,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "simple", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-chain-external-sni", @@ -430,6 +470,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "external-sni", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-chain-and-failover", @@ -437,6 +479,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-chain-and-failover-to-cluster-peer", @@ -444,6 +488,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-to-cluster-peer", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-tcp-chain-failover-through-remote-gateway", @@ -451,6 +497,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-through-remote-gateway", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-tcp-chain-failover-through-remote-gateway-triggered", @@ -458,6 +506,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-through-remote-gateway-triggered", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-tcp-chain-double-failover-through-remote-gateway", @@ -465,6 +515,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-through-double-remote-gateway", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-tcp-chain-double-failover-through-remote-gateway-triggered", @@ -472,6 +524,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-through-double-remote-gateway-triggered", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-tcp-chain-failover-through-local-gateway", @@ -479,6 +533,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-through-local-gateway", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-tcp-chain-failover-through-local-gateway-triggered", @@ -486,6 +542,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-through-local-gateway-triggered", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-tcp-chain-double-failover-through-local-gateway", @@ -493,6 +551,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-through-double-local-gateway", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-with-tcp-chain-double-failover-through-local-gateway-triggered", @@ -500,6 +560,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "failover-through-double-local-gateway-triggered", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-splitter-with-resolver-redirect", @@ -507,30 +569,42 @@ func TestEndpointsFromSnapshot(t *testing.T) { return proxycfg.TestConfigSnapshotIngressGateway(t, true, "http", "splitter-with-resolver-redirect-multidc", nil, nil, nil) }, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, { name: "terminating-gateway", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotTerminatingGateway(t, true, nil, nil) }, + // TODO(proxystate): terminating gateway will come at a later time + alsoRunTestForV2: false, }, { name: "terminating-gateway-no-services", create: func(t testinf.T) *proxycfg.ConfigSnapshot { return proxycfg.TestConfigSnapshotTerminatingGateway(t, false, nil, nil) }, + // TODO(proxystate): terminating gateway will come at a later time + alsoRunTestForV2: false, }, { name: "terminating-gateway-service-subsets", create: proxycfg.TestConfigSnapshotTerminatingGatewayServiceSubsets, + // TODO(proxystate): terminating gateway will come at a later time + alsoRunTestForV2: false, }, { name: "terminating-gateway-default-service-subset", create: proxycfg.TestConfigSnapshotTerminatingGatewayDefaultServiceSubset, + // TODO(proxystate): terminating gateway will come at a later time + alsoRunTestForV2: false, }, { name: "ingress-multiple-listeners-duplicate-service", create: proxycfg.TestConfigSnapshotIngress_MultipleListenersDuplicateService, + // TODO(proxystate): ingress gateway will come at a later time + alsoRunTestForV2: false, }, } @@ -564,7 +638,7 @@ func TestEndpointsFromSnapshot(t *testing.T) { r, err := response.CreateResponse(xdscommon.EndpointType, "00000001", "00000001", endpoints) require.NoError(t, err) - t.Run("current", func(t *testing.T) { + t.Run("current-xdsv1", func(t *testing.T) { gotJSON := protoToJSON(t, r) gName := tt.name @@ -574,6 +648,39 @@ func TestEndpointsFromSnapshot(t *testing.T) { require.JSONEq(t, goldenEnvoy(t, filepath.Join("endpoints", gName), envoyVersion, latestEnvoyVersion, gotJSON), gotJSON) }) + + if tt.alsoRunTestForV2 { + generator := xdsv2.NewResourceGenerator(testutil.Logger(t)) + + converter := proxystateconverter.NewConverter(testutil.Logger(t), &mockCfgFetcher{addressLan: "10.10.10.10"}) + proxyState, err := converter.ProxyStateFromSnapshot(snap) + require.NoError(t, err) + + res, err := generator.AllResourcesFromIR(proxyState) + require.NoError(t, err) + + endpoints = res[xdscommon.EndpointType] + // The order of listeners returned via LDS isn't relevant, so it's safe + // to sort these for the purposes of test comparisons. + sort.Slice(endpoints, func(i, j int) bool { + return endpoints[i].(*envoy_endpoint_v3.ClusterLoadAssignment).ClusterName < endpoints[j].(*envoy_endpoint_v3.ClusterLoadAssignment).ClusterName + }) + + r, err := response.CreateResponse(xdscommon.EndpointType, "00000001", "00000001", endpoints) + require.NoError(t, err) + + t.Run("current-xdsv2", func(t *testing.T) { + gotJSON := protoToJSON(t, r) + + gName := tt.name + if tt.overrideGoldenName != "" { + gName = tt.overrideGoldenName + } + + expectedJSON := goldenEnvoy(t, filepath.Join("endpoints", gName), envoyVersion, latestEnvoyVersion, gotJSON) + require.JSONEq(t, expectedJSON, gotJSON) + }) + } }) } }) diff --git a/agent/xds/naming/naming.go b/agent/xds/naming/naming.go index 3e19d9327003..b1de1ad12862 100644 --- a/agent/xds/naming/naming.go +++ b/agent/xds/naming/naming.go @@ -1,5 +1,5 @@ // Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 +// SPDX-License-Identifier: MPL-2.0 package naming diff --git a/agent/xds/proxystateconverter/clusters.go b/agent/xds/proxystateconverter/clusters.go index 2a43d6eb049c..b659ba622022 100644 --- a/agent/xds/proxystateconverter/clusters.go +++ b/agent/xds/proxystateconverter/clusters.go @@ -6,7 +6,7 @@ package proxystateconverter import ( "errors" "fmt" - envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "strings" @@ -455,6 +455,7 @@ func (s *Converter) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot, name, pathP }, }, } + protocol := pathProtocol if protocol == "" { protocol = cfg.Protocol @@ -740,6 +741,7 @@ func (s *Converter) createOutboundMeshMTLS(cfgSnap *proxycfg.ConfigSnapshot, spi if err != nil { return nil, err } + // Create the transport socket ts := &pbproxystate.TransportSocket{} @@ -1020,7 +1022,7 @@ func configureClusterWithHostnames( dnsEndpointGroup.Config.DiscoveryType = pbproxystate.DiscoveryType_DISCOVERY_TYPE_STRICT } - endpoints := make([]*envoy_endpoint_v3.LbEndpoint, 0, 1) + endpoints := make([]*pbproxystate.Endpoint, 0, 1) uniqueHostnames := make(map[string]bool) var ( diff --git a/agent/xds/proxystateconverter/converter.go b/agent/xds/proxystateconverter/converter.go index 061a59d6e93e..532f2af826cd 100644 --- a/agent/xds/proxystateconverter/converter.go +++ b/agent/xds/proxystateconverter/converter.go @@ -57,9 +57,20 @@ func (g *Converter) resourcesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) erro if err != nil { return err } - //g.routesFromSnapshot(cfgSnap) - g.clustersFromSnapshot(cfgSnap) - //g.endpointsFromSnapshot(cfgSnap) + + err = g.endpointsFromSnapshot(cfgSnap) + if err != nil { + return err + } + err = g.clustersFromSnapshot(cfgSnap) + if err != nil { + return err + } + //err = g.routesFromSnapshot(cfgSnap) + //if err != nil { + // return err + //} + //g.secretsFromSnapshot(cfgSnap) return nil } diff --git a/agent/xds/proxystateconverter/endpoints.go b/agent/xds/proxystateconverter/endpoints.go index 5a5dc3708098..37b484ac1992 100644 --- a/agent/xds/proxystateconverter/endpoints.go +++ b/agent/xds/proxystateconverter/endpoints.go @@ -4,9 +4,17 @@ package proxystateconverter import ( + + "errors" + "fmt" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/xds/response" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" + "github.com/hashicorp/go-bexpr" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -24,6 +32,273 @@ func makeLbEndpoint(addr string, port int, health pbproxystate.HealthStatus, wei return ep } +// endpointsFromSnapshot returns the mesh API representation of the "routes" in the snapshot. +func (s *Converter) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) error { + + if cfgSnap == nil { + return errors.New("nil config given") + } + + switch cfgSnap.Kind { + case structs.ServiceKindConnectProxy: + return s.endpointsFromSnapshotConnectProxy(cfgSnap) + //case structs.ServiceKindTerminatingGateway: + // return s.endpointsFromSnapshotTerminatingGateway(cfgSnap) + //case structs.ServiceKindMeshGateway: + // return s.endpointsFromSnapshotMeshGateway(cfgSnap) + //case structs.ServiceKindIngressGateway: + // return s.endpointsFromSnapshotIngressGateway(cfgSnap) + //case structs.ServiceKindAPIGateway: + // return s.endpointsFromSnapshotAPIGateway(cfgSnap) + default: + return fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) + } +} + +// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints" +// (upstream instances) in the snapshot. +func (s *Converter) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) error { + eps := make(map[string]*pbproxystate.Endpoints) + + // NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go + // so that the sets of endpoints generated matches the sets of clusters. + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta) + if skip { + // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. + continue + } + + var upstreamConfigMap map[string]interface{} + if upstream != nil { + upstreamConfigMap = upstream.Config + } + + es, err := s.endpointsFromDiscoveryChain( + uid, + chain, + cfgSnap, + cfgSnap.Locality, + upstreamConfigMap, + cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid], + cfgSnap.ConnectProxy.WatchedGatewayEndpoints[uid], + false, + ) + if err != nil { + return err + } + + for clusterName, endpoints := range es { + eps[clusterName] = &pbproxystate.Endpoints{ + Endpoints: endpoints, + } + + } + } + + // NOTE: Any time we skip an upstream below we MUST also skip that same + // upstream in clusters.go so that the sets of endpoints generated matches + // the sets of clusters. + for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() { + upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta) + if skip { + // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. + continue + } + + tbs, ok := cfgSnap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer) + if !ok { + // this should never happen since we loop through upstreams with + // set trust bundles + return fmt.Errorf("trust bundle not ready for peer %s", uid.Peer) + } + + clusterName := generatePeeredClusterName(uid, tbs) + + mgwMode := structs.MeshGatewayModeDefault + if upstream != nil { + mgwMode = upstream.MeshGateway.Mode + } + peerServiceEndpoints, err := s.makeEndpointsForPeerService(cfgSnap, uid, mgwMode) + if err != nil { + return err + } + + if peerServiceEndpoints != nil { + pbEndpoints := &pbproxystate.Endpoints{ + Endpoints: peerServiceEndpoints, + } + + eps[clusterName] = pbEndpoints + } + } + + // Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains + for _, u := range cfgSnap.Proxy.Upstreams { + if u.DestinationType != structs.UpstreamDestTypePreparedQuery { + continue + } + uid := proxycfg.NewUpstreamID(&u) + + dc := u.Datacenter + if dc == "" { + dc = cfgSnap.Datacenter + } + clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain) + + endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid] + if ok { + epts := makeEndpointsForLoadAssignment( + cfgSnap, + nil, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, + }, + cfgSnap.Locality, + ) + pbEndpoints := &pbproxystate.Endpoints{ + Endpoints: epts, + } + + eps[clusterName] = pbEndpoints + } + } + + // Loop over potential destinations in the mesh, then grab the gateway nodes associated with each + cfgSnap.ConnectProxy.DestinationsUpstream.ForEachKey(func(uid proxycfg.UpstreamID) bool { + svcConfig, ok := cfgSnap.ConnectProxy.DestinationsUpstream.Get(uid) + if !ok || svcConfig.Destination == nil { + return true + } + + for _, address := range svcConfig.Destination.Addresses { + clusterName := clusterNameForDestination(cfgSnap, uid.Name, address, uid.NamespaceOrDefault(), uid.PartitionOrDefault()) + + endpoints, ok := cfgSnap.ConnectProxy.DestinationGateways.Get(uid) + if ok { + epts := makeEndpointsForLoadAssignment( + cfgSnap, + nil, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, + }, + proxycfg.GatewayKey{ /*empty so it never matches*/ }, + ) + pbEndpoints := &pbproxystate.Endpoints{ + Endpoints: epts, + } + eps[clusterName] = pbEndpoints + } + } + + return true + }) + + s.proxyState.Endpoints = eps + return nil +} + +func (s *Converter) makeEndpointsForPeerService( + cfgSnap *proxycfg.ConfigSnapshot, + uid proxycfg.UpstreamID, + upstreamGatewayMode structs.MeshGatewayMode, +) ([]*pbproxystate.Endpoint, error) { + var eps []*pbproxystate.Endpoint + + upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams() + if err != nil { + return eps, err + } + + if upstreamGatewayMode == structs.MeshGatewayModeNone { + s.Logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid)) + } + + // If an upstream is configured with local mesh gw mode, we make a load assignment + // from the gateway endpoints instead of those of the upstreams. + if upstreamGatewayMode == structs.MeshGatewayModeLocal { + localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String()) + if !ok { + // local GW is not ready; return early + return eps, nil + } + eps = makeEndpointsForLoadAssignment( + cfgSnap, + nil, + []loadAssignmentEndpointGroup{ + {Endpoints: localGw}, + }, + cfgSnap.Locality, + ) + return eps, nil + } + + // Also skip peer instances with a hostname as their address. EDS + // cannot resolve hostnames, so we provide them through CDS instead. + if _, ok := upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid]; ok { + return eps, nil + } + + endpoints, ok := upstreamsSnapshot.PeerUpstreamEndpoints.Get(uid) + if !ok { + return nil, nil + } + eps = makeEndpointsForLoadAssignment( + cfgSnap, + nil, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, + }, + proxycfg.GatewayKey{ /*empty so it never matches*/ }, + ) + return eps, nil +} + +func (s *Converter) filterSubsetEndpoints(subset *structs.ServiceResolverSubset, endpoints structs.CheckServiceNodes) (structs.CheckServiceNodes, error) { + // locally execute the subsets filter + if subset.Filter != "" { + filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints) + if err != nil { + return nil, err + } + + raw, err := filter.Execute(endpoints) + if err != nil { + return nil, err + } + return raw.(structs.CheckServiceNodes), nil + } + return endpoints, nil +} + +// TODO(proxystate): Terminating Gateway will be added in the future. +// Functions to add from agent/xds/endpoints.go: +// func endpointsFromSnapshotTerminatingGateway + +// TODO(proxystate): Mesh Gateway will be added in the future. +// Functions to add from agent/xds/endpoints.go: +// func endpointsFromSnapshotMeshGateway + +// TODO(proxystate): Cluster Peering will be added in the future. +// Functions to add from agent/xds/endpoints.go: +// func makeEndpointsForOutgoingPeeredServices + +// TODO(proxystate): Mesh Gateway will be added in the future. +// Functions to add from agent/xds/endpoints.go: +// func endpointsFromServicesAndResolvers + +// TODO(proxystate): Mesh Gateway will be added in the future. +// Functions to add from agent/xds/endpoints.go: +// func makePeerServerEndpointsForMeshGateway + +// TODO(proxystate): Ingress Gateway will be added in the future. +// Functions to add from agent/xds/endpoints.go: +// func endpointsFromSnapshotIngressGateway + +// TODO(proxystate): API Gateway will be added in the future. +// Functions to add from agent/xds/endpoints.go: +// func endpointsFromSnapshotAPIGateway + // used in clusters.go func makeHostPortEndpoint(host string, port int) *pbproxystate.Endpoint { return &pbproxystate.Endpoint{ @@ -50,6 +325,317 @@ func makeUnixSocketEndpoint(path string) *pbproxystate.Endpoint { } } +func (s *Converter) makeUpstreamLoadAssignmentEndpointForPeerService( + cfgSnap *proxycfg.ConfigSnapshot, + uid proxycfg.UpstreamID, + upstreamGatewayMode structs.MeshGatewayMode, +) ([]*pbproxystate.Endpoint, error) { + var eps []*pbproxystate.Endpoint + + upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams() + if err != nil { + return eps, err + } + + if upstreamGatewayMode == structs.MeshGatewayModeNone { + s.Logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid)) + } + + // If an upstream is configured with local mesh gw mode, we make a load assignment + // from the gateway endpoints instead of those of the upstreams. + if upstreamGatewayMode == structs.MeshGatewayModeLocal { + localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String()) + if !ok { + // local GW is not ready; return early + return eps, nil + } + eps = makeEndpointsForLoadAssignment( + cfgSnap, + nil, + []loadAssignmentEndpointGroup{ + {Endpoints: localGw}, + }, + cfgSnap.Locality, + ) + return eps, nil + } + + // Also skip peer instances with a hostname as their address. EDS + // cannot resolve hostnames, so we provide them through CDS instead. + if _, ok := upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid]; ok { + return eps, nil + } + + endpoints, ok := upstreamsSnapshot.PeerUpstreamEndpoints.Get(uid) + if !ok { + return nil, nil + } + eps = makeEndpointsForLoadAssignment( + cfgSnap, + nil, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, + }, + proxycfg.GatewayKey{ /*empty so it never matches*/ }, + ) + return eps, nil +} + +func (s *Converter) endpointsFromDiscoveryChain( + uid proxycfg.UpstreamID, + chain *structs.CompiledDiscoveryChain, + cfgSnap *proxycfg.ConfigSnapshot, + gatewayKey proxycfg.GatewayKey, + upstreamConfigMap map[string]interface{}, + upstreamEndpoints map[string]structs.CheckServiceNodes, + gatewayEndpoints map[string]structs.CheckServiceNodes, + forMeshGateway bool, +) (map[string][]*pbproxystate.Endpoint, error) { + if chain == nil { + if forMeshGateway { + return nil, fmt.Errorf("missing discovery chain for %s", uid) + } + return nil, nil + } + + if upstreamConfigMap == nil { + upstreamConfigMap = make(map[string]interface{}) // TODO:needed? + } + + clusterEndpoints := make(map[string][]*pbproxystate.Endpoint) + + // TODO(jm): escape hatches will be implemented in the future + //var escapeHatchCluster *pbproxystate.Cluster + //if !forMeshGateway { + + //cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap) + //if err != nil { + // // Don't hard fail on a config typo, just warn. The parse func returns + // // default config if there is an error so it's safe to continue. + // s.Logger.Warn("failed to parse", "upstream", uid, + // "error", err) + //} + + //if cfg.EnvoyClusterJSON != "" { + // if chain.Default { + // // If you haven't done anything to setup the discovery chain, then + // // you can use the envoy_cluster_json escape hatch. + // escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON) + // if err != nil { + // return ce, nil + // } + // } else { + // s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for", + // "discovery chain", chain.ServiceName, "upstream", uid, + // "envoy_cluster_json", chain.ServiceName) + // } + //} + //} + + mgwMode := structs.MeshGatewayModeDefault + if upstream, _ := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta); upstream != nil { + mgwMode = upstream.MeshGateway.Mode + } + + // Find all resolver nodes. + for _, node := range chain.Nodes { + switch { + case node == nil: + return nil, fmt.Errorf("impossible to process a nil node") + case node.Type != structs.DiscoveryGraphNodeTypeResolver: + continue + case node.Resolver == nil: + return nil, fmt.Errorf("impossible to process a non-resolver node") + } + rawUpstreamConfig, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap) + if err != nil { + return nil, err + } + upstreamConfig := finalizeUpstreamConfig(rawUpstreamConfig, chain, node.Resolver.ConnectTimeout) + + mappedTargets, err := s.mapDiscoChainTargets(cfgSnap, chain, node, upstreamConfig, forMeshGateway) + if err != nil { + return nil, err + } + + targetGroups, err := mappedTargets.groupedTargets() + if err != nil { + return nil, err + } + + for _, groupedTarget := range targetGroups { + clusterName := groupedTarget.ClusterName + // TODO(jm): escape hatches will be implemented in the future + //if escapeHatchCluster != nil { + // clusterName = escapeHatchCluster.Name + //} + switch len(groupedTarget.Targets) { + case 0: + continue + case 1: + // We expect one target so this passes through to continue setting the load assignment up. + default: + return nil, fmt.Errorf("cannot have more than one target") + } + ti := groupedTarget.Targets[0] + s.Logger.Debug("generating endpoints for", "cluster", clusterName, "targetID", ti.TargetID) + targetUID := proxycfg.NewUpstreamIDFromTargetID(ti.TargetID) + if targetUID.Peer != "" { + peerServiceEndpoints, err := s.makeEndpointsForPeerService(cfgSnap, targetUID, mgwMode) + if err != nil { + return nil, err + } + if peerServiceEndpoints != nil { + clusterEndpoints[clusterName] = peerServiceEndpoints + } + continue + } + + endpointGroup, valid := makeLoadAssignmentEndpointGroup( + chain.Targets, + upstreamEndpoints, + gatewayEndpoints, + ti.TargetID, + gatewayKey, + forMeshGateway, + ) + if !valid { + continue // skip the cluster if we're still populating the snapshot + } + + epts := makeEndpointsForLoadAssignment( + cfgSnap, + ti.PrioritizeByLocality, + []loadAssignmentEndpointGroup{endpointGroup}, + gatewayKey, + ) + clusterEndpoints[clusterName] = epts + } + } + + return clusterEndpoints, nil +} + +// TODO(proxystate): Mesh Gateway will be added in the future. +// Functions to add from agent/xds/endpoints.go: +// func makeExportedUpstreamEndpointsForMeshGateway + +type loadAssignmentEndpointGroup struct { + Endpoints structs.CheckServiceNodes + OnlyPassing bool + OverrideHealth pbproxystate.HealthStatus +} + +func makeEndpointsForLoadAssignment(cfgSnap *proxycfg.ConfigSnapshot, + policy *structs.DiscoveryPrioritizeByLocality, + endpointGroups []loadAssignmentEndpointGroup, + localKey proxycfg.GatewayKey) []*pbproxystate.Endpoint { + pbEndpoints := make([]*pbproxystate.Endpoint, 0, len(endpointGroups)) + + // TODO(jm): make this work in xdsv2 + //if len(endpointGroups) > 1 { + // cla.Policy = &envoy_endpoint_v3.ClusterLoadAssignment_Policy{ + // // We choose such a large value here that the failover math should + // // in effect not happen until zero instances are healthy. + // OverprovisioningFactor: response.MakeUint32Value(100000), + // } + //} + + var priority uint32 + + for _, endpointGroup := range endpointGroups { + endpointsByLocality, err := groupedEndpoints(cfgSnap.ServiceLocality, policy, endpointGroup.Endpoints) + + if err != nil { + continue + } + + for _, endpoints := range endpointsByLocality { + for _, ep := range endpoints { + // TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? + _, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) + healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing) + + if endpointGroup.OverrideHealth != pbproxystate.HealthStatus_HEALTH_STATUS_UNKNOWN { + healthStatus = endpointGroup.OverrideHealth + } + + endpoint := makeHostPortEndpoint(addr, port) + endpoint.HealthStatus = healthStatus + endpoint.LoadBalancingWeight = response.MakeUint32Value(weight) + + pbEndpoints = append(pbEndpoints, endpoint) + } + + // TODO(jm): what do we do about priority downstream? + //cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{ + // Priority: priority, + // LbEndpoints: es, + //}) + + priority++ + } + } + + return pbEndpoints +} + +func makeLoadAssignmentEndpointGroup( + targets map[string]*structs.DiscoveryTarget, + targetHealth map[string]structs.CheckServiceNodes, + gatewayHealth map[string]structs.CheckServiceNodes, + targetID string, + localKey proxycfg.GatewayKey, + forMeshGateway bool, +) (loadAssignmentEndpointGroup, bool) { + realEndpoints, ok := targetHealth[targetID] + if !ok { + // skip the cluster if we're still populating the snapshot + return loadAssignmentEndpointGroup{}, false + } + target := targets[targetID] + + var gatewayKey proxycfg.GatewayKey + + switch target.MeshGateway.Mode { + case structs.MeshGatewayModeRemote: + gatewayKey.Datacenter = target.Datacenter + gatewayKey.Partition = target.Partition + case structs.MeshGatewayModeLocal: + gatewayKey = localKey + } + + if forMeshGateway || gatewayKey.IsEmpty() || localKey.Matches(target.Datacenter, target.Partition) { + // Gateways are not needed if the request isn't for a remote DC or partition. + return loadAssignmentEndpointGroup{ + Endpoints: realEndpoints, + OnlyPassing: target.Subset.OnlyPassing, + }, true + } + + // If using a mesh gateway we need to pull those endpoints instead. + gatewayEndpoints, ok := gatewayHealth[gatewayKey.String()] + if !ok { + // skip the cluster if we're still populating the snapshot + return loadAssignmentEndpointGroup{}, false + } + + // But we will use the health from the actual backend service. + overallHealth := pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY + for _, ep := range realEndpoints { + health, _ := calculateEndpointHealthAndWeight(ep, target.Subset.OnlyPassing) + if health == pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY { + overallHealth = pbproxystate.HealthStatus_HEALTH_STATUS_HEALTHY + break + } + } + + return loadAssignmentEndpointGroup{ + Endpoints: gatewayEndpoints, + OverrideHealth: overallHealth, + }, true +} + func calculateEndpointHealthAndWeight( ep structs.CheckServiceNode, onlyPassing bool, diff --git a/agent/xds/proxystateconverter/locality_policy.go b/agent/xds/proxystateconverter/locality_policy.go new file mode 100644 index 000000000000..db62e63f5f95 --- /dev/null +++ b/agent/xds/proxystateconverter/locality_policy.go @@ -0,0 +1,21 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package proxystateconverter + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" +) + +func groupedEndpoints(locality *structs.Locality, policy *structs.DiscoveryPrioritizeByLocality, csns structs.CheckServiceNodes) ([]structs.CheckServiceNodes, error) { + switch { + case policy == nil || policy.Mode == "" || policy.Mode == "none": + return []structs.CheckServiceNodes{csns}, nil + case policy.Mode == "failover": + return prioritizeByLocalityFailover(locality, csns), nil + default: + return nil, fmt.Errorf("unexpected priortize-by-locality mode %q", policy.Mode) + } +} diff --git a/agent/xds/proxystateconverter/locality_policy_oss.go b/agent/xds/proxystateconverter/locality_policy_oss.go new file mode 100644 index 000000000000..007501ffab7b --- /dev/null +++ b/agent/xds/proxystateconverter/locality_policy_oss.go @@ -0,0 +1,15 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +//go:build !consulent +// +build !consulent + +package proxystateconverter + +import ( + "github.com/hashicorp/consul/agent/structs" +) + +func prioritizeByLocalityFailover(locality *structs.Locality, csns structs.CheckServiceNodes) []structs.CheckServiceNodes { + return nil +} diff --git a/agent/xdsv2/endpoint_resources.go b/agent/xdsv2/endpoint_resources.go index f5fd1b9c5d09..c493c48f640d 100644 --- a/agent/xdsv2/endpoint_resources.go +++ b/agent/xdsv2/endpoint_resources.go @@ -7,35 +7,56 @@ import ( envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" "github.com/hashicorp/consul/agent/xds/response" + "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" + "google.golang.org/protobuf/proto" ) -func makeEnvoyEndpoint(endpoint *pbproxystate.Endpoint) *envoy_endpoint_v3.LbEndpoint { +func makeEnvoyLbEndpoint(endpoint *pbproxystate.Endpoint) *envoy_endpoint_v3.LbEndpoint { + hs := int32(endpoint.GetHealthStatus().Number()) + return &envoy_endpoint_v3.LbEndpoint{ + HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ + Endpoint: makeEnvoyEndpoint(endpoint), + }, + HealthStatus: envoy_core_v3.HealthStatus(hs), + LoadBalancingWeight: endpoint.GetLoadBalancingWeight(), + } +} + +func makeEnvoyEndpoint(endpoint *pbproxystate.Endpoint) *envoy_endpoint_v3.Endpoint { var address *envoy_core_v3.Address if endpoint.GetUnixSocket() != nil { address = response.MakePipeAddress(endpoint.GetUnixSocket().GetPath(), 0) } else { address = response.MakeAddress(endpoint.GetHostPort().GetHost(), int(endpoint.GetHostPort().Port)) } - return &envoy_endpoint_v3.LbEndpoint{ - HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ - Endpoint: &envoy_endpoint_v3.Endpoint{ - Address: address, - }, - }, + + return &envoy_endpoint_v3.Endpoint{ + Address: address, } } func makeEnvoyClusterLoadAssignment(clusterName string, endpoints []*pbproxystate.Endpoint) *envoy_endpoint_v3.ClusterLoadAssignment { - localityLbEndpoints := make([]*envoy_endpoint_v3.LocalityLbEndpoints, 0, len(endpoints)) + localityLbEndpoints := &envoy_endpoint_v3.LocalityLbEndpoints{} for _, endpoint := range endpoints { - localityLbEndpoints = append(localityLbEndpoints, &envoy_endpoint_v3.LocalityLbEndpoints{ - LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{makeEnvoyEndpoint(endpoint)}, - }) + localityLbEndpoints.LbEndpoints = append(localityLbEndpoints.LbEndpoints, makeEnvoyLbEndpoint(endpoint)) } return &envoy_endpoint_v3.ClusterLoadAssignment{ ClusterName: clusterName, - Endpoints: localityLbEndpoints, + Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{localityLbEndpoints}, + } +} + +func (pr *ProxyResources) makeXDSEndpoints() ([]proto.Message, error) { + endpoints := make([]proto.Message, 0) + + for clusterName, eps := range pr.proxyState.GetEndpoints() { + // TODO(jm): this does not seem like the best way. + if clusterName != xdscommon.LocalAppClusterName { + protoEndpoint := makeEnvoyClusterLoadAssignment(clusterName, eps.Endpoints) + endpoints = append(endpoints, protoEndpoint) + } } + return endpoints, nil } diff --git a/agent/xdsv2/resources.go b/agent/xdsv2/resources.go index 6082f7dd56b5..cba6e8188c80 100644 --- a/agent/xdsv2/resources.go +++ b/agent/xdsv2/resources.go @@ -5,7 +5,6 @@ package xdsv2 import ( "fmt" - pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/go-hclog" "google.golang.org/protobuf/proto" @@ -48,7 +47,6 @@ func (g *ResourceGenerator) AllResourcesFromIR(proxyState *pbmesh.ProxyState) (m func (pr *ProxyResources) generateXDSResources() error { listeners := make([]proto.Message, 0) routes := make([]proto.Message, 0) - endpoints := make([]proto.Message, 0) for _, l := range pr.proxyState.Listeners { protoListener, err := pr.makeListener(l) @@ -58,16 +56,21 @@ func (pr *ProxyResources) generateXDSResources() error { } listeners = append(listeners, protoListener) } + pr.envoyResources[xdscommon.ListenerType] = listeners clusters, err := pr.makeXDSClusters() if err != nil { return err } - - pr.envoyResources[xdscommon.ListenerType] = listeners pr.envoyResources[xdscommon.ClusterType] = clusters - pr.envoyResources[xdscommon.RouteType] = routes + + endpoints, err := pr.makeXDSEndpoints() + if err != nil { + return err + } pr.envoyResources[xdscommon.EndpointType] = endpoints + pr.envoyResources[xdscommon.RouteType] = routes + return nil }