Skip to content

Commit

Permalink
Re-add grace period to Upload completer for backwards compatibility. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Joerger authored May 9, 2022
1 parent 40b9b52 commit dde7bb7
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 167 deletions.
5 changes: 5 additions & 0 deletions lib/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ const (
// AbandonedUploadPollingRate defines how often to check for
// abandoned uploads which need to be completed.
AbandonedUploadPollingRate = SessionTrackerTTL / 6

// UploadGracePeriod is a period after which non-completed
// upload is considered abandoned and will be completed by the reconciler
// DELETE IN 11.0.0
UploadGracePeriod = 24 * time.Hour
)

var (
Expand Down
5 changes: 4 additions & 1 deletion lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,11 +614,14 @@ type StreamUpload struct {
ID string
// SessionID is a session ID of the upload
SessionID session.ID
// Initiated contains the timestamp of when the upload
// was initiated, not always initialized
Initiated time.Time
}

// String returns user friendly representation of the upload
func (u StreamUpload) String() string {
return fmt.Sprintf("Upload(session=%v, id=%v)", u.SessionID, u.ID)
return fmt.Sprintf("Upload(session=%v, id=%v, initiated=%v)", u.SessionID, u.ID, u.Initiated)
}

// CheckAndSetDefaults checks and sets default values
Expand Down
62 changes: 43 additions & 19 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type UploadCompleterConfig struct {
CheckPeriod time.Duration
// Clock is used to override clock in tests
Clock clockwork.Clock
// DELETE IN 11.0.0 in favor of SessionTrackerService
// GracePeriod is the period after which an upload's session
// tracker will be check to see if it's an abandoned upload.
GracePeriod time.Duration
}

// CheckAndSetDefaults checks and sets default values
Expand All @@ -73,20 +77,8 @@ func (cfg *UploadCompleterConfig) CheckAndSetDefaults() error {
return nil
}

// StartNewUploadCompleter starts an upload completer background process. It can
// be closed by closing the provided context.
func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error {
uc, err := newUploadCompleter(cfg)
if err != nil {
return trace.Wrap(err)
}
go uc.start(ctx)
return nil
}

// newUploadCompleter returns a new instance of the upload completer without
// starting it. Useful in tests.
func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
// NewUploadCompleter returns a new UploadCompleter.
func NewUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -95,19 +87,37 @@ func newUploadCompleter(cfg UploadCompleterConfig) (*UploadCompleter, error) {
log: log.WithFields(log.Fields{
trace.Component: teleport.Component(cfg.Component, "completer"),
}),
closeC: make(chan struct{}),
}
return u, nil
}

// StartNewUploadCompleter starts an upload completer background process that will
// will close once the provided ctx is closed.
func StartNewUploadCompleter(ctx context.Context, cfg UploadCompleterConfig) error {
uc, err := NewUploadCompleter(cfg)
if err != nil {
return trace.Wrap(err)
}
go uc.Serve(ctx)
return nil
}

// UploadCompleter periodically scans uploads that have not been completed
// and completes them
type UploadCompleter struct {
cfg UploadCompleterConfig
log *log.Entry
cfg UploadCompleterConfig
log *log.Entry
closeC chan struct{}
}

// Close stops the UploadCompleter
func (u *UploadCompleter) Close() {
close(u.closeC)
}

// start starts a goroutine to periodically check for and complete abandoned uploads
func (u *UploadCompleter) start(ctx context.Context) {
// Serve runs the upload completer until closed or until ctx is cancelled.
func (u *UploadCompleter) Serve(ctx context.Context) error {
periodic := interval.New(interval.Config{
Duration: u.cfg.CheckPeriod,
FirstDuration: utils.HalfJitter(u.cfg.CheckPeriod),
Expand All @@ -121,8 +131,10 @@ func (u *UploadCompleter) start(ctx context.Context) {
if err := u.checkUploads(ctx); err != nil {
u.log.WithError(err).Warningf("Failed to check uploads.")
}
case <-u.closeC:
return nil
case <-ctx.Done():
return
return trace.Wrap(ctx.Err(), "Context canceled")
}
}
}
Expand Down Expand Up @@ -153,6 +165,18 @@ func (u *UploadCompleter) checkUploads(ctx context.Context) error {

// Complete upload for any uploads without an active session tracker
for _, upload := range uploads {
// DELETE IN 11.0.0
// To support v9/v8 versions which do not use SessionTrackerService,
// sessions are only considered abandoned after the provided grace period.
if u.cfg.GracePeriod != 0 {
gracePoint := upload.Initiated.Add(u.cfg.GracePeriod)
if gracePoint.After(u.cfg.Clock.Now()) {
// uploads are ordered oldest to newest, stop checking
// once an upload doesn't exceed the grace point
return nil
}
}

if apiutils.SliceContainsStr(activeSessionIDs, upload.SessionID.String()) {
continue
}
Expand Down
98 changes: 92 additions & 6 deletions lib/events/complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"testing"
"time"

"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
)
Expand All @@ -52,12 +54,12 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
},
}

sessionTrackerService := &eventstest.MockSessionTrackerService{
Clock: clock,
MockTrackers: []types.SessionTracker{sessionTracker},
sessionTrackerService := &mockSessionTrackerService{
clock: clock,
trackers: []types.SessionTracker{sessionTracker},
}

uc, err := newUploadCompleter(UploadCompleterConfig{
uc, err := NewUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Expand All @@ -79,6 +81,48 @@ func TestUploadCompleterCompletesAbandonedUploads(t *testing.T) {
require.True(t, mu.uploads[upload.ID].completed)
}

// TestUploadCompleterWithGracePeriod verifies that the upload completer
// completes uploads that have lived past the configured grace period.
// DELETE IN 11.0.0
func TestUploadCompleterWithGracePeriod(t *testing.T) {
clock := clockwork.NewFakeClock()
mu := NewMemoryUploader()
mu.Clock = clock

log := &mockAuditLog{}

sessionID := session.NewID()
sessionTrackerService := &mockSessionTrackerService{}

uc, err := NewUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
SessionTracker: sessionTrackerService,
Clock: clock,
GracePeriod: 2 * time.Hour,
})
require.NoError(t, err)

upload, err := mu.CreateUpload(context.Background(), sessionID)
require.NoError(t, err)

err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

// Even if session tracker is not found, the completer
// should wait until the grace period to complete it
clock.Advance(1 * time.Hour)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.False(t, mu.uploads[upload.ID].completed)

clock.Advance(1 * time.Hour)
err = uc.checkUploads(context.Background())
require.NoError(t, err)
require.True(t, mu.uploads[upload.ID].completed)
}

// TestUploadCompleterEmitsSessionEnd verifies that the upload completer
// emits session.end or windows.desktop.session.end events for sessions
// that are completed.
Expand All @@ -99,11 +143,11 @@ func TestUploadCompleterEmitsSessionEnd(t *testing.T) {
sessionEvents: []apievents.AuditEvent{test.startEvent},
}

uc, err := newUploadCompleter(UploadCompleterConfig{
uc, err := NewUploadCompleter(UploadCompleterConfig{
Uploader: mu,
AuditLog: log,
Clock: clock,
SessionTracker: &eventstest.MockSessionTrackerService{},
SessionTracker: &mockSessionTrackerService{},
})
require.NoError(t, err)

Expand Down Expand Up @@ -162,3 +206,45 @@ func (m *mockAuditLog) StreamSessionEvents(ctx context.Context, sid session.ID,
func (m *mockAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return m.emitter.EmitAuditEvent(ctx, event)
}

type mockSessionTrackerService struct {
clock clockwork.Clock
trackers []types.SessionTracker
}

func (m *mockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) {
var trackers []types.SessionTracker
for _, tracker := range m.trackers {
// mock session tracker expiration
if tracker.Expiry().After(m.clock.Now()) {
trackers = append(trackers, tracker)
}
}
return trackers, nil
}

func (m *mockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) {
for _, tracker := range m.trackers {
// mock session tracker expiration
if tracker.GetSessionID() == sessionID && tracker.Expiry().After(m.clock.Now()) {
return tracker, nil
}
}
return nil, trace.NotFound("tracker not found")
}

func (m *mockSessionTrackerService) CreateSessionTracker(ctx context.Context, req *proto.CreateSessionTrackerRequest) (types.SessionTracker, error) {
return nil, trace.NotImplemented("CreateSessionTracker is not implemented")
}

func (m *mockSessionTrackerService) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error {
return trace.NotImplemented("UpdateSessionTracker is not implemented")
}

func (m *mockSessionTrackerService) RemoveSessionTracker(ctx context.Context, sessionID string) error {
return trace.NotImplemented("RemoveSessionTracker is not implemented")
}

func (m *mockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error {
return trace.NotImplemented("UpdatePresence is not implemented")
}
46 changes: 0 additions & 46 deletions lib/events/eventstest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ import (
"context"
"sync"

"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
)

// MockEmitter is an emitter that stores all emitted events.
Expand Down Expand Up @@ -92,45 +88,3 @@ func (e *MockEmitter) Close(ctx context.Context) error {
func (e *MockEmitter) Complete(ctx context.Context) error {
return nil
}

type MockSessionTrackerService struct {
Clock clockwork.Clock
MockTrackers []types.SessionTracker
}

func (m *MockSessionTrackerService) GetActiveSessionTrackers(ctx context.Context) ([]types.SessionTracker, error) {
var trackers []types.SessionTracker
for _, tracker := range m.MockTrackers {
// mock session tracker expiration
if tracker.Expiry().After(m.Clock.Now()) {
trackers = append(trackers, tracker)
}
}
return trackers, nil
}

func (m *MockSessionTrackerService) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) {
for _, tracker := range m.MockTrackers {
// mock session tracker expiration
if tracker.GetSessionID() == sessionID && tracker.Expiry().After(m.Clock.Now()) {
return tracker, nil
}
}
return nil, trace.NotFound("tracker not found")
}

func (m *MockSessionTrackerService) CreateSessionTracker(ctx context.Context, req *proto.CreateSessionTrackerRequest) (types.SessionTracker, error) {
return nil, nil
}

func (m *MockSessionTrackerService) UpdateSessionTracker(ctx context.Context, req *proto.UpdateSessionTrackerRequest) error {
return nil
}

func (m *MockSessionTrackerService) RemoveSessionTracker(ctx context.Context, sessionID string) error {
return nil
}

func (m *MockSessionTrackerService) UpdatePresence(ctx context.Context, sessionID, user string) error {
return nil
}
Loading

0 comments on commit dde7bb7

Please sign in to comment.