Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve head chunk allocations when reading samples. #3968

Merged
merged 3 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"hash"
"hash/crc32"
"io"
"sort"
"reflect"
"time"
"unsafe"

"github.com/cespare/xxhash/v2"
util_log "github.com/cortexproject/cortex/pkg/util/log"
Expand Down Expand Up @@ -912,14 +913,12 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
}
series[lhash] = s
}

// []byte here doesn't create allocation because Sum64 has go:noescape directive
// It specifies that the function does not allow any of the pointers passed as arguments to escape into the heap or into the values returned from the function.
h := xxhash.Sum64([]byte(e.s))
h := xxhash.Sum64(unsafeGetBytes(e.s))
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.t,
Value: value,
Expand All @@ -932,11 +931,22 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra
}
seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series {
// todo(ctovena) not sure we need this sort.
sort.Sort(s)
seriesRes = append(seriesRes, *s)
}
return iter.NewMultiSeriesIterator(ctx, seriesRes)
return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error {
for _, s := range series {
SamplesPool.Put(s.Samples)
}
return nil
})
}

func unsafeGetBytes(s string) []byte {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to put this in it's own pkg somewhere along with a few tests now that we're using it in multiple places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't know if two function are worth ? May be an util package ? but then it's a big mess of everything.

var buf []byte
p := unsafe.Pointer(&buf)
*(*string)(p) = s
(*reflect.SliceHeader)(p).Cap = len(s)
return buf
}

type bufferedIterator struct {
Expand Down
25 changes: 23 additions & 2 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,27 @@ func BenchmarkRead(b *testing.B) {
b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds())))
b.Log("n=", b.N)
})

b.Run(enc.String()+"_sample", func(b *testing.B) {
chunks, size := generateData(enc, 5)
b.ResetTimer()
bytesRead := uint64(0)
now := time.Now()
for n := 0; n < b.N; n++ {
for _, c := range chunks {
iterator := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), countExtractor)
for iterator.Next() {
_ = iterator.Sample()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
bytesRead += size
}
b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds())))
b.Log("n=", b.N)
})
}
}

Expand Down Expand Up @@ -733,7 +754,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
}

func BenchmarkHeadBlockSampleIterator(b *testing.B) {
for _, j := range []int{100000, 50000, 15000, 10000} {
for _, j := range []int{20000, 10000, 8000, 5000} {
b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) {
h := headBlock{}

Expand All @@ -751,6 +772,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
for iter.Next() {
_ = iter.Sample()
}
iter.Close()
}
})
}
Expand Down Expand Up @@ -1132,7 +1154,6 @@ func TestMemChunk_Rebound(t *testing.T) {

require.Equal(t, originalChunkItr.Entry(), newChunkItr.Entry())
}

})
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
"github.com/prometheus/prometheus/pkg/pool"

"github.com/grafana/loki/pkg/logproto"
)

// WriterPool is a pool of io.Writer
Expand Down Expand Up @@ -53,6 +55,9 @@ var (
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })

// SamplesPool pooling array of samples [512,1024,...,16k]
SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) })

// Pool of crc32 hash
crc32HashPool = sync.Pool{
New: func() interface{} {
Expand Down
23 changes: 10 additions & 13 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/binary"
"io"
"math"
"sort"
"time"

"github.com/Workiva/go-datastructures/rangetree"
Expand All @@ -20,9 +19,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel/stats"
)

var (
noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})
)
var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})

type unorderedHeadBlock struct {
// Opted for range tree over skiplist for space reduction.
Expand Down Expand Up @@ -86,7 +83,6 @@ func (hb *unorderedHeadBlock) append(ts int64, line string) {

hb.size += len(line)
hb.lines++

}

// Implements rangetree.Interval
Expand Down Expand Up @@ -238,15 +234,13 @@ func (hb *unorderedHeadBlock) sampleIterator(
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
}
series[lhash] = s
}

// []byte here doesn't create allocation because Sum64 has go:noescape directive
// It specifies that the function does not allow any of the pointers passed as arguments
// to escape into the heap or into the values returned from the function.
h := xxhash.Sum64([]byte(line))
h := xxhash.Sum64(unsafeGetBytes(line))
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: ts,
Value: value,
Expand All @@ -261,11 +255,14 @@ func (hb *unorderedHeadBlock) sampleIterator(
}
seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series {
// todo(ctovena) not sure we need this sort.
sort.Sort(s)
seriesRes = append(seriesRes, *s)
}
return iter.NewMultiSeriesIterator(ctx, seriesRes)
return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error {
for _, s := range series {
SamplesPool.Put(s.Samples)
}
return nil
})
}

// nolint:unused
Expand Down
26 changes: 26 additions & 0 deletions pkg/iter/sample_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,32 @@ type seriesIterator struct {
labels string
}

type withCloseSampleIterator struct {
closeFn func() error
SampleIterator
}

func (w *withCloseSampleIterator) Close() error {
var errs []error
if err := w.SampleIterator.Close(); err != nil {
errs = append(errs, err)
}
if err := w.closeFn(); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
return nil
}
return util.MultiError(errs)
}

func SampleIteratorWithClose(it SampleIterator, closeFn func() error) SampleIterator {
return &withCloseSampleIterator{
closeFn: closeFn,
SampleIterator: it,
}
}

// NewMultiSeriesIterator returns an iterator over multiple logproto.Series
func NewMultiSeriesIterator(ctx context.Context, series []logproto.Series) SampleIterator {
is := make([]SampleIterator, 0, len(series))
Expand Down