From 4686d99995b9d836a17f40014cecebecbad67fb5 Mon Sep 17 00:00:00 2001 From: Brian Joerger Date: Mon, 9 May 2022 15:24:01 -0700 Subject: [PATCH] re-add grace period to Upload completer (again) --- lib/events/auditlog.go | 4 ++++ lib/events/complete.go | 16 ++++++++++++++++ lib/events/complete_test.go | 17 ++++++++++++++++- 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/lib/events/auditlog.go b/lib/events/auditlog.go index 709e2c7a23d95..5cb3ab744c31d 100644 --- a/lib/events/auditlog.go +++ b/lib/events/auditlog.go @@ -111,6 +111,10 @@ const ( // AbandonedUploadPollingRate defines how often to check for // abandoned uploads which need to be completed. AbandonedUploadPollingRate = apidefaults.SessionTrackerTTL / 6 + + // UploadCompleterGracePeriod is the default period after which an upload's + // session tracker will be checked to see if it's an abandoned upload. + UploadCompleterGracePeriod = 24 * time.Hour ) var ( diff --git a/lib/events/complete.go b/lib/events/complete.go index b335363e667c2..c97643eab2ef9 100644 --- a/lib/events/complete.go +++ b/lib/events/complete.go @@ -19,6 +19,7 @@ package events import ( + "cmp" "context" "fmt" "os" @@ -59,6 +60,11 @@ type UploadCompleterConfig struct { Component string // CheckPeriod is a period for checking the upload CheckPeriod time.Duration + // GracePeriod is the period after which an upload's session tracker will be + // checked to see if it's an abandoned upload. A duration of zero will + // result in a sensible default, any negative value will result in no grace + // period. + GracePeriod time.Duration // Clock is used to override clock in tests Clock clockwork.Clock // ClusterName identifies the originating teleport cluster @@ -221,11 +227,21 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error { } }() + gracePeriod := cmp.Or(u.cfg.GracePeriod, UploadCompleterGracePeriod) incompleteSessionUploads.Set(float64(len(uploads))) // Complete upload for any uploads without an active session tracker for _, upload := range uploads { log := u.log.WithField("upload", upload.ID).WithField("session", upload.SessionID) + if gracePeriod > 0 && u.cfg.Clock.Since(upload.Initiated) <= gracePeriod { + log.Debug("Found incomplete upload within grace period, terminating check early.") + // not only we can skip this upload, but since uploads are sorted by + // Initiated oldest-to-newest, we can actually just stop checking as + // all further uploads will be closer in time to now and thus they + // will all be within the grace period + break + } + switch _, err := u.cfg.SessionTracker.GetSessionTracker(ctx, upload.SessionID.String()); { case err == nil: // session is still in progress, continue to other uploads log.Debug("session has active tracker and is not ready to be uploaded") diff --git a/lib/events/complete_test.go b/lib/events/complete_test.go index e21b20e1e2058..fae1d12bc343c 100644 --- a/lib/events/complete_test.go +++ b/lib/events/complete_test.go @@ -71,6 +71,7 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { SessionTracker: sessionTrackerService, Clock: clock, ClusterName: "teleport-cluster", + GracePeriod: 24 * time.Hour, }) require.NoError(t, err) @@ -81,7 +82,18 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) { require.NoError(t, err) require.False(t, mu.IsCompleted(upload.ID)) - clock.Advance(1 * time.Hour) + // enough to expire the session tracker, not enough to pass the grace period + clock.Advance(2 * time.Hour) + + err = uc.CheckUploads(context.Background()) + require.NoError(t, err) + require.False(t, mu.IsCompleted(upload.ID)) + + trackers, err := sessionTrackerService.GetActiveSessionTrackers(context.Background()) + require.NoError(t, err) + require.Empty(t, trackers) + + clock.Advance(22*time.Hour + time.Nanosecond) err = uc.CheckUploads(context.Background()) require.NoError(t, err) @@ -147,6 +159,7 @@ func TestUploadCompleterAcquiresSemaphore(t *testing.T) { }, acquireErr: nil, }, + GracePeriod: -1, }) require.NoError(t, err) @@ -193,6 +206,7 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) { Clock: clock, SessionTracker: &mockSessionTrackerService{}, ClusterName: "teleport-cluster", + GracePeriod: -1, }) require.NoError(t, err) @@ -286,6 +300,7 @@ func TestCheckUploadsContinuesOnError(t *testing.T) { SessionTracker: sessionTrackerService, Clock: clock, ClusterName: "teleport-cluster", + GracePeriod: -1, }) require.NoError(t, err)