Skip to content

Commit

Permalink
[supervisor] handle rate limitting of exposed ports
Browse files Browse the repository at this point in the history
  • Loading branch information
akosyakov committed Feb 8, 2021
1 parent 986bd94 commit 4128b45
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 41 deletions.
106 changes: 93 additions & 13 deletions components/supervisor/pkg/ports/exposed-ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package ports

import (
"context"
"time"

"github.com/gitpod-io/gitpod/common-go/log"
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
)

Expand All @@ -27,8 +29,11 @@ type ExposedPortsInterface interface {
// error occured), the observer will close both channels.
Observe(ctx context.Context) (<-chan []ExposedPort, <-chan error)

// Run starts listening to expose port requests.
Run(ctx context.Context)

// Expose exposes a port to the internet. Upon successful execution any Observer will be updated.
Expose(ctx context.Context, local, global uint32, public bool) error
Expose(ctx context.Context, local, global uint32, public bool) <-chan error
}

// NoopExposedPorts implements ExposedPortsInterface but does nothing
Expand All @@ -39,9 +44,14 @@ func (*NoopExposedPorts) Observe(ctx context.Context) (<-chan []ExposedPort, <-c
return make(<-chan []ExposedPort), make(<-chan error)
}

// Run starts listening to expose port requests.
func (*NoopExposedPorts) Run(ctx context.Context) {}

// Expose exposes a port to the internet. Upon successful execution any Observer will be updated.
func (*NoopExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) error {
return nil
func (*NoopExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) <-chan error {
done := make(chan error)
close(done)
return done
}

// GitpodExposedPorts uses a connection to the Gitpod server to implement
Expand All @@ -50,6 +60,34 @@ type GitpodExposedPorts struct {
WorkspaceID string
InstanceID string
C gitpod.APIInterface

minExposeDelay time.Duration
maxExposeAttempts uint32
exposeDelayGrowFactor float64

requests chan *exposePortRequest
}

type exposePortRequest struct {
port *gitpod.WorkspaceInstancePort
ctx context.Context
done chan error
}

// NewGitpodExposedPorts creates a new instance of GitpodExposedPorts
func NewGitpodExposedPorts(workspaceID string, instanceID string, gitpodService gitpod.APIInterface) *GitpodExposedPorts {
return &GitpodExposedPorts{
WorkspaceID: workspaceID,
InstanceID: instanceID,
C: gitpodService,

minExposeDelay: 2 * time.Second,
maxExposeAttempts: 5,
exposeDelayGrowFactor: 1.5,

// allow clients to submit 30 expose requests without blocking
requests: make(chan *exposePortRequest, 30),
}
}

// Observe starts observing the exposed ports until the context is canceled.
Expand Down Expand Up @@ -102,22 +140,64 @@ func (g *GitpodExposedPorts) Observe(ctx context.Context) (<-chan []ExposedPort,
return reschan, errchan
}

// Listen starts listening to expose port requests
func (g *GitpodExposedPorts) Run(ctx context.Context) {
// process multiple parallel requests but process one by one to avoid server/ws-manager rate limitting
// if it does not help then we try to expose the same port again with the exponential backoff.
for {
select {
case <-ctx.Done():
return
case req := <-g.requests:
g.doExpose(req)
}
}
}

func (g *GitpodExposedPorts) doExpose(req *exposePortRequest) {
var err error
defer func() {
if err != nil {
req.done <- err
}
close(req.done)
}()
delay := g.minExposeDelay
attempt := 0
for {
_, err = g.C.OpenPort(req.ctx, g.WorkspaceID, req.port)
if err == nil || req.ctx.Err() != nil || attempt == 5 {
return
}
log.WithError(err).WithField("port", req.port).Warnf("cannot expose port, trying again in %d seconds...", uint32(delay.Seconds()))
select {
case <-req.ctx.Done():
err = req.ctx.Err()
return
case <-time.After(delay):
delay = time.Duration(float64(delay) * g.exposeDelayGrowFactor)
attempt++
}
}
}

// Expose exposes a port to the internet. Upon successful execution any Observer will be updated.
func (g *GitpodExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) error {
func (g *GitpodExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) <-chan error {
var v string
if public {
v = "public"
} else {
v = "private"
}
_, err := g.C.OpenPort(ctx, g.WorkspaceID, &gitpod.WorkspaceInstancePort{
Port: float64(local),
TargetPort: float64(global),
Visibility: v,
})
if err != nil {
return err
req := &exposePortRequest{
port: &gitpod.WorkspaceInstancePort{
Port: float64(local),
TargetPort: float64(global),
Visibility: v,
},
ctx: ctx,
done: make(chan error),
}

return nil
g.requests <- req
return req.done
}
43 changes: 23 additions & 20 deletions components/supervisor/pkg/ports/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, confi

state: state,
subscriptions: make(map[*Subscription]struct{}),
proxyStarter: startLocalhostProxy}
proxyStarter: startLocalhostProxy,
}
}

type localhostProxy struct {
Expand Down Expand Up @@ -125,6 +126,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
}()
defer cancel()

go pm.E.Run(ctx)
exposedUpdates, exposedErrors := pm.E.Observe(ctx)
servedUpdates, servedErrors := pm.S.Observe(ctx)
configUpdates, configErrors := pm.C.Observe(ctx)
Expand Down Expand Up @@ -175,7 +177,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
// we received just an error, but no update
continue
}
pm.updateState(exposed, served, configured)
pm.updateState(ctx, exposed, served, configured)
}
}

Expand All @@ -187,7 +189,7 @@ func (pm *Manager) Status() []*api.PortsStatus {
return pm.getStatus()
}

func (pm *Manager) updateState(exposed []ExposedPort, served []ServedPort, configured *Configs) {
func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, served []ServedPort, configured *Configs) {
pm.mu.Lock()
defer pm.mu.Unlock()

Expand Down Expand Up @@ -221,7 +223,7 @@ func (pm *Manager) updateState(exposed []ExposedPort, served []ServedPort, confi
pm.configs = configured
}

newState := pm.nextState()
newState := pm.nextState(ctx)
stateChanged := !reflect.DeepEqual(newState, pm.state)
pm.state = newState

Expand All @@ -241,10 +243,7 @@ func (pm *Manager) updateState(exposed []ExposedPort, served []ServedPort, confi
}
}

func (pm *Manager) nextState() map[uint32]*managedPort {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

func (pm *Manager) nextState(ctx context.Context) map[uint32]*managedPort {
state := make(map[uint32]*managedPort)

// 1. first capture exposed since they don't depend on configured or served ports
Expand Down Expand Up @@ -355,14 +354,21 @@ func (pm *Manager) nextState() map[uint32]*managedPort {
return state
}

// clients should guard a call with check whether such port is already exposed or auto exposed
func (pm *Manager) autoExpose(ctx context.Context, mp *managedPort, public bool) {
err := pm.E.Expose(ctx, mp.LocalhostPort, mp.GlobalPort, public)
if err != nil {
log.WithError(err).WithField("port", *mp).Warn("cannot auto-expose port")
return
}
exposing := pm.E.Expose(ctx, mp.LocalhostPort, mp.GlobalPort, public)
go func() {
err := <-exposing
if err != nil {
if err != context.Canceled {
log.WithError(err).WithField("port", *mp).Warn("cannot auto-expose port")
}
return
}
log.WithField("port", *mp).Info("auto-exposed port")
}()
pm.autoExposed[mp.LocalhostPort] = mp.GlobalPort
log.WithField("port", *mp).Info("auto-expose port")
log.WithField("port", *mp).Info("auto-exposing port")
}

func (pm *Manager) updateProxies() {
Expand Down Expand Up @@ -489,19 +495,16 @@ func (pm *Manager) Expose(ctx context.Context, port uint32, targetPort uint32) e
pm.mu.RUnlock()
unlock = false

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
global := targetPort
if global == 0 {
global = port
}
public := exists && config.Visibility != "private"
err := pm.E.Expose(ctx, port, global, public)
if err != nil {
err := <-pm.E.Expose(ctx, port, global, public)
if err != nil && err != context.Canceled {
log.WithError(err).WithField("port", port).WithField("targetPort", targetPort).Error("cannot expose port")
return err
}
return nil
return err
}

var (
Expand Down
5 changes: 4 additions & 1 deletion components/supervisor/pkg/ports/ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,10 @@ func (tep *testExposedPorts) Observe(ctx context.Context) (<-chan []ExposedPort,
return tep.Changes, tep.Error
}

func (tep *testExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) error {
func (tep *testExposedPorts) Run(ctx context.Context) {
}

func (tep *testExposedPorts) Expose(ctx context.Context, local, global uint32, public bool) <-chan error {
tep.mu.Lock()
defer tep.mu.Unlock()

Expand Down
9 changes: 2 additions & 7 deletions components/supervisor/pkg/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,12 @@ func createGitpodService(cfg *Config, tknsrv api.TokenServiceServer) *gitpod.API
return gitpodService
}

func createExposedPortsImpl(cfg *Config, gitpodService *gitpod.APIoverJSONRPC) (res ports.ExposedPortsInterface) {
func createExposedPortsImpl(cfg *Config, gitpodService *gitpod.APIoverJSONRPC) ports.ExposedPortsInterface {
if gitpodService == nil {
log.Error("auto-port exposure won't work")
return &ports.NoopExposedPorts{}
}

return &ports.GitpodExposedPorts{
WorkspaceID: cfg.WorkspaceID,
InstanceID: cfg.WorkspaceInstanceID,
C: gitpodService,
}
return ports.NewGitpodExposedPorts(cfg.WorkspaceID, cfg.WorkspaceInstanceID, gitpodService)
}

func configureGit(cfg *Config) {
Expand Down
1 change: 1 addition & 0 deletions gitpod-ws.theia-workspace
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"folders": [
{ "path": "." },
{ "path": "components/gitpod-protocol" },
{ "path": "components/blobserve" },
{ "path": "components/common-go" },
{ "path": "components/content-service" },
Expand Down

0 comments on commit 4128b45

Please sign in to comment.