Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(kuma-cp) kds extensions #1572

Merged
merged 1 commit into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package resources
package client

import (
"net/http"
Expand All @@ -16,7 +16,7 @@ const (
Timeout = 60 * time.Second
)

func apiServerClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (util_http.Client, error) {
func ApiServerClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (util_http.Client, error) {
baseURL, err := url.Parse(coordinates.Url)
if err != nil {
return nil, errors.Wrapf(err, "Failed to parse API Server URL")
Expand Down
3 changes: 2 additions & 1 deletion app/kumactl/pkg/resources/apiserver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/pkg/errors"

kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client"
"github.com/kumahq/kuma/pkg/api-server/types"
config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1"
error_types "github.com/kumahq/kuma/pkg/core/rest/errors/types"
Expand All @@ -18,7 +19,7 @@ type ApiServerClient interface {
}

func NewAPIServerClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (ApiServerClient, error) {
client, err := apiServerClient(coordinates)
client, err := kumactl_client.ApiServerClient(coordinates)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion app/kumactl/pkg/resources/dataplane_overview_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/pkg/errors"

kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client"
config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/rest/errors/types"
Expand All @@ -22,7 +23,7 @@ type DataplaneOverviewClient interface {
}

func NewDataplaneOverviewClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (DataplaneOverviewClient, error) {
client, err := apiServerClient(coordinates)
client, err := kumactl_client.ApiServerClient(coordinates)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion app/kumactl/pkg/resources/service_overview_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pkg/errors"

kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client"
config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/rest/errors/types"
Expand All @@ -21,7 +22,7 @@ type ServiceOverviewClient interface {
}

func NewServiceOverviewClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (ServiceOverviewClient, error) {
client, err := apiServerClient(coordinates)
client, err := kumactl_client.ApiServerClient(coordinates)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion app/kumactl/pkg/resources/store.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package resources

import (
kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client"
kuma_rest "github.com/kumahq/kuma/pkg/api-server/definitions"
config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
remote_resources "github.com/kumahq/kuma/pkg/plugins/resources/remote"
)

func NewResourceStore(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (core_store.ResourceStore, error) {
client, err := apiServerClient(coordinates)
client, err := kumactl_client.ApiServerClient(coordinates)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion app/kumactl/pkg/resources/zone_overview_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/pkg/errors"

kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client"
"github.com/kumahq/kuma/pkg/core/resources/apis/system"

config_proto "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1"
Expand All @@ -21,7 +22,7 @@ type ZoneOverviewClient interface {
}

func NewZoneOverviewClient(coordinates *config_proto.ControlPlaneCoordinates_ApiServer) (ZoneOverviewClient, error) {
client, err := apiServerClient(coordinates)
client, err := kumactl_client.ApiServerClient(coordinates)
if err != nil {
return nil, err
}
Expand Down
20 changes: 3 additions & 17 deletions app/kumactl/pkg/tokens/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,21 @@ import (
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"time"

"github.com/pkg/errors"

kumactl_client "github.com/kumahq/kuma/app/kumactl/pkg/client"
kumactl_config "github.com/kumahq/kuma/pkg/config/app/kumactl/v1alpha1"
error_types "github.com/kumahq/kuma/pkg/core/rest/errors/types"
"github.com/kumahq/kuma/pkg/tokens/builtin/server/types"
util_http "github.com/kumahq/kuma/pkg/util/http"
)

const (
timeout = 10 * time.Second
)

func NewDataplaneTokenClient(config *kumactl_config.ControlPlaneCoordinates_ApiServer) (DataplaneTokenClient, error) {
baseURL, err := url.Parse(config.Url)
client, err := kumactl_client.ApiServerClient(config)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse Dataplane Token Server URL")
}
httpClient := &http.Client{
Timeout: timeout,
}
if baseURL.Scheme == "https" {
if err := util_http.ConfigureMTLS(httpClient, config.CaCertFile, config.ClientCertFile, config.ClientKeyFile); err != nil {
return nil, errors.Wrap(err, "could not configure tls for dataplane token client")
}
return nil, err
}
client := util_http.ClientWithBaseURL(httpClient, baseURL)
return &httpDataplaneTokenClient{
client: client,
}, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"

"github.com/kumahq/kuma/pkg/envoy/admin"
kds_context "github.com/kumahq/kuma/pkg/kds/context"

"github.com/kumahq/kuma/pkg/api-server/customization"
"github.com/kumahq/kuma/pkg/core/managers/apis/zone"
Expand Down Expand Up @@ -97,6 +98,7 @@ func buildRuntime(cfg kuma_cp.Config, closeCh <-chan struct{}) (core_runtime.Run
builder.WithAPIManager(customization.NewAPIList())
builder.WithXDSHooks(&xds_hooks.Hooks{})
builder.WithDpServer(server.NewDpServer(*cfg.DpServer, builder.Metrics()))
builder.WithKDSContext(kds_context.DefaultContext(builder.ResourceManager(), cfg.Multizone.Remote.Zone))

if err := initializeAfterBootstrap(cfg, builder); err != nil {
return nil, err
Expand Down
15 changes: 15 additions & 0 deletions pkg/core/runtime/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/kumahq/kuma/pkg/envoy/admin"
kds_context "github.com/kumahq/kuma/pkg/kds/context"

api_server "github.com/kumahq/kuma/pkg/api-server/customization"
dp_server "github.com/kumahq/kuma/pkg/dp-server/server"
Expand Down Expand Up @@ -48,6 +49,7 @@ type BuilderContext interface {
APIManager() api_server.APIManager
XDSHooks() *xds_hooks.Hooks
DpServer() *dp_server.DpServer
KDSContext() *kds_context.Context
}

var _ BuilderContext = &Builder{}
Expand All @@ -74,6 +76,7 @@ type Builder struct {
apim api_server.APIManager
xdsh *xds_hooks.Hooks
dps *dp_server.DpServer
kdsctx *kds_context.Context
closeCh <-chan struct{}
*runtimeInfo
}
Expand Down Expand Up @@ -200,6 +203,11 @@ func (b *Builder) WithDpServer(dps *dp_server.DpServer) *Builder {
return b
}

func (b *Builder) WithKDSContext(kdsctx *kds_context.Context) *Builder {
b.kdsctx = kdsctx
return b
}

func (b *Builder) Build() (Runtime, error) {
if b.cm == nil {
return nil, errors.Errorf("ComponentManager has not been configured")
Expand Down Expand Up @@ -246,6 +254,9 @@ func (b *Builder) Build() (Runtime, error) {
if b.dps == nil {
return nil, errors.Errorf("DpServer has not been configured")
}
if b.kdsctx == nil {
return nil, errors.Errorf("KDSContext has not been configured")
}
return &runtime{
RuntimeInfo: b.runtimeInfo,
RuntimeContext: &runtimeContext{
Expand All @@ -267,6 +278,7 @@ func (b *Builder) Build() (Runtime, error) {
apim: b.apim,
xdsh: b.xdsh,
dps: b.dps,
kdsctx: b.kdsctx,
},
Manager: b.cm,
}, nil
Expand Down Expand Up @@ -329,6 +341,9 @@ func (b *Builder) XDSHooks() *xds_hooks.Hooks {
func (b *Builder) DpServer() *dp_server.DpServer {
return b.dps
}
func (b *Builder) KDSContext() *kds_context.Context {
return b.kdsctx
}
func (b *Builder) CloseCh() <-chan struct{} {
return b.closeCh
}
10 changes: 8 additions & 2 deletions pkg/core/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"sync"

"github.com/kumahq/kuma/pkg/envoy/admin"

api_server "github.com/kumahq/kuma/pkg/api-server/customization"
dp_server "github.com/kumahq/kuma/pkg/dp-server/server"
"github.com/kumahq/kuma/pkg/envoy/admin"
kds_context "github.com/kumahq/kuma/pkg/kds/context"
xds_hooks "github.com/kumahq/kuma/pkg/xds/hooks"

"github.com/kumahq/kuma/pkg/core/datasource"
Expand Down Expand Up @@ -61,6 +61,7 @@ type RuntimeContext interface {
APIInstaller() api_server.APIInstaller
XDSHooks() *xds_hooks.Hooks
DpServer() *dp_server.DpServer
KDSContext() *kds_context.Context
}

var _ Runtime = &runtime{}
Expand Down Expand Up @@ -118,6 +119,7 @@ type runtimeContext struct {
apim api_server.APIInstaller
xdsh *xds_hooks.Hooks
dps *dp_server.DpServer
kdsctx *kds_context.Context
}

func (rc *runtimeContext) Metrics() metrics.Metrics {
Expand Down Expand Up @@ -194,3 +196,7 @@ func (rc *runtimeContext) DpServer() *dp_server.DpServer {
func (rc *runtimeContext) XDSHooks() *xds_hooks.Hooks {
return rc.xdsh
}

func (rc *runtimeContext) KDSContext() *kds_context.Context {
return rc.kdsctx
}
71 changes: 71 additions & 0 deletions pkg/kds/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package context

import (
"context"

"github.com/kumahq/kuma/pkg/core"
config_manager "github.com/kumahq/kuma/pkg/core/config/manager"
"github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/resources/apis/system"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/kds/mux"
"github.com/kumahq/kuma/pkg/kds/reconcile"
"github.com/kumahq/kuma/pkg/kds/util"
)

var log = core.Log.WithName("kds")

type Context struct {
RemoteClientCtx context.Context
GlobalServerCallbacks []mux.Callbacks
GlobalProvidedFilter reconcile.ResourceFilter
RemoteProvidedFilter reconcile.ResourceFilter
}

func DefaultContext(manager manager.ResourceManager, zone string) *Context {
return &Context{
RemoteClientCtx: context.Background(),
GlobalProvidedFilter: GlobalProvidedFilter(manager),
RemoteProvidedFilter: RemoteProvidedFilter(zone),
}
}

// GlobalProvidedFilter returns ResourceFilter which filters Resources provided by Global, specifically
// excludes Dataplanes and Ingresses from 'clusterID' cluster
func GlobalProvidedFilter(rm manager.ResourceManager) reconcile.ResourceFilter {
return func(clusterID string, r model.Resource) bool {
if r.GetType() == system.ConfigType && r.GetMeta().GetName() != config_manager.ClusterIdConfigKey {
return false
}
if r.GetType() != mesh.DataplaneType {
return true
}
if !r.(*mesh.DataplaneResource).Spec.IsIngress() {
return false
}
if clusterID == util.ZoneTag(r) {
// don't need to sync resource to the zone where resource is originated from
return false
}
zone := system.NewZoneResource()
if err := rm.Get(context.Background(), zone, store.GetByKey(util.ZoneTag(r), model.NoMesh)); err != nil {
log.Error(err, "failed to get zone", "zone", util.ZoneTag(r))
// since there is no explicit 'enabled: false' then we don't
// make any strong decisions which might affect connectivity
return true
}
return zone.Spec.IsEnabled()
}
}

// RemoteProvidedFilter filter Resources provided by Remote, specifically Ingresses that belongs to another zones
func RemoteProvidedFilter(clusterName string) reconcile.ResourceFilter {
return func(_ string, r model.Resource) bool {
if r.GetType() == mesh.DataplaneType {
return clusterName == util.ZoneTag(r)
}
return r.GetType() == mesh.DataplaneInsightType
}
}
35 changes: 3 additions & 32 deletions pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
"github.com/golang/protobuf/ptypes/wrappers"

system_proto "github.com/kumahq/kuma/api/system/v1alpha1"
config_manager "github.com/kumahq/kuma/pkg/core/config/manager"
"github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/store"
"github.com/kumahq/kuma/pkg/kds/reconcile"
resources_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s"
k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model"

Expand Down Expand Up @@ -58,7 +56,7 @@ var (
func Setup(rt runtime.Runtime) (err error) {
kdsServer, err := kds_server.New(kdsGlobalLog, rt, ProvidedTypes,
"global", rt.Config().Multizone.Global.KDS.RefreshInterval,
ProvidedFilter(rt.ResourceManager()), true)
rt.KDSContext().GlobalProvidedFilter, true)
if err != nil {
return err
}
Expand All @@ -85,7 +83,8 @@ func Setup(rt runtime.Runtime) (err error) {
}()
return nil
})
return rt.Add(mux.NewServer(onSessionStarted, *rt.Config().Multizone.Global.KDS, rt.Metrics()))
callbacks := append(rt.KDSContext().GlobalServerCallbacks, onSessionStarted)
return rt.Add(mux.NewServer(callbacks, *rt.Config().Multizone.Global.KDS, rt.Metrics()))
}

func createZoneIfAbsent(name string, resManager manager.ResourceManager) error {
Expand All @@ -106,34 +105,6 @@ func createZoneIfAbsent(name string, resManager manager.ResourceManager) error {
return nil
}

// ProvidedFilter returns ResourceFilter which filters Resources provided by Global, specifically
// excludes Dataplanes and Ingresses from 'clusterID' cluster
func ProvidedFilter(rm manager.ResourceManager) reconcile.ResourceFilter {
return func(clusterID string, r model.Resource) bool {
if r.GetType() == system.ConfigType && r.GetMeta().GetName() != config_manager.ClusterIdConfigKey {
return false
}
if r.GetType() != mesh.DataplaneType {
return true
}
if !r.(*mesh.DataplaneResource).Spec.IsIngress() {
return false
}
if clusterID == util.ZoneTag(r) {
// don't need to sync resource to the zone where resource is originated from
return false
}
zone := system.NewZoneResource()
if err := rm.Get(context.Background(), zone, store.GetByKey(util.ZoneTag(r), model.NoMesh)); err != nil {
kdsGlobalLog.Error(err, "failed to get zone", "zone", util.ZoneTag(r))
// since there is no explicit 'enabled: false' then we don't
// make any strong decisions which might affect connectivity
return true
}
return zone.Spec.IsEnabled()
}
}

func Callbacks(s sync_store.ResourceSyncer, k8sStore bool, kubeFactory resources_k8s.KubeFactory) *client.Callbacks {
return &client.Callbacks{
OnResourcesReceived: func(clusterName string, rs model.ResourceList) error {
Expand Down
Loading