Skip to content

Commit

Permalink
Refactor upload completer start logic; Remove expiration update on se…
Browse files Browse the repository at this point in the history
…ssion tracker to prevent race condition.
  • Loading branch information
Joerger committed Apr 20, 2022
1 parent 144dbf7 commit 61221a2
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 50 deletions.
44 changes: 22 additions & 22 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,41 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error {
return nil
}

// NewUploadCompleter returns a new instance of the upload completer
// the completer has to be closed to release resources and goroutines
func NewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) (*UploadCompleter, error) {
// StartNewUploadCompleter starts an upload completer background process. It can
// be closed by closing the provided context.
func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error {
uc, err := newUploadCompleter(cfg)
if err != nil {
return trace.Wrap(err)
}
go uc.start(ctx)
return nil
}

// newUploadCompleter returns a new instance of the upload completer without
// starting it. Useful in tests.
func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
ctx, cancel := context.WithCancel(ctx)
u := &UploadCompleter{
cfg: cfg,
log: log.WithFields(log.Fields{
trace.Component: teleport.Component(cfg.Component, "completer"),
}),
cancel: cancel,
closeCtx: ctx,
}
go u.run()
return u, nil
}

// UploadCompleter periodically scans uploads that have not been completed
// and completes them
type UploadCompleter struct {
cfg UploadCompleterConfig
log *log.Entry
cancel context.CancelFunc
closeCtx context.Context
cfg UploadCompleterConfig
log *log.Entry
}

func (u *UploadCompleter) run() {
// start starts a goroutine to periodically check for and complete abandoned uploads
func (u *UploadCompleter) start(ctx context.Context) {
periodic := interval.New(interval.Config{
Duration: u.cfg.CheckPeriod,
FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod),
Expand All @@ -112,17 +118,17 @@ func (u *UploadCompleter) run() {
for {
select {
case <-periodic.Next():
if err := u.CheckUploads(u.closeCtx); err != nil {
if err := u.checkUploads(ctx); err != nil {
u.log.WithError(err).Warningf("Failed to check uploads.")
}
case <-u.closeCtx.Done():
case <-ctx.Done():
return
}
}
}

// CheckUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
// checkUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) checkUploads(ctx context.Context) error {
trackers, err := u.cfg.SessionTracker.GetActiveSessionTrackers(ctx)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -208,12 +214,6 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
return nil
}

// Close closes all outstanding operations without waiting
func (u *UploadCompleter) Close() error {
u.cancel()
return nil
}

func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData UploadMetadata) error {
// at this point, we don't know whether we'll need to emit a session.end or a
// windows.desktop.session.end, but as soon as we see the session start we'll
Expand Down
14 changes: 5 additions & 9 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock
ctx := context.Background()

log := &mockAuditLog{}

Expand All @@ -58,24 +57,23 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
MockTrackers: []types.SessionTracker{sessionTracker},
}

uc, err := NewUploadCompleter(ctx, UploadCompleterConfig{
uc, err := newUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
})
require.NoError(t, err)
defer uc.Close()

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

clock.Advance(1 * time.Hour)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.True(t, mu.uploads[upload.ID].completed)
}
Expand All @@ -84,7 +82,6 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
// emits session.end or windows.desktop.session.end events for sessions
// that are completed.
func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
ctx := context.Background()
for _, test := range []struct {
startEvent apievents.AuditEvent
endEventType string
Expand All @@ -101,14 +98,13 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
sessionEvents: []apievents.AuditEvent{test.startEvent},
}

uc, err := NewUploadCompleter(ctx, UploadCompleterConfig{
uc, err := newUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
Clock: clock,
SessionTracker: &eventstest.MockSessionTrackerService{},
})
require.NoError(t, err)
defer uc.Close()

upload, err := mu.CreateUpload(context.Background(), session.NewID())
require.NoError(t, err)
Expand All @@ -118,7 +114,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
_, err = mu.UploadPart(context.Background(), *upload, 0, strings.NewReader("part"))
require.NoError(t, err)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)

// advance the clock to force the asynchronous session end event emission
Expand Down
10 changes: 5 additions & 5 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerServi
eventsCh: make(chan events.UploadEvent, cfg.ConcurrentUploads),
}

// completer scans for uploads that have been initiated, but not completed
// by the client (aborted or crashed) and completes them
uploader.uploadCompleter, err = events.NewUploadCompleter(uploader.ctx, events.UploadCompleterConfig{
// upload completer scans for uploads that have been initiated, but not completed
// by the client (aborted or crashed) and completes them. It will be closed once
// the uploader context is closed.
err = events.StartNewUploadCompleter(uploader.ctx, events.UploadCompleterConfig{
Uploader: handler,
AuditLog: cfg.AuditLog,
SessionTracker: sessionTracker,
Expand Down Expand Up @@ -298,9 +299,8 @@ func (u *Uploader) sessionErrorFilePath(sid session.ID) string {
}

// Close closes all operations
func (u *Uploader) Close() error {
func (u *Uploader) Close() {
u.cancel()
return u.uploadCompleter.Close()
}

type upload struct {
Expand Down
3 changes: 0 additions & 3 deletions lib/events/filesessions/fileasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,6 @@ type uploaderPack struct {

func (u *uploaderPack) Close(t *testing.T) {
u.cancel()

err := u.uploader.Close()
require.NoError(t, err)
}

func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack {
Expand Down
10 changes: 3 additions & 7 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,10 +1261,9 @@ func (process *TeleportProcess) initAuthService() error {
process.setLocalAuth(authServer)

// Upload completer is responsible for checking for initiated but abandoned
// session uploads and completing them
var uploadCompleter *events.UploadCompleter
// session uploads and completing them. it will be closed once the process exits.
if uploadHandler != nil {
uploadCompleter, err = events.NewUploadCompleter(process.ExitContext(), events.UploadCompleterConfig{
err = events.StartNewUploadCompleter(process.ExitContext(), events.UploadCompleterConfig{
Uploader: uploadHandler,
Component: teleport.ComponentAuth,
AuditLog: process.auditLog,
Expand Down Expand Up @@ -1486,9 +1485,6 @@ func (process *TeleportProcess) initAuthService() error {
// of the auth server basically never exits.
warnOnErr(tlsServer.Close(), log)
}
if uploadCompleter != nil {
warnOnErr(uploadCompleter.Close(), log)
}
log.Info("Exited.")
})
return nil
Expand Down Expand Up @@ -2111,7 +2107,7 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au

process.OnExit("fileuploader.shutdown", func(payload interface{}) {
log.Infof("File uploader is shutting down.")
warnOnErr(fileUploader.Close(), log)
fileUploader.Close()
log.Infof("File uploader has shut down.")
})

Expand Down
4 changes: 0 additions & 4 deletions lib/services/local/sessiontracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ func (s *sessionTracker) UpdateSessionTracker(ctx context.Context, req *proto.Up
switch update := req.Update.(type) {
case *proto.UpdateSessionTrackerRequest_UpdateState:
session.SetState(update.UpdateState.State)
if update.UpdateState.State == types.SessionState_SessionStateTerminated {
// Mark session tracker for deletion.
session.SetExpiry(s.bk.Clock().Now())
}
case *proto.UpdateSessionTrackerRequest_AddParticipant:
session.AddParticipant(*update.AddParticipant.Participant)
case *proto.UpdateSessionTrackerRequest_RemoveParticipant:
Expand Down
4 changes: 4 additions & 0 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,10 @@ func (s *session) Close() error {
if err := s.recorder.Complete(s.serverCtx); err != nil {
s.log.WithError(err).Warn("Failed to close recorder.")
}

// Wait for the session recorder to signal completion before
// finishing cleanup. Otherwise the upload completer background
// process may enter a race to complete the upload first.
select {
case <-s.recorder.Done():
case <-time.After(time.Second * 30):
Expand Down

0 comments on commit 61221a2

Please sign in to comment.