diff --git a/go.mod b/go.mod index c5bbedaaea..54b3f822a4 100644 --- a/go.mod +++ b/go.mod @@ -55,10 +55,10 @@ require ( github.com/prometheus/alertmanager v0.27.0 github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 - github.com/prometheus/common v0.54.0 + github.com/prometheus/common v0.54.1-0.20240615204547-04635d2962f9 github.com/prometheus/exporter-toolkit v0.11.0 // Prometheus maps version 2.x.y to tags v0.x.y. - github.com/prometheus/prometheus v0.52.2-0.20240606174736-edd558884b24 + github.com/prometheus/prometheus v0.52.2-0.20240614130246-4c1e71fa0b3d github.com/sony/gobreaker v0.5.0 github.com/stretchr/testify v1.9.0 github.com/thanos-io/objstore v0.0.0-20240613135658-39f40b8d97f7 @@ -274,9 +274,6 @@ replace ( // Required by Cortex https://github.com/cortexproject/cortex/pull/3051. github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab - // Temporarily pinning prometheus common: see https://github.com/thanos-io/thanos/pull/7416#issuecomment-2150585994 - github.com/prometheus/common v0.54.0 => github.com/prometheus/common v0.53.0 - // Pin kuberesolver/v5 to support new grpc version. Need to upgrade kuberesolver version on weaveworks/common. github.com/sercand/kuberesolver/v4 => github.com/sercand/kuberesolver/v5 v5.1.1 diff --git a/go.sum b/go.sum index 8e281c8805..f0cd7295d7 100644 --- a/go.sum +++ b/go.sum @@ -2131,8 +2131,8 @@ github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+ github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= -github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE= -github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= +github.com/prometheus/common v0.54.1-0.20240615204547-04635d2962f9 h1:WTZ/GBRTImL1HgRTEnJJcM2FuII7PXX1idCIEUJ8/r8= +github.com/prometheus/common v0.54.1-0.20240615204547-04635d2962f9/go.mod h1:1Yn/UzXoahbVLk1sn6wsGiSiemz3XQejcaz9FIA1r+I= github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI= github.com/prometheus/exporter-toolkit v0.8.2/go.mod h1:00shzmJL7KxcsabLWcONwpyNEuWhREOnFqZW7vadFS0= @@ -2150,8 +2150,8 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/prometheus/prometheus v0.52.2-0.20240606174736-edd558884b24 h1:h7ScWoH/UHp/Fz6eo/cJva/CbsyQjYtMOtAgIhTUBBE= -github.com/prometheus/prometheus v0.52.2-0.20240606174736-edd558884b24/go.mod h1:RZDkzs+ShMBDkAPQkLEaLBXpjmDcjhNxU2drUVPgKUU= +github.com/prometheus/prometheus v0.52.2-0.20240614130246-4c1e71fa0b3d h1:FFA2droSZiHghpHgKHefUCZLJnJQo+ZrKkBkcgZ0xp4= +github.com/prometheus/prometheus v0.52.2-0.20240614130246-4c1e71fa0b3d/go.mod h1:5AuUoK+n3jJUhhlc7C2DKdhI+dUx93eJvTYpvE0A3jE= github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw= github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= diff --git a/pkg/clientconfig/http.go b/pkg/clientconfig/http.go index 1198ad7903..dc94938d75 100644 --- a/pkg/clientconfig/http.go +++ b/pkg/clientconfig/http.go @@ -162,21 +162,28 @@ func NewRoundTripperFromConfig(cfg config_util.HTTPClientConfig, transportConfig // If an authorization_credentials is provided, create a round tripper that will set the // Authorization header correctly on each request. if cfg.Authorization != nil && len(cfg.Authorization.Credentials) > 0 { - rt = config_util.NewAuthorizationCredentialsRoundTripper(cfg.Authorization.Type, cfg.Authorization.Credentials, rt) + rt = config_util.NewAuthorizationCredentialsRoundTripper(cfg.Authorization.Type, config_util.NewInlineSecret(string(cfg.Authorization.Credentials)), rt) } else if cfg.Authorization != nil && len(cfg.Authorization.CredentialsFile) > 0 { - rt = config_util.NewAuthorizationCredentialsFileRoundTripper(cfg.Authorization.Type, cfg.Authorization.CredentialsFile, rt) + rt = config_util.NewAuthorizationCredentialsRoundTripper(cfg.Authorization.Type, config_util.NewFileSecret(cfg.Authorization.CredentialsFile), rt) } // Backwards compatibility, be nice with importers who would not have // called Validate(). if len(cfg.BearerToken) > 0 { - rt = config_util.NewAuthorizationCredentialsRoundTripper("Bearer", cfg.BearerToken, rt) + rt = config_util.NewAuthorizationCredentialsRoundTripper("Bearer", config_util.NewInlineSecret(string(cfg.BearerToken)), rt) } else if len(cfg.BearerTokenFile) > 0 { - rt = config_util.NewAuthorizationCredentialsFileRoundTripper("Bearer", cfg.BearerTokenFile, rt) + rt = config_util.NewAuthorizationCredentialsRoundTripper("Bearer", config_util.NewFileSecret(cfg.BearerTokenFile), rt) } if cfg.BasicAuth != nil { // TODO(yeya24): expose UsernameFile as a config. - rt = config_util.NewBasicAuthRoundTripper(cfg.BasicAuth.Username, cfg.BasicAuth.Password, "", cfg.BasicAuth.PasswordFile, rt) + username := config_util.NewInlineSecret(cfg.BasicAuth.Username) + var password config_util.SecretReader + if len(cfg.BasicAuth.PasswordFile) > 0 { + password = config_util.NewFileSecret(cfg.BasicAuth.PasswordFile) + } else { + password = config_util.NewInlineSecret(string(cfg.BasicAuth.Password)) + } + rt = config_util.NewBasicAuthRoundTripper(username, password, rt) } // Return a new configured RoundTripper. return rt, nil @@ -193,9 +200,9 @@ func NewRoundTripperFromConfig(cfg config_util.HTTPClientConfig, transportConfig } return config_util.NewTLSRoundTripper(tlsConfig, config_util.TLSRoundTripperSettings{ - CAFile: cfg.TLSConfig.CAFile, - CertFile: cfg.TLSConfig.CertFile, - KeyFile: cfg.TLSConfig.KeyFile, + CA: config_util.NewFileSecret(cfg.TLSConfig.CAFile), + Cert: config_util.NewFileSecret(cfg.TLSConfig.CertFile), + Key: config_util.NewFileSecret(cfg.TLSConfig.KeyFile), }, newRT) } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 4fa736433f..cabbed73c1 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -851,17 +851,19 @@ type Compactor interface { // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. - // When resulting Block has 0 samples + // Prometheus always return one or no block. The interface allows returning more than one + // block for downstream users to experiment with compactor. + // When one resulting Block has 0 samples // * No block is written. // * The source dirs are marked Deletable. - // * Returns empty ulid.ULID{}. - Compact(dest string, dirs []string, open []*tsdb.Block) (ulid.ULID, error) - CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) (ulid.ULID, error) + // * Block is not included in the result. + Compact(dest string, dirs []string, open []*tsdb.Block) ([]ulid.ULID, error) + CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) ([]ulid.ULID, error) } // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback) (shouldRerun bool, compID ulid.ULID, rerr error) { +func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback) (shouldRerun bool, compIDs []ulid.ULID, rerr error) { cg.compactionRunsStarted.Inc() subDir := filepath.Join(dir, cg.Key()) @@ -878,7 +880,7 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp }() if err := os.MkdirAll(subDir, 0750); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") + return false, nil, errors.Wrap(err, "create compaction group dir") } defer func() { @@ -898,17 +900,17 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp errChan := make(chan error, 1) err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { - shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan) + shouldRerun, compIDs, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan) return err }, opentracing.Tags{"group.key": cg.Key()}) errChan <- err close(errChan) if err != nil { cg.compactionFailures.Inc() - return false, ulid.ULID{}, err + return false, nil, err } cg.compactionRunsCompleted.Inc() - return shouldRerun, compID, nil + return shouldRerun, compIDs, nil } // Issue347Error is a type wrapper for errors that should invoke repair process for broken block. @@ -1114,7 +1116,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (shouldRerun bool, compID ulid.ULID, _ error) { +func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (bool, []ulid.ULID, error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -1124,7 +1126,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked // in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check. if !cg.enableVerticalCompaction { - return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check")) + return false, nil, halt(errors.Wrap(err, "pre compaction overlap check")) } overlappingBlocks = true @@ -1135,11 +1137,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp toCompact, e = planner.Plan(ctx, cg.metasByMinTime, errChan, cg.extensions) return e }); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") + return false, nil, errors.Wrap(err, "plan compaction") } if len(toCompact) == 0 { // Nothing to do. - return false, ulid.ULID{}, nil + return false, nil, nil } level.Info(cg.logger).Log("msg", "compaction available and planned", "plan", fmt.Sprintf("%v", toCompact)) @@ -1149,7 +1151,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin := groupCompactionBegin if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) + return false, nil, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) } level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", fmt.Sprintf("%v", toCompact)) @@ -1206,25 +1208,26 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp sourceBlockStr := fmt.Sprintf("%v", toCompactDirs) if err := g.Wait(); err != nil { - return false, ulid.ULID{}, err + return false, nil, err } level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "plan", sourceBlockStr) begin = time.Now() + var compIDs []ulid.ULID if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) { populateBlockFunc, e := compactionLifecycleCallback.GetBlockPopulator(ctx, cg.logger, cg) if e != nil { return e } - compID, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc) + compIDs, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc) return e }); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) + return false, nil, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) } - if compID == (ulid.ULID{}) { - // Prometheus compactor found that the compacted block would have no samples. - level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr) + if len(compIDs) == 0 { + // No compacted blocks means all compacted blocks are of no sample. + level.Info(cg.logger).Log("msg", "no compacted blocks, deleting source blocks", "blocks", sourceBlockStr) for _, meta := range toCompact { if meta.Stats.NumSamples == 0 { if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()), blockDeletableChecker); err != nil { @@ -1232,99 +1235,103 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } } } - // Even though this block was empty, there may be more work to do. - return true, ulid.ULID{}, nil + // Even though no compacted blocks, there may be more work to do. + return true, nil, nil } cg.compactions.Inc() if overlappingBlocks { cg.verticalCompactions.Inc() } - level.Info(cg.logger).Log("msg", "compacted blocks", "new", compID, + compIDStrings := make([]string, 0, len(compIDs)) + for _, compID := range compIDs { + compIDStrings = append(compIDStrings, compID.String()) + } + compIDStrs := fmt.Sprintf("%v", compIDStrings) + level.Info(cg.logger).Log("msg", "compacted blocks", "new", compIDStrs, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "overlapping_blocks", overlappingBlocks, "blocks", sourceBlockStr) - bdir := filepath.Join(dir, compID.String()) - index := filepath.Join(bdir, block.IndexFilename) - - if err := os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones") - } + for _, compID := range compIDs { + bdir := filepath.Join(dir, compID.String()) + index := filepath.Join(bdir, block.IndexFilename) - newMeta, err := metadata.ReadFromDir(bdir) - if err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "read new meta") - } + if err := os.Remove(filepath.Join(bdir, "tombstones")); err != nil { + return false, nil, errors.Wrap(err, "remove tombstones") + } - var stats block.HealthStats - // Ensure the output block is valid. - err = tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error { - stats, err = block.GatherIndexHealthStats(ctx, cg.logger, index, newMeta.MinTime, newMeta.MaxTime) + newMeta, err := metadata.ReadFromDir(bdir) if err != nil { - return err + return false, nil, errors.Wrap(err, "read new meta") } - return stats.AnyErr() - }) - if !cg.acceptMalformedIndex && err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) - } - thanosMeta := metadata.Thanos{ - Labels: cg.labels.Map(), - Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, - Source: metadata.CompactorSource, - SegmentFiles: block.GetSegmentFiles(bdir), - Extensions: cg.extensions, - } - if stats.ChunkMaxSize > 0 { - thanosMeta.IndexStats.ChunkMaxSize = stats.ChunkMaxSize - } - if stats.SeriesMaxSize > 0 { - thanosMeta.IndexStats.SeriesMaxSize = stats.SeriesMaxSize - } - newMeta, err = metadata.InjectThanos(cg.logger, bdir, thanosMeta, nil) - if err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) - } + var stats block.HealthStats + // Ensure the output block is valid. + err = tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error { + stats, err = block.GatherIndexHealthStats(ctx, cg.logger, index, newMeta.MinTime, newMeta.MaxTime) + if err != nil { + return err + } + return stats.AnyErr() + }) + if !cg.acceptMalformedIndex && err != nil { + return false, nil, halt(errors.Wrapf(err, "invalid result block %s", bdir)) + } - // Ensure the output block is not overlapping with anything else, - // unless vertical compaction is enabled. - if !cg.enableVerticalCompaction { - if err := cg.areBlocksOverlapping(newMeta, toCompact...); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + thanosMeta := metadata.Thanos{ + Labels: cg.labels.Map(), + Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, + Source: metadata.CompactorSource, + SegmentFiles: block.GetSegmentFiles(bdir), + Extensions: cg.extensions, + } + if stats.ChunkMaxSize > 0 { + thanosMeta.IndexStats.ChunkMaxSize = stats.ChunkMaxSize + } + if stats.SeriesMaxSize > 0 { + thanosMeta.IndexStats.SeriesMaxSize = stats.SeriesMaxSize + } + newMeta, err = metadata.InjectThanos(cg.logger, bdir, thanosMeta, nil) + if err != nil { + return false, nil, errors.Wrapf(err, "failed to finalize the block %s", bdir) + } + // Ensure the output block is not overlapping with anything else, + // unless vertical compaction is enabled. + if !cg.enableVerticalCompaction { + if err := cg.areBlocksOverlapping(newMeta, toCompact...); err != nil { + return false, nil, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) + } } - } - begin = time.Now() + begin = time.Now() - err = tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { - return block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency)) - }) - if err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) + err = tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { + return block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency)) + }) + if err != nil { + return false, nil, retry(errors.Wrapf(err, "upload of %s failed", compID)) + } + level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + level.Info(cg.logger).Log("msg", "running post compaction callback", "result_block", compID) + if err := compactionLifecycleCallback.PostCompactionCallback(ctx, cg.logger, cg, compID); err != nil { + return false, nil, retry(errors.Wrapf(err, "failed to run post compaction callback for result block %s", compID)) + } + level.Info(cg.logger).Log("msg", "finished running post compaction callback", "result_block", compID) } - level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) // Mark for deletion the blocks we just compacted from the group and bucket so they do not get included // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, meta := range toCompact { - err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { + if err := tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()), blockDeletableChecker) - }, opentracing.Tags{"block.id": meta.ULID}) - if err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) + }, opentracing.Tags{"block.id": meta.ULID}); err != nil { + return false, nil, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) } cg.groupGarbageCollectedBlocks.Inc() } - level.Info(cg.logger).Log("msg", "running post compaction callback", "result_block", compID) - if err := compactionLifecycleCallback.PostCompactionCallback(ctx, cg.logger, cg, compID); err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "failed to run post compaction callback for result block %s", compID)) - } - level.Info(cg.logger).Log("msg", "finished running post compaction callback", "result_block", compID) - level.Info(cg.logger).Log("msg", "finished compacting blocks", "duration", time.Since(groupCompactionBegin), - "duration_ms", time.Since(groupCompactionBegin).Milliseconds(), "result_block", compID, "source_blocks", sourceBlockStr) - return true, compID, nil + "duration_ms", time.Since(groupCompactionBegin).Milliseconds(), "result_blocks", compIDStrs, "source_blocks", sourceBlockStr) + return true, compIDs, nil } func (cg *Group) deleteBlock(id ulid.ULID, bdir string, blockDeletableChecker BlockDeletableChecker) error { diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index fcf93073a3..4fd762c2d6 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -887,7 +887,7 @@ func TestBucketStore_Acceptance(t *testing.T) { } for _, replica := range []string{"r1", "r2"} { - id := createBlockFromHead(tt, auxDir, h) + id := storetestutil.CreateBlockFromHead(tt, auxDir, h) auxBlockDir := filepath.Join(auxDir, id.String()) meta, err := metadata.ReadFromDir(auxBlockDir) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 7298e80230..11ef1b73c2 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1197,7 +1197,7 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in dir := filepath.Join(tmpDir, "tmp") testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) - id := createBlockFromHead(t, dir, h) + id := storetestutil.CreateBlockFromHead(t, dir, h) bdir := filepath.Join(dir, id.String()) meta, err := metadata.ReadFromDir(bdir) testutil.Ok(t, err) @@ -1238,18 +1238,6 @@ func appendTestData(t testing.TB, app storage.Appender, series int) { testutil.Ok(t, app.Commit()) } -func createBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID { - compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) - testutil.Ok(t, err) - testutil.Ok(t, os.MkdirAll(dir, 0777)) - - // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). - // Because of this block intervals are always +1 than the total samples it includes. - ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) - testutil.Ok(t, err) - return ulid -} - // Very similar benchmark to ths: https://github.com/prometheus/prometheus/blob/1d1732bc25cc4b47f513cb98009a4eb91879f175/tsdb/querier_bench_test.go#L82, // but with postings results check when run as test. func benchmarkExpandedPostings( @@ -1498,7 +1486,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, SkipChunks: t.IsBenchmark() || skipChunk, SampleType: sampleType, }) - id := createBlockFromHead(t, blockDir, head) + id := storetestutil.CreateBlockFromHead(t, blockDir, head) testutil.Ok(t, head.Close()) blockIDDir := filepath.Join(blockDir, id.String()) meta, err := metadata.ReadFromDir(blockIDDir) @@ -1715,7 +1703,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { testutil.Ok(t, app.Commit()) blockDir := filepath.Join(tmpDir, "tmp") - id := createBlockFromHead(t, blockDir, h) + id := storetestutil.CreateBlockFromHead(t, blockDir, h) meta, err := metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, id.String()), thanosMeta, nil) testutil.Ok(t, err) @@ -1755,7 +1743,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { testutil.Ok(t, app.Commit()) blockDir := filepath.Join(tmpDir, "tmp2") - id := createBlockFromHead(t, blockDir, h) + id := storetestutil.CreateBlockFromHead(t, blockDir, h) meta, err := metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, id.String()), thanosMeta, nil) testutil.Ok(t, err) @@ -2049,7 +2037,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { testutil.Ok(t, app.Commit()) } - blk := createBlockFromHead(t, headOpts.ChunkDirRoot, h) + blk := storetestutil.CreateBlockFromHead(t, headOpts.ChunkDirRoot, h) thanosMeta := metadata.Thanos{ Labels: labels.FromStrings("ext1", "1").Map(), @@ -2362,7 +2350,7 @@ func createBlockWithOneSeriesWithStep(t testutil.TB, dir string, lbls labels.Lab } testutil.Ok(t, app.Commit()) - return createBlockFromHead(t, dir, h) + return storetestutil.CreateBlockFromHead(t, dir, h) } func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb.Series, []*storepb.Series, ulid.ULID, ulid.ULID, func()) { @@ -2399,7 +2387,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb PrependLabels: extLset, Random: random, }) - block1 := createBlockFromHead(t, bktDir, head) + block1 := storetestutil.CreateBlockFromHead(t, bktDir, head) testutil.Ok(t, head.Close()) head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{ TSDBDir: filepath.Join(tmpDir, "1"), @@ -2408,7 +2396,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb PrependLabels: extLset, Random: random, }) - block2 := createBlockFromHead(t, bktDir, head2) + block2 := storetestutil.CreateBlockFromHead(t, bktDir, head2) testutil.Ok(t, head2.Close()) for _, blockID := range []ulid.ULID{block1, block2} { @@ -2612,7 +2600,7 @@ func TestSeries_ChunksHaveHashRepresentation(t *testing.T) { } testutil.Ok(t, app.Commit()) - blk := createBlockFromHead(t, headOpts.ChunkDirRoot, h) + blk := storetestutil.CreateBlockFromHead(t, headOpts.ChunkDirRoot, h) thanosMeta := metadata.Thanos{ Labels: labels.FromStrings("ext1", "1").Map(), @@ -2811,7 +2799,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck Random: rand.New(rand.NewSource(120)), SkipChunks: true, }) - blockID := createBlockFromHead(b, tmpDir, head) + blockID := storetestutil.CreateBlockFromHead(b, tmpDir, head) // Upload the block to the bucket. thanosMeta := metadata.Thanos{ @@ -3476,7 +3464,7 @@ func TestExpandedPostingsRace(t *testing.T) { Random: rand.New(rand.NewSource(120)), SkipChunks: true, }) - blockID := createBlockFromHead(t, tmpDir, head) + blockID := storetestutil.CreateBlockFromHead(t, tmpDir, head) bucketBlocks := make([]*bucketBlock, 0, blockCount) @@ -3612,7 +3600,7 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - id := createBlockFromHead(t, auxDir, h) + id := storetestutil.CreateBlockFromHead(t, auxDir, h) auxBlockDir := filepath.Join(auxDir, id.String()) _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ @@ -3831,7 +3819,7 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - id := createBlockFromHead(t, auxDir, h) + id := storetestutil.CreateBlockFromHead(t, auxDir, h) auxBlockDir := filepath.Join(auxDir, id.String()) _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index c5c4896bd7..d7d719e281 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -68,9 +68,10 @@ func CreateBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID { // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). // Because of this block intervals are always +1 than the total samples it includes. - ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) + ulids, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) testutil.Ok(t, err) - return ulid + testutil.Assert(t, len(ulids) > 0) + return ulids[0] } // CreateHeadWithSeries returns head filled with given samples and same series returned in separate list for assertion purposes. diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 285d8bc37a..acd9daf02a 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -257,7 +257,7 @@ func TestTSDBStore_SeriesAccessWithDelegateClosing(t *testing.T) { Random: random, SkipChunks: true, }) - _ = createBlockFromHead(t, tmpDir, head) + _ = storetestutil.CreateBlockFromHead(t, tmpDir, head) testutil.Ok(t, head.Close()) head, _ = storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{ @@ -426,7 +426,7 @@ func TestTSDBStore_SeriesAccessWithoutDelegateClosing(t *testing.T) { Random: random, SkipChunks: true, }) - _ = createBlockFromHead(t, tmpDir, head) + _ = storetestutil.CreateBlockFromHead(t, tmpDir, head) testutil.Ok(t, head.Close()) head, _ = storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{ @@ -566,7 +566,7 @@ func benchTSDBStoreSeries(t testutil.TB, totalSamples, totalSeries int) { resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i])) } - _ = createBlockFromHead(t, tmpDir, head) + _ = storetestutil.CreateBlockFromHead(t, tmpDir, head) testutil.Ok(t, head.Close()) } diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 4faba4fafd..39aab94eab 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -605,14 +605,14 @@ func createBlock( return id, errors.Wrap(err, "create compactor") } - id, err = c.Write(dir, h, mint, maxt, nil) + ids, err := c.Write(dir, h, mint, maxt, nil) if err != nil { return id, errors.Wrap(err, "write block") } - - if id.Compare(ulid.ULID{}) == 0 { + if len(ids) == 0 { return id, errors.Errorf("nothing to write, asked for %d samples", numSamples) } + id = ids[0] blockDir := filepath.Join(dir, id.String()) logger := log.NewNopLogger() @@ -769,14 +769,15 @@ func CreateBlockWithChurn( return id, errors.Wrap(err, "create compactor") } - id, err = c.Write(dir, h, mint, maxt, nil) + ids, err := c.Write(dir, h, mint, maxt, nil) if err != nil { return id, errors.Wrap(err, "write block") } - if id.Compare(ulid.ULID{}) == 0 { + if len(ids) == 0 { return id, errors.Errorf("nothing to write, asked for %d samples", numSamples) } + id = ids[0] blockDir := filepath.Join(dir, id.String()) logger := log.NewNopLogger()