Skip to content

Commit

Permalink
refactor(xds): reuse MakeSplit function (#6943)
Browse files Browse the repository at this point in the history
* refactor(xds): move out MakeSplit function
* refactor(xds): use splits for TCP proxy
* refactor(xds): rename TcpProxy
* Revert "chore(kuma-cp): add weights to xds.Cluster interface (#6844)"

This reverts commit 52045f6.

Signed-off-by: Bart Smykla <[email protected]>
Signed-off-by: Mike Beaumont <[email protected]>
Co-authored-by: Bart Smykla <[email protected]>
  • Loading branch information
michaelbeaumont and bartsmykla authored Jun 6, 2023
1 parent 44e42a5 commit 7c9dd6e
Show file tree
Hide file tree
Showing 35 changed files with 237 additions and 213 deletions.
2 changes: 1 addition & 1 deletion pkg/plugins/bootstrap/k8s/xds/hooks/api_server_bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (h ApiServerBypass) Modify(resources *core_xds.ResourceSet, ctx xds_context
listener, err := envoy_listeners.NewListenerBuilder(proxy.APIVersion).
Configure(envoy_listeners.OutboundListener(apiServerBypassHookResourcesName, h.Address, h.Port, core_xds.SocketAddressProtocolTCP)).
Configure(envoy_listeners.FilterChain(envoy_listeners.NewFilterChainBuilder(proxy.APIVersion).
Configure(envoy_listeners.TcpProxy(apiServerBypassHookResourcesName, envoy_common.NewCluster(envoy_common.WithService(apiServerBypassHookResourcesName)))))).
Configure(envoy_listeners.TcpProxyDeprecated(apiServerBypassHookResourcesName, envoy_common.NewCluster(envoy_common.WithService(apiServerBypassHookResourcesName)))))).
Configure(envoy_listeners.NoBindToPort()).
Configure(envoy_listeners.OriginalDstForwarder()).
Build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27777", "127.0.0.1", 27777, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27777",
envoy_common.NewCluster(
envoy_common.WithService("backend"),
Expand Down Expand Up @@ -255,7 +255,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27777", "127.0.0.1", 27777, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27777",
envoy_common.NewCluster(
envoy_common.WithService("backend"),
Expand Down Expand Up @@ -314,7 +314,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27777", "127.0.0.1", 27777, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27777",
envoy_common.NewCluster(
envoy_common.WithService("backend"),
Expand Down Expand Up @@ -376,7 +376,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27777", "127.0.0.1", 27777, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27777",
envoy_common.NewCluster(
envoy_common.WithService("backend"),
Expand Down Expand Up @@ -433,7 +433,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27777", "127.0.0.1", 27777, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27777",
envoy_common.NewCluster(
envoy_common.WithService("other-service"),
Expand All @@ -447,7 +447,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27778", "127.0.0.1", 27778, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27778",
envoy_common.NewCluster(
envoy_common.WithService("foo-service"),
Expand All @@ -461,7 +461,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27779", "127.0.0.1", 27779, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27779",
envoy_common.NewCluster(
envoy_common.WithService("bar-service"),
Expand Down Expand Up @@ -650,7 +650,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27777", "127.0.0.1", 27777, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27777",
envoy_common.NewCluster(
envoy_common.WithService("backend"),
Expand Down Expand Up @@ -710,7 +710,7 @@ var _ = Describe("MeshAccessLog", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:27777", "127.0.0.1", 27777, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:27777",
envoy_common.NewCluster(
envoy_common.WithService("backend"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var _ = Describe("MeshFaultInjection", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(InboundListener("inbound:127.0.0.1:17778", "127.0.0.1", 17778, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy("127.0.0.1:17778", envoy_common.NewCluster(envoy_common.WithName("frontend")))),
Configure(TcpProxyDeprecated("127.0.0.1:17778", envoy_common.NewCluster(envoy_common.WithName("frontend")))),
)).MustBuild(),
},
},
Expand Down
72 changes: 2 additions & 70 deletions pkg/plugins/policies/meshhttproute/plugin/v1alpha1/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func generateListeners(
protocol := plugins_xds.InferProtocol(proxy.Routing, serviceName)
var routes []xds.OutboundRoute
for _, route := range prepareRoutes(rules, serviceName, protocol) {
split := makeHTTPSplit(proxy, clusterCache, splitCounter, servicesAcc, route.BackendRefs)
split := meshroute_xds.MakeHTTPSplit(proxy, clusterCache, splitCounter, servicesAcc, route.BackendRefs)
if split == nil {
continue
}
for _, filter := range route.Filters {
if filter.Type == api.RequestMirrorType {
// we need to create a split for the mirror backend
_ = makeHTTPSplit(proxy, clusterCache, splitCounter, servicesAcc,
_ = meshroute_xds.MakeHTTPSplit(proxy, clusterCache, splitCounter, servicesAcc,
[]common_api.BackendRef{{
TargetRef: filter.RequestMirror.BackendRef,
Weight: pointer.To[uint](1), // any non-zero value
Expand Down Expand Up @@ -168,71 +168,3 @@ func prepareRoutes(

return routes
}

func makeHTTPSplit(
proxy *core_xds.Proxy,
clusterCache map[string]string,
sc *meshroute_xds.SplitCounter,
servicesAcc envoy_common.ServicesAccumulator,
refs []common_api.BackendRef,
) []*plugins_xds.Split {
var split []*plugins_xds.Split

for _, ref := range refs {
switch ref.Kind {
case common_api.MeshService, common_api.MeshServiceSubset:
default:
continue
}

service := ref.Name
if pointer.DerefOr(ref.Weight, 1) == 0 {
continue
}

switch plugins_xds.InferProtocol(proxy.Routing, service) {
case core_mesh.ProtocolHTTP, core_mesh.ProtocolHTTP2:
default:
// We don't support splitting if at least one of the backendRefs is not HTTP
return nil
}

clusterName := meshroute_xds.GetClusterName(ref.Name, ref.Tags, sc)
isExternalService := plugins_xds.HasExternalService(proxy.Routing, service)
refHash := ref.TargetRef.Hash()

if existingClusterName, ok := clusterCache[refHash]; ok {
// cluster already exists, so adding only split
split = append(split, plugins_xds.NewSplitBuilder().
WithClusterName(existingClusterName).
WithWeight(uint32(pointer.DerefOr(ref.Weight, 1))).
WithExternalService(isExternalService).
Build())
continue
}

clusterCache[refHash] = clusterName

split = append(split, plugins_xds.NewSplitBuilder().
WithClusterName(clusterName).
WithWeight(uint32(pointer.DerefOr(ref.Weight, 1))).
WithExternalService(isExternalService).
Build())

clusterBuilder := plugins_xds.NewClusterBuilder().
WithService(service).
WithName(clusterName).
WithTags(envoy_tags.Tags(ref.Tags).
WithTags(mesh_proto.ServiceTag, ref.Name).
WithoutTags(mesh_proto.MeshTag)).
WithExternalService(isExternalService)

if mesh, ok := ref.Tags[mesh_proto.MeshTag]; ok {
clusterBuilder.WithMesh(mesh)
}

servicesAcc.Add(clusterBuilder.Build())
}

return split
}
3 changes: 1 addition & 2 deletions pkg/plugins/policies/meshhttproute/xds/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
plugins_xds "github.com/kumahq/kuma/pkg/plugins/policies/xds"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
envoy_listeners_v3 "github.com/kumahq/kuma/pkg/xds/envoy/listeners/v3"
envoy_names "github.com/kumahq/kuma/pkg/xds/envoy/names"
Expand All @@ -15,7 +14,7 @@ import (
type OutboundRoute struct {
Matches []api.Match
Filters []api.Filter
Split []*plugins_xds.Split
Split []envoy_common.Split
BackendRefToClusterName map[string]string
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/plugins/policies/meshhttproute/xds/configurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (

api "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
"github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/xds/filters"
plugins_xds "github.com/kumahq/kuma/pkg/plugins/policies/xds"
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/route"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
envoy_common "github.com/kumahq/kuma/pkg/xds/envoy"
)

type RoutesConfigurer struct {
Matches []api.Match
Filters []api.Filter
Split []*plugins_xds.Split
Split []envoy_common.Split
BackendRefToClusterName map[string]string
}

Expand Down Expand Up @@ -200,7 +200,7 @@ func routeQueryParamsMatch(envoyMatch *envoy_route.RouteMatch, matches []api.Que
}
}

func (c RoutesConfigurer) hasExternal(split []*plugins_xds.Split) bool {
func (c RoutesConfigurer) hasExternal(split []envoy_common.Split) bool {
for _, s := range split {
if s.HasExternalService() {
return true
Expand All @@ -209,7 +209,7 @@ func (c RoutesConfigurer) hasExternal(split []*plugins_xds.Split) bool {
return false
}

func (c RoutesConfigurer) routeAction(split []*plugins_xds.Split) *envoy_route.RouteAction {
func (c RoutesConfigurer) routeAction(split []envoy_common.Split) *envoy_route.RouteAction {
routeAction := &envoy_route.RouteAction{
// this timeout should be updated by the MeshTimeout plugin
Timeout: util_proto.Duration(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ var _ = Describe("MeshRateLimit", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(InboundListener("inbound:127.0.0.1:17778", "127.0.0.1", 17778, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy("127.0.0.1:17778", envoy_common.NewCluster(envoy_common.WithName("frontend")))),
Configure(TcpProxyDeprecated("127.0.0.1:17778", envoy_common.NewCluster(envoy_common.WithName("frontend")))),
)).MustBuild(),
},
},
Expand Down Expand Up @@ -285,7 +285,7 @@ var _ = Describe("MeshRateLimit", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(InboundListener("inbound:127.0.0.1:17778", "127.0.0.1", 17778, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy("127.0.0.1:17778", envoy_common.NewCluster(envoy_common.WithName("frontend")))),
Configure(TcpProxyDeprecated("127.0.0.1:17778", envoy_common.NewCluster(envoy_common.WithName("frontend")))),
)).MustBuild(),
}},
fromRules: core_xds.FromRules{
Expand Down Expand Up @@ -354,7 +354,7 @@ var _ = Describe("MeshRateLimit", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(InboundListener("inbound:127.0.0.1:17778", "127.0.0.1", 17778, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy("127.0.0.1:17778", envoy_common.NewCluster(envoy_common.WithName("frontend")))),
Configure(TcpProxyDeprecated("127.0.0.1:17778", envoy_common.NewCluster(envoy_common.WithName("frontend")))),
)).MustBuild(),
}},
fromRules: core_xds.FromRules{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func tcpListener(port uint32) envoy_common.NamedResource {
return NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener(fmt.Sprintf("outbound:127.0.0.1:%d", port), "127.0.0.1", port, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
fmt.Sprintf("outbound:127.0.0.1:%d", port),
envoy_common.NewCluster(
envoy_common.WithService("backend"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func generateListeners(
// Cluster cache protects us from creating excessive amount of clusters.
// For one outbound we pick one traffic route, so LB and Timeout are
// the same.
clusterCache := map[string]struct{}{}
clusterCache := map[string]string{}
sc := &meshroute_xds.SplitCounter{}
networking := proxy.Dataplane.Spec.GetNetworking()
routing := proxy.Routing
Expand All @@ -42,9 +42,8 @@ func generateListeners(
continue
}

clusters := getClusters(routing, clusterCache, sc, servicesAccumulator,
backendRefs)
filterChain := buildFilterChain(proxy, serviceName, clusters)
splits := meshroute_xds.MakeTCPSplit(proxy, clusterCache, sc, servicesAccumulator, backendRefs)
filterChain := buildFilterChain(proxy, serviceName, splits)

listener, err := buildOutboundListener(proxy, outbound, filterChain)
if err != nil {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
func buildFilterChain(
proxy *core_xds.Proxy,
serviceName string,
clusters []envoy_common.Cluster,
splits []envoy_common.Split,
) envoy_listeners.ListenerBuilderOpt {
tcpProxy := envoy_listeners.TcpProxy(serviceName, clusters...)
tcpProxy := envoy_listeners.TCPProxy(serviceName, splits...)
builder := envoy_listeners.NewFilterChainBuilder(proxy.APIVersion).
Configure(tcpProxy)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ var _ = Describe("MeshTimeout", func() {
Resource: NewListenerBuilder(envoy_common.APIV3).
Configure(OutboundListener("outbound:127.0.0.1:10002", "127.0.0.1", 10002, core_xds.SocketAddressProtocolTCP)).
Configure(FilterChain(NewFilterChainBuilder(envoy_common.APIV3).
Configure(TcpProxy(
Configure(TcpProxyDeprecated(
"127.0.0.1:10002",
envoy_common.NewCluster(
envoy_common.WithService("backend"),
Expand Down
Loading

0 comments on commit 7c9dd6e

Please sign in to comment.