Skip to content

Commit

Permalink
receive: randomize TSDB compaction times
Browse files Browse the repository at this point in the history
Don't compact all TSDBs at the same time because that leads to latency
spikes. Add initial randomized latency (between 0 and 60 seconds) to
spread out the load over time.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Oct 10, 2024
1 parent 6623a3c commit a9f9a69
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
1 change: 1 addition & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
1 change: 1 addition & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down Expand Up @@ -698,6 +699,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down Expand Up @@ -800,6 +802,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
1 change: 1 addition & 0 deletions docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
76 changes: 74 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package receive
import (
"context"
"fmt"
"math/rand/v2"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -206,6 +207,10 @@ type tenant struct {

// For tests.
blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc

stopCompactionLoop func()

logger log.Logger
}

func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
Expand All @@ -231,10 +236,11 @@ func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
return deletable
}

func newTenant() *tenant {
func newTenant(logger log.Logger) *tenant {
return &tenant{
readyS: &ReadyStorage{},
mtx: &sync.RWMutex{},
logger: logger,
}
}

Expand Down Expand Up @@ -266,6 +272,64 @@ func (t *tenant) shipper() *shipper.Shipper {
return t.ship
}

func exponential(d, minD, maxD time.Duration) time.Duration {
d *= 2
if d < minD {
d = minD
}
if d > maxD {
d = maxD
}
return d
}

func (t *tenant) startCompactionLoop() {
wg := &sync.WaitGroup{}

wg.Add(1)

ctx, cancel := context.WithCancel(context.Background())

t.stopCompactionLoop = func() {
cancel()
wg.Wait()
}

initialWait := time.Duration(rand.IntN(61) * int(time.Second))

backoff := time.Duration(0)

go func() {
defer wg.Done()

select {
case <-time.After(initialWait):
case <-ctx.Done():
return
}

for {
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}

select {
case <-ctx.Done():
return
case <-time.After(1 * time.Minute):
if err := t.tsdb.Compact(ctx); err != nil {
level.Error(t.logger).Log("msg", "compaction failed", "err", err)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else {
backoff = 0
}
}
}
}()
}

func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
Expand Down Expand Up @@ -363,6 +427,8 @@ func (t *MultiTSDB) Close() error {
level.Error(t.logger).Log("msg", "closing TSDB failed; not ready", "tenant", id)
continue
}

tenant.stopCompactionLoop()
level.Info(t.logger).Log("msg", "closing TSDB", "tenant", id)
merr.Add(db.Close())
}
Expand Down Expand Up @@ -507,6 +573,8 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
return false, err
}

tenantInstance.stopCompactionLoop()

tenantInstance.mtx.Lock()
tenantInstance.readyS.set(nil)
tenantInstance.setComponents(nil, nil, nil, nil)
Expand Down Expand Up @@ -711,6 +779,10 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.mtx.Unlock()
return err
}

s.DisableCompactions()
tenant.startCompactionLoop()

var ship *shipper.Shipper
if t.bucket != nil {
ship = shipper.New(
Expand Down Expand Up @@ -758,7 +830,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
return tenant, nil
}

tenant = newTenant()
tenant = newTenant(log.WithSuffix(t.logger, "tenant", tenantID))
t.tenants[tenantID] = tenant
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
Expand Down

0 comments on commit a9f9a69

Please sign in to comment.