From 5c0bc7e8b9826d71b31c1d020b477ce4fbabbb9f Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 17 Feb 2021 10:13:23 +0100 Subject: [PATCH] Allow downstream projects to customise the Partitioner Signed-off-by: Marco Pracucci --- pkg/store/bucket.go | 38 ++++++++++++++++++------------------ pkg/store/bucket_e2e_test.go | 4 ++-- pkg/store/bucket_test.go | 20 +++++++++---------- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index c2bc919352..444a1069ac 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1815,11 +1815,11 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings g, ctx := errgroup.WithContext(r.ctx) for _, part := range parts { - i, j := part.elemRng[0], part.elemRng[1] + i, j := part.ElemRng[0], part.ElemRng[1] - start := int64(part.start) + start := int64(part.Start) // We assume index does not have any ptrs that has 0 length. - length := int64(part.end) - start + length := int64(part.End) - start // Fetch from object storage concurrently and update stats and posting list. g.Go(func() error { @@ -1975,8 +1975,8 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { }) g, ctx := errgroup.WithContext(r.ctx) for _, p := range parts { - s, e := p.start, p.end - i, j := p.elemRng[0], p.elemRng[1] + s, e := p.Start, p.End + i, j := p.ElemRng[0], p.ElemRng[1] g.Go(func() error { return r.loadSeries(ctx, ids[i:j], false, s, e) @@ -2028,11 +2028,11 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc return nil } -type part struct { - start uint64 - end uint64 +type Part struct { + Start uint64 + End uint64 - elemRng [2]int + ElemRng [2]int } type Partitioner interface { @@ -2040,7 +2040,7 @@ type Partitioner interface { // input ranges // It supports overlapping ranges. // NOTE: It expects range to be ted by start time. - Partition(length int, rng func(int) (uint64, uint64)) []part + Partition(length int, rng func(int) (uint64, uint64)) []Part } type gapBasedPartitioner struct { @@ -2056,29 +2056,29 @@ func NewGapBasedPartitioner(maxGapSize uint64) Partitioner { // Partition partitions length entries into n <= length ranges that cover all // input ranges by combining entries that are separated by reasonably small gaps. // It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones. -func (g gapBasedPartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []part) { +func (g gapBasedPartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []Part) { j := 0 k := 0 for k < length { j = k k++ - p := part{} - p.start, p.end = rng(j) + p := Part{} + p.Start, p.End = rng(j) // Keep growing the range until the end or we encounter a large gap. for ; k < length; k++ { s, e := rng(k) - if p.end+g.maxGapSize < s { + if p.End+g.maxGapSize < s { break } - if p.end <= e { - p.end = e + if p.End <= e { + p.End = e } } - p.elemRng = [2]int{j, k} + p.ElemRng = [2]int{j, k} parts = append(parts, p) } return parts @@ -2239,8 +2239,8 @@ func (r *bucketChunkReader) preload() error { offsets := offsets for _, p := range parts { - s, e := uint32(p.start), uint32(p.end) - m, n := p.elemRng[0], p.elemRng[1] + s, e := uint32(p.Start), uint32(p.End) + m, n := p.ElemRng[0], p.ElemRng[1] g.Go(func() error { return r.loadChunks(ctx, offsets[m:n], seq, s, e) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index d09bfc889b..ddcc6622d6 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -473,10 +473,10 @@ func TestBucketStore_e2e(t *testing.T) { type naivePartitioner struct{} -func (g naivePartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []part) { +func (g naivePartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []Part) { for i := 0; i < length; i++ { s, e := rng(i) - parts = append(parts, part{start: s, end: e, elemRng: [2]int{i, i + 1}}) + parts = append(parts, Part{Start: s, End: e, ElemRng: [2]int{i, i + 1}}) } return parts } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 59ff00c115..c845b2792c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -501,15 +501,15 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { for _, c := range []struct { input [][2]int - expected []part + expected []Part }{ { input: [][2]int{{1, 10}}, - expected: []part{{start: 1, end: 10, elemRng: [2]int{0, 1}}}, + expected: []Part{{Start: 1, End: 10, ElemRng: [2]int{0, 1}}}, }, { input: [][2]int{{1, 2}, {3, 5}, {7, 10}}, - expected: []part{{start: 1, end: 10, elemRng: [2]int{0, 3}}}, + expected: []Part{{Start: 1, End: 10, ElemRng: [2]int{0, 3}}}, }, { input: [][2]int{ @@ -518,9 +518,9 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { {20, 30}, {maxGapSize + 31, maxGapSize + 32}, }, - expected: []part{ - {start: 1, end: 30, elemRng: [2]int{0, 3}}, - {start: maxGapSize + 31, end: maxGapSize + 32, elemRng: [2]int{3, 4}}, + expected: []Part{ + {Start: 1, End: 30, ElemRng: [2]int{0, 3}}, + {Start: maxGapSize + 31, End: maxGapSize + 32, ElemRng: [2]int{3, 4}}, }, }, // Overlapping ranges. @@ -532,9 +532,9 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { {maxGapSize + 31, maxGapSize + 32}, {maxGapSize + 31, maxGapSize + 40}, }, - expected: []part{ - {start: 1, end: 30, elemRng: [2]int{0, 3}}, - {start: maxGapSize + 31, end: maxGapSize + 40, elemRng: [2]int{3, 5}}, + expected: []Part{ + {Start: 1, End: 30, ElemRng: [2]int{0, 3}}, + {Start: maxGapSize + 31, End: maxGapSize + 40, ElemRng: [2]int{3, 5}}, }, }, { @@ -544,7 +544,7 @@ func TestGapBasedPartitioner_Partition(t *testing.T) { {1, maxGapSize + 100}, {maxGapSize + 31, maxGapSize + 40}, }, - expected: []part{{start: 1, end: maxGapSize + 100, elemRng: [2]int{0, 3}}}, + expected: []Part{{Start: 1, End: maxGapSize + 100, ElemRng: [2]int{0, 3}}}, }, } { res := gapBasedPartitioner{maxGapSize: maxGapSize}.Partition(len(c.input), func(i int) (uint64, uint64) {