Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #7907

Closed
wants to merge 1 commit into from
Closed

WIP #7907

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 67 additions & 7 deletions lib/reversetunnel/agentpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ type AgentPool struct {
agents map[utils.NetAddr][]*Agent
}

type ProxyFinder func(context.Context) ([]utils.NetAddr, error)

func StaticAddress(proxyAddr string) ProxyFinder {
var addrs []utils.NetAddr
addr, err := utils.ParseAddr(proxyAddr)
if err == nil {
addrs = append(addrs, *addr)
}

return func(context.Context) ([]utils.NetAddr, error) {
return addrs, err
}
}

// AgentPoolConfig holds configuration parameters for the agent pool
type AgentPoolConfig struct {
// Client is client to the auth server this agent connects to receive
Expand Down Expand Up @@ -86,8 +100,9 @@ type AgentPoolConfig struct {
Component string
// ReverseTunnelServer holds all reverse tunnel connections.
ReverseTunnelServer Server
// ProxyAddr points to the address of the ssh proxy
ProxyAddr string
// // ProxyAddr points to the address of the ssh proxy
// ProxyAddr string
FindProxyAddrs ProxyFinder
// Cluster is a cluster name of the proxy.
Cluster string
}
Expand Down Expand Up @@ -130,10 +145,11 @@ func NewAgentPool(ctx context.Context, cfg AgentPoolConfig) (*AgentPool, error)
return nil, trace.Wrap(err)
}

proxyAddr, err := utils.ParseAddr(cfg.ProxyAddr)
if err != nil {
return nil, trace.Wrap(err)
}
// TODO(tcsc): delete
// proxyAddr, err := utils.ParseAddr(cfg.ProxyAddr)
// if err != nil {
// return nil, trace.Wrap(err)
// }

ctx, cancel := context.WithCancel(ctx)
tr, err := track.New(ctx, track.Config{ClusterName: cfg.Cluster})
Expand All @@ -142,6 +158,8 @@ func NewAgentPool(ctx context.Context, cfg AgentPoolConfig) (*AgentPool, error)
return nil, trace.Wrap(err)
}

log.Debugf("Constructing Agent pool with config: %v", cfg)

pool := &AgentPool{
agents: make(map[utils.NetAddr][]*Agent),
proxyTracker: tr,
Expand All @@ -156,13 +174,15 @@ func NewAgentPool(ctx context.Context, cfg AgentPoolConfig) (*AgentPool, error)
},
}),
}
pool.proxyTracker.Start(*proxyAddr)
// TODO(tcsc): delete
//pool.proxyTracker.Start(*proxyAddr)
return pool, nil
}

// Start starts the agent pool
func (m *AgentPool) Start() error {
m.log.Debugf("Starting agent pool %s.%s...", m.cfg.HostUUID, m.cfg.Cluster)
go m.findProxies()
go m.pollAndSyncAgents()
go m.processSeekEvents()
return nil
Expand All @@ -176,6 +196,46 @@ func (m *AgentPool) Stop() {
m.cancel()
}

// TODO(tcsc):
// simple implementation - will not scale over 100,000s of nodes but good
// enough for a Proof-of-concept. Maybe we can rate-limit later?
func (m *AgentPool) findProxies() {
m.log.Debugf("Entering proxy finder routine")
defer m.log.Debugf("Exiting proxy finder routine")

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

pollForProxies := func() {
m.log.Debugf("Polling for runnel proxies...")
proxies, err := m.cfg.FindProxyAddrs(m.ctx)
if err != nil {
m.log.WithError(err).Debug("Failed to find proxies")
return
}

m.log.Debugf("Found %d proxies", len(proxies))

for _, proxyAddr := range proxies {
m.log.Debugf("Configuring proxy on %s", proxyAddr.String())
m.proxyTracker.Start(proxyAddr)
}
}

pollForProxies()

for {
select {
case <-m.ctx.Done():
m.log.Debugf("Halting proxy finding loop")
return

case <-ticker.C:
pollForProxies()
}
}
}

// Wait returns when agent pool is closed
func (m *AgentPool) Wait() {
if m == nil {
Expand Down
76 changes: 39 additions & 37 deletions lib/reversetunnel/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ Package reversetunnel provides interfaces for accessing remote clusters

Reverse Tunnels

Proxy server Proxy agent
Reverse tunnel
+----------+ +---------+
| <----------------------+ |
| | | |
+-----+----------+ +---------+-----+
| | | |
| | | |
+----------------+ +---------------+
Proxy Cluster "A" Proxy Cluster "B"
Proxy server Proxy agent
Reverse tunnel
+----------+ +---------+
| <----------------------+ |
| | | |
+-----+----------+ +---------+-----+
| | | |
| | | |
+----------------+ +---------------+
Proxy Cluster "A" Proxy Cluster "B"


Reverse tunnel is established from a cluster "B" Proxy
Expand All @@ -44,43 +44,45 @@ proxy agents will eventually discover and establish connections to all
proxies in cluster.

* Initially Proxy Agent connects to Proxy 1.

* Proxy 1 starts sending information about all available proxies
to the the Proxy Agent . This process is called "sending discovery request".


+----------+
| <--------+
| | |
+----------+ | +-----------+ +----------+
Proxy 1 +-------------------------------+ |
| | | |
+-----------+ +----------+
Load Balancer Proxy Agent
+----------+
| |
| |
+----------+
Proxy 2
+----------+
| <--------+
| | |
+----------+ | +-----------+ +----------+
Proxy 1 +-------------------------------+ |
| | | |
+-----------+ +----------+
Load Balancer Proxy Agent
+----------+
| |
| |
+----------+
Proxy 2

* Agent will use the discovery request to establish new connections
and check if it has connected and "discovered" all the proxies specified
in the discovery request.
in the discovery request.

* Assuming that load balancer uses fair load balancing algorithm,
agent will eventually discover and connect back to all the proxies.

+----------+
| <--------+
| | |
+----------+ | +-----------+ +----------+
Proxy 1 +-------------------------------+ |
| | | | |
| +-----------+ +----------+
| Load Balancer Proxy Agent
+----------+ |
| <--------+
| |
+----------+
Proxy 2
+----------+
| <--------+
| | |
+----------+ | +-----------+ +----------+
Proxy 1 +-------------------------------+ |
| | | | |
| +-----------+ +----------+
| Load Balancer Proxy Agent
+----------+ |
| <--------+
| |
+----------+
Proxy 2



Expand Down
4 changes: 2 additions & 2 deletions lib/reversetunnel/rc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func (w *RemoteClusterTunnelManager) realNewAgentPool(ctx context.Context, clust
Component: teleport.ComponentProxy,

// Configs for remote cluster.
Cluster: cluster,
ProxyAddr: addr,
Cluster: cluster,
FindProxyAddrs: StaticAddress(addr),
})
if err != nil {
return nil, trace.Wrap(err, "failed creating reverse tunnel pool for remote cluster %q at address %q: %v", cluster, addr, err)
Expand Down
16 changes: 8 additions & 8 deletions lib/service/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,14 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) {
// Create and start the agent pool.
agentPool, err := reversetunnel.NewAgentPool(process.ExitContext(),
reversetunnel.AgentPoolConfig{
Component: teleport.ComponentDatabase,
HostUUID: conn.ServerIdentity.ID.HostUUID,
ProxyAddr: tunnelAddr,
Client: conn.Client,
Server: dbService,
AccessPoint: conn.Client,
HostSigner: conn.ServerIdentity.KeySigner,
Cluster: clusterName,
Component: teleport.ComponentDatabase,
HostUUID: conn.ServerIdentity.ID.HostUUID,
FindProxyAddrs: reversetunnel.StaticAddress(tunnelAddr),
Client: conn.Client,
Server: dbService,
AccessPoint: conn.Client,
HostSigner: conn.ServerIdentity.KeySigner,
Cluster: clusterName,
})
if err != nil {
return trace.Wrap(err)
Expand Down
16 changes: 8 additions & 8 deletions lib/service/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C
agentPool, err = reversetunnel.NewAgentPool(
process.ExitContext(),
reversetunnel.AgentPoolConfig{
Component: teleport.ComponentKube,
HostUUID: conn.ServerIdentity.ID.HostUUID,
ProxyAddr: conn.TunnelProxy(),
Client: conn.Client,
AccessPoint: accessPoint,
HostSigner: conn.ServerIdentity.KeySigner,
Cluster: conn.ServerIdentity.Cert.Extensions[utils.CertExtensionAuthority],
Server: shtl,
Component: teleport.ComponentKube,
HostUUID: conn.ServerIdentity.ID.HostUUID,
FindProxyAddrs: reversetunnel.StaticAddress(conn.TunnelProxy()),
Client: conn.Client,
AccessPoint: accessPoint,
HostSigner: conn.ServerIdentity.KeySigner,
Cluster: conn.ServerIdentity.Cert.Extensions[utils.CertExtensionAuthority],
Server: shtl,
})
if err != nil {
return trace.Wrap(err)
Expand Down
Loading