Skip to content

Commit

Permalink
chore: disable stack trace range chunking (#3583)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Sep 24, 2024
1 parent 66ccaa3 commit 784d4de
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 424 deletions.
3 changes: 0 additions & 3 deletions pkg/experiment/ingester/memdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ func NewHead(metrics *HeadMetrics) *Head {
metrics: metrics,
symbols: symdb.NewPartitionWriter(0, &symdb.Config{
Version: symdb.FormatV3,
Stacktraces: symdb.StacktracesConfig{
MaxNodesPerChunk: 4 << 20,
},
}),
totalSamples: atomic.NewUint64(0),
minTimeNanos: math.MaxInt64,
Expand Down
5 changes: 1 addition & 4 deletions pkg/phlaredb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,7 @@ func newSymbolsCompactor(path string, version symdb.FormatVersion) *symbolsCompa
version: symdb.FormatV2,
w: symdb.NewSymDB(symdb.DefaultConfig().
WithVersion(symdb.FormatV2).
WithDirectory(dst).
WithParquetConfig(symdb.ParquetConfig{
MaxBufferRowCount: defaultParquetConfig.MaxBufferRowCount,
})),
WithDirectory(dst)),
dst: dst,
rewriters: make(map[BlockReader]*symdb.Rewriter),
}
Expand Down
42 changes: 16 additions & 26 deletions pkg/phlaredb/symdb/block_writer_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"

Expand All @@ -15,7 +16,6 @@ import (
"github.com/grafana/pyroscope/pkg/phlaredb/block"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/util/build"
"github.com/grafana/pyroscope/pkg/util/math"
)

type writerV2 struct {
Expand Down Expand Up @@ -138,30 +138,20 @@ func (w *writerV2) Flush() (err error) {
}

func (w *writerV2) writeStacktraces(partition *PartitionWriter) (err error) {
for ci, c := range partition.stacktraces.chunks {
stacks := c.stacks
if stacks == 0 {
stacks = uint32(len(partition.stacktraces.hashToIdx))
}
h := StacktraceBlockHeader{
Offset: w.stacktraces.w.offset,
Size: 0, // Set later.
Partition: partition.header.Partition,
BlockIndex: uint16(ci),
Encoding: StacktraceEncodingGroupVarint,
Stacktraces: stacks,
StacktraceNodes: c.tree.len(),
StacktraceMaxDepth: 0, // TODO
StacktraceMaxNodes: c.partition.maxNodesPerChunk,
CRC: 0, // Set later.
}
crc := crc32.New(castagnoli)
if h.Size, err = c.WriteTo(io.MultiWriter(crc, w.stacktraces)); err != nil {
return fmt.Errorf("writing stacktrace chunk data: %w", err)
}
h.CRC = crc.Sum32()
partition.header.Stacktraces = append(partition.header.Stacktraces, h)
h := StacktraceBlockHeader{
Offset: w.stacktraces.w.offset,
Partition: partition.header.Partition,
Encoding: StacktraceEncodingGroupVarint,
Stacktraces: uint32(len(partition.stacktraces.hashToIdx)),
StacktraceNodes: partition.stacktraces.tree.len(),
StacktraceMaxNodes: math.MaxUint32,
}
crc := crc32.New(castagnoli)
if h.Size, err = partition.stacktraces.WriteTo(io.MultiWriter(crc, w.stacktraces)); err != nil {
return fmt.Errorf("writing stacktrace chunk data: %w", err)
}
h.CRC = crc.Sum32()
partition.header.Stacktraces = append(partition.header.Stacktraces, h)
return nil
}

Expand Down Expand Up @@ -220,7 +210,7 @@ func (s *parquetWriter[M, P]) init(dir string, c ParquetConfig) (err error) {
return err
}
s.rowsBatch = make([]parquet.Row, 0, 128)
s.buffer = parquet.NewBuffer(s.persister.Schema(), parquet.ColumnBufferCapacity(s.config.MaxBufferRowCount))
s.buffer = parquet.NewBuffer(s.persister.Schema())
s.writer = parquet.NewGenericWriter[P](s.file, s.persister.Schema(),
parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision),
parquet.PageBufferSize(3*1024*1024),
Expand Down Expand Up @@ -265,7 +255,7 @@ func (s *parquetWriter[M, P]) writeRows(values []M) (r RowRangeReference, err er
}

func (s *parquetWriter[M, P]) fillBatch(values []M) int {
m := math.Min(len(values), cap(s.rowsBatch))
m := min(len(values), cap(s.rowsBatch))
s.rowsBatch = s.rowsBatch[:m]
for i := 0; i < m; i++ {
row := s.rowsBatch[i][:0]
Expand Down
36 changes: 16 additions & 20 deletions pkg/phlaredb/symdb/block_writer_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"

Expand Down Expand Up @@ -88,27 +89,22 @@ func writePartitionV3(w *writerOffset, e *encodersV3, p *PartitionWriter) (err e
if p.header.V3.Locations, err = writeSymbolsBlock(w, p.locations.slice, e.locationsEncoder); err != nil {
return err
}
for ci, c := range p.stacktraces.chunks {
stacks := c.stacks
if stacks == 0 {
stacks = uint32(len(p.stacktraces.hashToIdx))
}
h := StacktraceBlockHeader{
Offset: w.offset,
Partition: p.header.Partition,
BlockIndex: uint16(ci),
Encoding: StacktraceEncodingGroupVarint,
Stacktraces: stacks,
StacktraceNodes: c.tree.len(),
StacktraceMaxNodes: c.partition.maxNodesPerChunk,
}
crc := crc32.New(castagnoli)
if h.Size, err = c.WriteTo(io.MultiWriter(crc, w)); err != nil {
return fmt.Errorf("writing stacktrace chunk data: %w", err)
}
h.CRC = crc.Sum32()
p.header.Stacktraces = append(p.header.Stacktraces, h)

h := StacktraceBlockHeader{
Offset: w.offset,
Partition: p.header.Partition,
Encoding: StacktraceEncodingGroupVarint,
Stacktraces: uint32(len(p.stacktraces.hashToIdx)),
StacktraceNodes: p.stacktraces.tree.len(),
StacktraceMaxNodes: math.MaxUint32,
}
crc := crc32.New(castagnoli)
if h.Size, err = p.stacktraces.WriteTo(io.MultiWriter(crc, w)); err != nil {
return fmt.Errorf("writing stacktrace chunk data: %w", err)
}
h.CRC = crc.Sum32()
p.header.Stacktraces = append(p.header.Stacktraces, h)

return nil
}

Expand Down
156 changes: 24 additions & 132 deletions pkg/phlaredb/symdb/partition_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"io"
"sync"

"github.com/grafana/pyroscope/pkg/iter"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
)

type PartitionWriter struct {
header PartitionHeader

stacktraces *stacktracesPartition
stacktraces *stacktraces
strings deduplicatingSlice[string, string, *stringsHelper]
mappings deduplicatingSlice[schemav1.InMemoryMapping, mappingsKey, *mappingsHelper]
functions deduplicatingSlice[schemav1.InMemoryFunction, functionsKey, *functionsHelper]
Expand All @@ -32,99 +31,36 @@ func (p *PartitionWriter) ResolveStacktraceLocations(_ context.Context, dst Stac

func (p *PartitionWriter) LookupLocations(dst []uint64, stacktraceID uint32) []uint64 {
dst = dst[:0]
if len(p.stacktraces.chunks) == 0 {
if stacktraceID == 0 {
return dst
}
chunkID := stacktraceID / p.stacktraces.maxNodesPerChunk
localSID := stacktraceID % p.stacktraces.maxNodesPerChunk
if localSID == 0 || int(chunkID) > len(p.stacktraces.chunks) {
return dst
}
return p.stacktraces.chunks[chunkID].tree.resolveUint64(dst, localSID)
}

type stacktracesPartition struct {
maxNodesPerChunk uint32

m sync.RWMutex
hashToIdx map[uint64]uint32
chunks []*stacktraceChunk
return p.stacktraces.tree.resolveUint64(dst, stacktraceID)
}

func (p *PartitionWriter) SplitStacktraceIDRanges(appender *SampleAppender) iter.Iterator[*StacktraceIDRange] {
if len(p.stacktraces.chunks) == 0 {
return iter.NewEmptyIterator[*StacktraceIDRange]()
}
var n int
samples := appender.Samples()
ranges := SplitStacktraces(samples.StacktraceIDs, p.stacktraces.maxNodesPerChunk)
for _, sr := range ranges {
c := p.stacktraces.chunks[sr.chunk]
sr.ParentPointerTree = c.tree
sr.Samples = samples.Range(n, n+len(sr.IDs))
n += len(sr.IDs)
func newStacktraces() *stacktraces {
p := &stacktraces{
hashToIdx: make(map[uint64]uint32),
tree: newStacktraceTree(defaultStacktraceTreeSize),
}
return iter.NewSliceIterator(ranges)
return p
}

func newStacktracesPartition(maxNodesPerChunk uint32) *stacktracesPartition {
p := &stacktracesPartition{
maxNodesPerChunk: maxNodesPerChunk,
hashToIdx: make(map[uint64]uint32, defaultStacktraceTreeSize/2),
}
p.chunks = append(p.chunks, &stacktraceChunk{
tree: newStacktraceTree(defaultStacktraceTreeSize),
partition: p,
})
return p
type stacktraces struct {
m sync.RWMutex
hashToIdx map[uint64]uint32
tree *stacktraceTree
stacks uint32
}

func (p *stacktracesPartition) size() uint64 {
func (p *stacktraces) size() uint64 {
p.m.RLock()
// TODO: map footprint isn't accounted
v := 0
for _, c := range p.chunks {
v += stacktraceTreeNodeSize * cap(c.tree.nodes)
}
v := stacktraceTreeNodeSize * cap(p.tree.nodes)
p.m.RUnlock()
return uint64(v)
}

// stacktraceChunkForInsert returns a chunk for insertion:
// if the existing one has capacity, or a new one, if the former is full.
// Must be called with the stracktraces mutex write lock held.
func (p *stacktracesPartition) stacktraceChunkForInsert(x int) *stacktraceChunk {
c := p.currentStacktraceChunk()
if n := c.tree.len() + uint32(x); p.maxNodesPerChunk > 0 && n >= p.maxNodesPerChunk {
// Calculate number of stacks in the chunk.
s := uint32(len(p.hashToIdx))
c.stacks = s - c.stacks
c = &stacktraceChunk{
partition: p,
tree: newStacktraceTree(defaultStacktraceTreeSize),
stid: c.stid + p.maxNodesPerChunk,
stacks: s,
}
p.chunks = append(p.chunks, c)
}
return c
}

// stacktraceChunkForRead returns a chunk for reads.
// Must be called with the stracktraces mutex read lock held.
func (p *stacktracesPartition) stacktraceChunkForRead(i int) (*stacktraceChunk, bool) {
if i < len(p.chunks) {
return p.chunks[i], true
}
return nil, false
}

func (p *stacktracesPartition) currentStacktraceChunk() *stacktraceChunk {
// Assuming there is at least one chunk.
return p.chunks[len(p.chunks)-1]
}

func (p *stacktracesPartition) append(dst []uint32, s []*schemav1.Stacktrace) {
func (p *stacktraces) append(dst []uint32, s []*schemav1.Stacktrace) {
if len(s) == 0 {
return
}
Expand Down Expand Up @@ -160,29 +96,16 @@ func (p *stacktracesPartition) append(dst []uint32, s []*schemav1.Stacktrace) {

p.m.Lock()
defer p.m.Unlock()
chunk := p.currentStacktraceChunk()

m := int(p.maxNodesPerChunk)
t, j := chunk.tree, chunk.stid
for i, v := range dst[:len(s)] {
if v != 0 {
// Already resolved. ID 0 is reserved
// as it is the tree root.
continue
}

x := s[i].LocationIDs
if m > 0 && len(t.nodes)+len(x) >= m {
// If we're close to the max nodes limit and can
// potentially exceed it, we take the next chunk,
// even if there are some space.
chunk = p.stacktraceChunkForInsert(len(x))
t, j = chunk.tree, chunk.stid
}

// Tree insertion is idempotent,
// we don't need to check the map.
id = t.insert(x) + j
id = p.tree.insert(x)
h := hashLocations(x)
p.hashToIdx[h] = id
dst[i] = id
Expand All @@ -205,30 +128,9 @@ func (p *stacktraceLocationsPool) put(x []int32) {
stacktraceLocations.Put(x)
}

func (p *stacktracesPartition) resolve(dst StacktraceInserter, stacktraces []uint32) (err error) {
for _, sr := range SplitStacktraces(stacktraces, p.maxNodesPerChunk) {
if err = p.ResolveChunk(dst, sr); err != nil {
return err
}
}
return nil
}

// NOTE(kolesnikovae):
// Caller is able to split a range of stacktrace IDs into chunks
// with SplitStacktraces, and then resolve them concurrently:
// StacktraceInserter could be implemented as a dense set, map,
// slice, or an n-ary tree: the stacktraceTree should be one of
// the options, the package provides.

func (p *stacktracesPartition) ResolveChunk(dst StacktraceInserter, sr *StacktraceIDRange) error {
func (p *stacktraces) resolve(dst StacktraceInserter, stacktraces []uint32) (err error) {
p.m.RLock()
c, found := p.stacktraceChunkForRead(int(sr.chunk))
if !found {
p.m.RUnlock()
return ErrInvalidStacktraceRange
}
t := stacktraceTree{nodes: c.tree.nodes}
t := stacktraceTree{nodes: p.tree.nodes}
// tree.resolve is thread safe: only the parent node index (p)
// and the reference to location (r) node fields are accessed,
// which are never modified after insertion.
Expand All @@ -239,25 +141,16 @@ func (p *stacktracesPartition) ResolveChunk(dst StacktraceInserter, sr *Stacktra
// the call.
p.m.RUnlock()
s := stacktraceLocations.get()
// Restore the original stacktrace ID.
off := sr.Offset()
for _, sid := range sr.IDs {
for _, sid := range stacktraces {
s = t.resolve(s, sid)
dst.InsertStacktrace(off+sid, s)
dst.InsertStacktrace(sid, s)
}
stacktraceLocations.put(s)
return nil
}

type stacktraceChunk struct {
partition *stacktracesPartition
tree *stacktraceTree
stid uint32 // Initial stack trace ID.
stacks uint32 //
}

func (s *stacktraceChunk) WriteTo(dst io.Writer) (int64, error) {
return s.tree.WriteTo(dst)
func (p *stacktraces) WriteTo(dst io.Writer) (int64, error) {
return p.tree.WriteTo(dst)
}

func (p *PartitionWriter) AppendLocations(dst []uint32, locations []schemav1.InMemoryLocation) {
Expand Down Expand Up @@ -288,8 +181,7 @@ func (p *PartitionWriter) Symbols() *Symbols {

func (p *PartitionWriter) WriteStats(s *PartitionStats) {
p.stacktraces.m.RLock()
c := p.stacktraces.currentStacktraceChunk()
s.MaxStacktraceID = int(c.stid + c.tree.len())
s.MaxStacktraceID = int(p.stacktraces.tree.len())
s.StacktracesTotal = len(p.stacktraces.hashToIdx)
p.stacktraces.m.RUnlock()

Expand Down
Loading

0 comments on commit 784d4de

Please sign in to comment.