diff --git a/lib/events/s3sessions/s3handler_thirdparty_test.go b/lib/events/s3sessions/s3handler_thirdparty_test.go index d4aba60f10197..1eede3cb49108 100644 --- a/lib/events/s3sessions/s3handler_thirdparty_test.go +++ b/lib/events/s3sessions/s3handler_thirdparty_test.go @@ -61,6 +61,9 @@ func TestThirdpartyStreams(t *testing.T) { t.Run("StreamManyParts", func(t *testing.T) { test.Stream(t, handler) }) + t.Run("StreamWithPadding", func(t *testing.T) { + test.StreamWithPadding(t, handler) + }) t.Run("UploadDownload", func(t *testing.T) { test.UploadDownload(t, handler) }) diff --git a/lib/events/sessionlog.go b/lib/events/sessionlog.go index a9a0f15b8b4b9..f239a1a110e4a 100644 --- a/lib/events/sessionlog.go +++ b/lib/events/sessionlog.go @@ -106,7 +106,7 @@ func newGzipWriter(writer io.WriteCloser) *gzipWriter { // gzipReader wraps file, on close close both gzip writer and file type gzipReader struct { io.ReadCloser - inner io.Closer + inner io.ReadCloser } // Close closes file and gzip writer @@ -128,6 +128,11 @@ func newGzipReader(reader io.ReadCloser) (*gzipReader, error) { if err != nil { return nil, trace.Wrap(err) } + // older bugged versions of teleport would sometimes incorrectly inject padding bytes into + // the gzip section of the archive. this causes gzip readers with multistream enabled (the + // default behavior) to fail. we disable multistream here in order to ensure that the gzip + // reader halts when it reaches the end of the current (only) valid gzip entry. + gzReader.Multistream(false) return &gzipReader{ ReadCloser: gzReader, inner: reader, diff --git a/lib/events/stream.go b/lib/events/stream.go index df5d3ccc5ab05..f92bb01996db1 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -91,6 +91,10 @@ type ProtoStreamerConfig struct { MinUploadBytes int64 // ConcurrentUploads sets concurrent uploads per stream ConcurrentUploads int + // ForceFlush is used in tests to force a flush of an in-progress slice. Note that + // sending on this channel just forces a single flush for whichever upload happens + // to receive the signal first, so this may not be suitable for concurrent tests. + ForceFlush chan struct{} } // CheckAndSetDefaults checks and sets streamer defaults @@ -139,6 +143,7 @@ func (s *ProtoStreamer) CreateAuditStreamForUpload(ctx context.Context, sid sess Uploader: s.cfg.Uploader, MinUploadBytes: s.cfg.MinUploadBytes, ConcurrentUploads: s.cfg.ConcurrentUploads, + ForceFlush: s.cfg.ForceFlush, }) } @@ -189,6 +194,10 @@ type ProtoStreamConfig struct { // after which streamer flushes the data to the uploader // to avoid data loss InactivityFlushPeriod time.Duration + // ForceFlush is used in tests to force a flush of an in-progress slice. Note that + // sending on this channel just forces a single flush for whichever upload happens + // to receive the signal first, so this may not be suitable for concurrent tests. + ForceFlush chan struct{} // Clock is used to override time in tests Clock clockwork.Clock // ConcurrentUploads sets concurrent uploads per stream @@ -546,6 +555,12 @@ func (w *sliceWriter) receiveAndUpload() error { delete(w.activeUploads, part.Number) w.updateCompletedParts(*part, upload.lastEventIndex) + case <-w.proto.cfg.ForceFlush: + if w.current != nil { + if err := w.startUploadCurrentSlice(); err != nil { + return trace.Wrap(err) + } + } case <-flushCh: now := clock.Now().UTC() inactivityPeriod := now.Sub(lastEvent) @@ -735,14 +750,18 @@ func (w *sliceWriter) startUpload(partNumber int64, slice *slice) (*activeUpload }) var retry retryutils.Retry + + // create reader once before the retry loop. in the event of an error, the reader must + // be reset via Seek rather than recreated. + reader, err := slice.reader() + if err != nil { + activeUpload.setError(err) + return + } + for i := 0; i < defaults.MaxIterationLimit; i++ { log := log.WithField("attempt", i) - reader, err := slice.reader() - if err != nil { - activeUpload.setError(err) - return - } part, err := w.proto.cfg.Uploader.UploadPart(w.proto.cancelCtx, w.proto.cfg.Upload, partNumber, reader) if err == nil { activeUpload.setPart(*part) @@ -772,10 +791,13 @@ func (w *sliceWriter) startUpload(partNumber int64, slice *slice) (*activeUpload } } retry.Inc() + + // reset reader to the beginning of the slice so it can be re-read if _, err := reader.Seek(0, 0); err != nil { activeUpload.setError(err) return } + select { case <-retry.After(): log.WithError(err).Debugf("Back off period for retry has passed. Retrying") @@ -833,8 +855,9 @@ type slice struct { lastEventIndex int64 } -// reader returns a reader for the bytes written, -// no writes should be done after this method is called +// reader returns a reader for the bytes written, no writes should be done after this +// method is called and this method should be called at most once per slice, otherwise +// the resulting recording will be corrupted. func (s *slice) reader() (io.ReadSeeker, error) { if err := s.writer.Close(); err != nil { return nil, trace.Wrap(err) @@ -1071,6 +1094,20 @@ func (r *ProtoReader) Read(ctx context.Context) (apievents.AuditEvent, error) { if err != io.EOF { return nil, r.setError(trace.ConvertSystemError(err)) } + + // due to a bug in older versions of teleport it was possible that padding + // bytes would end up inside of the gzip section of the archive. we should + // skip any dangling data in the gzip secion. + n, err := io.CopyBuffer(io.Discard, r.gzipReader.inner, r.messageBytes[:]) + if err != nil { + return nil, r.setError(trace.ConvertSystemError(err)) + } + + if n != 0 { + // log the number of bytes that were skipped + log.WithField("length", n).Debug("skipped dangling data in session recording section") + } + // reached the end of the current part, but not necessarily // the end of the stream if err := r.gzipReader.Close(); err != nil { diff --git a/lib/events/stream_test.go b/lib/events/stream_test.go index e420098f8eff4..94f8a1c445aa8 100644 --- a/lib/events/stream_test.go +++ b/lib/events/stream_test.go @@ -17,6 +17,7 @@ package events_test import ( "context" "errors" + "os" "strings" "testing" "time" @@ -198,6 +199,26 @@ func TestProtoStreamLargeEvent(t *testing.T) { require.NoError(t, stream.Complete(ctx)) } +// TestReadCorruptedRecording tests that the streamer can successfully decode the kind of corrupted +// recordings that some older bugged versions of teleport might end up producing when under heavy load/throttling. +func TestReadCorruptedRecording(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + f, err := os.Open("testdata/corrupted-session") + require.NoError(t, err) + defer f.Close() + + reader := events.NewProtoReader(f) + defer reader.Close() + + events, err := reader.ReadAll(ctx) + require.NoError(t, err) + + // verify that the expected number of events are extracted + require.Len(t, events, 12) +} + func makeQueryEvent(id string, query string) *apievents.DatabaseSessionQuery { return &apievents.DatabaseSessionQuery{ Metadata: apievents.Metadata{ diff --git a/lib/events/test/streamsuite.go b/lib/events/test/streamsuite.go index 7b9e8b241204c..fca49071373d0 100644 --- a/lib/events/test/streamsuite.go +++ b/lib/events/test/streamsuite.go @@ -16,10 +16,14 @@ package test import ( "context" + "fmt" + "io" "os" + "sync" "testing" "time" + "github.com/gravitational/trace" "github.com/stretchr/testify/require" "github.com/gravitational/teleport/lib/events" @@ -27,6 +31,42 @@ import ( "github.com/gravitational/teleport/lib/session" ) +type flakyHandler struct { + events.MultipartHandler + mu sync.Mutex + shouldFlake bool + flakedParts map[int64]bool +} + +func newFlakyHandler(handler events.MultipartHandler) *flakyHandler { + return &flakyHandler{ + MultipartHandler: handler, + flakedParts: make(map[int64]bool), + } +} + +func (f *flakyHandler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) { + var shouldFlake bool + f.mu.Lock() + if f.shouldFlake && !f.flakedParts[partNumber] { + shouldFlake = true + f.flakedParts[partNumber] = true + } + f.mu.Unlock() + + if shouldFlake { + return nil, trace.Errorf("flakeity flake flake") + } + + return f.MultipartHandler.UploadPart(ctx, upload, partNumber, partBody) +} + +func (f *flakyHandler) setFlakeUpload(flake bool) { + f.mu.Lock() + defer f.mu.Unlock() + f.shouldFlake = flake +} + // StreamParams configures parameters of a stream test suite type StreamParams struct { // PrintEvents is amount of print events to generate @@ -35,19 +75,34 @@ type StreamParams struct { ConcurrentUploads int // MinUploadBytes is minimum required upload bytes MinUploadBytes int64 + // Flaky is a flag that indicates that the handler should be flaky + Flaky bool + // ForceFlush is a flag that indicates that the handler should be forced to flush + // partially filled slices during event input. + ForceFlush bool } // StreamSinglePart tests stream upload and subsequent download and reads the results func StreamSinglePart(t *testing.T, handler events.MultipartHandler) { - StreamWithParameters(t, handler, StreamParams{ + StreamWithPermutedParameters(t, handler, StreamParams{ PrintEvents: 1024, MinUploadBytes: 1024 * 1024, }) } +// StreamWithPadding tests stream upload in a case where significant padding must be added. Note that +// in practice padding is only necessarily added in the 'ForceFlush' permutation as single-slice uploads +// do not require padding. +func StreamWithPadding(t *testing.T, handler events.MultipartHandler) { + StreamWithPermutedParameters(t, handler, StreamParams{ + PrintEvents: 10, + MinUploadBytes: 1024 * 1024, + }) +} + // Stream tests stream upload and subsequent download and reads the results func Stream(t *testing.T, handler events.MultipartHandler) { - StreamWithParameters(t, handler, StreamParams{ + StreamWithPermutedParameters(t, handler, StreamParams{ PrintEvents: 1024, MinUploadBytes: 1024, ConcurrentUploads: 2, @@ -56,7 +111,7 @@ func Stream(t *testing.T, handler events.MultipartHandler) { // StreamManyParts tests stream upload and subsequent download and reads the results func StreamManyParts(t *testing.T, handler events.MultipartHandler) { - StreamWithParameters(t, handler, StreamParams{ + StreamWithPermutedParameters(t, handler, StreamParams{ PrintEvents: 8192, MinUploadBytes: 1024, ConcurrentUploads: 64, @@ -73,6 +128,27 @@ func StreamResumeManyParts(t *testing.T, handler events.MultipartHandler) { }) } +// StreamWithPermutedParameters tests stream upload and subsequent download and reads the results, repeating +// the process with various permutations of flake and flush parameters in order to better cover padding and +// retry logic, which are easy to accidentally fail to cover. +func StreamWithPermutedParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) { + cases := []struct{ Flaky, ForceFlush bool }{ + {Flaky: false, ForceFlush: false}, + {Flaky: true, ForceFlush: false}, + {Flaky: false, ForceFlush: true}, + {Flaky: true, ForceFlush: true}, + } + + for _, cc := range cases { + t.Run(fmt.Sprintf("Flaky=%v,ForceFlush=%v", cc.Flaky, cc.ForceFlush), func(t *testing.T) { + pc := params + pc.Flaky = cc.Flaky + pc.ForceFlush = cc.ForceFlush + StreamWithParameters(t, handler, pc) + }) + } +} + // StreamWithParameters tests stream upload and subsequent download and reads the results func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) { ctx := context.TODO() @@ -80,10 +156,15 @@ func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params inEvents := eventstest.GenerateTestSession(eventstest.SessionParams{PrintEvents: params.PrintEvents}) sid := session.ID(inEvents[0].(events.SessionMetadataGetter).GetSessionID()) + forceFlush := make(chan struct{}) + + wrappedHandler := newFlakyHandler(handler) + streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{ - Uploader: handler, + Uploader: wrappedHandler, MinUploadBytes: params.MinUploadBytes, ConcurrentUploads: params.ConcurrentUploads, + ForceFlush: forceFlush, }) require.Nil(t, err) @@ -97,7 +178,21 @@ func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params t.Fatalf("Timed out waiting for status update.") } - for _, event := range inEvents { + // if enabled, flake causes the first upload attempt for each multipart upload part + // to fail. necessary in order to cover upload retry logic, which has historically been + // a source of bugs. + wrappedHandler.setFlakeUpload(params.Flaky) + + timeout := time.After(time.Minute) + + for i, event := range inEvents { + if params.ForceFlush && i%(len(inEvents)/3) == 0 { + select { + case forceFlush <- struct{}{}: + case <-timeout: + t.Fatalf("Timed out waiting for force flush.") + } + } err := stream.RecordEvent(ctx, eventstest.PrepareEvent(event)) require.Nil(t, err) } diff --git a/lib/events/testdata/corrupted-session b/lib/events/testdata/corrupted-session new file mode 100644 index 0000000000000..da33fc8a28de2 Binary files /dev/null and b/lib/events/testdata/corrupted-session differ