Skip to content

Commit

Permalink
Compactor Memory Improvements/Bug fix (grafana#1130)
Browse files Browse the repository at this point in the history
* reworked compactor logic to be more efficient

Signed-off-by: Joe Elliott <[email protected]>

* fixed iterator tests

Signed-off-by: Joe Elliott <[email protected]>

* true compaction/combine test

Signed-off-by: Joe Elliott <[email protected]>

* Remove commented code

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Nov 22, 2021
1 parent 9822a8f commit c2c84ae
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 53 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
* [ENHANCEMENT] Jsonnet: add `$._config.namespace` to filter by namespace in cortex metrics [#1098](https://github.com/grafana/tempo/pull/1098) (@mapno)
* [ENHANCEMENT] Add middleware to compress frontend HTTP responses with gzip if requested [#1080](https://github.com/grafana/tempo/pull/1080) (@kvrhdn, @zalegrala)
* [ENHANCEMENT] Allow query disablement in vulture [#1117](https://github.com/grafana/tempo/pull/1117) (@zalegrala)
* [ENHANCEMENT] Improve memory efficiency of compaction and block cutting. [#1121](https://github.com/grafana/tempo/pull/1121) (@joe-elliott)
* [ENHANCEMENT] Improve memory efficiency of compaction and block cutting. [#1121](https://github.com/grafana/tempo/pull/1121) [#1130](https://github.com/grafana/tempo/pull/1130) (@joe-elliott)
* [ENHANCEMENT] Include metrics for configured limit overrides and defaults: tempo_limits_overrides, tempo_limits_defaults [#1089](https://github.com/grafana/tempo/pull/1089) (@zalegrala)
* [BUGFIX] Fix defaults for MaxBytesPerTrace (ingester.max-bytes-per-trace) and MaxSearchBytesPerTrace (ingester.max-search-bytes-per-trace) (@bitprocessor)
* [BUGFIX] Ignore empty objects during compaction [#1113](https://github.com/grafana/tempo/pull/1113) (@mdisibio)
* [BUGFIX] Add process name to vulture traces to work around display issues [#1127](https://github.com/grafana/tempo/pull/1127) (@mdisibio)
* [BUGFIX] Fixed issue where compaction sometimes dropped spans. [#1130](https://github.com/grafana/tempo/pull/1130) (@joe-elliott)

## v1.2.0 / 2021-11-05
* [CHANGE] **BREAKING CHANGE** Drop support for v0 and v1 blocks. See [1.1 changelog](https://github.com/grafana/tempo/releases/tag/v1.1.0) for details [#919](https://github.com/grafana/tempo/pull/919) (@joe-elliott)
Expand Down
92 changes: 59 additions & 33 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/blocklist"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/pool"
"github.com/grafana/tempo/tempodb/wal"
Expand All @@ -32,15 +34,7 @@ func (m *mockSharder) Owns(hash string) bool {
}

func (m *mockSharder) Combine(dataEncoding string, objs ...[]byte) ([]byte, bool, error) {
if len(objs) != 2 {
return nil, false, nil
}

if len(objs[0]) > len(objs[1]) {
return objs[0], true, nil
}

return objs[1], true, nil
return model.ObjectCombiner.Combine(dataEncoding, objs...)
}

type mockJobSharder struct{}
Expand Down Expand Up @@ -186,6 +180,8 @@ func TestCompaction(t *testing.T) {
}
}

// TestSameIDCompaction is a bit gross in that it has a bad dependency with on the /pkg/model
// module to do a full e2e compaction/combination test.
func TestSameIDCompaction(t *testing.T) {
tempDir, err := os.MkdirTemp("/tmp", "")
defer os.RemoveAll(tempDir)
Expand Down Expand Up @@ -227,53 +223,83 @@ func TestSameIDCompaction(t *testing.T) {
assert.NoError(t, err)

blockCount := 5
blocksPerCompaction := (inputBlocks - outputBlocks)
recordCount := 100

// make a bunch of sharded requests
allReqs := make([][][]byte, 0, recordCount)
allIds := make([][]byte, 0, recordCount)
for i := 0; i < recordCount; i++ {
id := make([]byte, 16)
_, err = rand.Read(id)
require.NoError(t, err, "unexpected creating random id")

requestShards := rand.Intn(blockCount) + 1

reqs := make([][]byte, 0, requestShards)
for j := 0; j < requestShards; j++ {
buff, err := proto.Marshal(test.MakeRequest(1, id))
require.NoError(t, err)
reqs = append(reqs, buff)
}

allReqs = append(allReqs, reqs)
allIds = append(allIds, id)
}

// and write them to different blocks
for i := 0; i < blockCount; i++ {
blockID := uuid.New()
head, err := wal.NewBlock(blockID, testTenantID, "")
head, err := wal.NewBlock(blockID, testTenantID, model.TracePBEncoding)
require.NoError(t, err)
id := []byte{0x01, 0x02, 0x01, 0x02, 0x01, 0x02, 0x01, 0x02, 0x01, 0x02, 0x01, 0x02, 0x01, 0x02, 0x01, 0x02}

// Different content to ensure that object combination takes place
rec, _ := proto.Marshal(test.MakeTrace(1, id))
for j := 0; j < recordCount; j++ {
req := allReqs[j]
id := allIds[j]

err = head.Append(id, rec)
require.NoError(t, err, "unexpected error writing req")
if i < len(req) {
err = head.Append(id, req[i])
require.NoError(t, err, "unexpected error writing req")
}
}

_, err = w.CompleteBlock(head, &mockSharder{})
require.NoError(t, err)
}

rw := r.(*readerWriter)

combinedStart, err := test.GetCounterVecValue(metricCompactionObjectsCombined, "0")
require.NoError(t, err)

// poll
// check blocklists, force compaction and check again
checkBlocklists(t, uuid.Nil, blockCount, 0, rw)

var blocks []*backend.BlockMeta
blocklist := rw.blocklist.Metas(testTenantID)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, 1024*1024*1024, defaultMinInputBlocks, 2)
list := rw.blocklist.Metas(testTenantID)
blockSelector := newTimeWindowBlockSelector(list, rw.compactorCfg.MaxCompactionRange, 10000, 1024*1024*1024, defaultMinInputBlocks, blockCount)
blocks, _ = blockSelector.BlocksToCompact()
assert.Len(t, blocks, inputBlocks)
assert.Len(t, blocks, blockCount)

err = rw.compact(blocks, testTenantID)
require.NoError(t, err)

checkBlocklists(t, uuid.Nil, blockCount-blocksPerCompaction, inputBlocks, rw)
checkBlocklists(t, uuid.Nil, 1, blockCount, rw)

// do we have the right number of records
var records int
for _, meta := range rw.blocklist.Metas(testTenantID) {
records += meta.TotalObjects
}
assert.Equal(t, blockCount-blocksPerCompaction, records)
// force clear compacted blocks to guarantee that we're only querying the new blocks that went through the combiner
metas := rw.blocklist.Metas(testTenantID)
rw.blocklist.ApplyPollResults(blocklist.PerTenant{testTenantID: metas}, blocklist.PerTenantCompacted{})

combinedFinish, err := test.GetCounterVecValue(metricCompactionObjectsCombined, "0")
assert.NoError(t, err)
assert.Equal(t, float64(1), combinedFinish-combinedStart)
// search for all ids
for i, id := range allIds {
b, _, failedBlocks, err := rw.Find(context.Background(), testTenantID, id, BlockIDMin, BlockIDMax)
assert.NoError(t, err)
assert.Nil(t, failedBlocks)

actualBytes, _, err := model.ObjectCombiner.Combine(model.TracePBEncoding, b...)
require.NoError(t, err)

expectedBytes, _, err := model.ObjectCombiner.Combine(model.TracePBEncoding, allReqs[i]...)
require.NoError(t, err)

assert.Equal(t, expectedBytes, actualBytes)
}
}

func TestCompactionUpdatesBlocklist(t *testing.T) {
Expand Down
31 changes: 17 additions & 14 deletions tempodb/encoding/iterator_multiblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (i *multiblockIterator) iterate(ctx context.Context) {

for !i.allDone(ctx) {
var lowestID []byte
var lowestObject []byte
var lowestBookmark *bookmark
var lowestObjects [][]byte
var lowestBookmarks []*bookmark

// find lowest ID of the new object
for _, b := range i.bookmarks {
Expand All @@ -116,20 +116,25 @@ func (i *multiblockIterator) iterate(ctx context.Context) {
comparison := bytes.Compare(currentID, lowestID)

if comparison == 0 {
lowestObject, _, err = i.combiner.Combine(i.dataEncoding, currentObject, lowestObject)
if err != nil {
i.err.Store(fmt.Errorf("error combining while Nexting: %w", err))
return
}
b.clear()
lowestObjects = append(lowestObjects, currentObject)
lowestBookmarks = append(lowestBookmarks, b)
} else if len(lowestID) == 0 || comparison == -1 {
lowestID = currentID
lowestObject = currentObject
lowestBookmark = b
lowestObjects = [][]byte{currentObject}
lowestBookmarks = []*bookmark{b}
}
}

if len(lowestID) == 0 || len(lowestObject) == 0 || lowestBookmark == nil {
lowestObject, _, err := i.combiner.Combine(i.dataEncoding, lowestObjects...)
if err != nil {
i.err.Store(fmt.Errorf("error combining while Nexting: %w", err))
return
}
for _, b := range lowestBookmarks {
b.clear()
}

if len(lowestID) == 0 || len(lowestObject) == 0 || len(lowestBookmarks) == 0 {
// Skip empty objects or when the bookmarks failed to return an object.
// This intentional here because we concluded that the bookmarks have already
// been skipping most empties (but not all) and there is no reason to treat the
Expand All @@ -140,7 +145,7 @@ func (i *multiblockIterator) iterate(ctx context.Context) {
// for the bookmarks to skip to, and lowestBookmark remains nil. Since we
// already skipped every other empty, skip the last (but not least) entry.
// (todo: research needed to determine how empties get in the block)
level.Warn(i.logger).Log("msg", "multiblock iterator skipping empty object", "id", hex.EncodeToString(lowestID), "obj", lowestObject, "bookmark", lowestBookmark)
level.Warn(i.logger).Log("msg", "multiblock iterator skipping empty object", "id", hex.EncodeToString(lowestID), "obj", lowestObject, "bookmark", lowestBookmarks)
continue
}

Expand All @@ -150,8 +155,6 @@ func (i *multiblockIterator) iterate(ctx context.Context) {
object: append([]byte(nil), lowestObject...),
}

lowestBookmark.clear()

select {

case <-ctx.Done():
Expand Down
10 changes: 5 additions & 5 deletions tempodb/encoding/iterator_multiblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestMultiblockSorts(t *testing.T) {
iterOdds.Add([]byte{3}, []byte{3}, nil)
iterOdds.Add([]byte{5}, []byte{5}, nil)

iter := NewMultiblockIterator(context.TODO(), []Iterator{iterEvens, iterOdds}, 10, nil, "", testLogger)
iter := NewMultiblockIterator(context.TODO(), []Iterator{iterEvens, iterOdds}, 10, &mockCombiner{}, "", testLogger)

count := 0
lastID := -1
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestMultiblockIteratorCanBeCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)

// Create iterator and cancel/close it after 100ms
iter := NewMultiblockIterator(ctx, []Iterator{inner}, recordCount/2, nil, "", testLogger)
iter := NewMultiblockIterator(ctx, []Iterator{inner}, recordCount/2, &mockCombiner{}, "", testLogger)
time.Sleep(100 * time.Millisecond)
if tc.close {
iter.Close()
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestMultiblockIteratorCanBeCancelled(t *testing.T) {
func TestMultiblockIteratorCanBeCancelledMultipleTimes(t *testing.T) {
inner := &testIterator{}

iter := NewMultiblockIterator(context.TODO(), []Iterator{inner}, 1, nil, "", testLogger)
iter := NewMultiblockIterator(context.TODO(), []Iterator{inner}, 1, &mockCombiner{}, "", testLogger)

iter.Close()
iter.Close()
Expand All @@ -183,7 +183,7 @@ func TestMultiblockIteratorPropogatesErrors(t *testing.T) {
inner2.Add([]byte{2}, []byte{2}, nil)
inner2.Add([]byte{3}, []byte{3}, nil)

iter := NewMultiblockIterator(ctx, []Iterator{inner, inner2}, 10, nil, "", testLogger)
iter := NewMultiblockIterator(ctx, []Iterator{inner, inner2}, 10, &mockCombiner{}, "", testLogger)

_, _, err := iter.Next(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestMultiblockIteratorSkipsEmptyObjects(t *testing.T) {
{nil, nil, io.EOF},
}

iter := NewMultiblockIterator(ctx, []Iterator{inner}, 10, nil, "", testLogger)
iter := NewMultiblockIterator(ctx, []Iterator{inner}, 10, &mockCombiner{}, "", testLogger)
for i := 0; i < len(expected); i++ {
id, obj, err := iter.Next(ctx)
require.Equal(t, expected[i].err, err)
Expand Down

0 comments on commit c2c84ae

Please sign in to comment.