Skip to content

Commit

Permalink
fix session corruption
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall committed Aug 26, 2024
1 parent f1083d8 commit 4d5631f
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 13 deletions.
3 changes: 3 additions & 0 deletions lib/events/s3sessions/s3handler_thirdparty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func TestThirdpartyStreams(t *testing.T) {
t.Run("StreamManyParts", func(t *testing.T) {
test.Stream(t, handler)
})
t.Run("StreamWithPadding", func(t *testing.T) {
test.StreamWithPadding(t, handler)
})
t.Run("UploadDownload", func(t *testing.T) {
test.UploadDownload(t, handler)
})
Expand Down
7 changes: 6 additions & 1 deletion lib/events/sessionlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newGzipWriter(writer io.WriteCloser) *gzipWriter {
// gzipReader wraps file, on close close both gzip writer and file
type gzipReader struct {
io.ReadCloser
inner io.Closer
inner io.ReadCloser
}

// Close closes file and gzip writer
Expand All @@ -130,6 +130,11 @@ func newGzipReader(reader io.ReadCloser) (*gzipReader, error) {
if err != nil {
return nil, trace.Wrap(err)
}
// older bugged versions of teleport would sometimes incorrectly inject padding bytes into
// the gzip section of the archive. this causes gzip readers with multistream enabled (the
// default behavior) to fail. we disable multistream here in order to ensure that the gzip
// reader halts when it reaches the end of the current (only) valid gzip entry.
gzReader.Multistream(false)
return &gzipReader{
ReadCloser: gzReader,
inner: reader,
Expand Down
53 changes: 46 additions & 7 deletions lib/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type ProtoStreamerConfig struct {
MinUploadBytes int64
// ConcurrentUploads sets concurrent uploads per stream
ConcurrentUploads int
// ForceFlush is used in tests to force a flush of an in-progress slice. Note that
// sending on this channel just forces a single flush for whichever upload happens
// to receive the signal first, so this may not be suitable for concurrent tests.
ForceFlush chan struct{}
}

// CheckAndSetDefaults checks and sets streamer defaults
Expand Down Expand Up @@ -141,6 +145,7 @@ func (s *ProtoStreamer) CreateAuditStreamForUpload(ctx context.Context, sid sess
Uploader: s.cfg.Uploader,
MinUploadBytes: s.cfg.MinUploadBytes,
ConcurrentUploads: s.cfg.ConcurrentUploads,
ForceFlush: s.cfg.ForceFlush,
})
}

Expand Down Expand Up @@ -191,6 +196,10 @@ type ProtoStreamConfig struct {
// after which streamer flushes the data to the uploader
// to avoid data loss
InactivityFlushPeriod time.Duration
// ForceFlush is used in tests to force a flush of an in-progress slice. Note that
// sending on this channel just forces a single flush for whichever upload happens
// to receive the signal first, so this may not be suitable for concurrent tests.
ForceFlush chan struct{}
// Clock is used to override time in tests
Clock clockwork.Clock
// ConcurrentUploads sets concurrent uploads per stream
Expand Down Expand Up @@ -548,6 +557,12 @@ func (w *sliceWriter) receiveAndUpload() error {

delete(w.activeUploads, part.Number)
w.updateCompletedParts(*part, upload.lastEventIndex)
case <-w.proto.cfg.ForceFlush:
if w.current != nil {
if err := w.startUploadCurrentSlice(); err != nil {
return trace.Wrap(err)
}
}
case <-flushCh:
now := clock.Now().UTC()
inactivityPeriod := now.Sub(lastEvent)
Expand Down Expand Up @@ -737,14 +752,18 @@ func (w *sliceWriter) startUpload(partNumber int64, slice *slice) (*activeUpload
})

var retry retryutils.Retry

// create reader once before the retry loop. in the event of an error, the reader must
// be reset via Seek rather than recreated.
reader, err := slice.reader()
if err != nil {
activeUpload.setError(err)
return
}

for i := 0; i < defaults.MaxIterationLimit; i++ {
log := log.WithField("attempt", i)

reader, err := slice.reader()
if err != nil {
activeUpload.setError(err)
return
}
part, err := w.proto.cfg.Uploader.UploadPart(w.proto.cancelCtx, w.proto.cfg.Upload, partNumber, reader)
if err == nil {
activeUpload.setPart(*part)
Expand Down Expand Up @@ -774,10 +793,13 @@ func (w *sliceWriter) startUpload(partNumber int64, slice *slice) (*activeUpload
}
}
retry.Inc()

// reset reader to the beginning of the slice so it can be re-read
if _, err := reader.Seek(0, 0); err != nil {
activeUpload.setError(err)
return
}

select {
case <-retry.After():
log.WithError(err).Debugf("Back off period for retry has passed. Retrying")
Expand Down Expand Up @@ -835,8 +857,9 @@ type slice struct {
lastEventIndex int64
}

// reader returns a reader for the bytes written,
// no writes should be done after this method is called
// reader returns a reader for the bytes written, no writes should be done after this
// method is called and this method should be called at most once per slice, otherwise
// the resulting recording will be corrupted.
func (s *slice) reader() (io.ReadSeeker, error) {
if err := s.writer.Close(); err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -1073,6 +1096,22 @@ func (r *ProtoReader) Read(ctx context.Context) (apievents.AuditEvent, error) {
if !errors.Is(err, io.EOF) {
return nil, r.setError(trace.ConvertSystemError(err))
}

// due to a bug in older versions of teleport it was possible that padding
// bytes would end up inside of the gzip section of the archive. we should
// skip any dangling data in the gzip secion.
n, err := io.CopyBuffer(io.Discard, r.gzipReader.inner, r.messageBytes[:])
if err != nil {
return nil, r.setError(trace.ConvertSystemError(err))
}

if n != 0 {
// log the number of bytes that were skipped
log.WithFields(log.Fields{
"length": n,
}).Debug("skipped dangling data in session recording section")
}

// reached the end of the current part, but not necessarily
// the end of the stream
if err := r.gzipReader.Close(); err != nil {
Expand Down
21 changes: 21 additions & 0 deletions lib/events/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package events_test
import (
"context"
"errors"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -202,6 +203,26 @@ func TestProtoStreamLargeEvent(t *testing.T) {
require.NoError(t, stream.Complete(ctx))
}

// TestReadCorruptedRecording tests that the streamer can successfully decode the kind of corrupted
// recordings that some older bugged versions of teleport might end up producing when under heavy load/throttling.
func TestReadCorruptedRecording(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

f, err := os.Open("testdata/corrupted-session")
require.NoError(t, err)
defer f.Close()

reader := events.NewProtoReader(f)
defer reader.Close()

events, err := reader.ReadAll(ctx)
require.NoError(t, err)

// verify that the expected number of events are extracted
require.Len(t, events, 12)
}

func makeQueryEvent(id string, query string) *apievents.DatabaseSessionQuery {
return &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
Expand Down
105 changes: 100 additions & 5 deletions lib/events/test/streamsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,57 @@ package test

import (
"context"
"fmt"
"io"
"os"
"sync"
"testing"
"time"

"github.com/gravitational/trace"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/session"
)

type flakyHandler struct {
events.MultipartHandler
mu sync.Mutex
shouldFlake bool
flakedParts map[int64]bool
}

func newFlakyHandler(handler events.MultipartHandler) *flakyHandler {
return &flakyHandler{
MultipartHandler: handler,
flakedParts: make(map[int64]bool),
}
}

func (f *flakyHandler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) {
var shouldFlake bool
f.mu.Lock()
if f.shouldFlake && !f.flakedParts[partNumber] {
shouldFlake = true
f.flakedParts[partNumber] = true
}
f.mu.Unlock()

if shouldFlake {
return nil, trace.Errorf("flakeity flake flake")
}

return f.MultipartHandler.UploadPart(ctx, upload, partNumber, partBody)
}

func (f *flakyHandler) setFlakeUpload(flake bool) {
f.mu.Lock()
defer f.mu.Unlock()
f.shouldFlake = flake
}

// StreamParams configures parameters of a stream test suite
type StreamParams struct {
// PrintEvents is amount of print events to generate
Expand All @@ -39,19 +79,34 @@ type StreamParams struct {
ConcurrentUploads int
// MinUploadBytes is minimum required upload bytes
MinUploadBytes int64
// Flaky is a flag that indicates that the handler should be flaky
Flaky bool
// ForceFlush is a flag that indicates that the handler should be forced to flush
// partially filled slices during event input.
ForceFlush bool
}

// StreamSinglePart tests stream upload and subsequent download and reads the results
func StreamSinglePart(t *testing.T, handler events.MultipartHandler) {
StreamWithParameters(t, handler, StreamParams{
StreamWithPermutedParameters(t, handler, StreamParams{
PrintEvents: 1024,
MinUploadBytes: 1024 * 1024,
})
}

// StreamWithPadding tests stream upload in a case where significant padding must be added. Note that
// in practice padding is only necessarily added in the 'ForceFlush' permutation as single-slice uploads
// do not require padding.
func StreamWithPadding(t *testing.T, handler events.MultipartHandler) {
StreamWithPermutedParameters(t, handler, StreamParams{
PrintEvents: 10,
MinUploadBytes: 1024 * 1024,
})
}

// Stream tests stream upload and subsequent download and reads the results
func Stream(t *testing.T, handler events.MultipartHandler) {
StreamWithParameters(t, handler, StreamParams{
StreamWithPermutedParameters(t, handler, StreamParams{
PrintEvents: 1024,
MinUploadBytes: 1024,
ConcurrentUploads: 2,
Expand All @@ -60,7 +115,7 @@ func Stream(t *testing.T, handler events.MultipartHandler) {

// StreamManyParts tests stream upload and subsequent download and reads the results
func StreamManyParts(t *testing.T, handler events.MultipartHandler) {
StreamWithParameters(t, handler, StreamParams{
StreamWithPermutedParameters(t, handler, StreamParams{
PrintEvents: 8192,
MinUploadBytes: 1024,
ConcurrentUploads: 64,
Expand All @@ -77,17 +132,43 @@ func StreamResumeManyParts(t *testing.T, handler events.MultipartHandler) {
})
}

// StreamWithPermutedParameters tests stream upload and subsequent download and reads the results, repeating
// the process with various permutations of flake and flush parameters in order to better cover padding and
// retry logic, which are easy to accidentally fail to cover.
func StreamWithPermutedParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) {
cases := []struct{ Flaky, ForceFlush bool }{
{Flaky: false, ForceFlush: false},
{Flaky: true, ForceFlush: false},
{Flaky: false, ForceFlush: true},
{Flaky: true, ForceFlush: true},
}

for _, cc := range cases {
t.Run(fmt.Sprintf("Flaky=%v,ForceFlush=%v", cc.Flaky, cc.ForceFlush), func(t *testing.T) {
pc := params
pc.Flaky = cc.Flaky
pc.ForceFlush = cc.ForceFlush
StreamWithParameters(t, handler, pc)
})
}
}

// StreamWithParameters tests stream upload and subsequent download and reads the results
func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) {
ctx := context.TODO()

inEvents := eventstest.GenerateTestSession(eventstest.SessionParams{PrintEvents: params.PrintEvents})
sid := session.ID(inEvents[0].(events.SessionMetadataGetter).GetSessionID())

forceFlush := make(chan struct{})

wrappedHandler := newFlakyHandler(handler)

streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{
Uploader: handler,
Uploader: wrappedHandler,
MinUploadBytes: params.MinUploadBytes,
ConcurrentUploads: params.ConcurrentUploads,
ForceFlush: forceFlush,
})
require.NoError(t, err)

Expand All @@ -101,7 +182,21 @@ func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params
t.Fatalf("Timed out waiting for status update.")
}

for _, event := range inEvents {
// if enabled, flake causes the first upload attempt for each multipart upload part
// to fail. necessary in order to cover upload retry logic, which has historically been
// a source of bugs.
wrappedHandler.setFlakeUpload(params.Flaky)

timeout := time.After(time.Minute)

for i, event := range inEvents {
if params.ForceFlush && i%(len(inEvents)/3) == 0 {
select {
case forceFlush <- struct{}{}:
case <-timeout:
t.Fatalf("Timed out waiting for force flush.")
}
}
err := stream.RecordEvent(ctx, eventstest.PrepareEvent(event))
require.NoError(t, err)
}
Expand Down
Binary file added lib/events/testdata/corrupted-session
Binary file not shown.

0 comments on commit 4d5631f

Please sign in to comment.