Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shipper: change upload compacted type from bool to a function #6526

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, false, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, nil, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))

ctx, cancel := context.WithCancel(context.Background())

Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,9 @@ func runSidecar(
return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout)
}

uploadCompactedFunc := func() bool { return conf.shipper.uploadCompacted }
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.bucket,
func() labels.Labels { return lset },
metadata.ReceiveSource,
false,
nil,
t.allowOutOfOrderUpload,
t.hashFunc,
)
Expand Down
29 changes: 15 additions & 14 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type metrics struct {
uploadedCompacted prometheus.Gauge
}

func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics {
func newMetrics(reg prometheus.Registerer) *metrics {
var m metrics

m.dirSyncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand All @@ -59,15 +59,10 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics {
Name: "thanos_shipper_upload_failures_total",
Help: "Total number of block upload failures",
})
uploadCompactedGaugeOpts := prometheus.GaugeOpts{
m.uploadedCompacted = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_shipper_upload_compacted_done",
Help: "If 1 it means shipper uploaded all compacted blocks from the filesystem.",
}
if uploadCompacted {
m.uploadedCompacted = promauto.With(reg).NewGauge(uploadCompactedGaugeOpts)
} else {
m.uploadedCompacted = promauto.With(nil).NewGauge(uploadCompactedGaugeOpts)
}
})
return &m
}

Expand All @@ -80,7 +75,7 @@ type Shipper struct {
bucket objstore.Bucket
source metadata.SourceType

uploadCompacted bool
uploadCompactedFunc func() bool
allowOutOfOrderUploads bool
hashFunc metadata.HashFunc

Expand All @@ -98,7 +93,7 @@ func New(
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
uploadCompacted bool,
uploadCompactedFunc func() bool,
allowOutOfOrderUploads bool,
hashFunc metadata.HashFunc,
) *Shipper {
Expand All @@ -109,15 +104,20 @@ func New(
lbls = func() labels.Labels { return nil }
}

if uploadCompactedFunc == nil {
uploadCompactedFunc = func() bool {
return false
}
}
return &Shipper{
logger: logger,
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, uploadCompacted),
metrics: newMetrics(r),
source: source,
allowOutOfOrderUploads: allowOutOfOrderUploads,
uploadCompacted: uploadCompacted,
uploadCompactedFunc: uploadCompactedFunc,
hashFunc: hashFunc,
}
}
Expand Down Expand Up @@ -272,6 +272,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
uploadErrs int
)

uploadCompacted := s.uploadCompactedFunc()
metas, err := s.blockMetasFromOldest()
if err != nil {
return 0, err
Expand All @@ -292,7 +293,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {

// We only ship of the first compacted block level as normal flow.
if m.Compaction.Level > 1 {
if !s.uploadCompacted {
if !uploadCompacted {
continue
}
}
Expand Down Expand Up @@ -339,7 +340,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs)
}

if s.uploadCompacted {
if uploadCompacted {
s.metrics.uploadedCompacted.Set(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to reset this metric if uploadCompacted suddenly became false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how this metric is designed and used tbh. But I can set it to 0 if it is disabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure too but I think let's call .Reset() instead https://pkg.go.dev/github.com/prometheus/[email protected]/prometheus#MetricVec.Reset to remove all previously set metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.Reset() is only available in MetricVec but this is a Gauge, not GaugeVec.

}
return uploaded, nil
Expand Down
8 changes: 5 additions & 3 deletions pkg/shipper/shipper_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
dir := t.TempDir()

extLset := labels.FromStrings("prometheus", "prom-1")
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.NoneFunc)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, nil, false, metadata.NoneFunc)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -219,7 +219,8 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger))

shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.NoneFunc)
uploadCompactedFunc := func() bool { return true }
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc)

// Create 10 new blocks. 9 of them (non compacted) should be actually uploaded.
var (
Expand Down Expand Up @@ -374,8 +375,9 @@ func TestShipper_SyncOverlapBlocks_e2e(t *testing.T) {
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger))

uploadCompactedFunc := func() bool { return true }
// Here, the allowOutOfOrderUploads flag is set to true, which allows blocks with overlaps to be uploaded.
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, true, metadata.NoneFunc)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, true, metadata.NoneFunc)

// Creating 2 overlapping blocks - both uploaded when OOO uploads allowed.
var (
Expand Down
8 changes: 4 additions & 4 deletions pkg/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func TestShipperTimestamps(t *testing.T) {
dir := t.TempDir()

s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc)
s := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc)

// Missing thanos meta file.
_, _, err := s.Timestamps()
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestIterBlockMetas(t *testing.T) {
},
}.WriteToDir(log.NewNopLogger(), path.Join(dir, id3.String())))

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc)
metas, err := shipper.blockMetasFromOldest()
testutil.Ok(t, err)
testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool {
Expand Down Expand Up @@ -153,7 +153,7 @@ func BenchmarkIterBlockMetas(b *testing.B) {
})
b.ResetTimer()

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc)

_, err := shipper.blockMetasFromOldest()
testutil.Ok(b, err)
Expand All @@ -165,7 +165,7 @@ func TestShipperAddsSegmentFiles(t *testing.T) {
inmemory := objstore.NewInMemBucket()

lbls := []labels.Label{{Name: "test", Value: "test"}}
s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false, metadata.NoneFunc)
s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, nil, false, metadata.NoneFunc)

id := ulid.MustNew(1, nil)
blockDir := path.Join(dir, id.String())
Expand Down