Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Use informer implementation for MRCClientImpl
Browse files Browse the repository at this point in the history
Signed-off-by: Keith Mattix II <[email protected]>
  • Loading branch information
keithmattix committed Jun 13, 2022
1 parent 7831a73 commit cf607ad
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 59 deletions.
2 changes: 1 addition & 1 deletion cmd/osm-bootstrap/osm-bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func main() {
log.Fatal().Err(err).Msg("Error getting certificate options")
}

certManager, err := providers.NewCertificateManager(ctx, kubeClient, kubeConfig, cfg, osmNamespace, certOpts, msgBroker)
certManager, err := providers.NewCertificateManager(ctx, kubeClient, kubeConfig, cfg, osmNamespace, certOpts, msgBroker, informerCollection)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCertificateManager,
"Error initializing certificate manager of kind %s", certProviderKind)
Expand Down
2 changes: 1 addition & 1 deletion cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func main() {

// Intitialize certificate manager/provider
certManager, err := providers.NewCertificateManager(ctx, kubeClient, kubeConfig, cfg, osmNamespace,
certOpts, msgBroker)
certOpts, msgBroker, informerCollection)

if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCertificateManager,
Expand Down
2 changes: 1 addition & 1 deletion cmd/osm-injector/osm-injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func main() {
}
// Intitialize certificate manager/provider
certManager, err := providers.NewCertificateManager(ctx, kubeClient, kubeConfig, cfg, osmNamespace,
certOpts, msgBroker)
certOpts, msgBroker, informerCollection)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCertificateManager,
"Error initializing certificate manager of kind %s", certProviderKind)
Expand Down
3 changes: 3 additions & 0 deletions pkg/certificate/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ var ErrInvalidCertSecret = errors.New("invalid secret for certificate")

// ErrSecretNotFound should be returned if the secret isn't present in the underlying infra, on a Get
var ErrSecretNotFound = errors.New("secret not found")

// ErrNoValidMeshRootCertificates should be returned if no valid MeshRootCertificates are detected in the cluster
var ErrNoValidMeshRootCertificates = errors.New("no valid mesh root certificates could be found")
61 changes: 44 additions & 17 deletions pkg/certificate/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/rs/zerolog/log"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/messaging"
Expand All @@ -20,29 +22,22 @@ func NewManager(ctx context.Context, mrcClient MRCClient, serviceCertValidityDur
return nil, err
}

mrcs, err := mrcClient.List()
if err != nil {
return nil, err
m := &Manager{
serviceCertValidityDuration: serviceCertValidityDuration,
msgBroker: msgBroker,
}

client, ca, clientID, err := mrcClient.GetCertIssuerForMRC(mrcs[0])
// We need an MRC in order to get the correct issuers, so handle initial MRC List now
mrcs, err := mrcClient.List()
if err != nil {
return nil, err
}

c := &issuer{Issuer: client, ID: clientID, CertificateAuthority: ca}
m.init(mrcs, mrcClient)

m := &Manager{
// The signingIssuer is responsible for signing all newly issued certificates
// The validatingIssuer is the issuer that issued existing certificates.
// its underlying cert is still in the validating trust store
signingIssuer: c,
validatingIssuer: c,
serviceCertValidityDuration: serviceCertValidityDuration,
msgBroker: msgBroker,
}

// start a watch
// now that the manager is initialized, start a watch
// we wait until the manager is initialized so that there's
// no lock contention between the event handler and the initial
// state of the manager.
go func() {
for {
select {
Expand All @@ -62,6 +57,38 @@ func NewManager(ctx context.Context, mrcClient MRCClient, serviceCertValidityDur
return m, nil
}

// init initializes the Manager receiver based on the observed state of the cluster at this
// instant in time. This is why we pass in a slice of MRCs; we know our Watch call will make us initially
// consistent, we just need a valid state of the world at this instant to kick things off.
// This method is meant to be called at initialization time, so we don't grab the lock
func (m *Manager) init(mrcs []*v1alpha2.MeshRootCertificate, mrcClient MRCClient) error {
for _, mrc := range mrcs {
client, ca, clientID, err := mrcClient.GetCertIssuerForMRC(mrc)
if err != nil {
return err
}

c := &issuer{Issuer: client, ID: clientID, CertificateAuthority: ca}
switch {
case mrc.Status.State == "errored":
continue
case mrc.Status.State == constants.MRCStateComplete && mrc.Status.RotationStage == constants.MRCStageIssuing:
m.signingIssuer = c
m.validatingIssuer = c
case mrc.Status.RotationStage == constants.MRCStageIssuing:
m.signingIssuer = c
case mrc.Status.RotationStage == constants.MRCStageValidating:
m.validatingIssuer = c
}
}

if m.signingIssuer == nil || m.validatingIssuer == nil {
return ErrNoValidMeshRootCertificates
}

return nil
}

func (m *Manager) handleMRCEvent(event MRCEvent) {
switch event.Type {
case MRCEventAdded:
Expand Down
75 changes: 47 additions & 28 deletions pkg/certificate/providers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/openservicemesh/osm/pkg/certificate/providers/vault"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/messaging"
)

Expand All @@ -43,42 +44,60 @@ var getCA func(certificate.Issuer) (pem.RootCertificate, error) = func(i certifi
// NewCertificateManager returns a new certificate manager, with an MRC compat client.
// TODO(4713): Use an informer behind a feature flag.
func NewCertificateManager(ctx context.Context, kubeClient kubernetes.Interface, kubeConfig *rest.Config, cfg configurator.Configurator,
providerNamespace string, options Options, msgBroker *messaging.Broker) (*certificate.Manager, error) {
providerNamespace string, options Options, msgBroker *messaging.Broker, informers *informers.InformerCollection) (*certificate.Manager, error) {
if err := options.Validate(); err != nil {
return nil, err
}

// TODO(4713): Switch the compat client to an informer. Might need another struct to compose the informer and
// provider generator.
mrcClient := &MRCCompatClient{
MRCProviderGenerator: MRCProviderGenerator{
kubeClient: kubeClient,
kubeConfig: kubeConfig,
KeyBitSize: cfg.GetCertKeyBitSize(),
caExtractorFunc: getCA,
},
mrc: &v1alpha2.MeshRootCertificate{
ObjectMeta: metav1.ObjectMeta{
Name: "legacy-compat",
Namespace: providerNamespace,
Annotations: map[string]string{
constants.MRCVersionAnnotation: "legacy-compat",
},
var mrcClient certificate.MRCClient
// TODO(4713): Switch out with feature flag or some other mechanism
if true {
c := &MRCCompatClient{
MRCProviderGenerator: MRCProviderGenerator{
kubeClient: kubeClient,
kubeConfig: kubeConfig,
KeyBitSize: cfg.GetCertKeyBitSize(),
caExtractorFunc: getCA,
},
Spec: v1alpha2.MeshRootCertificateSpec{
Provider: options.AsProviderSpec(),
mrc: &v1alpha2.MeshRootCertificate{
ObjectMeta: metav1.ObjectMeta{
Name: "legacy-compat",
Namespace: providerNamespace,
Annotations: map[string]string{
constants.MRCVersionAnnotation: "legacy-compat",
},
},
Spec: v1alpha2.MeshRootCertificateSpec{
Provider: options.AsProviderSpec(),
},
// TODO(#4713): Detect if an actual MRC exists, and set the status accordingly.
Status: v1alpha2.MeshRootCertificateStatus{
State: constants.MRCStateComplete,
RotationStage: constants.MRCStageIssuing,
},
},
// TODO(#4713): Detect if an actual MRC exists, and set the status accordingly.
Status: v1alpha2.MeshRootCertificateStatus{
State: constants.MRCStateComplete,
RotationStage: constants.MRCStageIssuing,
}
// TODO(#4745): Remove after deprecating the osm.vault.token option.
if vaultOption, ok := options.(VaultOptions); ok {
c.MRCProviderGenerator.DefaultVaultToken = vaultOption.VaultToken
}
mrcClient = c
} else {
c := &MRCClientImpl{
MRCProviderGenerator: MRCProviderGenerator{
kubeClient: kubeClient,
kubeConfig: kubeConfig,
KeyBitSize: cfg.GetCertKeyBitSize(),
caExtractorFunc: getCA,
},
},
}
informerCollection: informers,
}

if vaultOption, ok := options.(VaultOptions); ok {
c.MRCProviderGenerator.DefaultVaultToken = vaultOption.VaultToken
}

// TODO(#4745): Remove after deprecating the osm.vault.token option.
if vaultOption, ok := options.(VaultOptions); ok {
mrcClient.MRCProviderGenerator.DefaultVaultToken = vaultOption.VaultToken
mrcClient = c
}

return certificate.NewManager(ctx, mrcClient, cfg.GetServiceCertValidityPeriod(), msgBroker)
Expand Down
2 changes: 1 addition & 1 deletion pkg/certificate/providers/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestGetCertificateManager(t *testing.T) {
getCA = oldCA
}()

manager, err := NewCertificateManager(context.Background(), tc.kubeClient, tc.restConfig, tc.cfg, tc.providerNamespace, tc.options, tc.msgBroker)
manager, err := NewCertificateManager(context.Background(), tc.kubeClient, tc.restConfig, tc.cfg, tc.providerNamespace, tc.options, tc.msgBroker, nil)
if tc.expectError {
assert.Empty(manager)
assert.Error(err)
Expand Down
64 changes: 59 additions & 5 deletions pkg/certificate/providers/mrcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,73 @@ import (

"github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/pkg/errors"
"k8s.io/client-go/tools/cache"
)

var (
errNilInformerCollection = errors.New("informer collection is nil")
)

// TODO: Rename (keithmattix)
type MRCClientImpl struct {
configurator configurator.Configurator
certificate.MRCEventBroker
informerCollection *informers.InformerCollection
MRCProviderGenerator
}

func (m *MRCClientImpl) List() ([]v1alpha2.MeshRootCertificate, error) {
return m.configurator.GetMeshRootCertificates()
func (m *MRCClientImpl) List() ([]*v1alpha2.MeshRootCertificate, error) {
if m.informerCollection == nil {
return nil, errNilInformerCollection
}
// informers return slice of pointers so we'll convert them to value types before returning
mrcPtrs := m.informerCollection.List(informers.InformerKeyMeshRootCertificate)
var mrcs []*v1alpha2.MeshRootCertificate
for _, mrcPtr := range mrcPtrs {
if mrcPtr == nil {
continue
}
mrc, ok := mrcPtr.(*v1alpha2.MeshRootCertificate)
if !ok {
continue
}
mrcs = append(mrcs, mrc)
}

return mrcs, nil
}

func (m *MRCClientImpl) Watch(ctx context.Context) (<-chan certificate.MRCEvent, error) {
if m.informerCollection == nil {
return nil, errNilInformerCollection
}

eventChan := make(chan certificate.MRCEvent)
m.informerCollection.AddEventHandler(informers.InformerKeyMeshRootCertificate, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mrc := obj.(*v1alpha2.MeshRootCertificate)
eventChan <- certificate.MRCEvent{
Type: certificate.MRCEventAdded,
MRC: mrc,
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMRC := oldObj.(*v1alpha2.MeshRootCertificate)
mrc := newObj.(*v1alpha2.MeshRootCertificate)
eventChan <- certificate.MRCEvent{
Type: certificate.MRCEventUpdated,
MRC: mrc,
OldMRC: oldMRC,
}
},
DeleteFunc: func(obj interface{}) {
mrc := obj.(*v1alpha2.MeshRootCertificate)
eventChan <- certificate.MRCEvent{
Type: certificate.MRCEventUpdated,
MRC: mrc,
}
},
})

return eventChan, nil
}
6 changes: 5 additions & 1 deletion pkg/certificate/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ type MRCClient interface {
type MRCEventType string
type MRCEvent struct {
Type MRCEventType
MRC *v1alpha2.MeshRootCertificate
// The last observed version of the MRC as of the time of this event
MRC *v1alpha2.MeshRootCertificate
// The previous version of the MRC. Only populated during updates
// +optional
OldMRC *v1alpha2.MeshRootCertificate
}

var (
Expand Down
7 changes: 4 additions & 3 deletions pkg/configurator/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package configurator

import (
"errors"
"fmt"
"reflect"

Expand Down Expand Up @@ -68,10 +69,10 @@ func (c *Client) getMeshConfig() configv1alpha2.MeshConfig {
return meshConfig
}

func (c *client) getMeshRootCertificates() ([]configv1alpha2.MeshRootCertificate, error) {
func (c *Client) getMeshRootCertificates() ([]configv1alpha2.MeshRootCertificate, error) {
var mrcs []configv1alpha2.MeshRootCertificate

for _, item := range c.caches.meshRootCertificate.List() {
for _, item := range c.informers.List(informers.InformerKeyMeshRootCertificate) {
mrc, ok := item.(*configv1alpha2.MeshRootCertificate)
if !ok {
return nil, errors.New("error casting cache entry to *MeshRootCertificate")
Expand All @@ -82,7 +83,7 @@ func (c *client) getMeshRootCertificates() ([]configv1alpha2.MeshRootCertificate
return mrcs, nil
}

func (c *client) metricsHandler() cache.ResourceEventHandlerFuncs {
func (c *Client) metricsHandler() cache.ResourceEventHandlerFuncs {
handleMetrics := func(obj interface{}) {
config := obj.(*configv1alpha2.MeshConfig)

Expand Down
2 changes: 1 addition & 1 deletion pkg/configurator/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Client) GetMeshConfig() configv1alpha2.MeshConfig {
return c.getMeshConfig()
}

func (c *client) GetMeshRootCertificates() ([]configv1alpha2.MeshRootCertificate, error) {
func (c *Client) GetMeshRootCertificates() ([]configv1alpha2.MeshRootCertificate, error) {
return c.getMeshRootCertificates()
}

Expand Down

0 comments on commit cf607ad

Please sign in to comment.