diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 5e4b601dcff..e3e127a88d4 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -4,7 +4,9 @@ package e2e_test import ( + "bytes" "context" + "encoding/json" "fmt" "net/http" "os" @@ -17,9 +19,12 @@ import ( e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/go-kit/kit/log" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/s3" @@ -29,18 +34,23 @@ import ( "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) +type blockDesc struct { + series []labels.Labels + extLset labels.Labels + mint int64 + maxt int64 +} + +func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration) (ulid.ULID, error) { + return e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, 120, b.mint, b.maxt, delay, b.extLset, 0) +} + func TestCompactWithStoreGateway(t *testing.T) { t.Parallel() - l := log.NewLogfmtLogger(os.Stdout) - type blockDesc struct { - series []labels.Labels - extLset labels.Labels - mint int64 - maxt int64 - } + logger := log.NewLogfmtLogger(os.Stdout) - delay := 30 * time.Minute + justAfterConsistencyDelay := 30 * time.Minute // Make sure to take realistic timestamp for start. This is to align blocks as if they would be aligned on Prometheus. // To have deterministic compaction, let's have fixed date: now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z") @@ -266,7 +276,7 @@ func TestCompactWithStoreGateway(t *testing.T) { m := e2edb.NewMinio(8080, bucket) testutil.Ok(t, s.StartAndWaitReady(m)) - bkt, err := s3.NewBucketWithConfig(l, s3.Config{ + bkt, err := s3.NewBucketWithConfig(logger, s3.Config{ Bucket: bucket, AccessKey: e2edb.MinioAccessKey, SecretKey: e2edb.MinioSecretKey, @@ -280,11 +290,73 @@ func TestCompactWithStoreGateway(t *testing.T) { rawBlockIDs := map[ulid.ULID]struct{}{} for _, b := range blocks { - id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, 120, b.mint, b.maxt, delay, b.extLset, 0) + id, err := b.Create(ctx, dir, justAfterConsistencyDelay) testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) rawBlockIDs[id] = struct{}{} } + { + // On top of that, add couple of other tricky cases with different meta. + malformedBase := blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("case", "malformed-things", "replica", "101"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + } + + // New Partial block. + id, err := malformedBase.Create(ctx, dir, 0*time.Second) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // New Partial block + deletion mark. + id, err = malformedBase.Create(ctx, dir, 0*time.Second) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, prometheus.NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after consistency delay. + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after consistency delay + deletion mark. + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, prometheus.NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after consistency delay + old deletion mark ready to be deleted. + id, err = malformedBase.Create(ctx, dir, justAfterConsistencyDelay) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + deletionMark, err := json.Marshal(metadata.DeletionMark{ + ID: id, + // Deletion threshold is usually 2 days. + DeletionTime: time.Now().Add(-50 * time.Hour).Unix(), + Version: metadata.DeletionMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(dir, id.String(), metadata.DeletionMarkFilename), bytes.NewBuffer(deletionMark))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after delete threshold. + id, err = malformedBase.Create(ctx, dir, 50*time.Hour) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + + // Partial block after delete threshold + deletion mark. + id, err = malformedBase.Create(ctx, dir, 50*time.Hour) + testutil.Ok(t, err) + testutil.Ok(t, os.Remove(path.Join(dir, id.String(), metadata.MetaFilename))) + testutil.Ok(t, block.MarkForDeletion(ctx, logger, bkt, id, prometheus.NewCounter(prometheus.CounterOpts{}))) + testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) + } svcConfig := client.BucketConfig{ Type: client.S3,