Skip to content

Commit

Permalink
Reverted addition of deletion mark for partial uploads. (#2472)
Browse files Browse the repository at this point in the history
Fixes #2459 (quick fix).

This keeps the logic from the 0.11.0 which was good enough.

Some improvement for future: #2470

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Apr 20, 2020
1 parent 7250c6b commit 2654a10
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 24 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Fixed

- [#2411](https://github.com/thanos-io/thanos/pull/2411) Query: fix a bug where queries might not time out sometimes due to issues with one or more StoreAPIs
- [#2411](https://github.com/thanos-io/thanos/pull/2411) Query: fix a bug where queries might not time out sometimes due to issues with one or more StoreAPIs.
- [#2474](https://github.com/thanos-io/thanos/pull/2474) Store: fix a panic caused by concurrent memory access during block filtering.
- [#2472](https://github.com/thanos-io/thanos/pull/2472) compact: Fixed bug where partial blocks were never deleted and causing spam of warnings.

## [v0.12.0](https://github.com/thanos-io/thanos/releases/tag/v0.12.0) - 2020.04.15

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func runCompact(
}

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/compact/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func BestEffortCleanAbortedPartialUploads(
partial map[ulid.ULID]error,
bkt objstore.Bucket,
deleteAttempts prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blockCleanups prometheus.Counter,
blockCleanupFailures prometheus.Counter,
) {
level.Info(logger).Log("msg", "started cleaning of aborted partial uploads")

Expand All @@ -45,10 +46,15 @@ func BestEffortCleanAbortedPartialUploads(

deleteAttempts.Inc()
level.Info(logger).Log("msg", "found partially uploaded block; marking for deletion", "block", id)
if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil {
level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err)
return
// We don't gather any information about deletion marks for partial blocks, so let's simply remove it. We waited
// long PartialUploadThresholdAge already.
// TODO(bwplotka): Fix some edge cases: https://github.com/thanos-io/thanos/issues/2470 .
if err := block.Delete(ctx, logger, bkt, id); err != nil {
blockCleanupFailures.Inc()
level.Warn(logger).Log("msg", "failed to delete aborted partial upload; will retry in next iteration", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err)
continue
}
blockCleanups.Inc()
level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge)
}
level.Info(logger).Log("msg", "cleaning of aborted partial uploads done")
Expand Down
15 changes: 6 additions & 9 deletions pkg/compact/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,19 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID2.String(), "chunks", "000001"), &fakeChunk))

deleteAttempts := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

blockCleanups := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blockCleanupFailures := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
_, partial, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)

BestEffortCleanAbortedPartialUploads(ctx, logger, partial, bkt, deleteAttempts, blocksMarkedForDeletion)
BestEffortCleanAbortedPartialUploads(ctx, logger, partial, bkt, deleteAttempts, blockCleanups, blockCleanupFailures)
testutil.Equals(t, 1.0, promtest.ToFloat64(deleteAttempts))
testutil.Equals(t, 1.0, promtest.ToFloat64(blockCleanups))
testutil.Equals(t, 0.0, promtest.ToFloat64(blockCleanupFailures))

exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)

exists, err = bkt.Exists(ctx, path.Join(shouldDeleteID.String(), metadata.DeletionMarkFilename))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
testutil.Equals(t, 1.0, promtest.ToFloat64(blocksMarkedForDeletion))
testutil.Equals(t, false, exists)

exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreID1.String(), "chunks", "000001"))
testutil.Ok(t, err)
Expand Down
21 changes: 12 additions & 9 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
// We expect no ops.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total"))
Expand All @@ -504,11 +505,11 @@ func TestCompactWithStoreGateway(t *testing.T) {
// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) // This should be 1 [BUG no 1]. TODO: fix.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3+1), "thanos_compactor_blocks_marked_for_deletion_total")) // 17.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) // We should have 1.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total"))
// TODO(bwplotka): This is confusing, should be either normal compaction or vertical not both (?)
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_started_total"))
Expand All @@ -519,7 +520,8 @@ func TestCompactWithStoreGateway(t *testing.T) {

testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(
len(rawBlockIDs)+7+
5, // 5 compactions, 5 newly added blocks.
5+ // 5 compactions, 5 newly added blocks.
-2, // Partial block removed.
)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))

Expand All @@ -539,7 +541,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
expectedEndVector,
)
// Store view:
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5-2)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))
})
Expand All @@ -553,8 +555,9 @@ func TestCompactWithStoreGateway(t *testing.T) {
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) // We should have 1.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))
Expand All @@ -564,7 +567,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total"))

testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5-16)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5-16-2)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_halted"))
Expand All @@ -584,7 +587,7 @@ func TestCompactWithStoreGateway(t *testing.T) {
)

// Store view:
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7-16+5)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7-16+5-2)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))
})
Expand Down

0 comments on commit 2654a10

Please sign in to comment.