Skip to content

Commit

Permalink
[ws-manager, supervisor, bridge] Prebuild workspaces are done when th…
Browse files Browse the repository at this point in the history
…eir container stops
  • Loading branch information
geropl committed Jul 8, 2021
1 parent 334c135 commit 8f0c24a
Show file tree
Hide file tree
Showing 57 changed files with 2,527 additions and 1,464 deletions.
46 changes: 39 additions & 7 deletions components/supervisor/pkg/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ const (
KindGit = "git"
)

type ShutdownReason int16

const (
ShutdownReasonSuccess ShutdownReason = 0
ShutdownReasonExecutionError ShutdownReason = 1
)

// Run serves as main entrypoint to the supervisor
func Run(options ...RunOption) {
defer log.Info("supervisor shut down")
Expand Down Expand Up @@ -148,7 +155,7 @@ func Run(options ...RunOption) {

ctx, cancel := context.WithCancel(context.Background())
var (
shutdown = make(chan struct{})
shutdown = make(chan ShutdownReason, 1)
ideReady = &ideReadyState{cond: sync.NewCond(&sync.Mutex{})}
cstate = NewInMemoryContentState(cfg.RepoRoot)
gitpodService = createGitpodService(cfg, tokenService)
Expand Down Expand Up @@ -202,7 +209,7 @@ func Run(options ...RunOption) {
// When in terminating mode, the reaper will send SIGTERM to each child that gets reparented
// to us and is still running. We use this mechanism to send SIGTERM to a shell child processes
// that get reparented once their parent shell terminates during shutdown.
terminatingReaper := make(chan bool)
terminatingReaper := make(chan bool, 1)
// We keep the reaper until the bitter end because:
// - it doesn't need graceful shutdown
// - we want to do as much work as possible (SIGTERM'ing reparented processes during shutdown).
Expand All @@ -220,11 +227,15 @@ func Run(options ...RunOption) {
wg.Add(1)
go startSSHServer(ctx, cfg, &wg)
wg.Add(1)
go taskManager.Run(ctx, &wg)
tasksSuccessChan := make(chan bool, 1)
go taskManager.Run(ctx, &wg, tasksSuccessChan)
wg.Add(1)
go socketActivationForDocker(ctx, &wg, termMux)

if !cfg.isHeadless() {
if cfg.isHeadless() {
wg.Add(1)
go stopWhenTasksAreDone(ctx, &wg, shutdown, tasksSuccessChan)
} else {
wg.Add(1)
go portMgmt.Run(ctx, &wg)
}
Expand All @@ -236,18 +247,20 @@ func Run(options ...RunOption) {
}

log.Error("metadata access is possible - shutting down")
close(shutdown)
shutdown <- ShutdownReasonExecutionError
}()
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
var exitCode int
select {
case <-sigChan:
case <-shutdown:
case shutdownReason := <-shutdown:
exitCode = int(shutdownReason)
}

log.Info("received SIGTERM - tearing down")
log.Info("received SIGTERM (or shutdown) - tearing down")
terminatingReaper <- true
cancel()
err = termMux.Close()
Expand All @@ -260,6 +273,9 @@ func Run(options ...RunOption) {
terminateChildProcesses()

wg.Wait()

log.WithField("exitCode", exitCode).Debug("supervisor exit")
os.Exit(exitCode)
}

func createGitpodService(cfg *Config, tknsrv api.TokenServiceServer) *gitpod.APIoverJSONRPC {
Expand Down Expand Up @@ -772,6 +788,22 @@ func tunnelOverSSH(ctx context.Context, tunneled *ports.TunneledPortsService, ne
<-ctx.Done()
}

func stopWhenTasksAreDone(ctx context.Context, wg *sync.WaitGroup, shutdown chan ShutdownReason, successChan <-chan bool) {
defer wg.Done()
defer close(shutdown)

success := <-successChan
if !success {
// we signal task failure via kubernetes termination log
msg := []byte("headless task failed")
err := ioutil.WriteFile("/dev/termination-log", msg, 0644)
if err != nil {
log.WithError(err).Error("err while writing termination log")
}
}
shutdown <- ShutdownReasonSuccess
}

func startSSHServer(ctx context.Context, cfg *Config, wg *sync.WaitGroup) {
defer wg.Done()

Expand Down
6 changes: 4 additions & 2 deletions components/supervisor/pkg/supervisor/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (tm *tasksManager) init(ctx context.Context) {
}
}

func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup) {
func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup, successChan chan bool) {
defer wg.Done()
defer log.Debug("tasksManager shutdown")

Expand Down Expand Up @@ -275,7 +275,8 @@ func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup) {
for _, task := range tm.tasks {
select {
case <-ctx.Done():
return
success = false
break
case taskSuccess := <-task.successChan:
if !taskSuccess {
success = false
Expand All @@ -286,6 +287,7 @@ func (tm *tasksManager) Run(ctx context.Context, wg *sync.WaitGroup) {
if tm.config.isHeadless() {
tm.reporter.done(success)
}
successChan <- success
}

func (tm *tasksManager) getCommand(task *task) string {
Expand Down
3 changes: 2 additions & 1 deletion components/supervisor/pkg/supervisor/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func TestTaskManager(t *testing.T) {
contentState.MarkContentReady(test.Source)
var wg sync.WaitGroup
wg.Add(1)
go taskManager.Run(context.Background(), &wg)
tasksSuccessChan := make(chan bool, 1)
go taskManager.Run(context.Background(), &wg, tasksSuccessChan)
wg.Wait()
if diff := cmp.Diff(test.ExpectedReporter, reporter); diff != "" {
t.Errorf("unexpected output (-want +got):\n%s", diff)
Expand Down
Loading

0 comments on commit 8f0c24a

Please sign in to comment.