Skip to content

Commit

Permalink
feat(kuma-cp) XDS extensions and new matcher strategy (#1493)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
jakubdyszkiewicz authored Feb 2, 2021
1 parent 6f91a8d commit 9781c38
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/kumahq/kuma/pkg/api-server/customization"
"github.com/kumahq/kuma/pkg/core/managers/apis/zone"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"

"github.com/pkg/errors"

Expand Down Expand Up @@ -90,6 +91,7 @@ func buildRuntime(cfg kuma_cp.Config, closeCh <-chan struct{}) (core_runtime.Run

builder.WithLookupIP(lookup.CachedLookupIP(net.LookupIP, cfg.General.DNSCacheTTL))
builder.WithAPIManager(customization.NewAPIList())
builder.WithXDSHooks(&xds_hooks.Hooks{})

if err := initializeAfterBootstrap(cfg, builder); err != nil {
return nil, err
Expand Down
50 changes: 50 additions & 0 deletions pkg/core/policy/dataplane_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,56 @@ func SelectDataplanePolicy(dataplane *mesh.DataplaneResource, policies []Datapla
return bestPolicy
}

// SelectInboundDataplanePolicies given a Dataplane definition and a list of DataplanePolicy returns the "best matching" DataplanePolicy for each inbound separately.
// A DataplanePolicy for an inbound is considered a match if the inbound matches all the tags listed in selector of the DataplanePolicy.
// Every matching DataplanePolicy gets a rank (score) defined as a maximum number of tags in a matching selector.
// DataplanePolicy with an empty list of selectors is considered a match with a rank (score) of 0.
// DataplanePolicy with an empty selector (one that has no tags) is considered a match with a rank (score) of 0.
// In case if there are multiple DataplanePolicies with the same rank (score), the policy created last is chosen.
func SelectInboundDataplanePolicies(dataplane *mesh.DataplaneResource, policies []DataplanePolicy) InboundDataplanePolicyMap {
sort.Stable(DataplanePolicyByName(policies)) // sort to avoid flakiness

match := InboundDataplanePolicyMap{}

for _, inbound := range dataplane.Spec.Networking.GetInbound() {
var bestPolicy DataplanePolicy
var bestRank mesh_proto.TagSelectorRank
sameRankCreatedLater := func(policy DataplanePolicy, rank mesh_proto.TagSelectorRank) bool {
return rank.CompareTo(bestRank) == 0 && policy.GetMeta().GetCreationTime().After(bestPolicy.GetMeta().GetCreationTime())
}
for _, policy := range policies {
if 0 == len(policy.Selectors()) { // match everything
if bestPolicy == nil || sameRankCreatedLater(policy, mesh_proto.TagSelectorRank{}) {
bestPolicy = policy
}
continue
}
for _, selector := range policy.Selectors() {
if 0 == len(selector.Match) { // match everything
if bestPolicy == nil || sameRankCreatedLater(policy, mesh_proto.TagSelectorRank{}) {
bestPolicy = policy
}
continue
}
tagSelector := mesh_proto.TagSelector(selector.Match)
if inbound.MatchTags(tagSelector) {
rank := tagSelector.Rank()
if rank.CompareTo(bestRank) > 0 || sameRankCreatedLater(policy, rank) {
bestRank = rank
bestPolicy = policy
}
}
}
}
if bestPolicy != nil {
iface := dataplane.Spec.GetNetworking().ToInboundInterface(inbound)
match[iface] = bestPolicy
}
}

return match
}

type DataplanePolicyByName []DataplanePolicy

func (a DataplanePolicyByName) Len() int { return len(a) }
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/policy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ type DataplanePolicy interface {
core_model.Resource
Selectors() []*mesh_proto.Selector
}

type InboundDataplanePolicyMap map[mesh_proto.InboundInterface]DataplanePolicy
16 changes: 15 additions & 1 deletion pkg/core/runtime/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

api_server "github.com/kumahq/kuma/pkg/api-server/customization"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"

"github.com/pkg/errors"

Expand Down Expand Up @@ -42,6 +43,7 @@ type BuilderContext interface {
Metrics() metrics.Metrics
EventReaderFactory() events.ListenerFactory
APIManager() api_server.APIManager
XDSHooks() *xds_hooks.Hooks
}

var _ BuilderContext = &Builder{}
Expand All @@ -65,6 +67,7 @@ type Builder struct {
metrics metrics.Metrics
erf events.ListenerFactory
apim api_server.APIManager
xdsh *xds_hooks.Hooks
closeCh <-chan struct{}
*runtimeInfo
}
Expand Down Expand Up @@ -176,6 +179,11 @@ func (b *Builder) WithAPIManager(apim api_server.APIManager) *Builder {
return b
}

func (b *Builder) WithXDSHooks(xdsh *xds_hooks.Hooks) *Builder {
b.xdsh = xdsh
return b
}

func (b *Builder) Build() (Runtime, error) {
if b.cm == nil {
return nil, errors.Errorf("ComponentManager has not been configured")
Expand Down Expand Up @@ -213,6 +221,9 @@ func (b *Builder) Build() (Runtime, error) {
if b.apim == nil {
return nil, errors.Errorf("APIManager has not been configured")
}
if b.xdsh == nil {
return nil, errors.Errorf("XDSHooks has not been configured")
}
return &runtime{
RuntimeInfo: b.runtimeInfo,
RuntimeContext: &runtimeContext{
Expand All @@ -231,6 +242,7 @@ func (b *Builder) Build() (Runtime, error) {
metrics: b.metrics,
erf: b.erf,
apim: b.apim,
xdsh: b.xdsh,
},
Manager: b.cm,
}, nil
Expand Down Expand Up @@ -284,10 +296,12 @@ func (b *Builder) Metrics() metrics.Metrics {
func (b *Builder) EventReaderFactory() events.ListenerFactory {
return b.erf
}

func (b *Builder) APIManager() api_server.APIManager {
return b.apim
}
func (b *Builder) XDSHooks() *xds_hooks.Hooks {
return b.xdsh
}
func (b *Builder) CloseCh() <-chan struct{} {
return b.closeCh
}
8 changes: 8 additions & 0 deletions pkg/core/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

api_server "github.com/kumahq/kuma/pkg/api-server/customization"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"

"github.com/kumahq/kuma/pkg/core/datasource"
"github.com/kumahq/kuma/pkg/dns/resolver"
Expand Down Expand Up @@ -54,6 +55,7 @@ type RuntimeContext interface {
Metrics() metrics.Metrics
EventReaderFactory() events.ListenerFactory
APIInstaller() api_server.APIInstaller
XDSHooks() *xds_hooks.Hooks
}

var _ Runtime = &runtime{}
Expand Down Expand Up @@ -108,6 +110,7 @@ type runtimeContext struct {
metrics metrics.Metrics
erf events.ListenerFactory
apim api_server.APIInstaller
xdsh *xds_hooks.Hooks
}

func (rc *runtimeContext) Metrics() metrics.Metrics {
Expand Down Expand Up @@ -169,6 +172,11 @@ func (rc *runtimeContext) LeaderInfo() component.LeaderInfo {
func (rc *runtimeContext) LookupIP() lookup.LookupIPFunc {
return rc.lif
}

func (rc *runtimeContext) APIInstaller() api_server.APIInstaller {
return rc.apim
}

func (rc *runtimeContext) XDSHooks() *xds_hooks.Hooks {
return rc.xdsh
}
2 changes: 2 additions & 0 deletions pkg/test/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"

"github.com/kumahq/kuma/pkg/api-server/customization"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"

kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
config_manager "github.com/kumahq/kuma/pkg/core/config/manager"
Expand Down Expand Up @@ -71,6 +72,7 @@ func BuilderFor(cfg kuma_cp.Config) (*core_runtime.Builder, error) {
builder.WithLookupIP(net.LookupIP)
builder.WithEventReaderFactory(events.NewEventBus())
builder.WithAPIManager(customization.NewAPIList())
builder.WithXDSHooks(&xds_hooks.Hooks{})

_ = initializeConfigManager(cfg, builder)
_ = initializeDNSResolver(cfg, builder)
Expand Down
13 changes: 13 additions & 0 deletions pkg/xds/hooks/hooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package hooks

type Hooks struct {
resourceSetHooks []ResourceSetHook
}

func (h *Hooks) AddResourceSetHook(hook ResourceSetHook) {
h.resourceSetHooks = append(h.resourceSetHooks, hook)
}

func (h *Hooks) ResourceSetHooks() []ResourceSetHook {
return h.resourceSetHooks
}
14 changes: 14 additions & 0 deletions pkg/xds/hooks/resource_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package hooks

import (
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
)

// ResourceSetHook is a way to modify XDS resources generated by Kuma
// Since resourceSet is an argument, you can add new, remove or modify the existing resources
// To support V2 and V3, search the resource set for typeUrls for proper version
// If you want to for example modify only inbound listeners, search for the resource origin of OriginInbound
type ResourceSetHook interface {
Modify(resourceSet *core_xds.ResourceSet, ctx xds_context.Context, proxy *core_xds.Proxy) error
}
7 changes: 4 additions & 3 deletions pkg/xds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
)

var (
meshResources = meshResourceTypes(map[core_model.ResourceType]bool{
// HashMeshExcludedResources defines Mesh-scoped resources that are not used in XDS therefore when counting hash mesh we can skip them
HashMeshExcludedResources = map[core_model.ResourceType]bool{
core_mesh.DataplaneInsightType: true,
core_mesh.DataplaneOverviewType: true,
core_mesh.ServiceInsightType: true,
core_system.ConfigType: true,
})
}
)

func meshResourceTypes(exclude map[core_model.ResourceType]bool) []core_model.ResourceType {
Expand Down Expand Up @@ -53,7 +54,7 @@ func RegisterXDS(rt core_runtime.Runtime, server *grpc.Server) error {
if err != nil {
return err
}
meshSnapshotCache, err := mesh.NewCache(rt.ReadOnlyResourceManager(), rt.Config().Store.Cache.ExpirationTime, meshResources, rt.LookupIP(), rt.Metrics())
meshSnapshotCache, err := mesh.NewCache(rt.ReadOnlyResourceManager(), rt.Config().Store.Cache.ExpirationTime, meshResourceTypes(HashMeshExcludedResources), rt.LookupIP(), rt.Metrics())
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/xds/server/v2/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func RegisterXDS(
func DefaultReconciler(rt core_runtime.Runtime, xdsContext v2.XdsContext) xds_sync.SnapshotReconciler {
return &reconciler{
&templateSnapshotGenerator{
ResourceSetHooks: rt.XDSHooks().ResourceSetHooks(),
ProxyTemplateResolver: &xds_template.SimpleProxyTemplateResolver{
ReadOnlyResourceManager: rt.ReadOnlyResourceManager(),
DefaultProxyTemplate: xds_template.DefaultProxyTemplate,
Expand All @@ -85,6 +86,7 @@ func DefaultReconciler(rt core_runtime.Runtime, xdsContext v2.XdsContext) xds_sy
func DefaultIngressReconciler(rt core_runtime.Runtime, xdsContext v2.XdsContext) xds_sync.SnapshotReconciler {
return &reconciler{
generator: &templateSnapshotGenerator{
ResourceSetHooks: rt.XDSHooks().ResourceSetHooks(),
ProxyTemplateResolver: &xds_template.StaticProxyTemplateResolver{
Template: xds_template.IngressProxyTemplate,
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/xds/server/v2/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package v2

import (
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/core"
model "github.com/kumahq/kuma/pkg/core/xds"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/generator"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"
xds_sync "github.com/kumahq/kuma/pkg/xds/sync"
xds_template "github.com/kumahq/kuma/pkg/xds/template"

Expand Down Expand Up @@ -93,6 +95,7 @@ type snapshotGenerator interface {

type templateSnapshotGenerator struct {
ProxyTemplateResolver xds_template.ProxyTemplateResolver
ResourceSetHooks []xds_hooks.ResourceSetHook
}

func (s *templateSnapshotGenerator) GenerateSnapshot(ctx xds_context.Context, proxy *model.Proxy) (envoy_cache.Snapshot, error) {
Expand All @@ -105,6 +108,11 @@ func (s *templateSnapshotGenerator) GenerateSnapshot(ctx xds_context.Context, pr
reconcileLog.Error(err, "failed to generate a snapshot", "proxy", proxy, "template", template)
return envoy_cache.Snapshot{}, err
}
for _, hook := range s.ResourceSetHooks {
if err := hook.Modify(rs, ctx, proxy); err != nil {
return envoy_cache.Snapshot{}, errors.Wrapf(err, "could not apply hook %T", hook)
}
}

version := "" // empty value is a sign to other components to generate the version automatically
out := envoy_cache.Snapshot{
Expand Down
2 changes: 2 additions & 0 deletions pkg/xds/server/v3/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func RegisterXDS(
func DefaultReconciler(rt core_runtime.Runtime, xdsContext v3.XdsContext) xds_sync.SnapshotReconciler {
return &reconciler{
&templateSnapshotGenerator{
ResourceSetHooks: rt.XDSHooks().ResourceSetHooks(),
ProxyTemplateResolver: &xds_template.SimpleProxyTemplateResolver{
ReadOnlyResourceManager: rt.ReadOnlyResourceManager(),
DefaultProxyTemplate: xds_template.DefaultProxyTemplate,
Expand All @@ -85,6 +86,7 @@ func DefaultReconciler(rt core_runtime.Runtime, xdsContext v3.XdsContext) xds_sy
func DefaultIngressReconciler(rt core_runtime.Runtime, xdsContext v3.XdsContext) xds_sync.SnapshotReconciler {
return &reconciler{
generator: &templateSnapshotGenerator{
ResourceSetHooks: rt.XDSHooks().ResourceSetHooks(),
ProxyTemplateResolver: &xds_template.StaticProxyTemplateResolver{
Template: xds_template.IngressProxyTemplate,
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/xds/server/v3/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package v3

import (
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/core"
model "github.com/kumahq/kuma/pkg/core/xds"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
"github.com/kumahq/kuma/pkg/xds/generator"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"
xds_sync "github.com/kumahq/kuma/pkg/xds/sync"
xds_template "github.com/kumahq/kuma/pkg/xds/template"

Expand Down Expand Up @@ -93,6 +95,7 @@ type snapshotGenerator interface {

type templateSnapshotGenerator struct {
ProxyTemplateResolver xds_template.ProxyTemplateResolver
ResourceSetHooks []xds_hooks.ResourceSetHook
}

func (s *templateSnapshotGenerator) GenerateSnapshot(ctx xds_context.Context, proxy *model.Proxy) (envoy_cache.Snapshot, error) {
Expand All @@ -105,6 +108,11 @@ func (s *templateSnapshotGenerator) GenerateSnapshot(ctx xds_context.Context, pr
reconcileLog.Error(err, "failed to generate a snapshot", "proxy", proxy, "template", template)
return envoy_cache.Snapshot{}, err
}
for _, hook := range s.ResourceSetHooks {
if err := hook.Modify(rs, ctx, proxy); err != nil {
return envoy_cache.Snapshot{}, errors.Wrapf(err, "could not apply hook %T", hook)
}
}

version := "" // empty value is a sign to other components to generate the version automatically
out := envoy_cache.Snapshot{
Expand Down

0 comments on commit 9781c38

Please sign in to comment.