Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow downstream projects to customise the Partitioner #3808

Merged
merged 1 commit into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,11 +1815,11 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings

g, ctx := errgroup.WithContext(r.ctx)
for _, part := range parts {
i, j := part.elemRng[0], part.elemRng[1]
i, j := part.ElemRng[0], part.ElemRng[1]

start := int64(part.start)
start := int64(part.Start)
// We assume index does not have any ptrs that has 0 length.
length := int64(part.end) - start
length := int64(part.End) - start

// Fetch from object storage concurrently and update stats and posting list.
g.Go(func() error {
Expand Down Expand Up @@ -1975,8 +1975,8 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
})
g, ctx := errgroup.WithContext(r.ctx)
for _, p := range parts {
s, e := p.start, p.end
i, j := p.elemRng[0], p.elemRng[1]
s, e := p.Start, p.End
i, j := p.ElemRng[0], p.ElemRng[1]

g.Go(func() error {
return r.loadSeries(ctx, ids[i:j], false, s, e)
Expand Down Expand Up @@ -2028,19 +2028,19 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetc
return nil
}

type part struct {
start uint64
end uint64
type Part struct {
Start uint64
End uint64

elemRng [2]int
ElemRng [2]int
}

type Partitioner interface {
// Partition partitions length entries into n <= length ranges that cover all
// input ranges
// It supports overlapping ranges.
// NOTE: It expects range to be ted by start time.
Partition(length int, rng func(int) (uint64, uint64)) []part
Partition(length int, rng func(int) (uint64, uint64)) []Part
}

type gapBasedPartitioner struct {
Expand All @@ -2056,29 +2056,29 @@ func NewGapBasedPartitioner(maxGapSize uint64) Partitioner {
// Partition partitions length entries into n <= length ranges that cover all
// input ranges by combining entries that are separated by reasonably small gaps.
// It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones.
func (g gapBasedPartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []part) {
func (g gapBasedPartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []Part) {
j := 0
k := 0
for k < length {
j = k
k++

p := part{}
p.start, p.end = rng(j)
p := Part{}
p.Start, p.End = rng(j)

// Keep growing the range until the end or we encounter a large gap.
for ; k < length; k++ {
s, e := rng(k)

if p.end+g.maxGapSize < s {
if p.End+g.maxGapSize < s {
break
}

if p.end <= e {
p.end = e
if p.End <= e {
p.End = e
}
}
p.elemRng = [2]int{j, k}
p.ElemRng = [2]int{j, k}
parts = append(parts, p)
}
return parts
Expand Down Expand Up @@ -2239,8 +2239,8 @@ func (r *bucketChunkReader) preload() error {
offsets := offsets

for _, p := range parts {
s, e := uint32(p.start), uint32(p.end)
m, n := p.elemRng[0], p.elemRng[1]
s, e := uint32(p.Start), uint32(p.End)
m, n := p.ElemRng[0], p.ElemRng[1]

g.Go(func() error {
return r.loadChunks(ctx, offsets[m:n], seq, s, e)
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,10 @@ func TestBucketStore_e2e(t *testing.T) {

type naivePartitioner struct{}

func (g naivePartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []part) {
func (g naivePartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []Part) {
for i := 0; i < length; i++ {
s, e := rng(i)
parts = append(parts, part{start: s, end: e, elemRng: [2]int{i, i + 1}})
parts = append(parts, Part{Start: s, End: e, ElemRng: [2]int{i, i + 1}})
}
return parts
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,15 +501,15 @@ func TestGapBasedPartitioner_Partition(t *testing.T) {

for _, c := range []struct {
input [][2]int
expected []part
expected []Part
}{
{
input: [][2]int{{1, 10}},
expected: []part{{start: 1, end: 10, elemRng: [2]int{0, 1}}},
expected: []Part{{Start: 1, End: 10, ElemRng: [2]int{0, 1}}},
},
{
input: [][2]int{{1, 2}, {3, 5}, {7, 10}},
expected: []part{{start: 1, end: 10, elemRng: [2]int{0, 3}}},
expected: []Part{{Start: 1, End: 10, ElemRng: [2]int{0, 3}}},
},
{
input: [][2]int{
Expand All @@ -518,9 +518,9 @@ func TestGapBasedPartitioner_Partition(t *testing.T) {
{20, 30},
{maxGapSize + 31, maxGapSize + 32},
},
expected: []part{
{start: 1, end: 30, elemRng: [2]int{0, 3}},
{start: maxGapSize + 31, end: maxGapSize + 32, elemRng: [2]int{3, 4}},
expected: []Part{
{Start: 1, End: 30, ElemRng: [2]int{0, 3}},
{Start: maxGapSize + 31, End: maxGapSize + 32, ElemRng: [2]int{3, 4}},
},
},
// Overlapping ranges.
Expand All @@ -532,9 +532,9 @@ func TestGapBasedPartitioner_Partition(t *testing.T) {
{maxGapSize + 31, maxGapSize + 32},
{maxGapSize + 31, maxGapSize + 40},
},
expected: []part{
{start: 1, end: 30, elemRng: [2]int{0, 3}},
{start: maxGapSize + 31, end: maxGapSize + 40, elemRng: [2]int{3, 5}},
expected: []Part{
{Start: 1, End: 30, ElemRng: [2]int{0, 3}},
{Start: maxGapSize + 31, End: maxGapSize + 40, ElemRng: [2]int{3, 5}},
},
},
{
Expand All @@ -544,7 +544,7 @@ func TestGapBasedPartitioner_Partition(t *testing.T) {
{1, maxGapSize + 100},
{maxGapSize + 31, maxGapSize + 40},
},
expected: []part{{start: 1, end: maxGapSize + 100, elemRng: [2]int{0, 3}}},
expected: []Part{{Start: 1, End: maxGapSize + 100, ElemRng: [2]int{0, 3}}},
},
} {
res := gapBasedPartitioner{maxGapSize: maxGapSize}.Partition(len(c.input), func(i int) (uint64, uint64) {
Expand Down