Skip to content

Commit

Permalink
feat(MeshLoadBalancingStrategy): add builtin gateway support (#6800)
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <[email protected]>
  • Loading branch information
michaelbeaumont authored May 23, 2023
1 parent 2962747 commit 6904d99
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 24 deletions.
143 changes: 120 additions & 23 deletions pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package v1alpha1

import (
"context"

envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
envoy_hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"github.com/pkg/errors"

Expand All @@ -14,6 +17,7 @@ import (
api "github.com/kumahq/kuma/pkg/plugins/policies/meshloadbalancingstrategy/api/v1alpha1"
"github.com/kumahq/kuma/pkg/plugins/policies/meshloadbalancingstrategy/plugin/xds"
policies_xds "github.com/kumahq/kuma/pkg/plugins/policies/xds"
gateway_plugin "github.com/kumahq/kuma/pkg/plugins/runtime/gateway"
"github.com/kumahq/kuma/pkg/util/pointer"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
v3 "github.com/kumahq/kuma/pkg/xds/envoy/listeners/v3"
Expand All @@ -36,15 +40,11 @@ func (p plugin) EgressMatchedPolicies(es *core_mesh.ExternalServiceResource, res
return matchers.EgressMatchedPolicies(api.MeshLoadBalancingStrategyType, es, resources)
}

func (p plugin) Apply(rs *core_xds.ResourceSet, _ xds_context.Context, proxy *core_xds.Proxy) error {
func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy *core_xds.Proxy) error {
if proxy.ZoneEgressProxy != nil {
return p.configureEgress(rs, proxy)
}

return p.configureDPP(rs, proxy)
}

func (p plugin) configureDPP(rs *core_xds.ResourceSet, proxy *core_xds.Proxy) error {
policies, ok := proxy.Policies.Dynamic[api.MeshLoadBalancingStrategyType]
if !ok {
return nil
Expand All @@ -53,26 +53,37 @@ func (p plugin) configureDPP(rs *core_xds.ResourceSet, proxy *core_xds.Proxy) er
listeners := policies_xds.GatherListeners(rs)
clusters := policies_xds.GatherClusters(rs)
endpoints := policies_xds.GatherEndpoints(rs)
routes := policies_xds.GatherRoutes(rs)

var zone string
if inbounds := proxy.Dataplane.Spec.GetNetworking().GetInbound(); len(inbounds) != 0 {
zone = inbounds[0].GetTags()[mesh_proto.ZoneTag]
if err := p.configureGateway(ctx, proxy, policies.ToRules, listeners.Gateway, clusters.Gateway, routes.Gateway, endpoints); err != nil {
return err
}

return p.configureDPP(proxy, policies.ToRules, listeners, clusters, endpoints)
}

func (p plugin) configureDPP(
proxy *core_xds.Proxy,
toRules core_xds.ToRules,
listeners policies_xds.Listeners,
clusters policies_xds.Clusters,
endpoints policies_xds.EndpointMap,
) error {
serviceConfs := map[string]api.Conf{}

for _, outbound := range proxy.Dataplane.Spec.Networking.GetOutbound() {
oface := proxy.Dataplane.Spec.Networking.ToOutboundInterface(outbound)
serviceName := outbound.GetTagsIncludingLegacy()[mesh_proto.ServiceTag]

computed := policies.ToRules.Rules.Compute(core_xds.MeshService(serviceName))
computed := toRules.Rules.Compute(core_xds.MeshService(serviceName))
if computed == nil {
continue
}

conf := computed.Conf.(api.Conf)

if listener, ok := listeners.Outbound[oface]; ok {
if err := p.generateLDS(listener, conf.LoadBalancer); err != nil {
if err := p.configureListener(listener, nil, conf.LoadBalancer); err != nil {
return err
}
}
Expand All @@ -84,22 +95,96 @@ func (p plugin) configureDPP(rs *core_xds.ResourceSet, proxy *core_xds.Proxy) er
// we configure clusters in a separate loop to avoid configuring the same cluster twice
for serviceName, conf := range serviceConfs {
if cluster, ok := clusters.Outbound[serviceName]; ok {
if err := p.generateCDS(cluster, conf.LoadBalancer); err != nil {
if err := p.configureCluster(cluster, conf.LoadBalancer); err != nil {
return err
}
}
for _, cluster := range clusters.OutboundSplit[serviceName] {
if err := p.generateCDS(cluster, conf.LoadBalancer); err != nil {
if err := p.configureCluster(cluster, conf.LoadBalancer); err != nil {
return err
}
}
if conf.LocalityAwareness == nil || !pointer.Deref(conf.LocalityAwareness.Disabled) {
for _, cla := range endpoints[serviceName] {
for _, localityLbEndpoints := range cla.Endpoints {
if localityLbEndpoints.Locality != nil && localityLbEndpoints.Locality.Zone != zone {
localityLbEndpoints.Priority = 1
}
configureEndpoints(proxy.Dataplane, endpoints, serviceName, conf)
}

return nil
}

func configureEndpoints(
dataplane *core_mesh.DataplaneResource,
endpoints policies_xds.EndpointMap,
serviceName string,
conf api.Conf,
) {
var zone string
if inbounds := dataplane.Spec.GetNetworking().GetInbound(); len(inbounds) != 0 {
zone = inbounds[0].GetTags()[mesh_proto.ZoneTag]
}
if conf.LocalityAwareness == nil || !pointer.Deref(conf.LocalityAwareness.Disabled) {
for _, cla := range endpoints[serviceName] {
for _, localityLbEndpoints := range cla.Endpoints {
if localityLbEndpoints.Locality != nil && localityLbEndpoints.Locality.Zone != zone {
localityLbEndpoints.Priority = 1
}
}
}
}
}

func (p plugin) configureGateway(
ctx xds_context.Context,
proxy *core_xds.Proxy,
rules core_xds.ToRules,
gatewayListeners map[core_xds.InboundListener]*envoy_listener.Listener,
gatewayClusters map[string]*envoy_cluster.Cluster,
gatewayRoutes map[string]*envoy_route.RouteConfiguration,
endpoints policies_xds.EndpointMap,
) error {
if !proxy.Dataplane.Spec.IsBuiltinGateway() {
return nil
}

gatewayListenerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone)
if err != nil {
return err
}

conf := core_xds.ComputeConf[api.Conf](rules.Rules, core_xds.MeshSubset())
if conf == nil {
return nil
}

for _, listenerInfo := range gatewayListenerInfos {
listener, ok := gatewayListeners[core_xds.InboundListener{
Address: proxy.Dataplane.Spec.GetNetworking().GetAddress(),
Port: listenerInfo.Listener.Port,
}]
if !ok {
continue
}

if err := p.configureListener(listener, gatewayRoutes, conf.LoadBalancer); err != nil {
return err
}

for _, hostInfo := range listenerInfo.HostInfos {
destinations := gateway_plugin.RouteDestinationsMutable(hostInfo.Entries)
for _, dest := range destinations {
clusterName, err := dest.Destination.DestinationClusterName(hostInfo.Host.Tags)
if err != nil {
continue
}
cluster, ok := gatewayClusters[clusterName]
if !ok {
continue
}

if err := p.configureCluster(cluster, conf.LoadBalancer); err != nil {
return err
}

serviceName := dest.Destination[mesh_proto.ServiceTag]
configureEndpoints(proxy.Dataplane, endpoints, serviceName, *conf)
}
}
}
Expand Down Expand Up @@ -162,7 +247,11 @@ func (p plugin) isLocalityAware(fr core_xds.FromRules) bool {
return false
}

func (p plugin) generateLDS(l *envoy_listener.Listener, lbConf *api.LoadBalancer) error {
func (p plugin) configureListener(
l *envoy_listener.Listener,
routes map[string]*envoy_route.RouteConfiguration,
lbConf *api.LoadBalancer,
) error {
if lbConf == nil {
return nil
}
Expand All @@ -189,10 +278,18 @@ func (p plugin) generateLDS(l *envoy_listener.Listener, lbConf *api.LoadBalancer
}

return v3.UpdateHTTPConnectionManager(l.FilterChains[0], func(hcm *envoy_hcm.HttpConnectionManager) error {
rc := hcm.RouteSpecifier.(*envoy_hcm.HttpConnectionManager_RouteConfig).RouteConfig
hpc := &xds.HashPolicyConfigurer{HashPolicies: *hashPolicy}
var routeConfig *envoy_route.RouteConfiguration
switch r := hcm.RouteSpecifier.(type) {
case *envoy_hcm.HttpConnectionManager_RouteConfig:
routeConfig = r.RouteConfig
case *envoy_hcm.HttpConnectionManager_Rds:
routeConfig = routes[r.Rds.RouteConfigName]
default:
return errors.Errorf("unexpected RouteSpecifer %T", r)
}

for _, vh := range rc.VirtualHosts {
hpc := &xds.HashPolicyConfigurer{HashPolicies: *hashPolicy}
for _, vh := range routeConfig.VirtualHosts {
for _, route := range vh.Routes {
if err := hpc.Configure(route); err != nil {
return err
Expand All @@ -203,7 +300,7 @@ func (p plugin) generateLDS(l *envoy_listener.Listener, lbConf *api.LoadBalancer
})
}

func (p plugin) generateCDS(c *envoy_cluster.Cluster, lbConf *api.LoadBalancer) error {
func (p plugin) configureCluster(c *envoy_cluster.Cluster, lbConf *api.LoadBalancer) error {
if lbConf == nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1_test

import (
"fmt"
"path/filepath"
"strings"

Expand All @@ -15,8 +16,11 @@ import (
core_xds "github.com/kumahq/kuma/pkg/core/xds"
"github.com/kumahq/kuma/pkg/plugins/policies/meshloadbalancingstrategy/api/v1alpha1"
plugin "github.com/kumahq/kuma/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1"
gateway_plugin "github.com/kumahq/kuma/pkg/plugins/runtime/gateway"
"github.com/kumahq/kuma/pkg/test/matchers"
"github.com/kumahq/kuma/pkg/test/resources/builders"
"github.com/kumahq/kuma/pkg/test/resources/samples"
test_xds "github.com/kumahq/kuma/pkg/test/xds"
"github.com/kumahq/kuma/pkg/util/pointer"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
Expand All @@ -42,7 +46,7 @@ var _ = Describe("MeshLoadBalancingStrategy", func() {
resources []core_xds.Resource
proxy *core_xds.Proxy
}
DescribeTable("Apply",
DescribeTable("Apply to sidecar Dataplanes",
func(given testCase) {
resources := core_xds.NewResourceSet()
for _, res := range given.resources {
Expand Down Expand Up @@ -362,4 +366,91 @@ var _ = Describe("MeshLoadBalancingStrategy", func() {
},
}),
)
type gatewayTestCase struct {
name string
toRules core_xds.ToRules
}
DescribeTable("should generate proper Envoy config for MeshGateways",
func(given gatewayTestCase) {
Expect(given.name).ToNot(BeEmpty())
resources := xds_context.NewResources()
resources.MeshLocalResources[core_mesh.MeshGatewayType] = &core_mesh.MeshGatewayResourceList{
Items: []*core_mesh.MeshGatewayResource{samples.GatewayResource()},
}
resources.MeshLocalResources[core_mesh.MeshGatewayRouteType] = &core_mesh.MeshGatewayRouteResourceList{
Items: []*core_mesh.MeshGatewayRouteResource{samples.BackendGatewayRoute()},
}

context := test_xds.CreateSampleMeshContextWith(resources)
proxy := core_xds.Proxy{
APIVersion: "v3",
Dataplane: samples.GatewayDataplane(),
Policies: core_xds.MatchedPolicies{
Dynamic: map[core_model.ResourceType]core_xds.TypedMatchingPolicies{
v1alpha1.MeshLoadBalancingStrategyType: {
Type: v1alpha1.MeshLoadBalancingStrategyType,
ToRules: given.toRules,
},
},
},
}
gatewayGenerator := gateway_plugin.NewGenerator("test-zone")
generatedResources, err := gatewayGenerator.Generate(context, &proxy)
Expect(err).NotTo(HaveOccurred())

// when
plugin := plugin.NewPlugin().(core_plugins.PolicyPlugin)
Expect(plugin.Apply(generatedResources, context, &proxy)).To(Succeed())

getResourceYaml := func(list core_xds.ResourceList) []byte {
actualResource, err := util_proto.ToYAML(list[0].Resource)
Expect(err).ToNot(HaveOccurred())
return actualResource
}

// then
Expect(getResourceYaml(generatedResources.ListOf(envoy_resource.ClusterType))).
To(matchers.MatchGoldenYAML(filepath.Join("testdata", fmt.Sprintf("%s.gateway_cluster.golden.yaml", given.name))))
Expect(getResourceYaml(generatedResources.ListOf(envoy_resource.ListenerType))).
To(matchers.MatchGoldenYAML(filepath.Join("testdata", fmt.Sprintf("%s.gateway_listener.golden.yaml", given.name))))
Expect(getResourceYaml(generatedResources.ListOf(envoy_resource.RouteType))).
To(matchers.MatchGoldenYAML(filepath.Join("testdata", fmt.Sprintf("%s.gateway_route.golden.yaml", given.name))))
},
Entry("basic outbound cluster", gatewayTestCase{
name: "basic",
toRules: core_xds.ToRules{
Rules: []*core_xds.Rule{
{
Subset: core_xds.Subset{},
Conf: v1alpha1.Conf{
LoadBalancer: &v1alpha1.LoadBalancer{
Type: v1alpha1.RingHashType,
RingHash: &v1alpha1.RingHash{
MinRingSize: pointer.To[uint32](100),
MaxRingSize: pointer.To[uint32](1000),
HashFunction: pointer.To(v1alpha1.MurmurHash2Type),
HashPolicies: &[]v1alpha1.HashPolicy{
{
Type: v1alpha1.QueryParameterType,
QueryParameter: &v1alpha1.QueryParameter{
Name: "queryparam",
},
Terminal: pointer.To(true),
},
{
Type: v1alpha1.ConnectionType,
Connection: &v1alpha1.Connection{
SourceIP: pointer.To(true),
},
Terminal: pointer.To(false),
},
},
},
},
},
},
},
},
}),
)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
connectTimeout: 10s
edsClusterConfig:
edsConfig:
ads: {}
resourceApiVersion: V3
lbPolicy: RING_HASH
name: backend-26cb64fa4e85e7b7
perConnectionBufferLimitBytes: 32768
ringHashLbConfig:
hashFunction: MURMUR_HASH_2
maximumRingSize: "1000"
minimumRingSize: "100"
type: EDS
typedExtensionProtocolOptions:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
'@type': type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
commonHttpProtocolOptions:
idleTimeout: 0s
explicitHttpConfig:
httpProtocolOptions: {}
Loading

0 comments on commit 6904d99

Please sign in to comment.