diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index b763d960c19e..d4bc77320d7d 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -1,15 +1,20 @@ package xds import ( + "encoding/json" "errors" + "fmt" "time" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" ) // clustersFromSnapshot returns the xDS API representation of the "clusters" @@ -21,59 +26,146 @@ func clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]pro // Include the "app" cluster for the public listener clusters := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1) - clusters[0] = makeAppCluster(cfgSnap) + var err error + clusters[0], err = makeAppCluster(cfgSnap) + if err != nil { + return nil, err + } for idx, upstream := range cfgSnap.Proxy.Upstreams { - clusters[idx+1] = makeUpstreamCluster(upstream.Identifier(), cfgSnap) + clusters[idx+1], err = makeUpstreamCluster(upstream, cfgSnap) + if err != nil { + return nil, err + } } return clusters, nil } -func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) *envoy.Cluster { - addr := cfgSnap.Proxy.LocalServiceAddress - if addr == "" { - addr = "127.0.0.1" +func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) { + var c *envoy.Cluster + var err error + + // If we have overriden local cluster config try to parse it into an Envoy cluster + if clusterJSONRaw, ok := cfgSnap.Proxy.Config["envoy_local_cluster_json"]; ok { + if clusterJSON, ok := clusterJSONRaw.(string); ok { + c, err = makeClusterFromUserConfig(clusterJSON) + if err != nil { + return c, err + } + } } - return &envoy.Cluster{ - Name: LocalAppClusterName, - // TODO(banks): make this configurable from the proxy config - ConnectTimeout: 5 * time.Second, - Type: envoy.Cluster_STATIC, - // API v2 docs say hosts is deprecated and should use LoadAssignment as - // below.. but it doesn't work for tcp_proxy target for some reason. - Hosts: []*envoycore.Address{makeAddressPtr(addr, cfgSnap.Proxy.LocalServicePort)}, - // LoadAssignment: &envoy.ClusterLoadAssignment{ - // ClusterName: LocalAppClusterName, - // Endpoints: []endpoint.LocalityLbEndpoints{ - // { - // LbEndpoints: []endpoint.LbEndpoint{ - // makeEndpoint(LocalAppClusterName, - // addr, - // cfgSnap.Proxy.LocalServicePort), - // }, - // }, - // }, - // }, + + if c == nil { + addr := cfgSnap.Proxy.LocalServiceAddress + if addr == "" { + addr = "127.0.0.1" + } + c = &envoy.Cluster{ + Name: LocalAppClusterName, + ConnectTimeout: 5 * time.Second, + Type: envoy.Cluster_STATIC, + // API v2 docs say hosts is deprecated and should use LoadAssignment as + // below.. but it doesn't work for tcp_proxy target for some reason. + Hosts: []*envoycore.Address{makeAddressPtr(addr, cfgSnap.Proxy.LocalServicePort)}, + // LoadAssignment: &envoy.ClusterLoadAssignment{ + // ClusterName: LocalAppClusterName, + // Endpoints: []endpoint.LocalityLbEndpoints{ + // { + // LbEndpoints: []endpoint.LbEndpoint{ + // makeEndpoint(LocalAppClusterName, + // addr, + // cfgSnap.Proxy.LocalServicePort), + // }, + // }, + // }, + // }, + } } + + return c, err } -func makeUpstreamCluster(name string, cfgSnap *proxycfg.ConfigSnapshot) *envoy.Cluster { - return &envoy.Cluster{ - Name: name, - // TODO(banks): make this configurable from the upstream config - ConnectTimeout: 5 * time.Second, - Type: envoy.Cluster_EDS, - EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{ - EdsConfig: &envoycore.ConfigSource{ - ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{ - Ads: &envoycore.AggregatedConfigSource{}, +func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) { + var c *envoy.Cluster + var err error + + // If we have overriden cluster config attempt to parse it into an Envoy cluster + if clusterJSONRaw, ok := upstream.Config["envoy_cluster_json"]; ok { + if clusterJSON, ok := clusterJSONRaw.(string); ok { + c, err = makeClusterFromUserConfig(clusterJSON) + if err != nil { + return c, err + } + } + } + + if c == nil { + c = &envoy.Cluster{ + Name: upstream.Identifier(), + ConnectTimeout: 5 * time.Second, + Type: envoy.Cluster_EDS, + EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{ + EdsConfig: &envoycore.ConfigSource{ + ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{ + Ads: &envoycore.AggregatedConfigSource{}, + }, }, }, - }, - // Enable TLS upstream with the configured client certificate. - TlsContext: &envoyauth.UpstreamTlsContext{ - CommonTlsContext: makeCommonTLSContext(cfgSnap), - }, + } + } + + // Enable TLS upstream with the configured client certificate. + c.TlsContext = &envoyauth.UpstreamTlsContext{ + CommonTlsContext: makeCommonTLSContext(cfgSnap), } + + return c, nil +} + +// makeClusterFromUserConfig returns the listener config decoded from an +// arbitrary proto3 json format string or an error if it's invalid. +// +// For now we only support embedding in JSON strings because of the hcl parsing +// pain (see config.go comment above call to patchSliceOfMaps). Until we +// refactor config parser a _lot_ user's opaque config that contains arrays will +// be mangled. We could actually fix that up in mapstructure which knows the +// type of the target so could resolve the slices to singletons unambiguously +// and it would work for us here... but we still have the problem that the +// config would render incorrectly in general in our HTTP API responses so we +// really need to fix it "properly". +// +// When we do that we can support just nesting the config directly into the +// JSON/hcl naturally but this is a stop-gap that gets us an escape hatch +// immediately. It's also probably not a bad thing to support long-term since +// any config generated by other systems will likely be in canonical protobuf +// from rather than our slight variant in JSON/hcl. +func makeClusterFromUserConfig(configJSON string) (*envoy.Cluster, error) { + var jsonFields map[string]*json.RawMessage + if err := json.Unmarshal([]byte(configJSON), &jsonFields); err != nil { + fmt.Println("Custom error", err, configJSON) + return nil, err + } + + var c envoy.Cluster + + if _, ok := jsonFields["@type"]; ok { + // Type field is present so decode it as a types.Any + var any types.Any + err := jsonpb.UnmarshalString(configJSON, &any) + if err != nil { + return nil, err + } + // And then unmarshal the listener again... + err = proto.Unmarshal(any.Value, &c) + if err != nil { + panic(err) + //return nil, err + } + return &c, err + } + + // No @type so try decoding as a straight listener. + err := jsonpb.UnmarshalString(configJSON, &c) + return &c, err } diff --git a/agent/xds/server_test.go b/agent/xds/server_test.go index 694cbce2972b..89a1854e0e76 100644 --- a/agent/xds/server_test.go +++ b/agent/xds/server_test.go @@ -361,50 +361,9 @@ func expectListenerJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token strin expectListenerJSONResources(t, snap, token, v, n)) } -func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) string { - return `{ - "versionInfo": "` + hexString(v) + `", - "resources": [ - { - "@type": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", - "clusterName": "service:db", - "endpoints": [ - { - "lbEndpoints": [ - { - "endpoint": { - "address": { - "socketAddress": { - "address": "10.10.1.1", - "portValue": 0 - } - } - } - }, - { - "endpoint": { - "address": { - "socketAddress": { - "address": "10.10.1.2", - "portValue": 0 - } - } - } - } - ] - } - ] - } - ], - "typeUrl": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", - "nonce": "` + hexString(n) + `" - }` -} - -func expectClustersJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) string { - return `{ - "versionInfo": "` + hexString(v) + `", - "resources": [ +func expectClustersJSONResources(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) map[string]string { + return map[string]string{ + "local_app": ` { "@type": "type.googleapis.com/envoy.api.v2.Cluster", "name": "local_app", @@ -417,7 +376,8 @@ func expectClustersJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token strin } } ] - }, + }`, + "service:db": ` { "@type": "type.googleapis.com/envoy.api.v2.Cluster", "name": "service:db", @@ -431,7 +391,8 @@ func expectClustersJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token strin }, "connectTimeout": "5s", "tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + ` - }, + }`, + "prepared_query:geo-cache": ` { "@type": "type.googleapis.com/envoy.api.v2.Cluster", "name": "prepared_query:geo-cache", @@ -445,12 +406,81 @@ func expectClustersJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token strin }, "connectTimeout": "5s", "tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + ` + }`, + } +} + +func expectClustersJSONFromResources(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64, resourcesJSON map[string]string) string { + resJSON := "" + + // Sort resources into specific order because that matters in JSONEq + // comparison later. + keyOrder := []string{"local_app"} + for _, u := range snap.Proxy.Upstreams { + keyOrder = append(keyOrder, u.Identifier()) + } + for _, k := range keyOrder { + j, ok := resourcesJSON[k] + if !ok { + continue + } + if resJSON != "" { + resJSON += ",\n" + } + resJSON += j + } + + return `{ + "versionInfo": "` + hexString(v) + `", + "resources": [` + resJSON + `], + "typeUrl": "type.googleapis.com/envoy.api.v2.Cluster", + "nonce": "` + hexString(n) + `" + }` +} + +func expectClustersJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) string { + return expectClustersJSONFromResources(t, snap, token, v, n, + expectClustersJSONResources(t, snap, token, v, n)) +} + +func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token string, v, n uint64) string { + return `{ + "versionInfo": "` + hexString(v) + `", + "resources": [ + { + "@type": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", + "clusterName": "service:db", + "endpoints": [ + { + "lbEndpoints": [ + { + "endpoint": { + "address": { + "socketAddress": { + "address": "10.10.1.1", + "portValue": 0 + } + } + } + }, + { + "endpoint": { + "address": { + "socketAddress": { + "address": "10.10.1.2", + "portValue": 0 + } + } + } + } + ] + } + ] } ], - "typeUrl": "type.googleapis.com/envoy.api.v2.Cluster", + "typeUrl": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", "nonce": "` + hexString(n) + `" - } - ` + }` } func expectedUpstreamTLSContextJSON(t *testing.T, snap *proxycfg.ConfigSnapshot) string { @@ -973,7 +1003,7 @@ func TestServer_Check(t *testing.T) { } } -func TestServer_ConfigOverrides(t *testing.T) { +func TestServer_ConfigOverridesListeners(t *testing.T) { tests := []struct { name string @@ -1118,6 +1148,123 @@ func TestServer_ConfigOverrides(t *testing.T) { } } +func TestServer_ConfigOverridesClusters(t *testing.T) { + + tests := []struct { + name string + setup func(snap *proxycfg.ConfigSnapshot) string + }{ + { + name: "sanity check no custom", + setup: func(snap *proxycfg.ConfigSnapshot) string { + // Default snap and expectation + return expectClustersJSON(t, snap, "my-token", 1, 1) + }, + }, + { + name: "custom public with no type", + setup: func(snap *proxycfg.ConfigSnapshot) string { + snap.Proxy.Config["envoy_local_cluster_json"] = + customAppClusterJSON(t, customClusterJSONOptions{ + Name: "mylocal", + IncludeType: false, + }) + resources := expectClustersJSONResources(t, snap, "my-token", 1, 1) + + // Replace an upstream listener with the custom one WITH type since + // that's how it comes out the other end. + resources["local_app"] = + customAppClusterJSON(t, customClusterJSONOptions{ + Name: "mylocal", + IncludeType: true, + }) + return expectClustersJSONFromResources(t, snap, "my-token", 1, 1, resources) + }, + }, + { + name: "custom public with type", + setup: func(snap *proxycfg.ConfigSnapshot) string { + snap.Proxy.Config["envoy_local_cluster_json"] = + customAppClusterJSON(t, customClusterJSONOptions{ + Name: "mylocal", + IncludeType: true, + }) + resources := expectClustersJSONResources(t, snap, "my-token", 1, 1) + + // Replace an upstream listener with the custom one WITH type since + // that's how it comes out the other end. + resources["local_app"] = + customAppClusterJSON(t, customClusterJSONOptions{ + Name: "mylocal", + IncludeType: true, + }) + return expectClustersJSONFromResources(t, snap, "my-token", 1, 1, resources) + }, + }, + { + name: "custom upstream with no type", + setup: func(snap *proxycfg.ConfigSnapshot) string { + snap.Proxy.Upstreams[0].Config["envoy_cluster_json"] = + customEDSClusterJSON(t, customClusterJSONOptions{ + Name: "myservice", + IncludeType: false, + }) + resources := expectClustersJSONResources(t, snap, "my-token", 1, 1) + + // Replace an upstream listener with the custom one WITH type since + // that's how it comes out the other end. + resources["service:db"] = + customEDSClusterJSON(t, customClusterJSONOptions{ + Name: "myservice", + IncludeType: true, + TLSContext: expectedUpstreamTLSContextJSON(t, snap), + }) + return expectClustersJSONFromResources(t, snap, "my-token", 1, 1, resources) + }, + }, + { + name: "custom upstream with type", + setup: func(snap *proxycfg.ConfigSnapshot) string { + snap.Proxy.Upstreams[0].Config["envoy_cluster_json"] = + customEDSClusterJSON(t, customClusterJSONOptions{ + Name: "myservice", + IncludeType: true, + }) + resources := expectClustersJSONResources(t, snap, "my-token", 1, 1) + + // Replace an upstream listener with the custom one WITH type since + // that's how it comes out the other end. + resources["service:db"] = + customEDSClusterJSON(t, customClusterJSONOptions{ + Name: "myservice", + IncludeType: true, + TLSContext: expectedUpstreamTLSContextJSON(t, snap), + }) + return expectClustersJSONFromResources(t, snap, "my-token", 1, 1, resources) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + // Sanity check default with no overrides first + snap := proxycfg.TestConfigSnapshot(t) + expect := tt.setup(snap) + + clusters, err := clustersFromSnapshot(snap, "my-token") + require.NoError(err) + r, err := createResponse(ClusterType, "00000001", "00000001", clusters) + require.NoError(err) + + fmt.Println(r) + + assertResponse(t, r, expect) + }) + } +} + type customListenerJSONOptions struct { Name string IncludeType bool @@ -1182,3 +1329,67 @@ func customListenerJSON(t *testing.T, opts customListenerJSONOptions) string { require.NoError(t, err) return buf.String() } + +type customClusterJSONOptions struct { + Name string + IncludeType bool + TLSContext string +} + +var customEDSClusterJSONTpl = `{ + {{ if .IncludeType -}} + "@type": "type.googleapis.com/envoy.api.v2.Cluster", + {{- end }} + {{ if .TLSContext -}} + "tlsContext": {{ .TLSContext }}, + {{- end }} + "name": "{{ .Name }}", + "type": "EDS", + "edsClusterConfig": { + "edsConfig": { + "ads": { + + } + } + }, + "connectTimeout": "5s" +}` + +var customEDSClusterJSONTemplate = template.Must(template.New("").Parse(customEDSClusterJSONTpl)) + +func customEDSClusterJSON(t *testing.T, opts customClusterJSONOptions) string { + t.Helper() + var buf bytes.Buffer + err := customEDSClusterJSONTemplate.Execute(&buf, opts) + require.NoError(t, err) + return buf.String() +} + +var customAppClusterJSONTpl = `{ + {{ if .IncludeType -}} + "@type": "type.googleapis.com/envoy.api.v2.Cluster", + {{- end }} + {{ if .TLSContext -}} + "tlsContext": {{ .TLSContext }}, + {{- end }} + "name": "{{ .Name }}", + "connectTimeout": "5s", + "hosts": [ + { + "socketAddress": { + "address": "127.0.0.1", + "portValue": 8080 + } + } + ] +}` + +var customAppClusterJSONTemplate = template.Must(template.New("").Parse(customAppClusterJSONTpl)) + +func customAppClusterJSON(t *testing.T, opts customClusterJSONOptions) string { + t.Helper() + var buf bytes.Buffer + err := customAppClusterJSONTemplate.Execute(&buf, opts) + require.NoError(t, err) + return buf.String() +}