Skip to content

Commit

Permalink
compact: atomically replace no compact marked map (thanos-io#6319)
Browse files Browse the repository at this point in the history
With lots of blocks it could take some time to fill this no compact
marked map hence replace it atomically. I believe this leads to problems
in the compaction planner where it picks up no compact marked blocks
because meta syncer does synchronizations concurrently.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Apr 26, 2023
1 parent 1967cd0 commit 9e00173
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 6 deletions.
16 changes: 10 additions & 6 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,9 +1392,9 @@ func (f *GatherNoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]*me

// Filter passes all metas, while gathering no compact markers.
func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
f.mtx.Lock()
f.noCompactMarkedMap = make(map[ulid.ULID]*metadata.NoCompactMark)
f.mtx.Unlock()
var localNoCompactMapMtx sync.Mutex

noCompactMarkedMap := make(map[ulid.ULID]*metadata.NoCompactMark)

// Make a copy of block IDs to check, in order to avoid concurrency issues
// between the scheduler and workers.
Expand Down Expand Up @@ -1427,9 +1427,9 @@ func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[uli
continue
}

f.mtx.Lock()
f.noCompactMarkedMap[id] = m
f.mtx.Unlock()
localNoCompactMapMtx.Lock()
noCompactMarkedMap[id] = m
localNoCompactMapMtx.Unlock()
synced.WithLabelValues(block.MarkedForNoCompactionMeta).Inc()
}

Expand Down Expand Up @@ -1457,5 +1457,9 @@ func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[uli
return errors.Wrap(err, "filter blocks marked for no compaction")
}

f.mtx.Lock()
f.noCompactMarkedMap = noCompactMarkedMap
f.mtx.Unlock()

return nil
}
78 changes: 78 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/errutil"
Expand Down Expand Up @@ -595,3 +597,79 @@ func TestDownsampleProgressCalculate(t *testing.T) {
}
}
}

func TestNoMarkFilterAtomic(t *testing.T) {
ctx := context.TODO()
logger := log.NewLogfmtLogger(io.Discard)

m := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"})

blocksNum := 200
bkt := objstore.NewInMemBucket()

metas := make(map[ulid.ULID]*metadata.Meta, blocksNum)

noMarkCounter := promauto.NewCounter(prometheus.CounterOpts{
Name: "coolcounter",
})

for i := 0; i < blocksNum; i++ {
var meta metadata.Meta
meta.Version = 1
meta.ULID = ulid.MustNew(uint64(i), nil)
metas[meta.ULID] = &meta

var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta))
testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf))
if i%2 == 0 {
testutil.Ok(
t,
block.MarkForNoCompact(ctx, logger, bkt, meta.ULID, metadata.NoCompactReason("test"), "nodetails", noMarkCounter),
)
}
}

slowBucket := objstore.WithNoopInstr(objstore.WithDelay(bkt, time.Millisecond*200))
f := NewGatherNoCompactionMarkFilter(logger, slowBucket, 10)

ctx, cancel := context.WithCancel(ctx)

g := &run.Group{}

// Fill the map initially.
testutil.Ok(t, f.Filter(ctx, metas, m, nil))
testutil.Assert(t, len(f.NoCompactMarkedBlocks()) > 0, "expected to always have not compacted blocks")

g.Add(func() error {
for {
if ctx.Err() != nil {
return nil
}
if err := f.Filter(ctx, metas, m, nil); err != nil && !errors.Is(err, context.Canceled) {
testutil.Ok(t, err)
}
}
}, func(err error) {
cancel()
})

g.Add(func() error {
for {
if ctx.Err() != nil {
return nil
}

if len(f.NoCompactMarkedBlocks()) == 0 {
return fmt.Errorf("expected to always have not compacted blocks")
}
}
}, func(err error) {
cancel()
})

time.AfterFunc(10*time.Second, func() {
cancel()
})
testutil.Ok(t, g.Run())
}

0 comments on commit 9e00173

Please sign in to comment.