Skip to content

Commit

Permalink
First pass at adding support for blob time-to-live. (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kristoffer Langeland Knudsen authored Feb 8, 2022
1 parent 5b3d3ed commit e0c6a63
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 118 deletions.
7 changes: 5 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func BuildRouter(db core.MetadataDatabase, store core.BlobStore, logRequests boo
r.Post("/data", handler.CreateBlob)
r.Get("/", handler.SearchBlobs)
r.Get("/data/latest", handler.GetLatestBlobData)
r.Get("/{combined-id}", handler.MakeBlobEndpoint(handler.BlobMetadataResponse))
r.Get("/{combined-id}/data", handler.MakeBlobEndpoint(handler.BlobDataResponse))
r.Get("/{combined-id}", handler.MakeBlobEndpoint(handler.BlobMetadataResponse, 0 * time.Second))
r.Get("/{combined-id}/data", handler.MakeBlobEndpoint(handler.BlobDataResponse, 30 * time.Minute))
})
})

Expand Down Expand Up @@ -152,6 +152,9 @@ func CreateBlobInfo(r *http.Request, blob *core.BlobInfo) map[string]interface{}

info := make(map[string]interface{})
info["lastModified"] = blob.CreatedAt.UTC().Format(time.RFC3339Nano)
if blob.ExpiresAt != nil {
info["expires"] = blob.ExpiresAt.UTC().Format(time.RFC3339Nano)
}

info["subject"] = blob.Key.Subject
if blob.Tags.ContentType != nil {
Expand Down
33 changes: 27 additions & 6 deletions api/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"net/http"
"regexp"
"time"
)

const (
Expand All @@ -27,6 +28,7 @@ var (
commonTagValidator = CombineTagValidators(ValidateTagName, ValidateGenericTagValues)
systemTagValidator = CombineTagValidators(commonTagValidator, ValidateOnlyOneTag)
subjectTagValidator = CombineTagValidators(ValidateSubjectTagValue, systemTagValidator)
ttlTagValidator = CombineTagValidators(ValidateTimeToLive, ValidateGenericTagValues)
)

type TagValidator func(tagName string, tagValues []string) error
Expand Down Expand Up @@ -80,7 +82,7 @@ func (handler *Handler) CreateBlob(w http.ResponseWriter, r *http.Request) {
if err := handler.store.SaveBlob(r.Context(), r.Body, key); err != nil {
log.Errorf("Failed to save blob: %v", err)

err = handler.db.RevertStagedBlobMetadata(r.Context(), key)
err = handler.db.DeleteBlobMetadata(r.Context(), key)
if err != nil {
log.Errorf("Failed to revert staged blob metadata: %v", err)
}
Expand Down Expand Up @@ -138,6 +140,23 @@ func ValidateSubjectTagValue(tagName string, tagValues []string) error {
return nil
}

func ValidateTimeToLive(tagName string, tagValues []string) error {

for _, t := range tagValues {
duration, err := time.ParseDuration(t)

if err != nil {
return fmt.Errorf("%s: %s", tagName, err.Error())
}

if duration < 0*time.Second {
return fmt.Errorf("%s: time-to-live cannot be negative", tagName)
}
}

return nil
}

func CombineTagValidators(validators ...TagValidator) TagValidator {
return func(tagName string, tagValues []string) error {
for _, v := range validators {
Expand All @@ -150,8 +169,8 @@ func CombineTagValidators(validators ...TagValidator) TagValidator {
}
}

func ValidateAndStoreOptionalSystemTag(tagName string, tagValues []string, field **string) error {
if err := systemTagValidator(tagName, tagValues); err != nil {
func ValidateAndStoreOptionalSystemTag(tagName string, tagValues []string, field **string, validator TagValidator) error {
if err := validator(tagName, tagValues); err != nil {
return err
}

Expand All @@ -162,11 +181,13 @@ func ValidateAndStoreOptionalSystemTag(tagName string, tagValues []string, field
func ValidateAndStoreTag(tags *core.BlobTags, tagName string, tagValues []string) error {
switch tagName {
case "device":
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Device)
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Device, systemTagValidator)
case "name":
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Name)
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Name, systemTagValidator)
case "session":
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Session)
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.Session, systemTagValidator)
case "_ttl":
return ValidateAndStoreOptionalSystemTag(tagName, tagValues, &tags.TimeToLive, ttlTagValidator)
default:
if err := commonTagValidator(tagName, tagValues); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion api/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestStorageWriteFailureRevertsStagedMetadata(t *testing.T) {
Return(nil, nil)

mockMetadataDatabase.EXPECT().
RevertStagedBlobMetadata(gomock.Any(), gomock.Any()).
DeleteBlobMetadata(gomock.Any(), gomock.Any()).
Return(nil)

mockBlobStore.EXPECT().
Expand Down
9 changes: 6 additions & 3 deletions api/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"errors"
"net/http"
"time"

"github.com/go-chi/chi/v5"
"github.com/ismrmrd/mrd-storage-server/core"
Expand All @@ -12,7 +13,7 @@ import (

type Responder func(http.ResponseWriter, *http.Request, *core.BlobInfo)

func (handler *Handler) MakeBlobEndpoint(responder Responder) http.HandlerFunc {
func (handler *Handler) MakeBlobEndpoint(responder Responder, grace time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

combinedId := chi.URLParam(r, "combined-id")
Expand All @@ -22,7 +23,7 @@ func (handler *Handler) MakeBlobEndpoint(responder Responder) http.HandlerFunc {
return
}

blobInfo, err := handler.db.GetBlobMetadata(r.Context(), key)
blobInfo, err := handler.db.GetBlobMetadata(r.Context(), key, time.Now().Add(-grace))
if err != nil {
if errors.Is(err, core.ErrRecordNotFound) {
w.WriteHeader(http.StatusNotFound)
Expand Down Expand Up @@ -56,8 +57,10 @@ func writeTagsAsHeaders(w http.ResponseWriter, blobInfo *core.BlobInfo) {
if blobInfo.Tags.ContentType == nil {
blobInfo.Tags.ContentType = pointer.String("application/octet-stream")
}
if blobInfo.ExpiresAt != nil {
w.Header().Add("Expires", blobInfo.ExpiresAt.Format(http.TimeFormat))
}
w.Header().Add("Content-Type", *blobInfo.Tags.ContentType)

w.Header().Add("Last-Modified", blobInfo.CreatedAt.Format(http.TimeFormat))

addSystemTagIfSet(w, "Device", blobInfo.Tags.Device)
Expand Down
4 changes: 2 additions & 2 deletions api/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (handler *Handler) SearchBlobs(w http.ResponseWriter, r *http.Request) {
return
}

results, ct, err := handler.db.SearchBlobMetadata(r.Context(), query, at, ct, pageSize)
results, ct, err := handler.db.SearchBlobMetadata(r.Context(), query, at, ct, pageSize, time.Now())

if err != nil {
if errors.Is(err, core.ErrInvalidContinuationToken) {
Expand Down Expand Up @@ -60,7 +60,7 @@ func (handler *Handler) GetLatestBlobData(w http.ResponseWriter, r *http.Request
return
}

results, _, err := handler.db.SearchBlobMetadata(r.Context(), query, at, nil, 1)
results, _, err := handler.db.SearchBlobMetadata(r.Context(), query, at, nil, 1, time.Now())

if err != nil {
log.Errorf("Failed to search blobs in DB: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions core/garbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// records in the medatada database.
func CollectGarbage(ctx context.Context, db MetadataDatabase, store BlobStore, olderThan time.Time) error {
for {
expiredKeys, err := db.GetPageOfExpiredStagedBlobMetadata(ctx, olderThan)
expiredKeys, err := db.GetPageOfExpiredBlobMetadata(ctx, olderThan)
if err != nil {
return err
}
Expand All @@ -39,7 +39,7 @@ func processExpiredKey(ctx context.Context, db MetadataDatabase, store BlobStore
return err
}

if err := db.RevertStagedBlobMetadata(ctx, key); err != nil && !errors.Is(err, ErrStagedRecordNotFound) {
if err := db.DeleteBlobMetadata(ctx, key); err != nil && !errors.Is(err, ErrBlobNotFound) {
return err
}

Expand Down
10 changes: 5 additions & 5 deletions core/garbage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/stretchr/testify/assert"
)

// Ensure garbage collection completes even when RevertStagedBlobMetadata
// returns ErrStagedRecordNotFound, which suggests that another instance
// Ensure garbage collection completes even when DeleteBlobMetadata
// returns ErrBlobNotFound, which suggests that another instance
// is performing garbage collection at the same time.
func TestConcurrentGarbageCollection(t *testing.T) {
mockCtrl := gomock.NewController(t)
Expand All @@ -24,13 +24,13 @@ func TestConcurrentGarbageCollection(t *testing.T) {
key := core.BlobKey{Subject: "s", Id: uuid.UUID{}}

db.EXPECT().
GetPageOfExpiredStagedBlobMetadata(gomock.Any(), gomock.Any()).
GetPageOfExpiredBlobMetadata(gomock.Any(), gomock.Any()).
Return([]core.BlobKey{key}, nil)

db.EXPECT().RevertStagedBlobMetadata(gomock.Any(), key).Return(core.ErrStagedRecordNotFound)
db.EXPECT().DeleteBlobMetadata(gomock.Any(), key).Return(core.ErrBlobNotFound)

db.EXPECT().
GetPageOfExpiredStagedBlobMetadata(gomock.Any(), gomock.Any()).
GetPageOfExpiredBlobMetadata(gomock.Any(), gomock.Any()).
Return([]core.BlobKey{}, nil)

store.EXPECT().DeleteBlob(gomock.Any(), key)
Expand Down
16 changes: 9 additions & 7 deletions core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ type BlobTags struct {
Device *string
Session *string
ContentType *string
TimeToLive *string
CustomTags map[string][]string
}

type BlobInfo struct {
Key BlobKey
CreatedAt time.Time
Tags BlobTags
Key BlobKey
Tags BlobTags
CreatedAt time.Time
ExpiresAt *time.Time
}

type ContinutationToken string
Expand All @@ -47,10 +49,10 @@ func UnixTimeMsToTime(timeValueMs int64) time.Time {
type MetadataDatabase interface {
StageBlobMetadata(ctx context.Context, key BlobKey, tags *BlobTags) (*BlobInfo, error)
CompleteStagedBlobMetadata(ctx context.Context, key BlobKey) error
RevertStagedBlobMetadata(ctx context.Context, key BlobKey) error
GetPageOfExpiredStagedBlobMetadata(ctx context.Context, olderThan time.Time) ([]BlobKey, error)
GetBlobMetadata(ctx context.Context, key BlobKey) (*BlobInfo, error)
SearchBlobMetadata(ctx context.Context, tags map[string][]string, at *time.Time, ct *ContinutationToken, pageSize int) ([]BlobInfo, *ContinutationToken, error)
DeleteBlobMetadata(ctx context.Context, key BlobKey) error
GetPageOfExpiredBlobMetadata(ctx context.Context, olderThan time.Time) ([]BlobKey, error)
GetBlobMetadata(ctx context.Context, key BlobKey, expiresAfter time.Time) (*BlobInfo, error)
SearchBlobMetadata(ctx context.Context, tags map[string][]string, at *time.Time, ct *ContinutationToken, pageSize int, expiresAfter time.Time) ([]BlobInfo, *ContinutationToken, error)
}

type BlobStore interface {
Expand Down
Loading

0 comments on commit e0c6a63

Please sign in to comment.