diff --git a/lib/events/s3sessions/s3handler_thirdparty_test.go b/lib/events/s3sessions/s3handler_thirdparty_test.go index 7cbecb1e92ca8..ffb63b19ca4c2 100644 --- a/lib/events/s3sessions/s3handler_thirdparty_test.go +++ b/lib/events/s3sessions/s3handler_thirdparty_test.go @@ -83,6 +83,9 @@ func TestThirdpartyStreams(t *testing.T) { t.Run("StreamManyParts", func(t *testing.T) { test.Stream(t, handler) }) + t.Run("StreamWithPadding", func(t *testing.T) { + test.StreamWithPadding(t, handler) + }) t.Run("UploadDownload", func(t *testing.T) { test.UploadDownload(t, handler) }) diff --git a/lib/events/sessionlog.go b/lib/events/sessionlog.go index f3d73d4f671f9..76f820d0d4195 100644 --- a/lib/events/sessionlog.go +++ b/lib/events/sessionlog.go @@ -108,7 +108,7 @@ func newGzipWriter(writer io.WriteCloser) *gzipWriter { // gzipReader wraps file, on close close both gzip writer and file type gzipReader struct { io.ReadCloser - inner io.Closer + inner io.ReadCloser } // Close closes file and gzip writer @@ -130,6 +130,11 @@ func newGzipReader(reader io.ReadCloser) (*gzipReader, error) { if err != nil { return nil, trace.Wrap(err) } + // older bugged versions of teleport would sometimes incorrectly inject padding bytes into + // the gzip section of the archive. this causes gzip readers with multistream enabled (the + // default behavior) to fail. we disable multistream here in order to ensure that the gzip + // reader halts when it reaches the end of the current (only) valid gzip entry. + gzReader.Multistream(false) return &gzipReader{ ReadCloser: gzReader, inner: reader, diff --git a/lib/events/stream.go b/lib/events/stream.go index 6c743010d9173..01332cd7a7bd7 100644 --- a/lib/events/stream.go +++ b/lib/events/stream.go @@ -93,6 +93,9 @@ type ProtoStreamerConfig struct { MinUploadBytes int64 // ConcurrentUploads sets concurrent uploads per stream ConcurrentUploads int + // InactivityFlushPeriod overrides the default period after which + // partial slices are flushed to upstream. + InactivityFlushPeriod time.Duration } // CheckAndSetDefaults checks and sets streamer defaults @@ -135,12 +138,13 @@ type ProtoStreamer struct { // this function is useful in tests func (s *ProtoStreamer) CreateAuditStreamForUpload(ctx context.Context, sid session.ID, upload StreamUpload) (apievents.Stream, error) { return NewProtoStream(ProtoStreamConfig{ - Upload: upload, - BufferPool: s.bufferPool, - SlicePool: s.slicePool, - Uploader: s.cfg.Uploader, - MinUploadBytes: s.cfg.MinUploadBytes, - ConcurrentUploads: s.cfg.ConcurrentUploads, + Upload: upload, + BufferPool: s.bufferPool, + SlicePool: s.slicePool, + Uploader: s.cfg.Uploader, + MinUploadBytes: s.cfg.MinUploadBytes, + ConcurrentUploads: s.cfg.ConcurrentUploads, + InactivityFlushPeriod: s.cfg.InactivityFlushPeriod, }) } @@ -737,14 +741,18 @@ func (w *sliceWriter) startUpload(partNumber int64, slice *slice) (*activeUpload }) var retry retryutils.Retry + + // create reader once before the retry loop. in the event of an error, the reader must + // be reset via Seek rather than recreated. + reader, err := slice.reader() + if err != nil { + activeUpload.setError(err) + return + } + for i := 0; i < defaults.MaxIterationLimit; i++ { log := log.WithField("attempt", i) - reader, err := slice.reader() - if err != nil { - activeUpload.setError(err) - return - } part, err := w.proto.cfg.Uploader.UploadPart(w.proto.cancelCtx, w.proto.cfg.Upload, partNumber, reader) if err == nil { activeUpload.setPart(*part) @@ -774,10 +782,13 @@ func (w *sliceWriter) startUpload(partNumber int64, slice *slice) (*activeUpload } } retry.Inc() + + // reset reader to the beginning of the slice so it can be re-read if _, err := reader.Seek(0, 0); err != nil { activeUpload.setError(err) return } + select { case <-retry.After(): log.WithError(err).Debugf("Back off period for retry has passed. Retrying") @@ -835,8 +846,9 @@ type slice struct { lastEventIndex int64 } -// reader returns a reader for the bytes written, -// no writes should be done after this method is called +// reader returns a reader for the bytes written, no writes should be done after this +// method is called and this method should be called at most once per slice, otherwise +// the resulting recording will be corrupted. func (s *slice) reader() (io.ReadSeeker, error) { if err := s.writer.Close(); err != nil { return nil, trace.Wrap(err) @@ -1073,6 +1085,22 @@ func (r *ProtoReader) Read(ctx context.Context) (apievents.AuditEvent, error) { if !errors.Is(err, io.EOF) { return nil, r.setError(trace.ConvertSystemError(err)) } + + // due to a bug in older versions of teleport it was possible that padding + // bytes would end up inside of the gzip section of the archive. we should + // skip any dangling data in the gzip secion. + n, err := io.CopyBuffer(io.Discard, r.gzipReader.inner, r.messageBytes[:]) + if err != nil { + return nil, r.setError(trace.ConvertSystemError(err)) + } + + if n != 0 { + // warn about the dangling data + log.WithFields(log.Fields{ + "length": n, + }).Warn("skipped dangling data in session recording section") + } + // reached the end of the current part, but not necessarily // the end of the stream if err := r.gzipReader.Close(); err != nil { diff --git a/lib/events/stream_test.go b/lib/events/stream_test.go index 6b1dab52e6575..a9d0ee1c94f60 100644 --- a/lib/events/stream_test.go +++ b/lib/events/stream_test.go @@ -21,6 +21,7 @@ package events_test import ( "context" "errors" + "os" "strings" "testing" "time" @@ -202,6 +203,26 @@ func TestProtoStreamLargeEvent(t *testing.T) { require.NoError(t, stream.Complete(ctx)) } +// TestReadCorruptedRecording tests that the streamer can successfully decode the kind of corrupted +// recordings that some older bugged versions of teleport might end up producing when under heavy load/throttling. +func TestReadCorruptedRecording(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + f, err := os.Open("testdata/corrupted-session") + require.NoError(t, err) + defer f.Close() + + reader := events.NewProtoReader(f) + defer reader.Close() + + events, err := reader.ReadAll(ctx) + require.NoError(t, err) + + // verify that the expected number of events are extracted + require.Len(t, events, 12) +} + func makeQueryEvent(id string, query string) *apievents.DatabaseSessionQuery { return &apievents.DatabaseSessionQuery{ Metadata: apievents.Metadata{ diff --git a/lib/events/test/streamsuite.go b/lib/events/test/streamsuite.go index 8d81aae0c2844..66c98b543fbb8 100644 --- a/lib/events/test/streamsuite.go +++ b/lib/events/test/streamsuite.go @@ -20,7 +20,10 @@ package test import ( "context" + "fmt" + "io" "os" + "sync" "testing" "time" @@ -29,8 +32,45 @@ import ( "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/events/eventstest" "github.com/gravitational/teleport/lib/session" + "github.com/gravitational/trace" ) +type flakyHandler struct { + events.MultipartHandler + mu sync.Mutex + shouldFlake bool + flakedParts map[int64]bool +} + +func newFlakyHandler(handler events.MultipartHandler) *flakyHandler { + return &flakyHandler{ + MultipartHandler: handler, + flakedParts: make(map[int64]bool), + } +} + +func (f *flakyHandler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) { + var shouldFlake bool + f.mu.Lock() + if f.shouldFlake && !f.flakedParts[partNumber] { + shouldFlake = true + f.flakedParts[partNumber] = true + } + f.mu.Unlock() + + if shouldFlake { + return nil, trace.Errorf("flakeity flake flake") + } + + return f.MultipartHandler.UploadPart(ctx, upload, partNumber, partBody) +} + +func (f *flakyHandler) setFlakeUpload(flake bool) { + f.mu.Lock() + defer f.mu.Unlock() + f.shouldFlake = flake +} + // StreamParams configures parameters of a stream test suite type StreamParams struct { // PrintEvents is amount of print events to generate @@ -39,19 +79,34 @@ type StreamParams struct { ConcurrentUploads int // MinUploadBytes is minimum required upload bytes MinUploadBytes int64 + // Flaky is a flag that indicates that the handler should be flaky + Flaky bool + // ForceFlush is a flag that indicates that the handler should be forced to flush + // partially filled slices during event input. + ForceFlush bool } // StreamSinglePart tests stream upload and subsequent download and reads the results func StreamSinglePart(t *testing.T, handler events.MultipartHandler) { - StreamWithParameters(t, handler, StreamParams{ + StreamWithPermutedParameters(t, handler, StreamParams{ PrintEvents: 1024, MinUploadBytes: 1024 * 1024, }) } +// StreamWithPadding tests stream upload in a case where significant padding must be added. Note that +// in practice padding is only necessarily added in the 'ForceFlush' permutation as single-slice uploads +// do not require padding. +func StreamWithPadding(t *testing.T, handler events.MultipartHandler) { + StreamWithPermutedParameters(t, handler, StreamParams{ + PrintEvents: 10, + MinUploadBytes: 1024 * 1024, + }) +} + // Stream tests stream upload and subsequent download and reads the results func Stream(t *testing.T, handler events.MultipartHandler) { - StreamWithParameters(t, handler, StreamParams{ + StreamWithPermutedParameters(t, handler, StreamParams{ PrintEvents: 1024, MinUploadBytes: 1024, ConcurrentUploads: 2, @@ -60,7 +115,7 @@ func Stream(t *testing.T, handler events.MultipartHandler) { // StreamManyParts tests stream upload and subsequent download and reads the results func StreamManyParts(t *testing.T, handler events.MultipartHandler) { - StreamWithParameters(t, handler, StreamParams{ + StreamWithPermutedParameters(t, handler, StreamParams{ PrintEvents: 8192, MinUploadBytes: 1024, ConcurrentUploads: 64, @@ -77,17 +132,44 @@ func StreamResumeManyParts(t *testing.T, handler events.MultipartHandler) { }) } +// StreamWithPermutedParameters tests stream upload and subsequent download and reads the results, repeating +// the process with various permutations of flake and flush parameters in order to better cover padding and +// retry logic, which are easy to accidentally fail to cover. +func StreamWithPermutedParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) { + cases := []struct{ Flaky, ForceFlush bool }{ + {Flaky: false, ForceFlush: false}, + {Flaky: true, ForceFlush: false}, + {Flaky: false, ForceFlush: true}, + {Flaky: true, ForceFlush: true}, + } + + for _, cc := range cases { + t.Run(fmt.Sprintf("Flaky=%v,ForceFlush=%v", cc.Flaky, cc.ForceFlush), func(t *testing.T) { + pc := params + pc.Flaky = cc.Flaky + pc.ForceFlush = cc.ForceFlush + StreamWithParameters(t, handler, pc) + }) + } +} + // StreamWithParameters tests stream upload and subsequent download and reads the results func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) { + const ( + inactivityFlushPeriod = time.Millisecond * 50 + ) ctx := context.TODO() inEvents := eventstest.GenerateTestSession(eventstest.SessionParams{PrintEvents: params.PrintEvents}) sid := session.ID(inEvents[0].(events.SessionMetadataGetter).GetSessionID()) + wrappedHandler := newFlakyHandler(handler) + streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{ - Uploader: handler, - MinUploadBytes: params.MinUploadBytes, - ConcurrentUploads: params.ConcurrentUploads, + Uploader: wrappedHandler, + MinUploadBytes: params.MinUploadBytes, + ConcurrentUploads: params.ConcurrentUploads, + InactivityFlushPeriod: inactivityFlushPeriod, }) require.NoError(t, err) @@ -101,7 +183,17 @@ func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params t.Fatalf("Timed out waiting for status update.") } - for _, event := range inEvents { + // if enabled, flake causes the first upload attempt for each multipart upload part + // to fail. necessary in order to cover upload retry logic, which has historically been + // a source of bugs. + wrappedHandler.setFlakeUpload(params.Flaky) + + for i, event := range inEvents { + if params.ForceFlush && i%(len(inEvents)/3) == 0 { + // ensure that we flush at least some of the slices + // before they are full in order to cover padding logic. + time.Sleep(inactivityFlushPeriod * 2) + } err := stream.RecordEvent(ctx, eventstest.PrepareEvent(event)) require.NoError(t, err) } diff --git a/lib/events/testdata/corrupted-session b/lib/events/testdata/corrupted-session new file mode 100644 index 0000000000000..da33fc8a28de2 Binary files /dev/null and b/lib/events/testdata/corrupted-session differ