Skip to content

Commit

Permalink
Fix flaky test - TestAuditOn (#12101)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joerger committed Apr 21, 2022
1 parent 4d9708f commit c961adb
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 120 deletions.
1 change: 1 addition & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {
inForwardAgent: false,
auditSessionsURI: t.TempDir(),
}, {
comment: "recording proxy with upload to file server",
inRecordLocation: types.RecordAtProxy,
inForwardAgent: false,
auditSessionsURI: t.TempDir(),
Expand Down
2 changes: 1 addition & 1 deletion integration/utmp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func newSrvCtx(ctx context.Context, t *testing.T) *SrvCtx {
nodeDir,
"",
utils.NetAddr{},
nil,
s.nodeClient,
regular.SetUUID(s.nodeID),
regular.SetNamespace(apidefaults.Namespace),
regular.SetEmitter(s.nodeClient),
Expand Down
67 changes: 34 additions & 33 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type UploadCompleterConfig struct {
CheckPeriod time.Duration
// Clock is used to override clock in tests
Clock clockwork.Clock
// Unstarted does not start automatic goroutine,
// is useful when completer is embedded in another function
Unstarted bool
}

// CheckAndSetDefaults checks and sets default values
Expand All @@ -76,37 +73,41 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error {
return nil
}

// NewUploadCompleter returns a new instance of the upload completer
// the completer has to be closed to release resources and goroutines
func NewUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
// StartNewUploadCompleter starts an upload completer background process. It can
// be closed by closing the provided context.
func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error {
uc, err := newUploadCompleter(cfg)
if err != nil {
return trace.Wrap(err)
}
go uc.start(ctx)
return nil
}

// newUploadCompleter returns a new instance of the upload completer without
// starting it. Useful in tests.
func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
ctx, cancel := context.WithCancel(context.Background())
u := &UploadCompleter{
cfg: cfg,
log: log.WithFields(log.Fields{
trace.Component: teleport.Component(cfg.Component, "completer"),
}),
cancel: cancel,
closeCtx: ctx,
}
if !cfg.Unstarted {
go u.run()
}
return u, nil
}

// UploadCompleter periodically scans uploads that have not been completed
// and completes them
type UploadCompleter struct {
cfg UploadCompleterConfig
log *log.Entry
cancel context.CancelFunc
closeCtx context.Context
cfg UploadCompleterConfig
log *log.Entry
}

func (u *UploadCompleter) run() {
// start starts a goroutine to periodically check for and complete abandoned uploads
func (u *UploadCompleter) start(ctx context.Context) {
periodic := interval.New(interval.Config{
Duration: u.cfg.CheckPeriod,
FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod),
Expand All @@ -117,17 +118,27 @@ func (u *UploadCompleter) run() {
for {
select {
case <-periodic.Next():
if err := u.CheckUploads(u.closeCtx); err != nil {
if err := u.checkUploads(ctx); err != nil {
u.log.WithError(err).Warningf("Failed to check uploads.")
}
case <-u.closeCtx.Done():
case <-ctx.Done():
return
}
}
}

// CheckUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
// checkUploads fetches uploads and completes any abandoned uploads
func (u *UploadCompleter) checkUploads(ctx context.Context) error {
trackers, err := u.cfg.SessionTracker.GetActiveSessionTrackers(ctx)
if err != nil {
return trace.Wrap(err)
}

var activeSessionIDs []string
for _, st := range trackers {
activeSessionIDs = append(activeSessionIDs, st.GetSessionID())
}

uploads, err := u.cfg.Uploader.ListUploads(ctx)
if err != nil {
return trace.Wrap(err)
Expand All @@ -140,14 +151,10 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
}
}()

// Complete upload for any uploads without an active session tracker
for _, upload := range uploads {
// Check for an active session tracker for the session upload.
_, err := u.cfg.SessionTracker.GetSessionTracker(ctx, upload.SessionID.String())
if err == nil {
// session appears to be active, don't complete the upload.
if apiutils.SliceContainsStr(activeSessionIDs, upload.SessionID.String()) {
continue
} else if !trace.IsNotFound(err) {
return trace.Wrap(err)
}

parts, err := u.cfg.Uploader.ListParts(ctx, upload)
Expand Down Expand Up @@ -207,12 +214,6 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
return nil
}

// Close closes all outstanding operations without waiting
func (u *UploadCompleter) Close() error {
u.cancel()
return nil
}

func (u *UploadCompleter) ensureSessionEndEvent(ctx context.Context, uploadData UploadMetadata) error {
// at this point, we don't know whether we'll need to emit a session.end or a
// windows.desktop.session.end, but as soon as we see the session start we'll
Expand Down
13 changes: 6 additions & 7 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,24 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
MockTrackers: []types.SessionTracker{sessionTracker},
}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
uc, err := newUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

clock.Advance(1 * time.Hour)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.True(t, mu.uploads[upload.ID].completed)
}
Expand All @@ -98,8 +98,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
sessionEvents: []apievents.AuditEvent{test.startEvent},
}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Unstarted: true,
uc, err := newUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
Clock: clock,
Expand All @@ -115,7 +114,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
_, err = mu.UploadPart(context.Background(), *upload, 0, strings.NewReader("part"))
require.NoError(t, err)

err = uc.CheckUploads(context.Background())
err = uc.checkUploads(context.Background())
require.NoError(t, err)

// advance the clock to force the asynchronous session end event emission
Expand Down
42 changes: 18 additions & 24 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,10 @@ func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerServi
if err != nil {
return nil, trace.Wrap(err)
}
// completer scans for uploads that have been initiated, but not completed
// by the client (aborted or crashed) and completes them
uploadCompleter, err := events.NewUploadCompleter(events.UploadCompleterConfig{
Uploader: handler,
AuditLog: cfg.AuditLog,
Unstarted: true,
SessionTracker: sessionTracker,
})
if err != nil {
return nil, trace.Wrap(err)
}

ctx, cancel := context.WithCancel(cfg.Context)
uploader := &Uploader{
uploadCompleter: uploadCompleter,
cfg: cfg,
cfg: cfg,
log: log.WithFields(log.Fields{
trace.Component: cfg.Component,
}),
Expand All @@ -127,6 +116,19 @@ func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerServi
semaphore: make(chan struct{}, cfg.ConcurrentUploads),
eventsCh: make(chan events.UploadEvent, cfg.ConcurrentUploads),
}

// upload completer scans for uploads that have been initiated, but not completed
// by the client (aborted or crashed) and completes them. It will be closed once
// the uploader context is closed.
err = events.StartNewUploadCompleter(uploader.ctx, events.UploadCompleterConfig{
Uploader: handler,
AuditLog: cfg.AuditLog,
SessionTracker: sessionTracker,
})
if err != nil {
return nil, trace.Wrap(err)
}

return uploader, nil
}

Expand All @@ -146,9 +148,8 @@ func NewUploader(cfg UploaderConfig, sessionTracker services.SessionTrackerServi
type Uploader struct {
semaphore chan struct{}

cfg UploaderConfig
log *log.Entry
uploadCompleter *events.UploadCompleter
cfg UploaderConfig
log *log.Entry

cancel context.CancelFunc
ctx context.Context
Expand Down Expand Up @@ -222,12 +223,6 @@ func (u *Uploader) Serve() error {
// Tick at scan period but slow down (and speeds up) on errors.
case <-backoff.After():
var failed bool
if err := u.uploadCompleter.CheckUploads(u.ctx); err != nil {
if trace.Unwrap(err) != errContext {
failed = true
u.log.WithError(err).Warningf("Completer scan failed.")
}
}
if _, err := u.Scan(); err != nil {
if trace.Unwrap(err) != errContext {
failed = true
Expand Down Expand Up @@ -304,9 +299,8 @@ func (u *Uploader) sessionErrorFilePath(sid session.ID) string {
}

// Close closes all operations
func (u *Uploader) Close() error {
func (u *Uploader) Close() {
u.cancel()
return u.uploadCompleter.Close()
}

type upload struct {
Expand Down
3 changes: 0 additions & 3 deletions lib/events/filesessions/fileasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,6 @@ type uploaderPack struct {
func (u *uploaderPack) Close(t *testing.T) {
u.cancel()

err := u.uploader.Close()
require.NoError(t, err)

if u.scanDir != "" {
err := os.RemoveAll(u.scanDir)
require.NoError(t, err)
Expand Down
9 changes: 8 additions & 1 deletion lib/events/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,14 @@ type MockSessionTrackerService struct {
}

func (m *MockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) {
return nil, nil
var trackers []types.SessionTracker
for _, tracker := range m.MockTrackers {
// mock session tracker expiration
if tracker.Expiry().After(m.Clock.Now()) {
trackers = append(trackers, tracker)
}
}
return trackers, nil
}

func (m *MockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) {
Expand Down
10 changes: 3 additions & 7 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,10 +1263,9 @@ func (process *TeleportProcess) initAuthService() error {
process.setLocalAuth(authServer)

// Upload completer is responsible for checking for initiated but abandoned
// session uploads and completing them
var uploadCompleter *events.UploadCompleter
// session uploads and completing them. it will be closed once the process exits.
if uploadHandler != nil {
uploadCompleter, err = events.NewUploadCompleter(events.UploadCompleterConfig{
err = events.StartNewUploadCompleter(process.ExitContext(), events.UploadCompleterConfig{
Uploader: uploadHandler,
Component: teleport.ComponentAuth,
AuditLog: process.auditLog,
Expand Down Expand Up @@ -1488,9 +1487,6 @@ func (process *TeleportProcess) initAuthService() error {
// of the auth server basically never exits.
warnOnErr(tlsServer.Close(), log)
}
if uploadCompleter != nil {
warnOnErr(uploadCompleter.Close(), log)
}
log.Info("Exited.")
})
return nil
Expand Down Expand Up @@ -2121,7 +2117,7 @@ func (process *TeleportProcess) initUploaderService(streamer events.Streamer, au

process.OnExit("fileuploader.shutdown", func(payload interface{}) {
log.Infof("File uploader is shutting down.")
warnOnErr(fileUploader.Close(), log)
fileUploader.Close()
log.Infof("File uploader has shut down.")
})

Expand Down
5 changes: 0 additions & 5 deletions lib/services/local/sessiontracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,6 @@ func (s *sessionTracker) UpdateSessionTracker(ctx context.Context, req *proto.Up
switch update := req.Update.(type) {
case *proto.UpdateSessionTrackerRequest_UpdateState:
session.SetState(update.UpdateState.State)
if update.UpdateState.State == types.SessionState_SessionStateTerminated {
// Mark session tracker for deletion.
session.SetExpiry(s.bk.Clock().Now())
}

case *proto.UpdateSessionTrackerRequest_AddParticipant:
session.AddParticipant(*update.AddParticipant.Participant)
case *proto.UpdateSessionTrackerRequest_RemoveParticipant:
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/forward/sshserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func New(c ServerConfig) (*Server, error) {
s.kexAlgorithms = c.KEXAlgorithms
s.macAlgorithms = c.MACAlgorithms

s.sessionRegistry, err = srv.NewSessionRegistry(s, nil)
s.sessionRegistry, err = srv.NewSessionRegistry(s, s.authClient)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down
16 changes: 8 additions & 8 deletions lib/srv/regular/sshserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ func TestProxyRoundRobin(t *testing.T) {
t.TempDir(),
"",
utils.NetAddr{},
nil,
proxyClient,
SetProxyMode(reverseTunnelServer, proxyClient),
SetSessionServer(proxyClient),
SetEmitter(nodeClient),
Expand Down Expand Up @@ -1281,7 +1281,7 @@ func TestProxyDirectAccess(t *testing.T) {
t.TempDir(),
"",
utils.NetAddr{},
nil,
proxyClient,
SetProxyMode(reverseTunnelServer, proxyClient),
SetSessionServer(proxyClient),
SetEmitter(nodeClient),
Expand Down Expand Up @@ -1413,7 +1413,7 @@ func TestLimiter(t *testing.T) {
nodeStateDir,
"",
utils.NetAddr{},
nil,
nodeClient,
SetLimiter(limiter),
SetShell("/bin/sh"),
SetSessionServer(nodeClient),
Expand Down Expand Up @@ -1587,13 +1587,13 @@ func TestSessionTracker(t *testing.T) {
err = se.Close()
require.NoError(t, err)

// Advance server clock to trigger the session to close (after lingering) and
// update the session tracker to expired. We don't know when the linger sleeper
// will start waiting for clock, so we give it a grace period of 5 seconds.
time.Sleep(time.Second * 5)
f.clock.BlockUntil(3)
f.clock.Advance(defaults.SessionIdlePeriod)

// once the session is closed, the tracker should expire (not found)
// Once the session is closed, the tracker should be termianted.
// Once the last set expiration is up, the tracker should be delted.
f.clock.Advance(defaults.SessionTrackerTTL)

trackerExpired := func() bool {
_, err := f.testSrv.Auth().GetSessionTracker(ctx, tracker.GetSessionID())
return trace.IsNotFound(err)
Expand Down
Loading

0 comments on commit c961adb

Please sign in to comment.