Skip to content

Commit

Permalink
1. fix service topology can not work when hub agent work on cloud mode
Browse files Browse the repository at this point in the history
--> solution: service topology filter will list/watch node from kube-apsierver if working mode is cloud
fixes openyurtio#600

2. optimize shared informers registeration. extract all of informers registeration and
make a comman function named registerInformers
  • Loading branch information
rambohe-ch committed Nov 18, 2021
1 parent ccabd6f commit 4302579
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 127 deletions.
103 changes: 85 additions & 18 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtcorev1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1"
yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"
yurtv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions/apps/v1alpha1"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
)
Expand Down Expand Up @@ -75,12 +81,11 @@ type YurtHubConfiguration struct {
SerializerManager *serializer.SerializerManager
RESTMapperManager *meta.RESTMapperManager
TLSConfig *tls.Config
MutatedMasterServiceAddr string
Filters *filter.Filters
SharedFactory informers.SharedInformerFactory
YurtSharedFactory yurtinformers.SharedInformerFactory
WorkingMode util.WorkingMode
KubeletHealthGracePeriod time.Duration
FilterChain filter.Interface
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand All @@ -104,12 +109,11 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
proxySecureServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxySecurePort)
proxyServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxyPort)
proxySecureServerDummyAddr := net.JoinHostPort(options.HubAgentDummyIfIP, options.YurtHubProxySecurePort)
sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr), options.NodePoolName)
if err != nil {
return nil, err
}
workingMode := util.WorkingMode(options.WorkingMode)

var filterChain filter.Interface
var filters *filter.Filters
var serviceTopologyFilterEnabled bool
var mutatedMasterServiceAddr string
if options.EnableResourceFilter {
if options.WorkingMode == string(util.WorkingModeCloud) {
Expand All @@ -118,6 +122,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
filters = filter.NewFilters(options.DisabledResourceFilters)
registerAllFilters(filters)

serviceTopologyFilterEnabled = filters.Enabled(filter.ServiceTopologyFilterName)
mutatedMasterServiceAddr = us[0].Host
if options.AccessServerThroughHub {
if options.EnableDummyIf {
Expand All @@ -128,6 +133,16 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}
}

sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s", proxyServerAddr))
if err != nil {
return nil, err
}
registerInformers(sharedFactory, yurtSharedFactory, workingMode, serviceTopologyFilterEnabled, options.NodePoolName, options.NodeName)
filterChain, err = createFilterChain(filters, sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, workingMode, options.NodeName, mutatedMasterServiceAddr)
if err != nil {
return nil, err
}

cfg := &YurtHubConfiguration{
LBMode: options.LBMode,
RemoteServers: us,
Expand All @@ -151,15 +166,14 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
EnableDummyIf: options.EnableDummyIf,
EnableIptables: options.EnableIptables,
HubAgentDummyIfName: options.HubAgentDummyIfName,
WorkingMode: util.WorkingMode(options.WorkingMode),
WorkingMode: workingMode,
StorageWrapper: storageWrapper,
SerializerManager: serializerManager,
RESTMapperManager: restMapperManager,
MutatedMasterServiceAddr: mutatedMasterServiceAddr,
Filters: filters,
SharedFactory: sharedFactory,
YurtSharedFactory: yurtSharedFactory,
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
FilterChain: filterChain,
}

return cfg, nil
Expand Down Expand Up @@ -196,7 +210,7 @@ func parseRemoteServers(serverAddr string) ([]*url.URL, error) {
}

// createSharedInformers create sharedInformers from the given proxyAddr.
func createSharedInformers(proxyAddr, nodePoolName string) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) {
func createSharedInformers(proxyAddr string) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) {
var kubeConfig *rest.Config
var err error
kubeConfig, err = clientcmd.BuildConfigFromFlags(proxyAddr, "")
Expand All @@ -214,20 +228,73 @@ func createSharedInformers(proxyAddr, nodePoolName string) (informers.SharedInfo
return nil, nil, err
}

if len(nodePoolName) == 0 {
return informers.NewSharedInformerFactory(client, 24*time.Hour), yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
return informers.NewSharedInformerFactory(client, 24*time.Hour),
yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
}

// registerInformers reconstruct node/nodePool/configmap informers
func registerInformers(informerFactory informers.SharedInformerFactory,
yurtInformerFactory yurtinformers.SharedInformerFactory,
workingMode util.WorkingMode,
serviceTopologyFilterEnabled bool,
nodePoolName, nodeName string) {
// skip construct node/nodePool informers if service topology filter disabled
if serviceTopologyFilterEnabled {
if workingMode == util.WorkingModeCloud {
newNodeInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": nodeName}.String()
}
return coreinformers.NewFilteredNodeInformer(client, resyncPeriod, nil, tweakListOptions)
}
informerFactory.InformerFor(&corev1.Node{}, newNodeInformer)
}

if len(nodePoolName) != 0 {
newNodePoolInformer := func(client yurtclientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": nodePoolName}.String()
}
return yurtv1alpha1.NewFilteredNodePoolInformer(client, resyncPeriod, nil, tweakListOptions)
}

yurtInformerFactory.InformerFor(&yurtcorev1alpha1.NodePool{}, newNodePoolInformer)
}
}

if workingMode == util.WorkingModeEdge {
newConfigmapInformer := func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
}
return coreinformers.NewFilteredConfigMapInformer(client, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
}
informerFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
}
yurtSharedInformerFactory := yurtinformers.NewSharedInformerFactoryWithOptions(yurtClient, 24*time.Hour,
yurtinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{"metadata.name": nodePoolName}.String()
}))
return informers.NewSharedInformerFactory(client, 24*time.Hour), yurtSharedInformerFactory, nil
}

// registerAllFilters by order, the front registered filter will be
// called before the later registered ones.
// called before the behind registered ones.
func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
}

// createFilterChain return union filters that initializations completed.
func createFilterChain(filters *filter.Filters,
sharedFactory informers.SharedInformerFactory,
yurtSharedFactory yurtinformers.SharedInformerFactory,
serializerManager *serializer.SerializerManager,
storageWrapper cachemanager.StorageWrapper,
workingMode util.WorkingMode,
nodeName, mutatedMasterServiceAddr string) (filter.Interface, error) {
if filters == nil {
return nil, nil
}

genericInitializer := initializer.New(sharedFactory, yurtSharedFactory, serializerManager, storageWrapper, nodeName, mutatedMasterServiceAddr, workingMode)
initializerChain := filter.FilterInitializers{}
initializerChain = append(initializerChain, genericInitializer)
return filters.NewFromFilters(initializerChain)
}
30 changes: 4 additions & 26 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/certificate"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/hubself"
"github.com/openyurtio/openyurt/pkg/yurthub/certificate/kubelet"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
Expand Down Expand Up @@ -158,15 +156,8 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
}
trace++

klog.Infof("%d. new filter chain for mutating response body", trace)
filterChain, err := createFilterChain(cfg)
if err != nil {
return fmt.Errorf("could not new filter chain, %v", err)
}
trace++

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, filterChain, stopCh)
yurtProxyHandler, err := proxy.NewYurtReverseProxyHandler(cfg, cacheMgr, transportManager, healthChecker, certManager, stopCh)
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %v", err)
}
Expand All @@ -183,11 +174,9 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
klog.Infof("%d. new %s server and begin to serve, dummy proxy server: %s, secure dummy proxy server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerDummyAddr, cfg.YurtHubProxyServerSecureDummyAddr)
}

// start shared informers here
if filterChain != nil && cfg.Filters.Enabled(filter.ServiceTopologyFilterName) {
cfg.SharedFactory.Start(stopCh)
cfg.YurtSharedFactory.Start(stopCh)
}
// start shared informers before start hub server
cfg.SharedFactory.Start(stopCh)
cfg.YurtSharedFactory.Start(stopCh)

klog.Infof("%d. new %s server and begin to serve, proxy server: %s, secure proxy server: %s, hub server: %s", trace, projectinfo.GetHubName(), cfg.YurtHubProxyServerAddr, cfg.YurtHubProxyServerSecureAddr, cfg.YurtHubServerAddr)
s, err := server.NewYurtHubServer(cfg, certManager, yurtProxyHandler)
Expand All @@ -198,14 +187,3 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
klog.Infof("hub agent exited")
return nil
}

func createFilterChain(cfg *config.YurtHubConfiguration) (filter.Interface, error) {
if cfg.Filters == nil {
return nil, nil
}

genericInitializer := initializer.New(cfg.SharedFactory, cfg.YurtSharedFactory, cfg.SerializerManager, cfg.StorageWrapper, cfg.NodeName, cfg.MutatedMasterServiceAddr)
initializerChain := filter.FilterInitializers{}
initializerChain = append(initializerChain, genericInitializer)
return cfg.Filters.NewFromFilters(initializerChain)
}
92 changes: 33 additions & 59 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,24 @@ package cachemanager

import (
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var (
const (
sepForAgent = ","
)

func (cm *cacheManager) initCacheAgents() error {
if cm.sharedFactory == nil {
return nil
}
configmapInformer := cm.sharedFactory.InformerFor(&corev1.ConfigMap{}, newConfigmapInformer)
configmapInformer := cm.sharedFactory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cm.addConfigmap,
UpdateFunc: cm.updateConfigmap,
Expand All @@ -50,8 +45,37 @@ func (cm *cacheManager) initCacheAgents() error {
return nil
}

// UpdateCacheAgents update cache agents
func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.String {
func (cm *cacheManager) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

deletedAgents := cm.updateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
}

newCfg, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}

if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] {
return
}

deletedAgents := cm.updateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update")
cm.deleteAgentCache(deletedAgents)
}

// updateCacheAgents update cache agents
func (cm *cacheManager) updateCacheAgents(cacheAgents, action string) sets.String {
newAgents := sets.NewString()
for _, agent := range strings.Split(cacheAgents, sepForAgent) {
agent = strings.TrimSpace(agent)
Expand All @@ -63,7 +87,6 @@ func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.Strin
cm.Lock()
defer cm.Unlock()
cm.cacheAgents = cm.cacheAgents.Delete(util.DefaultCacheAgents...)

if cm.cacheAgents.Equal(newAgents) {
// add default cache agents
cm.cacheAgents = cm.cacheAgents.Insert(util.DefaultCacheAgents...)
Expand All @@ -84,55 +107,6 @@ func (cm *cacheManager) UpdateCacheAgents(cacheAgents, action string) sets.Strin
return deletedAgents
}

// ListCacheAgents get all of cache agents
func (cm *cacheManager) ListCacheAgents() []string {
cm.RLock()
defer cm.RUnlock()
agents := make([]string, 0)
for k := range cm.cacheAgents {
agents = append(agents, k)
}
return agents
}

func newConfigmapInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fields.Set{"metadata.name": util.YurthubConfigMapName}.String()
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}

return coreinformers.NewFilteredConfigMapInformer(cs, util.YurtHubNamespace, resyncPeriod, nil, tweakListOptions)
}

func (cm *cacheManager) addConfigmap(obj interface{}) {
cfg, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}

deletedAgents := cm.UpdateCacheAgents(cfg.Data[util.CacheUserAgentsKey], "add")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) updateConfigmap(oldObj, newObj interface{}) {
oldCfg, ok := oldObj.(*corev1.ConfigMap)
if !ok {
return
}

newCfg, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}

if oldCfg.Data[util.CacheUserAgentsKey] == newCfg.Data[util.CacheUserAgentsKey] {
return
}

deletedAgents := cm.UpdateCacheAgents(newCfg.Data[util.CacheUserAgentsKey], "update")
cm.deleteAgentCache(deletedAgents)
}

func (cm *cacheManager) deleteAgentCache(deletedAgents sets.String) {
// delete cache data for deleted agents
if deletedAgents.Len() > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestUpdateCacheAgents(t *testing.T) {
}

// add agents
deletedAgents := m.UpdateCacheAgents(tt.cacheAgents, "")
deletedAgents := m.updateCacheAgents(tt.cacheAgents, "")

if !deletedAgents.Equal(tt.deletedAgents) {
t.Errorf("Got deleted agents: %v, expect agents: %v", deletedAgents, tt.deletedAgents)
Expand Down
Loading

0 comments on commit 4302579

Please sign in to comment.