Skip to content

Commit

Permalink
Merge pull request #93 from vinted/pull_pr_7166
Browse files Browse the repository at this point in the history
Pull pr 7166
  • Loading branch information
GiedriusS authored Apr 3, 2024
2 parents 734a005 + 7f6e49f commit aa493ea
Showing 4 changed files with 148 additions and 22 deletions.
64 changes: 55 additions & 9 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@@ -157,7 +158,34 @@ type tenant struct {
exemplarsTSDB *exemplars.TSDB
ship *shipper.Shipper

mtx *sync.RWMutex
mtx *sync.RWMutex
tsdb *tsdb.DB

// For tests.
blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc
}

func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
t.mtx.RLock()
defer t.mtx.RUnlock()

if t.tsdb == nil {
return nil
}

deletable := t.blocksToDeleteFn(t.tsdb)(blocks)
if t.ship == nil {
return deletable
}

uploaded := t.ship.UploadedBlocks()
for deletableID := range deletable {
if _, ok := uploaded[deletableID]; !ok {
delete(deletable, deletableID)
}
}

return deletable
}

func newTenant() *tenant {
@@ -205,14 +233,15 @@ func (t *tenant) shipper() *shipper.Shipper {
func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.setComponents(storeTSDB, ship, exemplarsTSDB)
t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB)
t.mtx.Unlock()
}

func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) {
t.storeTSDB = storeTSDB
t.ship = ship
t.exemplarsTSDB = exemplarsTSDB
t.tsdb = tenantTSDB
}

func (t *MultiTSDB) Open() error {
@@ -356,7 +385,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {

// pruneTSDB removes a TSDB if its past the retention period.
// It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk.
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) {
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (pruned bool, rerr error) {
tenantTSDB := tenantInstance.readyStorage()
if tenantTSDB == nil {
return false, nil
@@ -386,9 +415,21 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
tenantTSDB.mtx.Lock()
defer tenantTSDB.mtx.Unlock()

// Lock the entire tenant to make sure the shipper is not running in parallel.
// Make sure the shipper is not running in parallel.
tenantInstance.mtx.Lock()
defer tenantInstance.mtx.Unlock()
shipper := tenantInstance.ship
tenantInstance.ship = nil
tenantInstance.mtx.Unlock()

defer func() {
if pruned {
return
}
// If the tenant was not pruned, re-enable the shipper.
tenantInstance.mtx.Lock()
tenantInstance.ship = shipper
tenantInstance.mtx.Unlock()
}()

sinceLastAppendMillis = time.Since(time.UnixMilli(head.MaxTime())).Milliseconds()
if sinceLastAppendMillis <= compactThreshold {
@@ -405,8 +446,9 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
}

level.Info(logger).Log("msg", "Pruning tenant")
if tenantInstance.ship != nil {
uploaded, err := tenantInstance.ship.Sync(ctx)
if shipper != nil {
// No other code can reach this shipper anymore so enable it again to be able to sync manually.
uploaded, err := shipper.Sync(ctx)
if err != nil {
return false, err
}
@@ -424,8 +466,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
return false, err
}

tenantInstance.mtx.Lock()
tenantInstance.readyS.set(nil)
tenantInstance.setComponents(nil, nil, nil)
tenantInstance.setComponents(nil, nil, nil, nil)
tenantInstance.mtx.Unlock()

return true, nil
}
@@ -582,6 +626,8 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant

level.Info(logger).Log("msg", "opening TSDB")
opts := *t.tsdbOpts
opts.BlocksToDelete = tenant.blocksToDelete
tenant.blocksToDeleteFn = tsdb.DefaultBlocksToDelete

// NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL
// which gets later converted into a block. Without setting this flag to false, the block would get compacted
69 changes: 69 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
@@ -9,17 +9,21 @@ import (
"math"
"os"
"path"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@@ -28,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -466,8 +471,16 @@ func TestMultiTSDBPrune(t *testing.T) {
}

testutil.Ok(t, m.Prune(ctx))
if test.bucket != nil {
_, err := m.Sync(ctx)
testutil.Ok(t, err)
}

testutil.Equals(t, test.expectedTenants, len(m.TSDBLocalClients()))
var shippedBlocks int
if test.bucket == nil && shippedBlocks > 0 {
t.Fatal("can't expect uploads when there is no bucket")
}
if test.bucket != nil {
testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error {
shippedBlocks++
@@ -829,3 +842,59 @@ func BenchmarkMultiTSDB(b *testing.B) {
_, _ = a.Append(0, l, int64(i), float64(i))
}
}

func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) {
tenant := &tenant{
mtx: &sync.RWMutex{},
}

t.Run("no blocks", func(t *testing.T) {
require.Equal(t, (map[ulid.ULID]struct{})(nil), tenant.blocksToDelete(nil))
})

tenant.tsdb = &tsdb.DB{}

mockBlockIDs := []ulid.ULID{
ulid.MustNew(1, nil),
ulid.MustNew(2, nil),
}

t.Run("no shipper", func(t *testing.T) {
tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc {
return func(_ []*tsdb.Block) map[ulid.ULID]struct{} {
return map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}
}
}

require.Equal(t, map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}, tenant.blocksToDelete(nil))
})

t.Run("some blocks uploaded", func(t *testing.T) {
tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc {
return func(_ []*tsdb.Block) map[ulid.ULID]struct{} {
return map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}
}
}

td := t.TempDir()

require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), filepath.Join(td, shipper.DefaultMetaFilename), &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{mockBlockIDs[0]},
}))

tenant.ship = shipper.New(log.NewNopLogger(), nil, td, nil, nil, metadata.BucketRepairSource, nil, false, metadata.NoneFunc, "")
require.Equal(t, map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
}, tenant.blocksToDelete(nil))
})
}
4 changes: 1 addition & 3 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
@@ -225,9 +225,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
numExemplarsLabelLength++
level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength)
default:
if err != nil {
level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err)
}
level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err)
}
}
}
33 changes: 23 additions & 10 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
@@ -136,13 +136,6 @@ func (s *Shipper) SetLabels(lbls labels.Labels) {
s.labels = func() labels.Labels { return lbls }
}

func (s *Shipper) getLabels() labels.Labels {
s.mtx.RLock()
defer s.mtx.RUnlock()

return s.labels()
}

// Timestamps returns the minimum timestamp for which data is available and the highest timestamp
// of blocks that were successfully uploaded.
func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) {
@@ -254,6 +247,9 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo
//
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
s.mtx.Lock()
defer s.mtx.Unlock()

meta, err := ReadMetaFile(s.metadataFilePath)
if err != nil {
// If we encounter any error, proceed with an empty meta file and overwrite it later.
@@ -275,7 +271,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
meta.Uploaded = nil

var (
checker = newLazyOverlapChecker(s.logger, s.bucket, s.getLabels)
checker = newLazyOverlapChecker(s.logger, s.bucket, func() labels.Labels { return s.labels() })
uploadErrs int
)

@@ -355,6 +351,21 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return uploaded, nil
}

func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} {
meta, err := ReadMetaFile(s.metadataFilePath)
if err != nil {
// NOTE(GiedriusS): Sync() will inform users about any problems.
return nil
}

ret := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
for _, id := range meta.Uploaded {
ret[id] = struct{}{}
}

return ret
}

// sync uploads the block if not exists in remote storage.
// TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error.
func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
@@ -382,8 +393,10 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
return errors.Wrap(err, "hard link block")
}
// Attach current labels and write a new meta file with Thanos extensions.
if lset := s.getLabels(); !lset.IsEmpty() {
meta.Thanos.Labels = lset.Map()
if lset := s.labels(); !lset.IsEmpty() {
lset.Range(func(l labels.Label) {
meta.Thanos.Labels[l.Name] = l.Value
})
}
meta.Thanos.Source = s.source
meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir)

0 comments on commit aa493ea

Please sign in to comment.