Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
cert: Use MRCs on startup
Browse files Browse the repository at this point in the history
By default, read MRCs from the cluster in order to build out the
certificate manager. From there, allow the certificate manager to watch
for changes to the MRCs in the cluster

Signed-off-by: Keith Mattix II <[email protected]>
  • Loading branch information
keithmattix committed Jun 14, 2022
1 parent 241e8ae commit 821f517
Show file tree
Hide file tree
Showing 15 changed files with 381 additions and 64 deletions.
8 changes: 3 additions & 5 deletions cmd/osm-bootstrap/osm-bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,8 @@ func main() {
log.Fatal().Err(err).Msg("Error initializing Kubernetes events recorder")
}

stop := signals.RegisterExitHandlers()
_, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, cancel := context.WithCancel(context.Background())
stop := signals.RegisterExitHandlers(cancel)

// Start the default metrics store
metricsstore.DefaultMetricsStore.Start(
Expand Down Expand Up @@ -214,8 +213,7 @@ func main() {
log.Fatal().Err(err).Msg("Error getting certificate options")
}

// Intitialize certificate manager/provider
certManager, err := providers.NewCertificateManager(kubeClient, kubeConfig, cfg, osmNamespace, certOpts, msgBroker)
certManager, err := providers.NewCertificateManager(ctx, kubeClient, kubeConfig, cfg, osmNamespace, certOpts, msgBroker, informerCollection)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCertificateManager,
"Error initializing certificate manager of kind %s", certProviderKind)
Expand Down
7 changes: 3 additions & 4 deletions cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,8 @@ func main() {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCLIParameters, "Error validating CLI parameters")
}

stop := signals.RegisterExitHandlers()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stop := signals.RegisterExitHandlers(cancel)

// Start the default metrics store
startMetricsStore()
Expand Down Expand Up @@ -208,8 +207,8 @@ func main() {
}

// Intitialize certificate manager/provider
certManager, err := providers.NewCertificateManager(kubeClient, kubeConfig, cfg, osmNamespace,
certOpts, msgBroker)
certManager, err := providers.NewCertificateManager(ctx, kubeClient, kubeConfig, cfg, osmNamespace,
certOpts, msgBroker, informerCollection)

if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCertificateManager,
Expand Down
9 changes: 4 additions & 5 deletions cmd/osm-injector/osm-injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,8 @@ func main() {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCLIParameters, "Error validating CLI parameters")
}

stop := signals.RegisterExitHandlers()
_, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, cancel := context.WithCancel(context.Background())
stop := signals.RegisterExitHandlers(cancel)

// Start the default metrics store
metricsstore.DefaultMetricsStore.Start(
Expand Down Expand Up @@ -203,8 +202,8 @@ func main() {
log.Fatal().Err(err).Msg("Error getting certificate options")
}
// Intitialize certificate manager/provider
certManager, err := providers.NewCertificateManager(kubeClient, kubeConfig, cfg, osmNamespace,
certOpts, msgBroker)
certManager, err := providers.NewCertificateManager(ctx, kubeClient, kubeConfig, cfg, osmNamespace,
certOpts, msgBroker, informerCollection)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCertificateManager,
"Error initializing certificate manager of kind %s", certProviderKind)
Expand Down
12 changes: 12 additions & 0 deletions pkg/certificate/fake_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package certificate

import (
"context"
"fmt"
time "time"

Expand All @@ -24,6 +25,16 @@ func (c *fakeMRCClient) List() ([]*v1alpha2.MeshRootCertificate, error) {
return []*v1alpha2.MeshRootCertificate{{}}, nil
}

func (c *fakeMRCClient) Watch(ctx context.Context) (<-chan MRCEvent, error) {
ch := make(chan MRCEvent)
go func() {
ch <- MRCEvent{}
close(ch)
}()

return ch, nil
}

type fakeIssuer struct {
err bool
id string
Expand All @@ -47,6 +58,7 @@ func (i *fakeIssuer) IssueCertificate(cn CommonName, validityPeriod time.Duratio
// FakeCertManager is a testing helper that returns a *certificate.Manager
func FakeCertManager() (*Manager, error) {
cm, err := NewManager(
context.Background(),
&fakeMRCClient{},
validity,
nil,
Expand Down
94 changes: 81 additions & 13 deletions pkg/certificate/manager.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,111 @@
package certificate

import (
"context"
"fmt"
"time"

"github.com/rs/zerolog/log"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/messaging"
)

// NewManager creates a new CertManager with the passed CA and CA Private Key
func NewManager(mrcClient MRCClient, serviceCertValidityDuration time.Duration, msgBroker *messaging.Broker) (*Manager, error) {
// TODO(#4502): transition this call to a watch function that knows how to handle multiple MRC and can react to changes.
func NewManager(ctx context.Context, mrcClient MRCClient, serviceCertValidityDuration time.Duration, msgBroker *messaging.Broker) (*Manager, error) {
m := &Manager{
serviceCertValidityDuration: serviceCertValidityDuration,
msgBroker: msgBroker,
}
// We need an MRC in order to get the correct issuers, so handle initial MRC List now
mrcs, err := mrcClient.List()
if err != nil {
return nil, err
}

client, ca, clientID, err := mrcClient.GetCertIssuerForMRC(mrcs[0])
err = m.init(mrcs, mrcClient)
if err != nil {
return nil, err
}

c := &issuer{Issuer: client, ID: clientID, CertificateAuthority: ca}

m := &Manager{
// The signingIssuer is responsible for signing all newly issued certificates
// The validatingIssuer is the issuer that issued existing certificates.
// its underlying cert is still in the validating trust store
signingIssuer: c,
validatingIssuer: c,
serviceCertValidityDuration: serviceCertValidityDuration,
msgBroker: msgBroker,
// now that the manager is initialized, start a watch
// we wait until the manager is initialized so that there's
// no lock contention between the event handler and the initial
// state of the manager.
mrcEvents, err := mrcClient.Watch(ctx)
if err != nil {
return nil, err
}

go func() {
for {
select {
case <-ctx.Done():
return
case event, open := <-mrcEvents:
if !open {
// channel was closed; return
return
}

m.handleMRCEvent(event)
}
}
}()

return m, nil
}

// init initializes the Manager receiver based on the observed state of the cluster at this
// instant in time. This is why we pass in a slice of MRCs; we know our Watch call will make us initially
// consistent, we just need a valid state of the world at this instant to kick things off.
// This method is meant to be called at initialization time, so we don't grab the lock
func (m *Manager) init(mrcs []*v1alpha2.MeshRootCertificate, mrcClient MRCClient) error {
for _, mrc := range mrcs {
client, ca, clientID, err := mrcClient.GetCertIssuerForMRC(mrc)
if err != nil {
return err
}

c := &issuer{Issuer: client, ID: clientID, CertificateAuthority: ca}
switch {
case mrc.Status.State == "errored":
continue
case mrc.Status.State == constants.MRCStateComplete && mrc.Status.RotationStage == constants.MRCStageIssuing:
m.signingIssuer = c
m.validatingIssuer = c
case mrc.Status.RotationStage == constants.MRCStageIssuing:
m.signingIssuer = c
case mrc.Status.RotationStage == constants.MRCStageValidating:
m.validatingIssuer = c
default:
m.signingIssuer = c
m.validatingIssuer = c
}
}

if m.signingIssuer == nil || m.validatingIssuer == nil {
return fmt.Errorf("invalid mesh root certificate configuration. Signing issuer was %#v and validating issuer was %#v", m.signingIssuer, m.validatingIssuer)
}

return nil
}

func (m *Manager) handleMRCEvent(event MRCEvent) {
switch event.Type {
case MRCEventAdded:
// TODO
case MRCEventUpdated:
// TODO
case MRCEventDeleted:
// TODO
}
}

// Start takes an interval to check if the certificate
// needs to be rotated
func (m *Manager) Start(checkInterval time.Duration, stop <-chan struct{}) {
Expand Down
10 changes: 7 additions & 3 deletions pkg/certificate/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package certificate

import (
"context"
"testing"
time "time"

tassert "github.com/stretchr/testify/assert"
trequire "github.com/stretchr/testify/require"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/certificate/pem"
Expand All @@ -13,19 +15,21 @@ import (

func TestRotor(t *testing.T) {
assert := tassert.New(t)
require := trequire.New(t)

cn := CommonName("foo")
validityPeriod := -1 * time.Hour // negative time means this cert has already expired -- will be rotated asap

stop := make(chan struct{})
defer close(stop)
msgBroker := messaging.NewBroker(stop)
certManager, err := NewManager(&fakeMRCClient{}, validityPeriod, msgBroker)
certManager, err := NewManager(context.Background(), &fakeMRCClient{}, validityPeriod, msgBroker)
require.NoError(err)

certManager.Start(5*time.Second, stop)
assert.NoError(err)

certA, err := certManager.IssueCertificate(cn, validityPeriod)
assert.NoError(err)
require.NoError(err)
certRotateChan := msgBroker.GetCertPubSub().Sub(announcements.CertificateRotated.String())

// Wait for two certificate rotations to be announced and terminate
Expand Down
17 changes: 17 additions & 0 deletions pkg/certificate/providers/compat.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package providers

import (
"context"

"github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
"github.com/openservicemesh/osm/pkg/certificate"
)

// List returns the single, pre-generated MRC. It is intended to implement the certificate.MRCClient interface.
Expand All @@ -10,3 +13,17 @@ func (c *MRCCompatClient) List() ([]*v1alpha2.MeshRootCertificate, error) {
c.mrc,
}, nil
}

// Watch is a basic Watch implementation for the MRC attached to the compat client
func (c *MRCCompatClient) Watch(ctx context.Context) (<-chan certificate.MRCEvent, error) {
ch := make(chan certificate.MRCEvent)
go func() {
ch <- certificate.MRCEvent{
Type: certificate.MRCEventAdded,
MRC: c.mrc,
}
close(ch)
}()

return ch, nil
}
Loading

0 comments on commit 821f517

Please sign in to comment.