Skip to content

Commit

Permalink
Check parts' last modified time for the grace period
Browse files Browse the repository at this point in the history
  • Loading branch information
espadolini committed Aug 1, 2024
1 parent b9ff9d9 commit 32809ff
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 20 deletions.
3 changes: 3 additions & 0 deletions lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,9 @@ type StreamPart struct {
Number int64
// ETag is a part e-tag
ETag string
// LastModified is the time of last modification of this part (if
// available).
LastModified time.Time
}

// StreamUpload represents stream multipart upload
Expand Down
21 changes: 17 additions & 4 deletions lib/events/azsessions/azsessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"slices"
"strconv"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
Expand Down Expand Up @@ -450,7 +451,8 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa

// our parts are just over 5 MiB (events.MinUploadPartSizeBytes) so we can
// upload them in one shot
if _, err := cErr(partBlob.Upload(ctx, streaming.NopCloser(partBody), nil)); err != nil {
response, err := cErr(partBlob.Upload(ctx, streaming.NopCloser(partBody), nil))
if err != nil {
return nil, trace.Wrap(err)
}
h.log.WithFields(logrus.Fields{
Expand All @@ -459,7 +461,11 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa
fieldPartNumber: partNumber,
}).Debug("Uploaded part.")

return &events.StreamPart{Number: partNumber}, nil
var lastModified time.Time
if response.LastModified != nil {
lastModified = *response.LastModified
}
return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil
}

// ListParts implements [events.MultipartUploader].
Expand Down Expand Up @@ -492,8 +498,15 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([]
if err != nil {
continue
}

parts = append(parts, events.StreamPart{Number: partNumber})
var lastModified time.Time
if b.Properties != nil &&
b.Properties.LastModified != nil {
lastModified = *b.Properties.LastModified
}
parts = append(parts, events.StreamPart{
Number: partNumber,
LastModified: lastModified,
})
}
}

Expand Down
10 changes: 10 additions & 0 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {
}
return trace.Wrap(err, "listing parts")
}
var lastModified time.Time
for _, part := range parts {
if part.LastModified.After(lastModified) {
lastModified = part.LastModified
}
}
if u.cfg.Clock.Since(lastModified) <= gracePeriod {
log.Debug("Found incomplete upload with recently uploaded part, skipping.")
continue
}

log.Debugf("upload has %d parts", len(parts))

Expand Down
31 changes: 23 additions & 8 deletions lib/events/eventstest/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type MemoryUploader struct {
objects map[session.ID][]byte
eventsC chan events.UploadEvent

// Clock is an optional [clockwork.Clock] to determine the time to associate
// with uploads and parts.
Clock clockwork.Clock
}

Expand All @@ -63,16 +65,21 @@ type MemoryUpload struct {
// id is the upload ID
id string
// parts is the upload parts
parts map[int64][]byte
parts map[int64]part
// sessionID is the session ID associated with the upload
sessionID session.ID
//completed specifies upload as completed
// completed specifies upload as completed
completed bool
// Initiated contains the timestamp of when the upload
// was initiated, not always initialized
Initiated time.Time
}

type part struct {
data []byte
lastModified time.Time
}

func (m *MemoryUploader) trySendEvent(event events.UploadEvent) {
if m.eventsC == nil {
return
Expand All @@ -98,14 +105,15 @@ func (m *MemoryUploader) CreateUpload(ctx context.Context, sessionID session.ID)
upload := &events.StreamUpload{
ID: uuid.New().String(),
SessionID: sessionID,
Initiated: time.Now(),
}
if m.Clock != nil {
upload.Initiated = m.Clock.Now()
}
m.uploads[upload.ID] = &MemoryUpload{
id: upload.ID,
sessionID: sessionID,
parts: make(map[int64][]byte),
parts: make(map[int64]part),
Initiated: upload.Initiated,
}
return upload, nil
Expand All @@ -127,11 +135,11 @@ func (m *MemoryUploader) CompleteUpload(ctx context.Context, upload events.Strea
partsSet := make(map[int64]bool, len(parts))
for _, part := range parts {
partsSet[part.Number] = true
data, ok := up.parts[part.Number]
upPart, ok := up.parts[part.Number]
if !ok {
return trace.NotFound("part %v has not been uploaded", part.Number)
}
result = append(result, data...)
result = append(result, upPart.data...)
}
// exclude parts that are not requested to be completed
for number := range up.parts {
Expand All @@ -157,8 +165,15 @@ func (m *MemoryUploader) UploadPart(ctx context.Context, upload events.StreamUpl
if !ok {
return nil, trace.NotFound("upload %q is not found", upload.ID)
}
up.parts[partNumber] = data
return &events.StreamPart{Number: partNumber}, nil
lastModified := time.Now()
if m.Clock != nil {
lastModified = m.Clock.Now()
}
up.parts[partNumber] = part{
data: data,
lastModified: lastModified,
}
return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil
}

// ListUploads lists uploads that have been initiated but not completed with
Expand Down Expand Up @@ -199,7 +214,7 @@ func (m *MemoryUploader) GetParts(uploadID string) ([][]byte, error) {
return partNumbers[i] < partNumbers[j]
})
for _, partNumber := range partNumbers {
sortedParts = append(sortedParts, up.parts[partNumber])
sortedParts = append(sortedParts, up.parts[partNumber].data)
}
return sortedParts, nil
}
Expand Down
14 changes: 11 additions & 3 deletions lib/events/filesessions/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,19 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa
}

// Rename reservation to part file.
err = os.Rename(reservationPath, h.partPath(upload, partNumber))
partPath := h.partPath(upload, partNumber)
err = os.Rename(reservationPath, partPath)
if err != nil {
return nil, trace.ConvertSystemError(err)
}

return &events.StreamPart{Number: partNumber}, nil
var lastModified time.Time
fi, err := os.Stat(partPath)
if err == nil {
lastModified = fi.ModTime()
}

return &events.StreamPart{Number: partNumber, LastModified: lastModified}, nil
}

// CompleteUpload completes the upload
Expand Down Expand Up @@ -254,7 +261,8 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([]
return nil
}
parts = append(parts, events.StreamPart{
Number: part,
Number: part,
LastModified: info.ModTime(),
})
return nil
})
Expand Down
3 changes: 2 additions & 1 deletion lib/events/gcssessions/gcsstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa
if err != nil {
return nil, convertGCSError(err)
}
return &events.StreamPart{Number: partNumber}, nil
return &events.StreamPart{Number: partNumber, LastModified: writer.Attrs().Created}, nil
}

// CompleteUpload completes the upload
Expand Down Expand Up @@ -249,6 +249,7 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([]
if err != nil {
return nil, trace.Wrap(err)
}
part.LastModified = attrs.Updated
parts = append(parts, *part)
}
return parts, nil
Expand Down
8 changes: 4 additions & 4 deletions lib/events/s3sessions/s3stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (h *Handler) UploadPart(ctx context.Context, upload events.StreamUpload, pa
}

log.Infof("Uploaded part %v in %v", partNumber, time.Since(start))
return &events.StreamPart{ETag: aws.StringValue(resp.ETag), Number: partNumber}, nil
return &events.StreamPart{ETag: aws.StringValue(resp.ETag), Number: partNumber, LastModified: time.Now()}, nil
}

func (h *Handler) abortUpload(ctx context.Context, upload events.StreamUpload) error {
Expand Down Expand Up @@ -205,10 +205,10 @@ func (h *Handler) ListParts(ctx context.Context, upload events.StreamUpload) ([]
return nil, awsutils.ConvertS3Error(err)
}
for _, part := range re.Parts {

parts = append(parts, events.StreamPart{
Number: aws.Int64Value(part.PartNumber),
ETag: aws.StringValue(part.ETag),
Number: aws.Int64Value(part.PartNumber),
ETag: aws.StringValue(part.ETag),
LastModified: aws.TimeValue(part.LastModified),
})
}
if !aws.BoolValue(re.IsTruncated) {
Expand Down

0 comments on commit 32809ff

Please sign in to comment.