diff --git a/CHANGELOG.md b/CHANGELOG.md index 74f559818a..b699044e73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5642](https://github.com/thanos-io/thanos/pull/5642) Receive: Log labels correctly in writer debug messages. - [#5655](https://github.com/thanos-io/thanos/pull/5655) Receive: Fix recreating already pruned tenants. - [#5702](https://github.com/thanos-io/thanos/pull/5702) Store: Upgrade minio-go/v7 to fix panic caused by leaked goroutines. +- [#5736](https://github.com/thanos-io/thanos/pull/5736) Compact: Fix crash in GatherNoCompactionMarkFilter.NoCompactMarkedBlocks. ### Added * [#5654](https://github.com/thanos-io/thanos/pull/5654) Query: add `--grpc-compression` flag that controls the compression used in gRPC client. With the flag it is now possible to compress the traffic between Query and StoreAPI nodes - you get lower network usage in exchange for a bit higher CPU/RAM usage. diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index e17ea0f385..551e600b09 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1361,6 +1361,7 @@ type GatherNoCompactionMarkFilter struct { bkt objstore.InstrumentedBucketReader noCompactMarkedMap map[ulid.ULID]*metadata.NoCompactMark concurrency int + mtx sync.Mutex } // NewGatherNoCompactionMarkFilter creates GatherNoCompactionMarkFilter. @@ -1374,12 +1375,21 @@ func NewGatherNoCompactionMarkFilter(logger log.Logger, bkt objstore.Instrumente // NoCompactMarkedBlocks returns block ids that were marked for no compaction. func (f *GatherNoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]*metadata.NoCompactMark { - return f.noCompactMarkedMap + f.mtx.Lock() + copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(f.noCompactMarkedMap)) + for k, v := range f.noCompactMarkedMap { + copiedNoCompactMarked[k] = v + } + f.mtx.Unlock() + + return copiedNoCompactMarked } // Filter passes all metas, while gathering no compact markers. func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { + f.mtx.Lock() f.noCompactMarkedMap = make(map[ulid.ULID]*metadata.NoCompactMark) + f.mtx.Unlock() // Make a copy of block IDs to check, in order to avoid concurrency issues // between the scheduler and workers. @@ -1389,9 +1399,8 @@ func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[uli } var ( - eg errgroup.Group - ch = make(chan ulid.ULID, f.concurrency) - mtx sync.Mutex + eg errgroup.Group + ch = make(chan ulid.ULID, f.concurrency) ) for i := 0; i < f.concurrency; i++ { @@ -1413,9 +1422,9 @@ func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[uli continue } - mtx.Lock() + f.mtx.Lock() f.noCompactMarkedMap[id] = m - mtx.Unlock() + f.mtx.Unlock() synced.WithLabelValues(block.MarkedForNoCompactionMeta).Inc() }