From 4128b4540215eb362a899b2a1b9abce99b379e1d Mon Sep 17 00:00:00 2001 From: Anton Kosyakov Date: Mon, 8 Feb 2021 13:41:52 +0000 Subject: [PATCH] [supervisor] handle rate limitting of exposed ports --- .../supervisor/pkg/ports/exposed-ports.go | 106 +++++++++++++++--- components/supervisor/pkg/ports/ports.go | 43 +++---- components/supervisor/pkg/ports/ports_test.go | 5 +- .../supervisor/pkg/supervisor/supervisor.go | 9 +- gitpod-ws.theia-workspace | 1 + 5 files changed, 123 insertions(+), 41 deletions(-) diff --git a/components/supervisor/pkg/ports/exposed-ports.go b/components/supervisor/pkg/ports/exposed-ports.go index 8aeddc5e5fa984..5c2379dc218790 100644 --- a/components/supervisor/pkg/ports/exposed-ports.go +++ b/components/supervisor/pkg/ports/exposed-ports.go @@ -6,7 +6,9 @@ package ports import ( "context" + "time" + "github.com/gitpod-io/gitpod/common-go/log" gitpod "github.com/gitpod-io/gitpod/gitpod-protocol" ) @@ -27,8 +29,11 @@ type ExposedPortsInterface interface { // error occured), the observer will close both channels. Observe(ctx context.Context) (<-chan []ExposedPort, <-chan error) + // Run starts listening to expose port requests. + Run(ctx context.Context) + // Expose exposes a port to the internet. Upon successful execution any Observer will be updated. - Expose(ctx context.Context, local, global uint32, public bool) error + Expose(ctx context.Context, local, global uint32, public bool) <-chan error } // NoopExposedPorts implements ExposedPortsInterface but does nothing @@ -39,9 +44,14 @@ func (*NoopExposedPorts) Observe(ctx context.Context) (<-chan []ExposedPort, <-c return make(<-chan []ExposedPort), make(<-chan error) } +// Run starts listening to expose port requests. +func (*NoopExposedPorts) Run(ctx context.Context) {} + // Expose exposes a port to the internet. Upon successful execution any Observer will be updated. -func (*NoopExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) error { - return nil +func (*NoopExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) <-chan error { + done := make(chan error) + close(done) + return done } // GitpodExposedPorts uses a connection to the Gitpod server to implement @@ -50,6 +60,34 @@ type GitpodExposedPorts struct { WorkspaceID string InstanceID string C gitpod.APIInterface + + minExposeDelay time.Duration + maxExposeAttempts uint32 + exposeDelayGrowFactor float64 + + requests chan *exposePortRequest +} + +type exposePortRequest struct { + port *gitpod.WorkspaceInstancePort + ctx context.Context + done chan error +} + +// NewGitpodExposedPorts creates a new instance of GitpodExposedPorts +func NewGitpodExposedPorts(workspaceID string, instanceID string, gitpodService gitpod.APIInterface) *GitpodExposedPorts { + return &GitpodExposedPorts{ + WorkspaceID: workspaceID, + InstanceID: instanceID, + C: gitpodService, + + minExposeDelay: 2 * time.Second, + maxExposeAttempts: 5, + exposeDelayGrowFactor: 1.5, + + // allow clients to submit 30 expose requests without blocking + requests: make(chan *exposePortRequest, 30), + } } // Observe starts observing the exposed ports until the context is canceled. @@ -102,22 +140,64 @@ func (g *GitpodExposedPorts) Observe(ctx context.Context) (<-chan []ExposedPort, return reschan, errchan } +// Listen starts listening to expose port requests +func (g *GitpodExposedPorts) Run(ctx context.Context) { + // process multiple parallel requests but process one by one to avoid server/ws-manager rate limitting + // if it does not help then we try to expose the same port again with the exponential backoff. + for { + select { + case <-ctx.Done(): + return + case req := <-g.requests: + g.doExpose(req) + } + } +} + +func (g *GitpodExposedPorts) doExpose(req *exposePortRequest) { + var err error + defer func() { + if err != nil { + req.done <- err + } + close(req.done) + }() + delay := g.minExposeDelay + attempt := 0 + for { + _, err = g.C.OpenPort(req.ctx, g.WorkspaceID, req.port) + if err == nil || req.ctx.Err() != nil || attempt == 5 { + return + } + log.WithError(err).WithField("port", req.port).Warnf("cannot expose port, trying again in %d seconds...", uint32(delay.Seconds())) + select { + case <-req.ctx.Done(): + err = req.ctx.Err() + return + case <-time.After(delay): + delay = time.Duration(float64(delay) * g.exposeDelayGrowFactor) + attempt++ + } + } +} + // Expose exposes a port to the internet. Upon successful execution any Observer will be updated. -func (g *GitpodExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) error { +func (g *GitpodExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) <-chan error { var v string if public { v = "public" } else { v = "private" } - _, err := g.C.OpenPort(ctx, g.WorkspaceID, &gitpod.WorkspaceInstancePort{ - Port: float64(local), - TargetPort: float64(global), - Visibility: v, - }) - if err != nil { - return err + req := &exposePortRequest{ + port: &gitpod.WorkspaceInstancePort{ + Port: float64(local), + TargetPort: float64(global), + Visibility: v, + }, + ctx: ctx, + done: make(chan error), } - - return nil + g.requests <- req + return req.done } diff --git a/components/supervisor/pkg/ports/ports.go b/components/supervisor/pkg/ports/ports.go index f4034c30c655fb..818dbbf4ea2611 100644 --- a/components/supervisor/pkg/ports/ports.go +++ b/components/supervisor/pkg/ports/ports.go @@ -49,7 +49,8 @@ func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, confi state: state, subscriptions: make(map[*Subscription]struct{}), - proxyStarter: startLocalhostProxy} + proxyStarter: startLocalhostProxy, + } } type localhostProxy struct { @@ -125,6 +126,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { }() defer cancel() + go pm.E.Run(ctx) exposedUpdates, exposedErrors := pm.E.Observe(ctx) servedUpdates, servedErrors := pm.S.Observe(ctx) configUpdates, configErrors := pm.C.Observe(ctx) @@ -175,7 +177,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) { // we received just an error, but no update continue } - pm.updateState(exposed, served, configured) + pm.updateState(ctx, exposed, served, configured) } } @@ -187,7 +189,7 @@ func (pm *Manager) Status() []*api.PortsStatus { return pm.getStatus() } -func (pm *Manager) updateState(exposed []ExposedPort, served []ServedPort, configured *Configs) { +func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, served []ServedPort, configured *Configs) { pm.mu.Lock() defer pm.mu.Unlock() @@ -221,7 +223,7 @@ func (pm *Manager) updateState(exposed []ExposedPort, served []ServedPort, confi pm.configs = configured } - newState := pm.nextState() + newState := pm.nextState(ctx) stateChanged := !reflect.DeepEqual(newState, pm.state) pm.state = newState @@ -241,10 +243,7 @@ func (pm *Manager) updateState(exposed []ExposedPort, served []ServedPort, confi } } -func (pm *Manager) nextState() map[uint32]*managedPort { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - +func (pm *Manager) nextState(ctx context.Context) map[uint32]*managedPort { state := make(map[uint32]*managedPort) // 1. first capture exposed since they don't depend on configured or served ports @@ -355,14 +354,21 @@ func (pm *Manager) nextState() map[uint32]*managedPort { return state } +// clients should guard a call with check whether such port is already exposed or auto exposed func (pm *Manager) autoExpose(ctx context.Context, mp *managedPort, public bool) { - err := pm.E.Expose(ctx, mp.LocalhostPort, mp.GlobalPort, public) - if err != nil { - log.WithError(err).WithField("port", *mp).Warn("cannot auto-expose port") - return - } + exposing := pm.E.Expose(ctx, mp.LocalhostPort, mp.GlobalPort, public) + go func() { + err := <-exposing + if err != nil { + if err != context.Canceled { + log.WithError(err).WithField("port", *mp).Warn("cannot auto-expose port") + } + return + } + log.WithField("port", *mp).Info("auto-exposed port") + }() pm.autoExposed[mp.LocalhostPort] = mp.GlobalPort - log.WithField("port", *mp).Info("auto-expose port") + log.WithField("port", *mp).Info("auto-exposing port") } func (pm *Manager) updateProxies() { @@ -489,19 +495,16 @@ func (pm *Manager) Expose(ctx context.Context, port uint32, targetPort uint32) e pm.mu.RUnlock() unlock = false - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() global := targetPort if global == 0 { global = port } public := exists && config.Visibility != "private" - err := pm.E.Expose(ctx, port, global, public) - if err != nil { + err := <-pm.E.Expose(ctx, port, global, public) + if err != nil && err != context.Canceled { log.WithError(err).WithField("port", port).WithField("targetPort", targetPort).Error("cannot expose port") - return err } - return nil + return err } var ( diff --git a/components/supervisor/pkg/ports/ports_test.go b/components/supervisor/pkg/ports/ports_test.go index 7961dfea1bbbe7..6748260a1a853d 100644 --- a/components/supervisor/pkg/ports/ports_test.go +++ b/components/supervisor/pkg/ports/ports_test.go @@ -562,7 +562,10 @@ func (tep *testExposedPorts) Observe(ctx context.Context) (<-chan []ExposedPort, return tep.Changes, tep.Error } -func (tep *testExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) error { +func (tep *testExposedPorts) Run(ctx context.Context) { +} + +func (tep *testExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) <-chan error { tep.mu.Lock() defer tep.mu.Unlock() diff --git a/components/supervisor/pkg/supervisor/supervisor.go b/components/supervisor/pkg/supervisor/supervisor.go index b15d482c22a427..1dbcf0c8cec18c 100644 --- a/components/supervisor/pkg/supervisor/supervisor.go +++ b/components/supervisor/pkg/supervisor/supervisor.go @@ -263,17 +263,12 @@ func createGitpodService(cfg *Config, tknsrv api.TokenServiceServer) *gitpod.API return gitpodService } -func createExposedPortsImpl(cfg *Config, gitpodService *gitpod.APIoverJSONRPC) (res ports.ExposedPortsInterface) { +func createExposedPortsImpl(cfg *Config, gitpodService *gitpod.APIoverJSONRPC) ports.ExposedPortsInterface { if gitpodService == nil { log.Error("auto-port exposure won't work") return &ports.NoopExposedPorts{} } - - return &ports.GitpodExposedPorts{ - WorkspaceID: cfg.WorkspaceID, - InstanceID: cfg.WorkspaceInstanceID, - C: gitpodService, - } + return ports.NewGitpodExposedPorts(cfg.WorkspaceID, cfg.WorkspaceInstanceID, gitpodService) } func configureGit(cfg *Config) { diff --git a/gitpod-ws.theia-workspace b/gitpod-ws.theia-workspace index b444f06927f0f6..32aaa499cad57f 100644 --- a/gitpod-ws.theia-workspace +++ b/gitpod-ws.theia-workspace @@ -1,6 +1,7 @@ { "folders": [ { "path": "." }, + { "path": "components/gitpod-protocol" }, { "path": "components/blobserve" }, { "path": "components/common-go" }, { "path": "components/content-service" },