Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Seek() shouldn't return true when past maxt. Also add some tests to s… #327

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,9 @@ func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int6

func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
if t > it.maxt {
it.i = len(it.chunks)
}
if it.cur.Err() != nil || it.i >= len(it.chunks) {
return false
}

Expand All @@ -791,31 +794,46 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
t = it.mint
}

for ; it.chunks[it.i].MaxTime < t; it.i++ {
if it.i == len(it.chunks)-1 {
return false
}
// Return early if already past t.
if t0, _ := it.cur.At(); t0 >= t {
return t0 <= it.maxt
}

it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
}
iCur := it.i
for ; it.i < len(it.chunks); it.i++ {
if it.chunks[it.i].MaxTime >= t {
if it.i != iCur {
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
}
}

for it.cur.Next() {
t0, _ := it.cur.At()
if t0 >= t {
return true
for it.cur.Next() {
t0, _ := it.cur.At()
if t0 > it.maxt || it.cur.Err() != nil {
return false
}
if t0 >= t {
return true
}
}
}
}
return false
}

func (it *chunkSeriesIterator) At() (t int64, v float64) {
// Should it panic if it.cur.Err() != nil || it.i >= len(it.chunks)?
// What about before Next() or Seek() were called?
return it.cur.At()
}

func (it *chunkSeriesIterator) Next() bool {
if it.cur.Err() != nil || it.i >= len(it.chunks) {
return false
}

if it.cur.Next() {
t, _ := it.cur.At()

Expand All @@ -824,22 +842,18 @@ func (it *chunkSeriesIterator) Next() bool {
return false
}
t, _ = it.At()

return t <= it.maxt
}
if t > it.maxt {
return false
}
return true

return t <= it.maxt
}
if err := it.cur.Err(); err != nil {
return false
}
if it.i == len(it.chunks)-1 {
return false
}

it.i++
if it.i == len(it.chunks) {
return false
}
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
Expand Down
103 changes: 87 additions & 16 deletions querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,16 @@ func TestBlockQuerier(t *testing.T) {
},
},
},
{
lset: map[string]string{
"s": "s",
},
chunks: [][]sample{
{
{1, 2}, {10, 11},
},
},
},
},

queries: []query{
Expand Down Expand Up @@ -448,6 +458,18 @@ func TestBlockQuerier(t *testing.T) {
),
}),
},
{
mint: 2,
maxt: 9,
ms: []labels.Matcher{labels.NewEqualMatcher("s", "s")},
exp: newListSeriesSet([]Series{
newSeries(map[string]string{
"s": "s",
},
[]sample{},
),
}),
},
},
}

Expand Down Expand Up @@ -558,8 +580,8 @@ func TestBlockQuerierDelete(t *testing.T) {
},
},
tombstones: memTombstones{
1: Intervals{{1, 3}},
2: Intervals{{1, 3}, {6, 10}},
1: Intervals{{1, 2}},
2: Intervals{{2, 3}, {6, 10}},
3: Intervals{{6, 10}},
},

Expand All @@ -572,7 +594,7 @@ func TestBlockQuerierDelete(t *testing.T) {
newSeries(map[string]string{
"a": "a",
},
[]sample{{5, 2}, {6, 3}, {7, 4}},
[]sample{{3, 4}, {5, 2}, {6, 3}, {7, 4}},
),
newSeries(map[string]string{
"a": "a",
Expand Down Expand Up @@ -605,19 +627,36 @@ func TestBlockQuerierDelete(t *testing.T) {
maxt: 4,
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
exp: newListSeriesSet([]Series{
newSeries(map[string]string{
"a": "a",
},
[]sample{{3, 4}},
),
newSeries(map[string]string{
"a": "a",
"b": "b",
},
[]sample{{4, 15}},
[]sample{{1, 1}, {4, 15}},
),
}),
},
{
mint: 1,
maxt: 3,
maxt: 2,
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
exp: newListSeriesSet([]Series{}),
exp: newListSeriesSet([]Series{
newSeries(map[string]string{
"a": "a",
},
[]sample{},
),
newSeries(map[string]string{
"a": "a",
"b": "b",
},
[]sample{{1, 1}},
),
}),
},
},
}
Expand Down Expand Up @@ -866,7 +905,7 @@ func TestSeriesIterator(t *testing.T) {
b: []sample{},
c: []sample{},

seek: 0,
seek: 1,
success: false,
exp: nil,
},
Expand Down Expand Up @@ -987,7 +1026,7 @@ func TestSeriesIterator(t *testing.T) {
{10, 22}, {203, 3493},
},

seek: 203,
seek: 101,
success: false,
exp: nil,
mint: 2,
Expand Down Expand Up @@ -1038,10 +1077,10 @@ func TestSeriesIterator(t *testing.T) {
testutil.Assert(t, remaining == true, "")

for remaining {
sExp, eExp := exp.At()
sRes, eRes := res.At()
testutil.Equals(t, eExp, eRes)
testutil.Equals(t, sExp, sRes)
tExp, vExp := exp.At()
tRes, vRes := res.At()
testutil.Equals(t, tExp, tRes)
testutil.Equals(t, vExp, vRes)

remaining = exp.Next()
testutil.Equals(t, remaining, res.Next())
Expand Down Expand Up @@ -1084,10 +1123,10 @@ func TestSeriesIterator(t *testing.T) {
testutil.Assert(t, remaining == true, "")

for remaining {
sExp, eExp := exp.At()
sRes, eRes := res.At()
testutil.Equals(t, eExp, eRes)
testutil.Equals(t, sExp, sRes)
tExp, vExp := exp.At()
tRes, vRes := res.At()
testutil.Equals(t, tExp, tRes)
testutil.Equals(t, vExp, vRes)

remaining = exp.Next()
testutil.Equals(t, remaining, res.Next())
Expand Down Expand Up @@ -1116,6 +1155,24 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
testutil.Equals(t, float64(2), v)
}

// Regression for: https://github.com/prometheus/tsdb/pull/327
// Seek would go back within the same chunk.
func TestChunkSeriesIterator_DoubleSeekBackwards(t *testing.T) {
chkMetas := []chunks.Meta{
chunkFromSamples([]sample{}),
chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}),
chunkFromSamples([]sample{{4, 4}, {5, 5}}),
}

// Seek for 3, then 2. Iterator should remain positioned on 3.
res := newChunkSeriesIterator(chkMetas, nil, 2, 8)
testutil.Assert(t, res.Seek(3) == true, "")
testutil.Assert(t, res.Seek(2) == true, "")
ts, v := res.At()
testutil.Equals(t, int64(3), ts)
testutil.Equals(t, float64(3), v)
}

// Regression when seeked chunks were still found via binary search and we always
// skipped to the end when seeking a value in the current chunk.
func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
Expand Down Expand Up @@ -1149,6 +1206,20 @@ func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) {
testutil.Assert(t, it.Next() == false, "")
}

// Regression for: https://github.com/prometheus/tsdb/pull/327
// Calling Seek() with a time between [mint, maxt] after the iterator had
// already passed the end would incorrectly return true.
func TestChunkSeriesIterator_SeekWithMinTime(t *testing.T) {
metas := []chunks.Meta{
chunkFromSamples([]sample{{1, 6}, {5, 6}, {7, 8}}),
}

it := newChunkSeriesIterator(metas, nil, 2, 5)
testutil.Assert(t, it.Seek(6) == false, "")
// A second, within bounds Seek() used to succeed. Make sure it also returns false.
testutil.Assert(t, it.Seek(3) == false, "")
}

func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})}
chunkMetas := [][]chunks.Meta{
Expand Down