Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
(k8s/informers): use InformerCollection for other clients (#4804)
Browse files Browse the repository at this point in the history
Use the new InformerCollection instead of fragmenting informers 
across different clients.

Signed-off-by: Keith Mattix II <[email protected]>
  • Loading branch information
keithmattix authored Jun 13, 2022
1 parent 7046cf2 commit 241e8ae
Show file tree
Hide file tree
Showing 24 changed files with 347 additions and 1,053 deletions.
13 changes: 10 additions & 3 deletions cmd/osm-bootstrap/osm-bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/openservicemesh/osm/pkg/crdconversion"
"github.com/openservicemesh/osm/pkg/httpserver"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/metricsstore"
Expand Down Expand Up @@ -196,12 +197,18 @@ func main() {

msgBroker := messaging.NewBroker(stop)

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
informerCollection, err := informers.NewInformerCollection(meshName, stop,
informers.WithKubeClient(kubeClient),
informers.WithConfigClient(configClient),
)

if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection")
}

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)

certOpts, err := getCertOptions()
if err != nil {
log.Fatal().Err(err).Msg("Error getting certificate options")
Expand Down
16 changes: 4 additions & 12 deletions cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,12 @@ func main() {
}

// This component will be watching resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)

k8sClient := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker)

meshSpec, err := smi.NewMeshSpecClient(kubeConfig, osmNamespace, k8sClient, stop, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating MeshSpec")
}
meshSpec := smi.NewSMIClient(informerCollection, osmNamespace, k8sClient, msgBroker)

certOpts, err := getCertOptions()
if err != nil {
log.Fatal().Err(err).Msg("Error getting certificate options")
Expand Down Expand Up @@ -249,10 +244,7 @@ func main() {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Ingress client")
}

policyController, err := policy.NewPolicyController(k8sClient, policyClient, stop, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for policy.openservicemesh.io")
}
policyController := policy.NewPolicyController(informerCollection, k8sClient, msgBroker)

meshCatalog := catalog.NewMeshCatalog(
k8sClient,
Expand Down
5 changes: 1 addition & 4 deletions cmd/osm-injector/osm-injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,7 @@ func main() {
}

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)

// Initialize kubernetes.Controller to watch kubernetes resources
kubeController := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker, k8s.Namespaces)
Expand Down
6 changes: 5 additions & 1 deletion pkg/catalog/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/client-go/kubernetes"

configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/catalog"
tresorFake "github.com/openservicemesh/osm/pkg/certificate/providers/tresor/fake"
Expand Down Expand Up @@ -46,10 +47,13 @@ func NewFakeMeshCatalog(kubeClient kubernetes.Interface, meshConfigClient config

osmNamespace := "-test-osm-namespace-"
osmMeshConfigName := "-test-osm-mesh-config-"
cfg, err := configurator.NewConfigurator(meshConfigClient, stop, osmNamespace, osmMeshConfigName, nil)
ic, err := informers.NewInformerCollection("osm", stop, informers.WithKubeClient(kubeClient), informers.WithConfigClient(meshConfigClient))
if err != nil {
return nil
}

cfg := configurator.NewConfigurator(ic, osmNamespace, osmMeshConfigName, nil)

certManager := tresorFake.NewFake(nil)

// #1683 tracks potential improvements to the following dynamic mocks
Expand Down
105 changes: 13 additions & 92 deletions pkg/configurator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ import (
"fmt"
"reflect"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"

configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
configInformers "github.com/openservicemesh/osm/pkg/gen/client/config/informers/externalversions"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/errcode"
Expand All @@ -21,44 +17,9 @@ import (
)

// NewConfigurator implements configurator.Configurator and creates the Kubernetes client to manage namespaces.
func NewConfigurator(configClient configClientset.Interface, stop <-chan struct{}, osmNamespace, meshConfigName string,
msgBroker *messaging.Broker) (Configurator, error) {
return newConfigurator(configClient, stop, osmNamespace, meshConfigName, msgBroker)
}

func newConfigurator(configClient configClientset.Interface, stop <-chan struct{}, osmNamespace string, meshConfigName string,
msgBroker *messaging.Broker) (*client, error) {
listOption := configInformers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, meshConfigName).String()
})

meshConfigInformerFactory := configInformers.NewSharedInformerFactoryWithOptions(
configClient,
k8s.DefaultKubeEventResyncInterval,
configInformers.WithNamespace(osmNamespace),
listOption,
)

// informerFactory without listOptions
configInformerFactory := configInformers.NewSharedInformerFactoryWithOptions(
configClient,
k8s.DefaultKubeEventResyncInterval,
configInformers.WithNamespace(osmNamespace),
)

informerCollection := informerCollection{
meshConfig: meshConfigInformerFactory.Config().V1alpha2().MeshConfigs().Informer(),
meshRootCertificate: configInformerFactory.Config().V1alpha2().MeshRootCertificates().Informer(),
}

cacheCollection := cacheCollection{
meshConfig: informerCollection.meshConfig.GetStore(),
meshRootCertificate: informerCollection.meshRootCertificate.GetStore(),
}

c := &client{
informers: &informerCollection,
caches: &cacheCollection,
func NewConfigurator(informerCollection *informers.InformerCollection, osmNamespace, meshConfigName string, msgBroker *messaging.Broker) *Client {
c := &Client{
informers: informerCollection,
osmNamespace: osmNamespace,
meshConfigName: meshConfigName,
}
Expand All @@ -69,70 +30,30 @@ func newConfigurator(configClient configClientset.Interface, stop <-chan struct{
Update: announcements.MeshConfigUpdated,
Delete: announcements.MeshConfigDeleted,
}
informerCollection.meshConfig.AddEventHandler(k8s.GetEventHandlerFuncs(nil, meshConfigEventTypes, msgBroker))
informerCollection.meshConfig.AddEventHandler(c.metricsHandler())

informerCollection.AddEventHandler(informers.InformerKeyMeshConfig, k8s.GetEventHandlerFuncs(nil, meshConfigEventTypes, msgBroker))
informerCollection.AddEventHandler(informers.InformerKeyMeshConfig, c.metricsHandler())

meshRootCertificateEventTypes := k8s.EventTypes{
Add: announcements.MeshRootCertificateAdded,
Update: announcements.MeshRootCertificateUpdated,
Delete: announcements.MeshRootCertificateDeleted,
}
informerCollection.meshRootCertificate.AddEventHandler(k8s.GetEventHandlerFuncs(nil, meshRootCertificateEventTypes, msgBroker))

err := c.run(stop)
if err != nil {
return c, errors.Errorf("Could not start %s informer clients: %s", configv1alpha2.SchemeGroupVersion, err)
}

return c, nil
}

func (c *client) run(stop <-chan struct{}) error {
log.Info().Msgf("Starting informer clients for API group %s", configv1alpha2.SchemeGroupVersion)

if c.informers == nil {
return errors.New("config.openservicemesh.io informers not initialized")
}

sharedInformers := map[string]cache.SharedInformer{
"MeshConfig": c.informers.meshConfig,
"MeshRootCertificate": c.informers.meshRootCertificate,
}

var informerNames []string
var hasSynced []cache.InformerSynced
for name, informer := range sharedInformers {
if informer == nil {
log.Error().Msgf("Informer for '%s' not initialized, ignoring it", name) // TODO: log with errcode
continue
}
informerNames = append(informerNames, name)
log.Info().Msgf("Starting informer: %s", name)
go informer.Run(stop)
hasSynced = append(hasSynced, informer.HasSynced)
}

log.Info().Msgf("Waiting for informers %v caches to sync", informerNames)
if !cache.WaitForCacheSync(stop, hasSynced...) {
// TODO(#3962): metric might not be scraped before process restart resulting from this error
log.Error().Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrConfigInformerInitCache)).Msg("Failed initial cache sync for config.openservicemesh.io informers")
return errors.New("Failed initial cache sync for config.openservicemesh.io informers")
}
informerCollection.AddEventHandler(informers.InformerKeyMeshRootCertificate, k8s.GetEventHandlerFuncs(nil, meshRootCertificateEventTypes, msgBroker))

log.Info().Msgf("Cache sync finished for %v informers in API group %s", informerNames, configv1alpha2.SchemeGroupVersion)
return nil
return c
}

func (c *client) getMeshConfigCacheKey() string {
func (c *Client) getMeshConfigCacheKey() string {
return fmt.Sprintf("%s/%s", c.osmNamespace, c.meshConfigName)
}

// Returns the current MeshConfig
func (c *client) getMeshConfig() configv1alpha2.MeshConfig {
func (c *Client) getMeshConfig() configv1alpha2.MeshConfig {
var meshConfig configv1alpha2.MeshConfig

meshConfigCacheKey := c.getMeshConfigCacheKey()
item, exists, err := c.caches.meshConfig.GetByKey(meshConfigCacheKey)
item, exists, err := c.informers.GetByKey(informers.InformerKeyMeshConfig, meshConfigCacheKey)
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMeshConfigFetchFromCache)).Msgf("Error getting MeshConfig from cache with key %s", meshConfigCacheKey)
return meshConfig
Expand All @@ -147,7 +68,7 @@ func (c *client) getMeshConfig() configv1alpha2.MeshConfig {
return meshConfig
}

func (c *client) metricsHandler() cache.ResourceEventHandlerFuncs {
func (c *Client) metricsHandler() cache.ResourceEventHandlerFuncs {
handleMetrics := func(obj interface{}) {
config := obj.(*configv1alpha2.MeshConfig)

Expand Down
21 changes: 8 additions & 13 deletions pkg/configurator/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

fakeConfig "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/metricsstore"

configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
Expand All @@ -23,9 +23,12 @@ func TestGetMeshConfig(t *testing.T) {

meshConfigClient := fakeConfig.NewSimpleClientset()
stop := make(chan struct{})
c, err := newConfigurator(meshConfigClient, stop, osmNamespace, osmMeshConfigName, nil)

ic, err := informers.NewInformerCollection("osm", stop, informers.WithConfigClient(meshConfigClient))
a.Nil(err)

c := NewConfigurator(ic, osmNamespace, osmMeshConfigName, nil)

// Returns empty MeshConfig if informer cache is empty
a.Equal(configv1alpha2.MeshConfig{}, c.getMeshConfig())

Expand All @@ -39,25 +42,17 @@ func TestGetMeshConfig(t *testing.T) {
Name: osmMeshConfigName,
},
}
err = c.caches.meshConfig.Add(newObj)
err = c.informers.Add(informers.InformerKeyMeshConfig, newObj, t)
a.Nil(err)
a.Equal(*newObj, c.getMeshConfig())
}

type store struct {
cache.Store
}

func (s *store) GetByKey(_ string) (interface{}, bool, error) {
return nil, false, nil
}

func TestMetricsHandler(t *testing.T) {
a := assert.New(t)

c := &client{
caches: &cacheCollection{meshConfig: &store{}},
c := &Client{
meshConfigName: osmMeshConfigName,
informers: &informers.InformerCollection{},
}
handlers := c.metricsHandler()
metricsstore.DefaultMetricsStore.Start(metricsstore.DefaultMetricsStore.FeatureFlagEnabled)
Expand Down
Loading

0 comments on commit 241e8ae

Please sign in to comment.