diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index bc1f049de6..346ae52fec 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -728,7 +728,7 @@ func runRule( } }() - s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, false, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) + s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, nil, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 4f8208ccde..116ae61888 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -341,8 +341,9 @@ func runSidecar( return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout) } + uploadCompactedFunc := func() bool { return conf.shipper.uploadCompacted } s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, - conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) + uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc)) return runutil.Repeat(30*time.Second, ctx.Done(), func() error { if uploaded, err := s.Sync(ctx); err != nil { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 349ceb98ef..2458e7123f 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -600,7 +600,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.bucket, func() labels.Labels { return lset }, metadata.ReceiveSource, - false, + nil, t.allowOutOfOrderUpload, t.hashFunc, ) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 2a2c04df2d..d3562d87b1 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -40,7 +40,7 @@ type metrics struct { uploadedCompacted prometheus.Gauge } -func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics { +func newMetrics(reg prometheus.Registerer) *metrics { var m metrics m.dirSyncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -59,15 +59,10 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics { Name: "thanos_shipper_upload_failures_total", Help: "Total number of block upload failures", }) - uploadCompactedGaugeOpts := prometheus.GaugeOpts{ + m.uploadedCompacted = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Name: "thanos_shipper_upload_compacted_done", Help: "If 1 it means shipper uploaded all compacted blocks from the filesystem.", - } - if uploadCompacted { - m.uploadedCompacted = promauto.With(reg).NewGauge(uploadCompactedGaugeOpts) - } else { - m.uploadedCompacted = promauto.With(nil).NewGauge(uploadCompactedGaugeOpts) - } + }) return &m } @@ -80,7 +75,7 @@ type Shipper struct { bucket objstore.Bucket source metadata.SourceType - uploadCompacted bool + uploadCompactedFunc func() bool allowOutOfOrderUploads bool hashFunc metadata.HashFunc @@ -98,7 +93,7 @@ func New( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, - uploadCompacted bool, + uploadCompactedFunc func() bool, allowOutOfOrderUploads bool, hashFunc metadata.HashFunc, ) *Shipper { @@ -109,15 +104,20 @@ func New( lbls = func() labels.Labels { return nil } } + if uploadCompactedFunc == nil { + uploadCompactedFunc = func() bool { + return false + } + } return &Shipper{ logger: logger, dir: dir, bucket: bucket, labels: lbls, - metrics: newMetrics(r, uploadCompacted), + metrics: newMetrics(r), source: source, allowOutOfOrderUploads: allowOutOfOrderUploads, - uploadCompacted: uploadCompacted, + uploadCompactedFunc: uploadCompactedFunc, hashFunc: hashFunc, } } @@ -272,6 +272,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { uploadErrs int ) + uploadCompacted := s.uploadCompactedFunc() metas, err := s.blockMetasFromOldest() if err != nil { return 0, err @@ -292,7 +293,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { // We only ship of the first compacted block level as normal flow. if m.Compaction.Level > 1 { - if !s.uploadCompacted { + if !uploadCompacted { continue } } @@ -339,8 +340,10 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs) } - if s.uploadCompacted { + if uploadCompacted { s.metrics.uploadedCompacted.Set(1) + } else { + s.metrics.uploadedCompacted.Set(0) } return uploaded, nil } diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 985ee4329f..5b95a2059b 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -44,7 +44,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { dir := t.TempDir() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.NoneFunc) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, nil, false, metadata.NoneFunc) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -219,7 +219,8 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger)) - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.NoneFunc) + uploadCompactedFunc := func() bool { return true } + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( @@ -374,8 +375,9 @@ func TestShipper_SyncOverlapBlocks_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger)) + uploadCompactedFunc := func() bool { return true } // Here, the allowOutOfOrderUploads flag is set to true, which allows blocks with overlaps to be uploaded. - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, true, metadata.NoneFunc) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, true, metadata.NoneFunc) // Creating 2 overlapping blocks - both uploaded when OOO uploads allowed. var ( diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 75a37afcc0..96ded414f9 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -29,7 +29,7 @@ import ( func TestShipperTimestamps(t *testing.T) { dir := t.TempDir() - s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) + s := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc) // Missing thanos meta file. _, _, err := s.Timestamps() @@ -122,7 +122,7 @@ func TestIterBlockMetas(t *testing.T) { }, }.WriteToDir(log.NewNopLogger(), path.Join(dir, id3.String()))) - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc) metas, err := shipper.blockMetasFromOldest() testutil.Ok(t, err) testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { @@ -153,7 +153,7 @@ func BenchmarkIterBlockMetas(b *testing.B) { }) b.ResetTimer() - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc) _, err := shipper.blockMetasFromOldest() testutil.Ok(b, err) @@ -165,7 +165,7 @@ func TestShipperAddsSegmentFiles(t *testing.T) { inmemory := objstore.NewInMemBucket() lbls := []labels.Label{{Name: "test", Value: "test"}} - s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false, metadata.NoneFunc) + s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, nil, false, metadata.NoneFunc) id := ulid.MustNew(1, nil) blockDir := path.Join(dir, id.String())