Skip to content

Commit

Permalink
receive: randomize TSDB compaction times
Browse files Browse the repository at this point in the history
Don't compact all TSDBs at the same time because that leads to latency
spikes. Add initial randomized latency (between 0 and 60 seconds) to
spread out the load over time.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Oct 10, 2024
1 parent f265c3b commit edca98c
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
1 change: 1 addition & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
1 change: 1 addition & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down Expand Up @@ -698,6 +699,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down Expand Up @@ -800,6 +802,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
1 change: 1 addition & 0 deletions docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down
75 changes: 73 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"math/rand/v2"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -257,6 +258,10 @@ type tenant struct {

// For tests.
blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc

stopCompactionLoop func()

logger log.Logger
}

func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
Expand All @@ -282,10 +287,11 @@ func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
return deletable
}

func newTenant() *tenant {
func newTenant(logger log.Logger) *tenant {
return &tenant{
readyS: &ReadyStorage{},
mtx: &sync.RWMutex{},
logger: logger,
}
}

Expand Down Expand Up @@ -317,6 +323,64 @@ func (t *tenant) shipper() *shipper.Shipper {
return t.ship
}

func exponential(d, minD, maxD time.Duration) time.Duration {
d *= 2
if d < minD {
d = minD
}
if d > maxD {
d = maxD
}
return d
}

func (t *tenant) startCompactionLoop() {
wg := &sync.WaitGroup{}

wg.Add(1)

ctx, cancel := context.WithCancel(context.Background())

t.stopCompactionLoop = func() {
cancel()
wg.Wait()
}

initialWait := time.Duration(rand.IntN(61) * int(time.Second))

backoff := time.Duration(0)

go func() {
defer wg.Done()

select {
case <-time.After(initialWait):
case <-ctx.Done():
return
}

for {
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}

select {
case <-ctx.Done():
return
case <-time.After(1 * time.Minute):
if err := t.tsdb.Compact(ctx); err != nil {
level.Error(t.logger).Log("msg", "compaction failed", "err", err)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else {
backoff = 0
}
}
}
}()
}

func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
Expand Down Expand Up @@ -414,6 +478,8 @@ func (t *MultiTSDB) Close() error {
level.Error(t.logger).Log("msg", "closing TSDB failed; not ready", "tenant", id)
continue
}

tenant.stopCompactionLoop()
level.Info(t.logger).Log("msg", "closing TSDB", "tenant", id)
merr.Add(db.Close())
}
Expand Down Expand Up @@ -550,6 +616,7 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
}
}

tenantInstance.stopCompactionLoop()
if err := tdb.Close(); err != nil {
return false, err
}
Expand Down Expand Up @@ -762,6 +829,10 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.mtx.Unlock()
return err
}

s.DisableCompactions()
tenant.startCompactionLoop()

var ship *shipper.Shipper
if t.bucket != nil {
ship = shipper.New(
Expand Down Expand Up @@ -809,7 +880,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan
return tenant, nil
}

tenant = newTenant()
tenant = newTenant(log.WithSuffix(t.logger, "tenant", tenantID))
t.tenants[tenantID] = tenant
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
Expand Down

0 comments on commit edca98c

Please sign in to comment.