Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull pr 7166 #93

Merged
merged 4 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 55 additions & 9 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -157,7 +158,34 @@ type tenant struct {
exemplarsTSDB *exemplars.TSDB
ship *shipper.Shipper

mtx *sync.RWMutex
mtx *sync.RWMutex
tsdb *tsdb.DB

// For tests.
blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc
}

func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
t.mtx.RLock()
defer t.mtx.RUnlock()

if t.tsdb == nil {
return nil
}

deletable := t.blocksToDeleteFn(t.tsdb)(blocks)
if t.ship == nil {
return deletable
}

uploaded := t.ship.UploadedBlocks()
for deletableID := range deletable {
if _, ok := uploaded[deletableID]; !ok {
delete(deletable, deletableID)
}
}

return deletable
}

func newTenant() *tenant {
Expand Down Expand Up @@ -205,14 +233,15 @@ func (t *tenant) shipper() *shipper.Shipper {
func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.setComponents(storeTSDB, ship, exemplarsTSDB)
t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB)
t.mtx.Unlock()
}

func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) {
t.storeTSDB = storeTSDB
t.ship = ship
t.exemplarsTSDB = exemplarsTSDB
t.tsdb = tenantTSDB
}

func (t *MultiTSDB) Open() error {
Expand Down Expand Up @@ -356,7 +385,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {

// pruneTSDB removes a TSDB if its past the retention period.
// It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk.
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) {
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (pruned bool, rerr error) {
tenantTSDB := tenantInstance.readyStorage()
if tenantTSDB == nil {
return false, nil
Expand Down Expand Up @@ -386,9 +415,21 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
tenantTSDB.mtx.Lock()
defer tenantTSDB.mtx.Unlock()

// Lock the entire tenant to make sure the shipper is not running in parallel.
// Make sure the shipper is not running in parallel.
tenantInstance.mtx.Lock()
defer tenantInstance.mtx.Unlock()
shipper := tenantInstance.ship
tenantInstance.ship = nil
tenantInstance.mtx.Unlock()

defer func() {
if pruned {
return
}
// If the tenant was not pruned, re-enable the shipper.
tenantInstance.mtx.Lock()
tenantInstance.ship = shipper
tenantInstance.mtx.Unlock()
}()

sinceLastAppendMillis = time.Since(time.UnixMilli(head.MaxTime())).Milliseconds()
if sinceLastAppendMillis <= compactThreshold {
Expand All @@ -405,8 +446,9 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
}

level.Info(logger).Log("msg", "Pruning tenant")
if tenantInstance.ship != nil {
uploaded, err := tenantInstance.ship.Sync(ctx)
if shipper != nil {
// No other code can reach this shipper anymore so enable it again to be able to sync manually.
uploaded, err := shipper.Sync(ctx)
if err != nil {
return false, err
}
Expand All @@ -424,8 +466,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
return false, err
}

tenantInstance.mtx.Lock()
tenantInstance.readyS.set(nil)
tenantInstance.setComponents(nil, nil, nil)
tenantInstance.setComponents(nil, nil, nil, nil)
tenantInstance.mtx.Unlock()

return true, nil
}
Expand Down Expand Up @@ -582,6 +626,8 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant

level.Info(logger).Log("msg", "opening TSDB")
opts := *t.tsdbOpts
opts.BlocksToDelete = tenant.blocksToDelete
tenant.blocksToDeleteFn = tsdb.DefaultBlocksToDelete

// NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL
// which gets later converted into a block. Without setting this flag to false, the block would get compacted
Expand Down
69 changes: 69 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@ import (
"math"
"os"
"path"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand All @@ -28,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -466,8 +471,16 @@ func TestMultiTSDBPrune(t *testing.T) {
}

testutil.Ok(t, m.Prune(ctx))
if test.bucket != nil {
_, err := m.Sync(ctx)
testutil.Ok(t, err)
}

testutil.Equals(t, test.expectedTenants, len(m.TSDBLocalClients()))
var shippedBlocks int
if test.bucket == nil && shippedBlocks > 0 {
t.Fatal("can't expect uploads when there is no bucket")
}
if test.bucket != nil {
testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error {
shippedBlocks++
Expand Down Expand Up @@ -829,3 +842,59 @@ func BenchmarkMultiTSDB(b *testing.B) {
_, _ = a.Append(0, l, int64(i), float64(i))
}
}

func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) {
tenant := &tenant{
mtx: &sync.RWMutex{},
}

t.Run("no blocks", func(t *testing.T) {
require.Equal(t, (map[ulid.ULID]struct{})(nil), tenant.blocksToDelete(nil))
})

tenant.tsdb = &tsdb.DB{}

mockBlockIDs := []ulid.ULID{
ulid.MustNew(1, nil),
ulid.MustNew(2, nil),
}

t.Run("no shipper", func(t *testing.T) {
tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc {
return func(_ []*tsdb.Block) map[ulid.ULID]struct{} {
return map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}
}
}

require.Equal(t, map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}, tenant.blocksToDelete(nil))
})

t.Run("some blocks uploaded", func(t *testing.T) {
tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc {
return func(_ []*tsdb.Block) map[ulid.ULID]struct{} {
return map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}
}
}

td := t.TempDir()

require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), filepath.Join(td, shipper.DefaultMetaFilename), &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{mockBlockIDs[0]},
}))

tenant.ship = shipper.New(log.NewNopLogger(), nil, td, nil, nil, metadata.BucketRepairSource, nil, false, metadata.NoneFunc, "")
require.Equal(t, map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
}, tenant.blocksToDelete(nil))
})
}
4 changes: 1 addition & 3 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
numExemplarsLabelLength++
level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength)
default:
if err != nil {
level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err)
}
level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err)
}
}
}
Expand Down
33 changes: 23 additions & 10 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,6 @@ func (s *Shipper) SetLabels(lbls labels.Labels) {
s.labels = func() labels.Labels { return lbls }
}

func (s *Shipper) getLabels() labels.Labels {
s.mtx.RLock()
defer s.mtx.RUnlock()

return s.labels()
}

// Timestamps returns the minimum timestamp for which data is available and the highest timestamp
// of blocks that were successfully uploaded.
func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) {
Expand Down Expand Up @@ -254,6 +247,9 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo
//
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
s.mtx.Lock()
defer s.mtx.Unlock()

meta, err := ReadMetaFile(s.metadataFilePath)
if err != nil {
// If we encounter any error, proceed with an empty meta file and overwrite it later.
Expand All @@ -275,7 +271,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
meta.Uploaded = nil

var (
checker = newLazyOverlapChecker(s.logger, s.bucket, s.getLabels)
checker = newLazyOverlapChecker(s.logger, s.bucket, func() labels.Labels { return s.labels() })
uploadErrs int
)

Expand Down Expand Up @@ -355,6 +351,21 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return uploaded, nil
}

func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} {
meta, err := ReadMetaFile(s.metadataFilePath)
if err != nil {
// NOTE(GiedriusS): Sync() will inform users about any problems.
return nil
}

ret := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
for _, id := range meta.Uploaded {
ret[id] = struct{}{}
}

return ret
}

// sync uploads the block if not exists in remote storage.
// TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error.
func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
Expand Down Expand Up @@ -382,8 +393,10 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
return errors.Wrap(err, "hard link block")
}
// Attach current labels and write a new meta file with Thanos extensions.
if lset := s.getLabels(); !lset.IsEmpty() {
meta.Thanos.Labels = lset.Map()
if lset := s.labels(); !lset.IsEmpty() {
lset.Range(func(l labels.Label) {
meta.Thanos.Labels[l.Name] = l.Value
})
}
meta.Thanos.Source = s.source
meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir)
Expand Down
Loading