diff --git a/integration/integration_test.go b/integration/integration_test.go index 6995212069c99..6b054050f04fe 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -4033,23 +4033,28 @@ func testRotateTrustedClusters(t *testing.T, suite *integrationTestSuite) { Clock: tconf.Clock, Client: aux.GetSiteAPI(clusterAux), }, - WatchHostCA: true, + Types: []types.CertAuthType{types.HostCA}, }) if err != nil { return err } defer watcher.Close() + sub, err := watcher.Subscribe(ctx, services.CertAuthorityTarget{ClusterName: clusterMain, Type: types.HostCA}) + require.NoError(t, err) var lastPhase string for i := 0; i < 10; i++ { select { case <-ctx.Done(): return trace.CompareFailed("failed to converge to phase %q, last phase %q", phase, lastPhase) - case cas := <-watcher.CertAuthorityC: - for _, ca := range cas { - if ca.GetClusterName() == clusterMain && - ca.GetType() == types.HostCA && - ca.GetRotation().Phase == phase { + case evt := <-sub.Events(): + switch evt.Type { + case types.OpPut: + ca, ok := evt.Resource.(types.CertAuthority) + if !ok { + return trace.BadParameter("expected a ca got type %T", evt.Resource) + } + if ca.GetRotation().Phase == phase { return nil } lastPhase = ca.GetRotation().Phase diff --git a/lib/reversetunnel/remotesite.go b/lib/reversetunnel/remotesite.go index 69fcb4192bde7..e0a1baa037ba9 100644 --- a/lib/reversetunnel/remotesite.go +++ b/lib/reversetunnel/remotesite.go @@ -411,9 +411,7 @@ func (s *remoteSite) compareAndSwapCertAuthority(ca types.CertAuthority) error { return trace.CompareFailed("remote certificate authority rotation has been updated") } -func (s *remoteSite) updateCertAuthorities(retry utils.Retry) { - s.Debugf("Watching for cert authority changes.") - +func (s *remoteSite) updateCertAuthorities(retry utils.Retry, remoteWatcher *services.CertAuthorityWatcher) { for { startedWaiting := s.clock.Now() select { @@ -424,7 +422,7 @@ func (s *remoteSite) updateCertAuthorities(retry utils.Retry) { return } - err := s.watchCertAuthorities() + err := s.watchCertAuthorities(remoteWatcher) if err != nil { switch { case trace.IsNotFound(err): @@ -440,56 +438,63 @@ func (s *remoteSite) updateCertAuthorities(retry utils.Retry) { s.Warningf("Could not perform cert authorities update: %v.", trace.DebugReport(err)) } } - } } -func (s *remoteSite) watchCertAuthorities() error { - localWatcher, err := services.NewCertAuthorityWatcher(s.ctx, services.CertAuthorityWatcherConfig{ - ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: teleport.ComponentProxy, - Log: s, - Clock: s.clock, - Client: s.localAccessPoint, +func (s *remoteSite) watchCertAuthorities(remoteWatcher *services.CertAuthorityWatcher) error { + localWatch, err := s.srv.CertAuthorityWatcher.Subscribe( + s.ctx, + services.CertAuthorityTarget{ + ClusterName: s.srv.ClusterName, + Type: types.HostCA, }, - WatchUserCA: true, - WatchHostCA: true, - }) + services.CertAuthorityTarget{ + ClusterName: s.srv.ClusterName, + Type: types.UserCA, + }, + ) if err != nil { return trace.Wrap(err) } - defer localWatcher.Close() + defer func() { + if err := localWatch.Close(); err != nil { + s.WithError(err).Warn("Failed to close local ca watcher subscription.") + } + }() - remoteWatcher, err := services.NewCertAuthorityWatcher(s.ctx, services.CertAuthorityWatcherConfig{ - ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: teleport.ComponentProxy, - Log: s, - Clock: s.clock, - Client: s.remoteAccessPoint, + remoteWatch, err := remoteWatcher.Subscribe( + s.ctx, + services.CertAuthorityTarget{ + ClusterName: s.domainName, + Type: types.HostCA, }, - WatchHostCA: true, - }) + ) if err != nil { return trace.Wrap(err) } - defer remoteWatcher.Close() + defer func() { + if err := remoteWatch.Close(); err != nil { + s.WithError(err).Warn("Failed to close remote ca watcher subscription.") + } + }() + s.Debugf("Watching for cert authority changes.") for { select { case <-s.ctx.Done(): s.WithError(s.ctx.Err()).Debug("Context is closing.") return trace.Wrap(s.ctx.Err()) - case <-localWatcher.Done(): + case <-localWatch.Done(): s.Warn("Local CertAuthority watcher subscription has closed") return fmt.Errorf("local ca watcher for cluster %s has closed", s.srv.ClusterName) - case <-remoteWatcher.Done(): + case <-remoteWatch.Done(): s.Warn("Remote CertAuthority watcher subscription has closed") return fmt.Errorf("remote ca watcher for cluster %s has closed", s.domainName) - case cas := <-localWatcher.CertAuthorityC: - for _, localCA := range cas { - if localCA.GetClusterName() != s.srv.ClusterName || - (localCA.GetType() != types.HostCA && - localCA.GetType() != types.UserCA) { + case evt := <-localWatch.Events(): + switch evt.Type { + case types.OpPut: + localCA, ok := evt.Resource.(types.CertAuthority) + if !ok { continue } @@ -498,10 +503,11 @@ func (s *remoteSite) watchCertAuthorities() error { return trace.Wrap(err) } } - case cas := <-remoteWatcher.CertAuthorityC: - for _, remoteCA := range cas { - if remoteCA.GetType() != types.HostCA || - remoteCA.GetClusterName() != s.domainName { + case evt := <-remoteWatch.Events(): + switch evt.Type { + case types.OpPut: + remoteCA, ok := evt.Resource.(types.CertAuthority) + if !ok { continue } diff --git a/lib/reversetunnel/srv.go b/lib/reversetunnel/srv.go index 67e16a01ee959..c3c588c46dae5 100644 --- a/lib/reversetunnel/srv.go +++ b/lib/reversetunnel/srv.go @@ -202,6 +202,9 @@ type Config struct { // LockWatcher is a lock watcher. LockWatcher *services.LockWatcher + + // CertAuthorityWatcher is a cert authority watcher. + CertAuthorityWatcher *services.CertAuthorityWatcher } // CheckAndSetDefaults checks parameters and sets default values @@ -253,6 +256,9 @@ func (cfg *Config) CheckAndSetDefaults() error { if cfg.LockWatcher == nil { return trace.BadParameter("missing parameter LockWatcher") } + if cfg.CertAuthorityWatcher == nil { + return trace.BadParameter("missing parameter CertAuthorityWatcher") + } return nil } @@ -1082,7 +1088,22 @@ func newRemoteSite(srv *server, domainName string, sconn ssh.Conn) (*remoteSite, return nil, err } - go remoteSite.updateCertAuthorities(caRetry) + remoteWatcher, err := services.NewCertAuthorityWatcher(srv.ctx, services.CertAuthorityWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: teleport.ComponentProxy, + Log: srv.log, + Clock: srv.Clock, + Client: remoteSite.remoteAccessPoint, + }, + Types: []types.CertAuthType{types.HostCA}, + }) + if err != nil { + return nil, trace.Wrap(err) + } + go func() { + defer remoteWatcher.Close() + remoteSite.updateCertAuthorities(caRetry, remoteWatcher) + }() lockRetry, err := utils.NewLinear(utils.LinearConfig{ First: utils.HalfJitter(srv.Config.PollingPeriod), diff --git a/lib/service/service.go b/lib/service/service.go index 0c64fb0f67ef6..f853eed9c3b11 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2846,6 +2846,19 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { return trace.Wrap(err) } + caWatcher, err := services.NewCertAuthorityWatcher(process.ExitContext(), services.CertAuthorityWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: teleport.ComponentProxy, + Log: process.log.WithField(trace.Component, teleport.ComponentProxy), + Client: conn.Client, + }, + AuthorityGetter: accessPoint, + Types: []types.CertAuthType{types.HostCA, types.UserCA}, + }) + if err != nil { + return trace.Wrap(err) + } + serverTLSConfig, err := conn.ServerIdentity.TLSConfig(cfg.CipherSuites) if err != nil { return trace.Wrap(err) @@ -2875,16 +2888,17 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { Client: conn.Client, }, }, - KeyGen: cfg.Keygen, - Ciphers: cfg.Ciphers, - KEXAlgorithms: cfg.KEXAlgorithms, - MACAlgorithms: cfg.MACAlgorithms, - DataDir: process.Config.DataDir, - PollingPeriod: process.Config.PollingPeriod, - FIPS: cfg.FIPS, - Emitter: streamEmitter, - Log: process.log, - LockWatcher: lockWatcher, + KeyGen: cfg.Keygen, + Ciphers: cfg.Ciphers, + KEXAlgorithms: cfg.KEXAlgorithms, + MACAlgorithms: cfg.MACAlgorithms, + DataDir: process.Config.DataDir, + PollingPeriod: process.Config.PollingPeriod, + FIPS: cfg.FIPS, + Emitter: streamEmitter, + Log: process.log, + LockWatcher: lockWatcher, + CertAuthorityWatcher: caWatcher, }) if err != nil { return trace.Wrap(err) diff --git a/lib/services/watcher.go b/lib/services/watcher.go index c1d932323248f..2464120a458e3 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -890,12 +890,8 @@ type CertAuthorityWatcherConfig struct { ResourceWatcherConfig // AuthorityGetter is responsible for fetching cert authority resources. AuthorityGetter - // CertAuthorityC receives up-to-date list of all cert authority resources. - CertAuthorityC chan []types.CertAuthority - // WatchHostCA indicates that the watcher should monitor types.HostCA - WatchHostCA bool - // WatchUserCA indicates that the watcher should monitor types.UserCA - WatchUserCA bool + // Types restricts which cert authority types are retrieved via the AuthorityGetter. + Types []types.CertAuthType } // CheckAndSetDefaults checks parameters and sets default values. @@ -910,9 +906,6 @@ func (cfg *CertAuthorityWatcherConfig) CheckAndSetDefaults() error { } cfg.AuthorityGetter = getter } - if cfg.CertAuthorityC == nil { - cfg.CertAuthorityC = make(chan []types.CertAuthority) - } return nil } @@ -924,6 +917,12 @@ func NewCertAuthorityWatcher(ctx context.Context, cfg CertAuthorityWatcherConfig collector := &caCollector{ CertAuthorityWatcherConfig: cfg, + fanout: NewFanout(), + cas: make(map[types.CertAuthType]map[string]types.CertAuthority, len(cfg.Types)), + } + + for _, t := range cfg.Types { + collector.cas[t] = make(map[string]types.CertAuthority) } watcher, err := newResourceWatcher(ctx, collector, cfg.ResourceWatcherConfig) @@ -931,6 +930,7 @@ func NewCertAuthorityWatcher(ctx context.Context, cfg CertAuthorityWatcherConfig return nil, trace.Wrap(err) } + collector.fanout.SetInit() return &CertAuthorityWatcher{watcher, collector}, nil } @@ -943,9 +943,65 @@ type CertAuthorityWatcher struct { // caCollector accompanies resourceWatcher when monitoring cert authority resources. type caCollector struct { CertAuthorityWatcherConfig - host map[string]types.CertAuthority - user map[string]types.CertAuthority - lock sync.RWMutex + lock sync.RWMutex + cas map[types.CertAuthType]map[string]types.CertAuthority + fanout *Fanout +} + +// CertAuthorityTarget lists the attributes of interactions to be disabled. +type CertAuthorityTarget struct { + // ClusterName specifies the name of the cluster to watch. + ClusterName string + // Type specifies the ca types to watch for. + Type types.CertAuthType +} + +// Subscribe is used to subscribe to the lock updates. +func (c *caCollector) Subscribe(ctx context.Context, targets ...CertAuthorityTarget) (types.Watcher, error) { + watchKinds, err := caTargetToWatchKinds(targets) + if err != nil { + return nil, trace.Wrap(err) + } + sub, err := c.fanout.NewWatcher(ctx, types.Watch{Kinds: watchKinds}) + if err != nil { + return nil, trace.Wrap(err) + } + select { + case event := <-sub.Events(): + if event.Type != types.OpInit { + return nil, trace.BadParameter("expected init event, got %v instead", event.Type) + } + case <-sub.Done(): + return nil, trace.Wrap(sub.Error()) + } + return sub, nil +} + +func caTargetToWatchKinds(targets []CertAuthorityTarget) ([]types.WatchKind, error) { + watchKinds := make([]types.WatchKind, 0, len(targets)) + for _, target := range targets { + kind := types.WatchKind{ + Kind: types.KindCertAuthority, + // Note that watching SubKind doesn't work for types.WatchKind - to do so it would + // require a custom filter, which doesn't currently exist for CAs and has backwards + // compatibility issues. Leaving this here so that if it is ever added in the future + // it just works. For now, we will filter all subscriptions to only allow the types + // defined in the CertAuthorityWatcherConfig. + SubKind: string(target.Type), + } + + if target.ClusterName != "" { + kind.Name = target.ClusterName + } + + watchKinds = append(watchKinds, kind) + } + + if len(watchKinds) == 0 { + watchKinds = []types.WatchKind{{Kind: types.KindCertAuthority}} + } + + return watchKinds, nil } // resourceKind specifies the resource kind to watch. @@ -955,39 +1011,29 @@ func (c *caCollector) resourceKind() string { // getResourcesAndUpdateCurrent refreshes the list of current resources. func (c *caCollector) getResourcesAndUpdateCurrent(ctx context.Context) error { - var ( - newHost map[string]types.CertAuthority - newUser map[string]types.CertAuthority - ) + var cas []types.CertAuthority - if c.WatchHostCA { - host, err := c.AuthorityGetter.GetCertAuthorities(types.HostCA, false) + for _, t := range c.Types { + authorities, err := c.AuthorityGetter.GetCertAuthorities(t, false) if err != nil { return trace.Wrap(err) } - newHost = make(map[string]types.CertAuthority, len(host)) - for _, ca := range host { - newHost[ca.GetName()] = ca - } - } - if c.WatchUserCA { - user, err := c.AuthorityGetter.GetCertAuthorities(types.UserCA, false) - if err != nil { - return trace.Wrap(err) - } - newUser = make(map[string]types.CertAuthority, len(user)) - for _, ca := range user { - newUser[ca.GetName()] = ca - } + cas = append(cas, authorities...) } c.lock.Lock() - c.host = newHost - c.user = newUser - c.lock.Unlock() + defer c.lock.Unlock() + + for _, ca := range cas { + authority, ok := c.cas[ca.GetType()][ca.GetName()] + if ok && CertAuthoritiesEquivalent(authority, ca) { + continue + } - c.CertAuthorityC <- casToSlice(newHost, newUser) + c.cas[ca.GetType()][ca.GetName()] = ca + c.fanout.Emit(types.Event{Type: types.OpPut, Resource: ca}) + } return nil } @@ -1001,14 +1047,13 @@ func (c *caCollector) processEventAndUpdateCurrent(ctx context.Context, event ty defer c.lock.Unlock() switch event.Type { case types.OpDelete: - if c.WatchHostCA && event.Resource.GetSubKind() == string(types.HostCA) { - delete(c.host, event.Resource.GetName()) - } - if c.WatchUserCA && event.Resource.GetSubKind() == string(types.UserCA) { - delete(c.user, event.Resource.GetName()) + caType := types.CertAuthType(event.Resource.GetSubKind()) + if !c.watchingType(caType) { + return } - c.CertAuthorityC <- casToSlice(c.host, c.user) + delete(c.cas[caType], event.Resource.GetName()) + c.fanout.Emit(event) case types.OpPut: ca, ok := event.Resource.(types.CertAuthority) if !ok { @@ -1016,36 +1061,31 @@ func (c *caCollector) processEventAndUpdateCurrent(ctx context.Context, event ty return } - if c.WatchHostCA && ca.GetType() == types.HostCA { - c.host[ca.GetName()] = ca + if !c.watchingType(ca.GetType()) { + return } - if c.WatchUserCA && ca.GetType() == types.UserCA { - c.user[ca.GetName()] = ca + + authority, ok := c.cas[ca.GetType()][ca.GetName()] + if ok && CertAuthoritiesEquivalent(authority, ca) { + return } - c.CertAuthorityC <- casToSlice(c.host, c.user) + c.cas[ca.GetType()][ca.GetName()] = ca + c.fanout.Emit(event) default: c.Log.Warnf("Unsupported event type %s.", event.Type) return } } -// GetCurrent returns the currently stored authorities. -func (c *caCollector) GetCurrent() []types.CertAuthority { - c.lock.RLock() - defer c.lock.RUnlock() - return casToSlice(c.host, c.user) +func (c *caCollector) watchingType(t types.CertAuthType) bool { + for _, caType := range c.Types { + if caType == t { + return true + } + } + + return false } func (c *caCollector) notifyStale() {} - -func casToSlice(host map[string]types.CertAuthority, user map[string]types.CertAuthority) []types.CertAuthority { - slice := make([]types.CertAuthority, 0, len(host)+len(user)) - for _, ca := range host { - slice = append(slice, ca) - } - for _, ca := range user { - slice = append(slice, ca) - } - return slice -} diff --git a/lib/services/watcher_test.go b/lib/services/watcher_test.go index 4d7b3cfbcacc9..54742c0ceaca9 100644 --- a/lib/services/watcher_test.go +++ b/lib/services/watcher_test.go @@ -31,6 +31,7 @@ import ( "github.com/gravitational/teleport/lib/auth/testauthority" "github.com/gravitational/teleport/lib/backend/lite" "github.com/gravitational/teleport/lib/defaults" + "github.com/gravitational/teleport/lib/fixtures" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/services/local" "github.com/gravitational/teleport/lib/tlsca" @@ -517,9 +518,10 @@ func resourceDiff(res1, res2 types.Resource) string { func caDiff(ca1, ca2 types.CertAuthority) string { return cmp.Diff(ca1, ca2, cmpopts.IgnoreFields(types.Metadata{}, "ID"), - cmpopts.IgnoreFields(types.CertAuthoritySpecV2{}, "CheckingKeys", "TLSKeyPairs"), + cmpopts.IgnoreFields(types.CertAuthoritySpecV2{}, "CheckingKeys", "TLSKeyPairs", "JWTKeyPairs"), cmpopts.IgnoreFields(types.SSHKeyPair{}, "PrivateKey"), cmpopts.IgnoreFields(types.TLSKeyPair{}, "Key"), + cmpopts.IgnoreFields(types.JWTKeyPair{}, "PrivateKey"), cmpopts.EquateEmpty(), ) } @@ -720,10 +722,12 @@ func newApp(t *testing.T, name string) types.Application { func TestCertAuthorityWatcher(t *testing.T) { t.Parallel() ctx := context.Background() + clock := clockwork.NewFakeClock() bk, err := lite.NewWithConfig(ctx, lite.Config{ Path: t.TempDir(), PollStreamPeriod: 200 * time.Millisecond, + Clock: clock, }) require.NoError(t, err) @@ -741,86 +745,88 @@ func TestCertAuthorityWatcher(t *testing.T) { Trust: caService, Events: local.NewEventsService(bk), }, + Clock: clock, }, - CertAuthorityC: make(chan []types.CertAuthority, 10), - WatchUserCA: true, - WatchHostCA: true, + Types: []types.CertAuthType{types.HostCA, types.UserCA}, }) require.NoError(t, err) t.Cleanup(w.Close) - nothingWatcher, err := services.NewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ - ResourceWatcherConfig: services.ResourceWatcherConfig{ - Component: "test", - MaxRetryPeriod: 200 * time.Millisecond, - Client: &client{ - Trust: caService, - Events: local.NewEventsService(bk), - }, - }, - CertAuthorityC: make(chan []types.CertAuthority, 10), - }) + target := services.CertAuthorityTarget{ClusterName: "test"} + sub, err := w.Subscribe(ctx, target) require.NoError(t, err) - t.Cleanup(nothingWatcher.Close) - - require.Empty(t, w.GetCurrent()) - require.Empty(t, nothingWatcher.GetCurrent()) + t.Cleanup(func() { require.NoError(t, sub.Close()) }) - // Initially there are no cas so watcher should send an empty list. + // create a CA for the cluster and a type we are filtering for + // and ensure we receive the event + ca := newCertAuthority(t, "test", types.HostCA) + require.NoError(t, caService.UpsertCertAuthority(ca)) select { - case changeset := <-w.CertAuthorityC: - require.Len(t, changeset, 0) - require.Empty(t, nothingWatcher.GetCurrent()) - case <-w.Done(): - t.Fatal("Watcher has unexpectedly exited.") - case <-time.After(2 * time.Second): - t.Fatal("Timeout waiting for the first event.") + case event := <-sub.Events(): + caFromEvent, ok := event.Resource.(types.CertAuthority) + require.True(t, ok) + require.Empty(t, caDiff(ca, caFromEvent)) + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") } - // Add an authority. - ca1 := newCertAuthority(t, "ca1", types.HostCA) - require.NoError(t, caService.CreateCertAuthority(ca1)) - - // The first event is always the current list of apps. + // create a CA with a type we are filtering for another cluster that we are NOT filtering for + // and ensure that we DO NOT receive the event + require.NoError(t, caService.UpsertCertAuthority(newCertAuthority(t, "unknown", types.UserCA))) select { - case changeset := <-w.CertAuthorityC: - require.Len(t, changeset, 1) - require.Empty(t, caDiff(changeset[0], ca1)) - require.Empty(t, nothingWatcher.GetCurrent()) - case <-w.Done(): - t.Fatal("Watcher has unexpectedly exited.") - case <-time.After(2 * time.Second): - t.Fatal("Timeout waiting for the first event.") + case event := <-sub.Events(): + t.Fatalf("Unexpected event: %v.", event) + case <-sub.Done(): + t.Fatal("CA watcher subscription has unexpectedly exited.") + case <-time.After(time.Second): } - // Add a second ca. - ca2 := newCertAuthority(t, "ca2", types.UserCA) - require.NoError(t, caService.CreateCertAuthority(ca2)) + // create a CA for the cluster and a type we are filtering for + // and ensure we receive the event + ca2 := newCertAuthority(t, "test", types.UserCA) + require.NoError(t, caService.UpsertCertAuthority(ca2)) + select { + case event := <-sub.Events(): + caFromEvent, ok := event.Resource.(types.CertAuthority) + require.True(t, ok) + require.Empty(t, caDiff(ca2, caFromEvent)) + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") + } - // Watcher should detect the ca list change. + // delete a CA with type being watched in the cluster we are filtering for + // and ensure we receive the event + require.NoError(t, caService.DeleteCertAuthority(ca.GetID())) select { - case changeset := <-w.CertAuthorityC: - require.Len(t, changeset, 2) - require.Empty(t, nothingWatcher.GetCurrent()) - case <-w.Done(): - t.Fatal("Watcher has unexpectedly exited.") - case <-time.After(2 * time.Second): - t.Fatal("Timeout waiting for the update event.") + case event := <-sub.Events(): + require.Equal(t, types.KindCertAuthority, event.Resource.GetKind()) + require.Equal(t, string(types.HostCA), event.Resource.GetSubKind()) + require.Equal(t, "test", event.Resource.GetName()) + case <-time.After(time.Second): + t.Fatal("timed out waiting for event") } - // Delete the first ca. - require.NoError(t, caService.DeleteCertAuthority(ca1.GetID())) + // create a CA with a type we are NOT filtering for but for a cluster we are filtering for + // and ensure we DO NOT receive the event + signer := newCertAuthority(t, "test", types.JWTSigner) + require.NoError(t, caService.UpsertCertAuthority(signer)) + select { + case event := <-sub.Events(): + t.Fatalf("Unexpected event: %v.", event) + case <-sub.Done(): + t.Fatal("CA watcher subscription has unexpectedly exited.") + case <-time.After(time.Second): + } - // Watcher should detect the ca list change. + // delete a CA with a name we are filtering for but a type we are NOT filtering for + // and ensure we do NOT receive the event + require.NoError(t, caService.DeleteCertAuthority(signer.GetID())) select { - case changeset := <-w.CertAuthorityC: - require.Len(t, changeset, 1) - require.Empty(t, caDiff(changeset[0], ca2)) - require.Empty(t, nothingWatcher.GetCurrent()) - case <-w.Done(): - t.Fatal("Watcher has unexpectedly exited.") - case <-time.After(2 * time.Second): - t.Fatal("Timeout waiting for the update event.") + case event := <-sub.Events(): + t.Fatalf("Unexpected event: %v.", event) + case <-sub.Done(): + t.Fatal("CA watcher subscription has unexpectedly exited.") + case <-time.After(time.Second): } } @@ -837,15 +843,25 @@ func newCertAuthority(t *testing.T, name string, caType types.CertAuthType) type Type: caType, ClusterName: name, ActiveKeys: types.CAKeySet{ - SSH: []*types.SSHKeyPair{{ - PrivateKey: priv, - PrivateKeyType: types.PrivateKeyType_RAW, - PublicKey: pub, - }}, - TLS: []*types.TLSKeyPair{{ - Cert: cert, - Key: key, - }}, + SSH: []*types.SSHKeyPair{ + { + PrivateKey: priv, + PrivateKeyType: types.PrivateKeyType_RAW, + PublicKey: pub, + }, + }, + TLS: []*types.TLSKeyPair{ + { + Cert: cert, + Key: key, + }, + }, + JWT: []*types.JWTKeyPair{ + { + PublicKey: []byte(fixtures.JWTSignerPublicKey), + PrivateKey: []byte(fixtures.JWTSignerPrivateKey), + }, + }, }, Roles: nil, SigningAlg: types.CertAuthoritySpecV2_RSA_SHA2_256, diff --git a/lib/srv/regular/sshserver_test.go b/lib/srv/regular/sshserver_test.go index 204b8871cd566..1004ae374e2b3 100644 --- a/lib/srv/regular/sshserver_test.go +++ b/lib/srv/regular/sshserver_test.go @@ -1125,6 +1125,7 @@ func TestProxyRoundRobin(t *testing.T) { listener, reverseTunnelAddress := mustListen(t) defer listener.Close() lockWatcher := newLockWatcher(ctx, t, proxyClient) + caWatcher := newCertAuthorityWatcher(ctx, t, proxyClient) reverseTunnelServer, err := reversetunnel.NewServer(reversetunnel.Config{ ClusterName: f.testSrv.ClusterName(), @@ -1141,6 +1142,7 @@ func TestProxyRoundRobin(t *testing.T) { Emitter: proxyClient, Log: logger, LockWatcher: lockWatcher, + CertAuthorityWatcher: caWatcher, }) require.NoError(t, err) logger.WithField("tun-addr", reverseTunnelAddress.String()).Info("Created reverse tunnel server.") @@ -1248,6 +1250,7 @@ func TestProxyDirectAccess(t *testing.T) { logger := logrus.WithField("test", "TestProxyDirectAccess") proxyClient, _ := newProxyClient(t, f.testSrv) lockWatcher := newLockWatcher(ctx, t, proxyClient) + caWatcher := newCertAuthorityWatcher(ctx, t, proxyClient) reverseTunnelServer, err := reversetunnel.NewServer(reversetunnel.Config{ ClientTLS: proxyClient.TLSConfig(), @@ -1264,6 +1267,7 @@ func TestProxyDirectAccess(t *testing.T) { Emitter: proxyClient, Log: logger, LockWatcher: lockWatcher, + CertAuthorityWatcher: caWatcher, }) require.NoError(t, err) @@ -1945,6 +1949,19 @@ func newLockWatcher(ctx context.Context, t *testing.T, client types.Events) *ser return lockWatcher } +func newCertAuthorityWatcher(ctx context.Context, t *testing.T, client types.Events) *services.CertAuthorityWatcher { + caWatcher, err := services.NewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: "test", + Client: client, + }, + Types: []types.CertAuthType{types.HostCA, types.UserCA}, + }) + require.NoError(t, err) + t.Cleanup(caWatcher.Close) + return caWatcher +} + // maxPipeSize is one larger than the maximum pipe size for most operating // systems which appears to be 65536 bytes. // diff --git a/lib/web/apiserver_test.go b/lib/web/apiserver_test.go index 87c73b719fa32..82795254c821f 100644 --- a/lib/web/apiserver_test.go +++ b/lib/web/apiserver_test.go @@ -276,6 +276,16 @@ func (s *WebSuite) SetUpTest(c *C) { }) c.Assert(err, IsNil) + caWatcher, err := services.NewCertAuthorityWatcher(s.ctx, services.CertAuthorityWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: teleport.ComponentProxy, + Client: s.proxyClient, + }, + Types: []types.CertAuthType{types.HostCA, types.UserCA}, + }) + c.Assert(err, IsNil) + defer caWatcher.Close() + revTunServer, err := reversetunnel.NewServer(reversetunnel.Config{ ID: node.ID(), Listener: revTunListener, @@ -289,6 +299,7 @@ func (s *WebSuite) SetUpTest(c *C) { DirectClusters: []reversetunnel.DirectCluster{{Name: s.server.ClusterName(), Client: s.proxyClient}}, DataDir: c.MkDir(), LockWatcher: proxyLockWatcher, + CertAuthorityWatcher: caWatcher, }) c.Assert(err, IsNil) s.proxyTunnel = revTunServer @@ -3477,6 +3488,16 @@ func createProxy(ctx context.Context, t *testing.T, proxyID string, node *regula require.NoError(t, err) t.Cleanup(proxyLockWatcher.Close) + proxyCAWatcher, err := services.NewCertAuthorityWatcher(ctx, services.CertAuthorityWatcherConfig{ + ResourceWatcherConfig: services.ResourceWatcherConfig{ + Component: teleport.ComponentProxy, + Client: client, + }, + Types: []types.CertAuthType{types.HostCA, types.UserCA}, + }) + require.NoError(t, err) + t.Cleanup(proxyLockWatcher.Close) + revTunServer, err := reversetunnel.NewServer(reversetunnel.Config{ ID: node.ID(), Listener: revTunListener, @@ -3490,6 +3511,7 @@ func createProxy(ctx context.Context, t *testing.T, proxyID string, node *regula DirectClusters: []reversetunnel.DirectCluster{{Name: authServer.ClusterName(), Client: client}}, DataDir: t.TempDir(), LockWatcher: proxyLockWatcher, + CertAuthorityWatcher: proxyCAWatcher, }) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, revTunServer.Close()) })