Skip to content

Commit

Permalink
perf(v2): tune parquet row iterator batching (#3726)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Nov 28, 2024
1 parent a2ca38d commit ee162aa
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 17 deletions.
26 changes: 15 additions & 11 deletions pkg/experiment/query_backend/query_profile_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ import (
"github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index"
)

// As we expect rows to be very small, we want to fetch a bigger
// batch of rows at once to amortize the latency of reading.
const bigBatchSize = 2 << 10

type ProfileEntry struct {
RowNum int64
Timestamp model.Time
Fingerprint model.Fingerprint
Labels phlaremodel.Labels
Partition uint64
}

func (e ProfileEntry) RowNumber() int64 { return e.RowNum }

func profileEntryIterator(q *queryContext, groupBy ...string) (iter.Iterator[ProfileEntry], error) {
series, err := getSeriesLabels(q.ds.Index(), q.req.matchers, groupBy...)
if err != nil {
Expand All @@ -28,7 +42,7 @@ func profileEntryIterator(q *queryContext, groupBy ...string) (iter.Iterator[Pro

buf := make([][]parquet.Value, 3)
entries := iter.NewAsyncBatchIterator[*parquetquery.IteratorResult, ProfileEntry](
results, 128,
results, bigBatchSize,
func(r *parquetquery.IteratorResult) ProfileEntry {
buf = r.Columns(buf,
schemav1.SeriesIndexColumnName,
Expand All @@ -48,16 +62,6 @@ func profileEntryIterator(q *queryContext, groupBy ...string) (iter.Iterator[Pro
return entries, nil
}

type ProfileEntry struct {
RowNum int64
Timestamp model.Time
Fingerprint model.Fingerprint
Labels phlaremodel.Labels
Partition uint64
}

func (e ProfileEntry) RowNumber() int64 { return e.RowNum }

type seriesLabels struct {
fingerprint model.Fingerprint
labels phlaremodel.Labels
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/query_backend/query_time_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func queryTimeSeries(q *queryContext, query *queryv1.Query) (r *queryv1.Report,
return nil, err
}

rows := parquetquery.NewRepeatedRowIterator(q.ctx, entries, q.ds.Profiles().RowGroups(), column.ColumnIndex)
rows := parquetquery.NewRepeatedRowIteratorBatchSize(q.ctx, entries, q.ds.Profiles().RowGroups(), bigBatchSize, column.ColumnIndex)
defer runutil.CloseWithErrCapture(&err, rows, "failed to close column iterator")

builder := phlaremodel.NewTimeSeriesBuilder(query.TimeSeries.GroupBy...)
Expand Down
29 changes: 24 additions & 5 deletions pkg/phlaredb/query/repeated.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,43 @@ const (
//
// How many values we expect per a row, the upper boundary?
repeatedRowColumnIteratorReadSize = 2 << 10

// Batch size specifies how many rows to read from a column at once.
// Note that the batched rows are buffered in-memory but do not reference
// the pages from which they were read.
//
// The default value is extremely conservative, as in most cases, rows are
// quite large (e.g., profile samples). Given that we run many queries
// concurrently, the memory waste outweighs the benefits of the "read-ahead"
// optimization that batching is intended to provide.
//
// However, in cases where the rows are small (such as in time series),
// the value should be increased significantly.
defaultBatchSize = 4
)

func NewRepeatedRowIterator[T any](
ctx context.Context,
rows iter.Iterator[T],
rowGroups []parquet.RowGroup,
columns ...int,
) iter.Iterator[RepeatedRow[T]] {
return NewRepeatedRowIteratorBatchSize(ctx, rows, rowGroups, defaultBatchSize, columns...)
}

func NewRepeatedRowIteratorBatchSize[T any](
ctx context.Context,
rows iter.Iterator[T],
rowGroups []parquet.RowGroup,
batchSize int64,
columns ...int,
) iter.Iterator[RepeatedRow[T]] {
rows, rowNumbers := iter.Tee(rows)
return &repeatedRowIterator[T]{
rows: rows,
columns: NewMultiColumnIterator(ctx,
WrapWithRowNumber(rowNumbers),
// Batch size specifies how many rows to be read
// from a column at once. Note that the batched rows
// are buffered in-memory, but not reference pages
// they were read from.
4,
int(batchSize),
rowGroups,
columns...,
),
Expand Down

0 comments on commit ee162aa

Please sign in to comment.