Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB: fix wrong initial value of tsidOrd in TimeSeriesIndexSearcher #85713

Merged
merged 4 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/85713.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 85713
summary: "TSDB: fix wrong initial value of tsidOrd in TimeSeriesIndexSearcher"
area: TSDB
type: bug
issues:
- 85711
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ int nextDoc() throws IOException {
}

BytesRef getTsid() throws IOException {
scratch.copyBytes(tsids.lookupOrd(tsids.ordValue()));
tsidOrd = tsids.ordValue();
scratch.copyBytes(tsids.lookupOrd(tsidOrd));
return scratch.get();
}

Expand All @@ -221,13 +222,11 @@ private boolean isInvalidDoc(int docId) throws IOException {

// true if the TSID ord has changed since the last time we checked
boolean shouldPop() throws IOException {
if (tsidOrd == -1) {
tsidOrd = tsids.ordValue();
} else if (tsidOrd != tsids.ordValue()) {
tsidOrd = tsids.ordValue();
if (tsidOrd != tsids.ordValue()) {
return true;
} else {
return false;
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.sandbox.search.DocValuesTermsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ScoreMode;
Expand Down Expand Up @@ -50,26 +51,18 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase {
// Collection should be in order

public void testCollectInOrderAcrossSegments() throws IOException, InterruptedException {

Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
boolean tsidReverse = TIME_SERIES_SORT[0].getOrder() == SortOrder.DESC;
boolean timestampReverse = TIME_SERIES_SORT[1].getOrder() == SortOrder.DESC;
Sort sort = new Sort(
new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING, tsidReverse),
new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG, timestampReverse)
);
iwc.setIndexSort(sort);
RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc);
RandomIndexWriter iw = getIndexWriter(dir);

AtomicInteger clock = new AtomicInteger(0);

final int THREADS = 5;
final int docCounts = 500;
weizijun marked this conversation as resolved.
Show resolved Hide resolved
ExecutorService indexer = Executors.newFixedThreadPool(THREADS);
for (int i = 0; i < THREADS; i++) {
indexer.submit(() -> {
Document doc = new Document();
for (int j = 0; j < 500; j++) {
for (int j = 0; j < docCounts; j++) {
String tsid = "tsid" + randomIntBetween(0, 30);
long time = clock.addAndGet(randomIntBetween(0, 10));
doc.clear();
Expand All @@ -92,8 +85,98 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx

TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of());

BucketCollector collector = new BucketCollector() {
BucketCollector collector = getBucketCollector(THREADS * docCounts);

indexSearcher.search(new MatchAllDocsQuery(), collector);
collector.postCollection();

reader.close();
dir.close();
}

/**
* this test fixed the wrong init value of tsidOrd
* See https://github.com/elastic/elasticsearch/issues/85711
*/
public void testCollectFromMiddle() throws IOException {
Directory dir = newDirectory();
RandomIndexWriter iw = getIndexWriter(dir);

Document doc = new Document();
final int docCounts = 500;

// segment 1
// pre add a value
doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef("tsid")));
doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, 1));
iw.addDocument(doc);

// segment 1 add value, timestamp is all large then segment 2
for (int j = 0; j < docCounts; j++) {
String tsid = "tsid" + randomIntBetween(0, 1);
doc.clear();
doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid)));
doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, randomIntBetween(20, 25)));
try {
iw.addDocument(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
iw.commit();

// segment 2
// pre add a value
doc.clear();
doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef("tsid")));
doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, 1));
iw.addDocument(doc);
for (int j = 0; j < docCounts; j++) {
String tsid = "tsid" + randomIntBetween(0, 1);
doc.clear();
doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid)));
doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, randomIntBetween(10, 15)));
try {
iw.addDocument(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

iw.close();
IndexReader reader = DirectoryReader.open(dir);
IndexSearcher searcher = new IndexSearcher(reader);

TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of());

BucketCollector collector = getBucketCollector(2 * docCounts);

// skip the first doc of segment 1 and 2
indexSearcher.search(new DocValuesTermsQuery("_tsid", List.of(new BytesRef("tsid0"), new BytesRef("tsid1"))), collector);
collector.postCollection();

reader.close();
dir.close();
}

private RandomIndexWriter getIndexWriter(Directory dir) throws IOException {

IndexWriterConfig iwc = newIndexWriterConfig();
boolean tsidReverse = TIME_SERIES_SORT[0].getOrder() == SortOrder.DESC;
boolean timestampReverse = TIME_SERIES_SORT[1].getOrder() == SortOrder.DESC;
Sort sort = new Sort(
new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING, tsidReverse),
new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG, timestampReverse)
);
iwc.setIndexSort(sort);
return new RandomIndexWriter(random(), dir, iwc);
}

private BucketCollector getBucketCollector(long totalCount) {
return new BucketCollector() {

boolean tsidReverse = TIME_SERIES_SORT[0].getOrder() == SortOrder.DESC;
boolean timestampReverse = TIME_SERIES_SORT[1].getOrder() == SortOrder.DESC;
BytesRef currentTSID = null;
long currentTimestamp = 0;
long total = 0;
Expand Down Expand Up @@ -139,20 +222,13 @@ public void preCollection() throws IOException {

@Override
public void postCollection() throws IOException {
assertEquals(2500, total);
assertEquals(totalCount, total);
}

@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE;
}
};

indexSearcher.search(new MatchAllDocsQuery(), collector);
collector.postCollection();

reader.close();
dir.close();

}
}