Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix session corruption #45786

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/events/s3sessions/s3handler_thirdparty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func TestThirdpartyStreams(t *testing.T) {
t.Run("StreamManyParts", func(t *testing.T) {
test.Stream(t, handler)
})
t.Run("StreamWithPadding", func(t *testing.T) {
test.StreamWithPadding(t, handler)
})
t.Run("UploadDownload", func(t *testing.T) {
test.UploadDownload(t, handler)
})
Expand Down
7 changes: 6 additions & 1 deletion lib/events/sessionlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newGzipWriter(writer io.WriteCloser) *gzipWriter {
// gzipReader wraps file, on close close both gzip writer and file
type gzipReader struct {
io.ReadCloser
inner io.Closer
inner io.ReadCloser
}

// Close closes file and gzip writer
Expand All @@ -130,6 +130,11 @@ func newGzipReader(reader io.ReadCloser) (*gzipReader, error) {
if err != nil {
return nil, trace.Wrap(err)
}
// older bugged versions of teleport would sometimes incorrectly inject padding bytes into
// the gzip section of the archive. this causes gzip readers with multistream enabled (the
// default behavior) to fail. we disable multistream here in order to ensure that the gzip
// reader halts when it reaches the end of the current (only) valid gzip entry.
gzReader.Multistream(false)
return &gzipReader{
ReadCloser: gzReader,
inner: reader,
Expand Down
51 changes: 44 additions & 7 deletions lib/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type ProtoStreamerConfig struct {
MinUploadBytes int64
// ConcurrentUploads sets concurrent uploads per stream
ConcurrentUploads int
// ForceFlush is used in tests to force a flush of an in-progress slice. Note that
// sending on this channel just forces a single flush for whichever upload happens
// to receive the signal first, so this may not be suitable for concurrent tests.
ForceFlush chan struct{}
}

// CheckAndSetDefaults checks and sets streamer defaults
Expand Down Expand Up @@ -141,6 +145,7 @@ func (s *ProtoStreamer) CreateAuditStreamForUpload(ctx context.Context, sid sess
Uploader: s.cfg.Uploader,
MinUploadBytes: s.cfg.MinUploadBytes,
ConcurrentUploads: s.cfg.ConcurrentUploads,
ForceFlush: s.cfg.ForceFlush,
})
}

Expand Down Expand Up @@ -191,6 +196,10 @@ type ProtoStreamConfig struct {
// after which streamer flushes the data to the uploader
// to avoid data loss
InactivityFlushPeriod time.Duration
// ForceFlush is used in tests to force a flush of an in-progress slice. Note that
// sending on this channel just forces a single flush for whichever upload happens
// to receive the signal first, so this may not be suitable for concurrent tests.
ForceFlush chan struct{}
// Clock is used to override time in tests
Clock clockwork.Clock
// ConcurrentUploads sets concurrent uploads per stream
Expand Down Expand Up @@ -548,6 +557,12 @@ func (w *sliceWriter) receiveAndUpload() error {

delete(w.activeUploads, part.Number)
w.updateCompletedParts(*part, upload.lastEventIndex)
case <-w.proto.cfg.ForceFlush:
if w.current != nil {
if err := w.startUploadCurrentSlice(); err != nil {
return trace.Wrap(err)
}
}
case <-flushCh:
now := clock.Now().UTC()
inactivityPeriod := now.Sub(lastEvent)
Expand Down Expand Up @@ -737,14 +752,18 @@ func (w *sliceWriter) startUpload(partNumber int64, slice *slice) (*activeUpload
})

var retry retryutils.Retry

// create reader once before the retry loop. in the event of an error, the reader must
// be reset via Seek rather than recreated.
reader, err := slice.reader()
if err != nil {
activeUpload.setError(err)
return
}

for i := 0; i < defaults.MaxIterationLimit; i++ {
log := log.WithField("attempt", i)

reader, err := slice.reader()
if err != nil {
activeUpload.setError(err)
return
}
part, err := w.proto.cfg.Uploader.UploadPart(w.proto.cancelCtx, w.proto.cfg.Upload, partNumber, reader)
if err == nil {
activeUpload.setPart(*part)
Expand Down Expand Up @@ -774,10 +793,13 @@ func (w *sliceWriter) startUpload(partNumber int64, slice *slice) (*activeUpload
}
}
retry.Inc()

// reset reader to the beginning of the slice so it can be re-read
if _, err := reader.Seek(0, 0); err != nil {
activeUpload.setError(err)
return
}

select {
case <-retry.After():
log.WithError(err).Debugf("Back off period for retry has passed. Retrying")
Expand Down Expand Up @@ -835,8 +857,9 @@ type slice struct {
lastEventIndex int64
}

// reader returns a reader for the bytes written,
// no writes should be done after this method is called
// reader returns a reader for the bytes written, no writes should be done after this
// method is called and this method should be called at most once per slice, otherwise
// the resulting recording will be corrupted.
func (s *slice) reader() (io.ReadSeeker, error) {
if err := s.writer.Close(); err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -1073,6 +1096,20 @@ func (r *ProtoReader) Read(ctx context.Context) (apievents.AuditEvent, error) {
if !errors.Is(err, io.EOF) {
return nil, r.setError(trace.ConvertSystemError(err))
}

// due to a bug in older versions of teleport it was possible that padding
// bytes would end up inside of the gzip section of the archive. we should
// skip any dangling data in the gzip secion.
n, err := io.CopyBuffer(io.Discard, r.gzipReader.inner, r.messageBytes[:])
if err != nil {
return nil, r.setError(trace.ConvertSystemError(err))
}

if n != 0 {
// log the number of bytes that were skipped
log.WithField("length", n).Debug("skipped dangling data in session recording section")
}

// reached the end of the current part, but not necessarily
// the end of the stream
if err := r.gzipReader.Close(); err != nil {
Expand Down
21 changes: 21 additions & 0 deletions lib/events/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package events_test
import (
"context"
"errors"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -202,6 +203,26 @@ func TestProtoStreamLargeEvent(t *testing.T) {
require.NoError(t, stream.Complete(ctx))
}

// TestReadCorruptedRecording tests that the streamer can successfully decode the kind of corrupted
// recordings that some older bugged versions of teleport might end up producing when under heavy load/throttling.
func TestReadCorruptedRecording(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

f, err := os.Open("testdata/corrupted-session")
require.NoError(t, err)
defer f.Close()

reader := events.NewProtoReader(f)
defer reader.Close()

events, err := reader.ReadAll(ctx)
require.NoError(t, err)

// verify that the expected number of events are extracted
require.Len(t, events, 12)
}

func makeQueryEvent(id string, query string) *apievents.DatabaseSessionQuery {
return &apievents.DatabaseSessionQuery{
Metadata: apievents.Metadata{
Expand Down
105 changes: 100 additions & 5 deletions lib/events/test/streamsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,57 @@ package test

import (
"context"
"fmt"
"io"
"os"
"sync"
"testing"
"time"

"github.com/gravitational/trace"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/session"
)

type flakyHandler struct {
events.MultipartHandler
mu sync.Mutex
shouldFlake bool
flakedParts map[int64]bool
}

func newFlakyHandler(handler events.MultipartHandler) *flakyHandler {
return &flakyHandler{
MultipartHandler: handler,
flakedParts: make(map[int64]bool),
}
}

func (f *flakyHandler) UploadPart(ctx context.Context, upload events.StreamUpload, partNumber int64, partBody io.ReadSeeker) (*events.StreamPart, error) {
var shouldFlake bool
f.mu.Lock()
if f.shouldFlake && !f.flakedParts[partNumber] {
shouldFlake = true
f.flakedParts[partNumber] = true
}
f.mu.Unlock()

if shouldFlake {
return nil, trace.Errorf("flakeity flake flake")
}

return f.MultipartHandler.UploadPart(ctx, upload, partNumber, partBody)
}

func (f *flakyHandler) setFlakeUpload(flake bool) {
f.mu.Lock()
defer f.mu.Unlock()
f.shouldFlake = flake
}

// StreamParams configures parameters of a stream test suite
type StreamParams struct {
// PrintEvents is amount of print events to generate
Expand All @@ -39,19 +79,34 @@ type StreamParams struct {
ConcurrentUploads int
// MinUploadBytes is minimum required upload bytes
MinUploadBytes int64
// Flaky is a flag that indicates that the handler should be flaky
Flaky bool
// ForceFlush is a flag that indicates that the handler should be forced to flush
// partially filled slices during event input.
ForceFlush bool
}

// StreamSinglePart tests stream upload and subsequent download and reads the results
func StreamSinglePart(t *testing.T, handler events.MultipartHandler) {
StreamWithParameters(t, handler, StreamParams{
StreamWithPermutedParameters(t, handler, StreamParams{
PrintEvents: 1024,
MinUploadBytes: 1024 * 1024,
})
}

// StreamWithPadding tests stream upload in a case where significant padding must be added. Note that
// in practice padding is only necessarily added in the 'ForceFlush' permutation as single-slice uploads
// do not require padding.
func StreamWithPadding(t *testing.T, handler events.MultipartHandler) {
StreamWithPermutedParameters(t, handler, StreamParams{
PrintEvents: 10,
MinUploadBytes: 1024 * 1024,
})
}

// Stream tests stream upload and subsequent download and reads the results
func Stream(t *testing.T, handler events.MultipartHandler) {
StreamWithParameters(t, handler, StreamParams{
StreamWithPermutedParameters(t, handler, StreamParams{
PrintEvents: 1024,
MinUploadBytes: 1024,
ConcurrentUploads: 2,
Expand All @@ -60,7 +115,7 @@ func Stream(t *testing.T, handler events.MultipartHandler) {

// StreamManyParts tests stream upload and subsequent download and reads the results
func StreamManyParts(t *testing.T, handler events.MultipartHandler) {
StreamWithParameters(t, handler, StreamParams{
StreamWithPermutedParameters(t, handler, StreamParams{
PrintEvents: 8192,
MinUploadBytes: 1024,
ConcurrentUploads: 64,
Expand All @@ -77,17 +132,43 @@ func StreamResumeManyParts(t *testing.T, handler events.MultipartHandler) {
})
}

// StreamWithPermutedParameters tests stream upload and subsequent download and reads the results, repeating
// the process with various permutations of flake and flush parameters in order to better cover padding and
// retry logic, which are easy to accidentally fail to cover.
func StreamWithPermutedParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) {
cases := []struct{ Flaky, ForceFlush bool }{
{Flaky: false, ForceFlush: false},
{Flaky: true, ForceFlush: false},
{Flaky: false, ForceFlush: true},
{Flaky: true, ForceFlush: true},
}

for _, cc := range cases {
t.Run(fmt.Sprintf("Flaky=%v,ForceFlush=%v", cc.Flaky, cc.ForceFlush), func(t *testing.T) {
pc := params
pc.Flaky = cc.Flaky
pc.ForceFlush = cc.ForceFlush
StreamWithParameters(t, handler, pc)
})
}
}

// StreamWithParameters tests stream upload and subsequent download and reads the results
func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params StreamParams) {
ctx := context.TODO()

inEvents := eventstest.GenerateTestSession(eventstest.SessionParams{PrintEvents: params.PrintEvents})
sid := session.ID(inEvents[0].(events.SessionMetadataGetter).GetSessionID())

forceFlush := make(chan struct{})

wrappedHandler := newFlakyHandler(handler)

streamer, err := events.NewProtoStreamer(events.ProtoStreamerConfig{
Uploader: handler,
Uploader: wrappedHandler,
MinUploadBytes: params.MinUploadBytes,
ConcurrentUploads: params.ConcurrentUploads,
ForceFlush: forceFlush,
})
require.NoError(t, err)

Expand All @@ -101,7 +182,21 @@ func StreamWithParameters(t *testing.T, handler events.MultipartHandler, params
t.Fatalf("Timed out waiting for status update.")
}

for _, event := range inEvents {
// if enabled, flake causes the first upload attempt for each multipart upload part
// to fail. necessary in order to cover upload retry logic, which has historically been
// a source of bugs.
wrappedHandler.setFlakeUpload(params.Flaky)

timeout := time.After(time.Minute)

for i, event := range inEvents {
if params.ForceFlush && i%(len(inEvents)/3) == 0 {
select {
case forceFlush <- struct{}{}:
case <-timeout:
t.Fatalf("Timed out waiting for force flush.")
}
}
err := stream.RecordEvent(ctx, eventstest.PrepareEvent(event))
require.NoError(t, err)
}
Expand Down
Binary file added lib/events/testdata/corrupted-session
Binary file not shown.
Loading