Skip to content

Commit

Permalink
Improve CertAuthorityWatcher
Browse files Browse the repository at this point in the history
Eliminate some CA spam that was previously being being emitted via
the CertAuthorityWatcher. Migrating to using a fanout and applying
watchkinds allows a consumer to only subscribe to receiving the CAs
that they care about. This also adds tracking to the CertAuthorityWatcher
so that duplicate CAs aren't emitted.
  • Loading branch information
rosstimothy committed Feb 23, 2022
1 parent 5f1eb44 commit 724f644
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 187 deletions.
17 changes: 11 additions & 6 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4033,23 +4033,28 @@ func testRotateTrustedClusters(t *testing.T, suite *integrationTestSuite) {
Clock: tconf.Clock,
Client: aux.GetSiteAPI(clusterAux),
},
WatchHostCA: true,
Types: []types.CertAuthType{types.HostCA},
})
if err != nil {
return err
}
defer watcher.Close()

sub, err := watcher.Subscribe(ctx, services.CertAuthorityTarget{ClusterName: clusterMain, Type: types.HostCA})
require.NoError(t, err)
var lastPhase string
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return trace.CompareFailed("failed to converge to phase %q, last phase %q", phase, lastPhase)
case cas := <-watcher.CertAuthorityC:
for _, ca := range cas {
if ca.GetClusterName() == clusterMain &&
ca.GetType() == types.HostCA &&
ca.GetRotation().Phase == phase {
case evt := <-sub.Events():
switch evt.Type {
case types.OpPut:
ca, ok := evt.Resource.(types.CertAuthority)
if !ok {
return trace.BadParameter("expected a ca got type %T", evt.Resource)
}
if ca.GetRotation().Phase == phase {
return nil
}
lastPhase = ca.GetRotation().Phase
Expand Down
78 changes: 42 additions & 36 deletions lib/reversetunnel/remotesite.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,7 @@ func (s *remoteSite) compareAndSwapCertAuthority(ca types.CertAuthority) error {
return trace.CompareFailed("remote certificate authority rotation has been updated")
}

func (s *remoteSite) updateCertAuthorities(retry utils.Retry) {
s.Debugf("Watching for cert authority changes.")

func (s *remoteSite) updateCertAuthorities(retry utils.Retry, remoteWatcher *services.CertAuthorityWatcher) {
for {
startedWaiting := s.clock.Now()
select {
Expand All @@ -424,7 +422,7 @@ func (s *remoteSite) updateCertAuthorities(retry utils.Retry) {
return
}

err := s.watchCertAuthorities()
err := s.watchCertAuthorities(remoteWatcher)
if err != nil {
switch {
case trace.IsNotFound(err):
Expand All @@ -440,56 +438,63 @@ func (s *remoteSite) updateCertAuthorities(retry utils.Retry) {
s.Warningf("Could not perform cert authorities update: %v.", trace.DebugReport(err))
}
}

}
}

func (s *remoteSite) watchCertAuthorities() error {
localWatcher, err := services.NewCertAuthorityWatcher(s.ctx, services.CertAuthorityWatcherConfig{
ResourceWatcherConfig: services.ResourceWatcherConfig{
Component: teleport.ComponentProxy,
Log: s,
Clock: s.clock,
Client: s.localAccessPoint,
func (s *remoteSite) watchCertAuthorities(remoteWatcher *services.CertAuthorityWatcher) error {
localWatch, err := s.srv.CertAuthorityWatcher.Subscribe(
s.ctx,
services.CertAuthorityTarget{
ClusterName: s.srv.ClusterName,
Type: types.HostCA,
},
WatchUserCA: true,
WatchHostCA: true,
})
services.CertAuthorityTarget{
ClusterName: s.srv.ClusterName,
Type: types.UserCA,
},
)
if err != nil {
return trace.Wrap(err)
}
defer localWatcher.Close()
defer func() {
if err := localWatch.Close(); err != nil {
s.WithError(err).Warn("Failed to close local ca watcher subscription.")
}
}()

remoteWatcher, err := services.NewCertAuthorityWatcher(s.ctx, services.CertAuthorityWatcherConfig{
ResourceWatcherConfig: services.ResourceWatcherConfig{
Component: teleport.ComponentProxy,
Log: s,
Clock: s.clock,
Client: s.remoteAccessPoint,
remoteWatch, err := remoteWatcher.Subscribe(
s.ctx,
services.CertAuthorityTarget{
ClusterName: s.domainName,
Type: types.HostCA,
},
WatchHostCA: true,
})
)
if err != nil {
return trace.Wrap(err)
}
defer remoteWatcher.Close()
defer func() {
if err := remoteWatch.Close(); err != nil {
s.WithError(err).Warn("Failed to close remote ca watcher subscription.")
}
}()

s.Debugf("Watching for cert authority changes.")
for {
select {
case <-s.ctx.Done():
s.WithError(s.ctx.Err()).Debug("Context is closing.")
return trace.Wrap(s.ctx.Err())
case <-localWatcher.Done():
case <-localWatch.Done():
s.Warn("Local CertAuthority watcher subscription has closed")
return fmt.Errorf("local ca watcher for cluster %s has closed", s.srv.ClusterName)
case <-remoteWatcher.Done():
case <-remoteWatch.Done():
s.Warn("Remote CertAuthority watcher subscription has closed")
return fmt.Errorf("remote ca watcher for cluster %s has closed", s.domainName)
case cas := <-localWatcher.CertAuthorityC:
for _, localCA := range cas {
if localCA.GetClusterName() != s.srv.ClusterName ||
(localCA.GetType() != types.HostCA &&
localCA.GetType() != types.UserCA) {
case evt := <-localWatch.Events():
switch evt.Type {
case types.OpPut:
localCA, ok := evt.Resource.(types.CertAuthority)
if !ok {
continue
}

Expand All @@ -498,10 +503,11 @@ func (s *remoteSite) watchCertAuthorities() error {
return trace.Wrap(err)
}
}
case cas := <-remoteWatcher.CertAuthorityC:
for _, remoteCA := range cas {
if remoteCA.GetType() != types.HostCA ||
remoteCA.GetClusterName() != s.domainName {
case evt := <-remoteWatch.Events():
switch evt.Type {
case types.OpPut:
remoteCA, ok := evt.Resource.(types.CertAuthority)
if !ok {
continue
}

Expand Down
23 changes: 22 additions & 1 deletion lib/reversetunnel/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ type Config struct {

// LockWatcher is a lock watcher.
LockWatcher *services.LockWatcher

// CertAuthorityWatcher is a cert authority watcher.
CertAuthorityWatcher *services.CertAuthorityWatcher
}

// CheckAndSetDefaults checks parameters and sets default values
Expand Down Expand Up @@ -253,6 +256,9 @@ func (cfg *Config) CheckAndSetDefaults() error {
if cfg.LockWatcher == nil {
return trace.BadParameter("missing parameter LockWatcher")
}
if cfg.CertAuthorityWatcher == nil {
return trace.BadParameter("missing parameter CertAuthorityWatcher")
}
return nil
}

Expand Down Expand Up @@ -1082,7 +1088,22 @@ func newRemoteSite(srv *server, domainName string, sconn ssh.Conn) (*remoteSite,
return nil, err
}

go remoteSite.updateCertAuthorities(caRetry)
remoteWatcher, err := services.NewCertAuthorityWatcher(srv.ctx, services.CertAuthorityWatcherConfig{
ResourceWatcherConfig: services.ResourceWatcherConfig{
Component: teleport.ComponentProxy,
Log: srv.log,
Clock: srv.Clock,
Client: remoteSite.remoteAccessPoint,
},
Types: []types.CertAuthType{types.HostCA},
})
if err != nil {
return nil, trace.Wrap(err)
}
go func() {
defer remoteWatcher.Close()
remoteSite.updateCertAuthorities(caRetry, remoteWatcher)
}()

lockRetry, err := utils.NewLinear(utils.LinearConfig{
First: utils.HalfJitter(srv.Config.PollingPeriod),
Expand Down
34 changes: 24 additions & 10 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2846,6 +2846,19 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
return trace.Wrap(err)
}

caWatcher, err := services.NewCertAuthorityWatcher(process.ExitContext(), services.CertAuthorityWatcherConfig{
ResourceWatcherConfig: services.ResourceWatcherConfig{
Component: teleport.ComponentProxy,
Log: process.log.WithField(trace.Component, teleport.ComponentProxy),
Client: conn.Client,
},
AuthorityGetter: accessPoint,
Types: []types.CertAuthType{types.HostCA, types.UserCA},
})
if err != nil {
return trace.Wrap(err)
}

serverTLSConfig, err := conn.ServerIdentity.TLSConfig(cfg.CipherSuites)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -2875,16 +2888,17 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error {
Client: conn.Client,
},
},
KeyGen: cfg.Keygen,
Ciphers: cfg.Ciphers,
KEXAlgorithms: cfg.KEXAlgorithms,
MACAlgorithms: cfg.MACAlgorithms,
DataDir: process.Config.DataDir,
PollingPeriod: process.Config.PollingPeriod,
FIPS: cfg.FIPS,
Emitter: streamEmitter,
Log: process.log,
LockWatcher: lockWatcher,
KeyGen: cfg.Keygen,
Ciphers: cfg.Ciphers,
KEXAlgorithms: cfg.KEXAlgorithms,
MACAlgorithms: cfg.MACAlgorithms,
DataDir: process.Config.DataDir,
PollingPeriod: process.Config.PollingPeriod,
FIPS: cfg.FIPS,
Emitter: streamEmitter,
Log: process.log,
LockWatcher: lockWatcher,
CertAuthorityWatcher: caWatcher,
})
if err != nil {
return trace.Wrap(err)
Expand Down
Loading

0 comments on commit 724f644

Please sign in to comment.