diff --git a/lib/reversetunnel/agentpool.go b/lib/reversetunnel/agentpool.go index 12289465e45f3..e1a58520a54b7 100644 --- a/lib/reversetunnel/agentpool.go +++ b/lib/reversetunnel/agentpool.go @@ -59,6 +59,20 @@ type AgentPool struct { agents map[utils.NetAddr][]*Agent } +type ProxyFinder func(context.Context) ([]utils.NetAddr, error) + +func StaticAddress(proxyAddr string) ProxyFinder { + var addrs []utils.NetAddr + addr, err := utils.ParseAddr(proxyAddr) + if err == nil { + addrs = append(addrs, *addr) + } + + return func(context.Context) ([]utils.NetAddr, error) { + return addrs, err + } +} + // AgentPoolConfig holds configuration parameters for the agent pool type AgentPoolConfig struct { // Client is client to the auth server this agent connects to receive @@ -86,8 +100,9 @@ type AgentPoolConfig struct { Component string // ReverseTunnelServer holds all reverse tunnel connections. ReverseTunnelServer Server - // ProxyAddr points to the address of the ssh proxy - ProxyAddr string + // // ProxyAddr points to the address of the ssh proxy + // ProxyAddr string + FindProxyAddrs ProxyFinder // Cluster is a cluster name of the proxy. Cluster string } @@ -130,10 +145,11 @@ func NewAgentPool(ctx context.Context, cfg AgentPoolConfig) (*AgentPool, error) return nil, trace.Wrap(err) } - proxyAddr, err := utils.ParseAddr(cfg.ProxyAddr) - if err != nil { - return nil, trace.Wrap(err) - } + // TODO(tcsc): delete + // proxyAddr, err := utils.ParseAddr(cfg.ProxyAddr) + // if err != nil { + // return nil, trace.Wrap(err) + // } ctx, cancel := context.WithCancel(ctx) tr, err := track.New(ctx, track.Config{ClusterName: cfg.Cluster}) @@ -142,6 +158,8 @@ func NewAgentPool(ctx context.Context, cfg AgentPoolConfig) (*AgentPool, error) return nil, trace.Wrap(err) } + log.Debugf("Constructing Agent pool with config: %v", cfg) + pool := &AgentPool{ agents: make(map[utils.NetAddr][]*Agent), proxyTracker: tr, @@ -156,13 +174,15 @@ func NewAgentPool(ctx context.Context, cfg AgentPoolConfig) (*AgentPool, error) }, }), } - pool.proxyTracker.Start(*proxyAddr) + // TODO(tcsc): delete + //pool.proxyTracker.Start(*proxyAddr) return pool, nil } // Start starts the agent pool func (m *AgentPool) Start() error { m.log.Debugf("Starting agent pool %s.%s...", m.cfg.HostUUID, m.cfg.Cluster) + go m.findProxies() go m.pollAndSyncAgents() go m.processSeekEvents() return nil @@ -176,6 +196,46 @@ func (m *AgentPool) Stop() { m.cancel() } +// TODO(tcsc): +// simple implementation - will not scale over 100,000s of nodes but good +// enough for a Proof-of-concept. Maybe we can rate-limit later? +func (m *AgentPool) findProxies() { + m.log.Debugf("Entering proxy finder routine") + defer m.log.Debugf("Exiting proxy finder routine") + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + pollForProxies := func() { + m.log.Debugf("Polling for runnel proxies...") + proxies, err := m.cfg.FindProxyAddrs(m.ctx) + if err != nil { + m.log.WithError(err).Debug("Failed to find proxies") + return + } + + m.log.Debugf("Found %d proxies", len(proxies)) + + for _, proxyAddr := range proxies { + m.log.Debugf("Configuring proxy on %s", proxyAddr.String()) + m.proxyTracker.Start(proxyAddr) + } + } + + pollForProxies() + + for { + select { + case <-m.ctx.Done(): + m.log.Debugf("Halting proxy finding loop") + return + + case <-ticker.C: + pollForProxies() + } + } +} + // Wait returns when agent pool is closed func (m *AgentPool) Wait() { if m == nil { diff --git a/lib/reversetunnel/doc.go b/lib/reversetunnel/doc.go index c2cedf13bbc70..d42d3527ec411 100644 --- a/lib/reversetunnel/doc.go +++ b/lib/reversetunnel/doc.go @@ -20,16 +20,16 @@ Package reversetunnel provides interfaces for accessing remote clusters Reverse Tunnels - Proxy server Proxy agent - Reverse tunnel - +----------+ +---------+ - | <----------------------+ | - | | | | -+-----+----------+ +---------+-----+ -| | | | -| | | | -+----------------+ +---------------+ - Proxy Cluster "A" Proxy Cluster "B" + Proxy server Proxy agent + Reverse tunnel + +----------+ +---------+ + | <----------------------+ | + | | | | + +-----+----------+ +---------+-----+ + | | | | + | | | | + +----------------+ +---------------+ + Proxy Cluster "A" Proxy Cluster "B" Reverse tunnel is established from a cluster "B" Proxy @@ -44,43 +44,45 @@ proxy agents will eventually discover and establish connections to all proxies in cluster. * Initially Proxy Agent connects to Proxy 1. + * Proxy 1 starts sending information about all available proxies to the the Proxy Agent . This process is called "sending discovery request". -+----------+ -| <--------+ -| | | -+----------+ | +-----------+ +----------+ - Proxy 1 +-------------------------------+ | - | | | | - +-----------+ +----------+ - Load Balancer Proxy Agent -+----------+ -| | -| | -+----------+ - Proxy 2 + +----------+ + | <--------+ + | | | + +----------+ | +-----------+ +----------+ + Proxy 1 +-------------------------------+ | + | | | | + +-----------+ +----------+ + Load Balancer Proxy Agent + +----------+ + | | + | | + +----------+ + Proxy 2 * Agent will use the discovery request to establish new connections and check if it has connected and "discovered" all the proxies specified - in the discovery request. +in the discovery request. + * Assuming that load balancer uses fair load balancing algorithm, agent will eventually discover and connect back to all the proxies. -+----------+ -| <--------+ -| | | -+----------+ | +-----------+ +----------+ - Proxy 1 +-------------------------------+ | - | | | | | - | +-----------+ +----------+ - | Load Balancer Proxy Agent -+----------+ | -| <--------+ -| | -+----------+ - Proxy 2 + +----------+ + | <--------+ + | | | + +----------+ | +-----------+ +----------+ + Proxy 1 +-------------------------------+ | + | | | | | + | +-----------+ +----------+ + | Load Balancer Proxy Agent + +----------+ | + | <--------+ + | | + +----------+ + Proxy 2 diff --git a/lib/reversetunnel/rc_manager.go b/lib/reversetunnel/rc_manager.go index 0a64086eaef97..67c78d6e10fe3 100644 --- a/lib/reversetunnel/rc_manager.go +++ b/lib/reversetunnel/rc_manager.go @@ -215,8 +215,8 @@ func (w *RemoteClusterTunnelManager) realNewAgentPool(ctx context.Context, clust Component: teleport.ComponentProxy, // Configs for remote cluster. - Cluster: cluster, - ProxyAddr: addr, + Cluster: cluster, + FindProxyAddrs: StaticAddress(addr), }) if err != nil { return nil, trace.Wrap(err, "failed creating reverse tunnel pool for remote cluster %q at address %q: %v", cluster, addr, err) diff --git a/lib/service/db.go b/lib/service/db.go index b45ab28862f5e..cfeae0905ed5a 100644 --- a/lib/service/db.go +++ b/lib/service/db.go @@ -203,14 +203,14 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) { // Create and start the agent pool. agentPool, err := reversetunnel.NewAgentPool(process.ExitContext(), reversetunnel.AgentPoolConfig{ - Component: teleport.ComponentDatabase, - HostUUID: conn.ServerIdentity.ID.HostUUID, - ProxyAddr: tunnelAddr, - Client: conn.Client, - Server: dbService, - AccessPoint: conn.Client, - HostSigner: conn.ServerIdentity.KeySigner, - Cluster: clusterName, + Component: teleport.ComponentDatabase, + HostUUID: conn.ServerIdentity.ID.HostUUID, + FindProxyAddrs: reversetunnel.StaticAddress(tunnelAddr), + Client: conn.Client, + Server: dbService, + AccessPoint: conn.Client, + HostSigner: conn.ServerIdentity.KeySigner, + Cluster: clusterName, }) if err != nil { return trace.Wrap(err) diff --git a/lib/service/kubernetes.go b/lib/service/kubernetes.go index 8f417e2d8b20c..6e943968df2d0 100644 --- a/lib/service/kubernetes.go +++ b/lib/service/kubernetes.go @@ -135,14 +135,14 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C agentPool, err = reversetunnel.NewAgentPool( process.ExitContext(), reversetunnel.AgentPoolConfig{ - Component: teleport.ComponentKube, - HostUUID: conn.ServerIdentity.ID.HostUUID, - ProxyAddr: conn.TunnelProxy(), - Client: conn.Client, - AccessPoint: accessPoint, - HostSigner: conn.ServerIdentity.KeySigner, - Cluster: conn.ServerIdentity.Cert.Extensions[utils.CertExtensionAuthority], - Server: shtl, + Component: teleport.ComponentKube, + HostUUID: conn.ServerIdentity.ID.HostUUID, + FindProxyAddrs: reversetunnel.StaticAddress(conn.TunnelProxy()), + Client: conn.Client, + AccessPoint: accessPoint, + HostSigner: conn.ServerIdentity.KeySigner, + Cluster: conn.ServerIdentity.Cert.Extensions[utils.CertExtensionAuthority], + Server: shtl, }) if err != nil { return trace.Wrap(err) diff --git a/lib/service/service.go b/lib/service/service.go index 0a07d460397e7..b9e161492a61e 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1862,18 +1862,35 @@ func (process *TeleportProcess) initSSH() error { return trace.Wrap(err) } + findProxyAddr := func(ctx context.Context) ([]utils.NetAddr, error) { + log.Debugf("Attempting to find proxies manually") + // if isInSingleProcessMode { + // } + + addrText, err := process.findReverseTunnel(process.Config.AuthServers) + if err != nil { + return nil, trace.Wrap(err) + } + + addr, err := utils.ParseAddr(addrText) + if err != nil { + return nil, trace.Wrap(err) + } + return []utils.NetAddr{*addr}, nil + } + // Create and start an agent pool. agentPool, err = reversetunnel.NewAgentPool( process.ExitContext(), reversetunnel.AgentPoolConfig{ - Component: teleport.ComponentNode, - HostUUID: conn.ServerIdentity.ID.HostUUID, - ProxyAddr: conn.TunnelProxy(), - Client: conn.Client, - AccessPoint: conn.Client, - HostSigner: conn.ServerIdentity.KeySigner, - Cluster: conn.ServerIdentity.Cert.Extensions[utils.CertExtensionAuthority], - Server: s, + Component: teleport.ComponentNode, + HostUUID: conn.ServerIdentity.ID.HostUUID, + FindProxyAddrs: findProxyAddr, //reversetunnel.StaticAddress(conn.TunnelProxy()), + Client: conn.Client, + AccessPoint: conn.Client, + HostSigner: conn.ServerIdentity.KeySigner, + Cluster: conn.ServerIdentity.Cert.Extensions[utils.CertExtensionAuthority], + Server: s, }) if err != nil { return trace.Wrap(err) @@ -3199,19 +3216,29 @@ func (process *TeleportProcess) initApps() { // If it was not, it is running in single process mode which is used for // development and demos. In that case, wait until all dependencies (like // auth and reverse tunnel server) are ready before starting. - var tunnelAddr string - if conn.TunnelProxy() != "" { - tunnelAddr = conn.TunnelProxy() - } else { - if tunnelAddr, ok = process.singleProcessMode(); !ok { - return trace.BadParameter(`failed to find reverse tunnel address, if running in single process mode, make sure "auth_service", "proxy_service", and "app_service" are all enabled`) - } + // TODO(tcsc): delete + // var tunnelAddr string + _, isInSingleProcessMode := process.singleProcessMode() + if isInSingleProcessMode { // Block and wait for all dependencies to start before starting. log.Debugf("Waiting for application service dependencies to start.") process.waitForAppDepend() } + // TODO(tcsc): work out what to do with this + // if conn.TunnelProxy() != "" { + // tunnelAddr = conn.TunnelProxy() + // } else { + // if tunnelAddr, ok = process.singleProcessMode(); !ok { + // return trace.BadParameter(`failed to find reverse tunnel address, if running in single process mode, make sure "auth_service", "proxy_service", and "app_service" are all enabled`) + // } + + // // Block and wait for all dependencies to start before starting. + // log.Debugf("Waiting for application service dependencies to start.") + // process.waitForAppDepend() + // } + // Create a caching client to the Auth Server. It is to reduce load on // the Auth Server. accessPoint, err := process.newLocalCache(conn.Client, cache.ForApps, []string{component}) @@ -3339,17 +3366,34 @@ func (process *TeleportProcess) initApps() { // and (dynamic) label update. appServer.Start() + findProxyAddr := func(ctx context.Context) ([]utils.NetAddr, error) { + log.Debugf("Attempting to find proxies manually") + // if isInSingleProcessMode { + // } + + addrText, err := process.findReverseTunnel(process.Config.AuthServers) + if err != nil { + return nil, trace.Wrap(err) + } + + addr, err := utils.ParseAddr(addrText) + if err != nil { + return nil, trace.Wrap(err) + } + return []utils.NetAddr{*addr}, nil + } + // Create and start an agent pool. agentPool, err = reversetunnel.NewAgentPool(process.ExitContext(), reversetunnel.AgentPoolConfig{ - Component: teleport.ComponentApp, - HostUUID: conn.ServerIdentity.ID.HostUUID, - ProxyAddr: tunnelAddr, - Client: conn.Client, - Server: appServer, - AccessPoint: accessPoint, - HostSigner: conn.ServerIdentity.KeySigner, - Cluster: clusterName, + Component: teleport.ComponentApp, + HostUUID: conn.ServerIdentity.ID.HostUUID, + FindProxyAddrs: findProxyAddr, + Client: conn.Client, + Server: appServer, + AccessPoint: accessPoint, + HostSigner: conn.ServerIdentity.KeySigner, + Cluster: clusterName, }) if err != nil { return trace.Wrap(err)