From cf98ac16136f8d844d699a6a30c608bdf45baaac Mon Sep 17 00:00:00 2001 From: weizijun Date: Wed, 6 Apr 2022 16:41:25 +0800 Subject: [PATCH 1/3] fixup --- .../timeseries/TimeSeriesIndexSearcher.java | 11 +- .../TimeSeriesIndexSearcherTests.java | 113 ++++++++++++++---- 2 files changed, 98 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java index 347a33238b751..5f6e423e1da1a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java @@ -193,7 +193,8 @@ int nextDoc() throws IOException { } BytesRef getTsid() throws IOException { - scratch.copyBytes(tsids.lookupOrd(tsids.ordValue())); + tsidOrd = tsids.ordValue(); + scratch.copyBytes(tsids.lookupOrd(tsidOrd)); return scratch.get(); } @@ -206,13 +207,11 @@ private boolean isInvalidDoc(int docId) throws IOException { // true if the TSID ord has changed since the last time we checked boolean shouldPop() throws IOException { - if (tsidOrd == -1) { - tsidOrd = tsids.ordValue(); - } else if (tsidOrd != tsids.ordValue()) { - tsidOrd = tsids.ordValue(); + if (tsidOrd != tsids.ordValue()) { return true; + } else { + return false; } - return false; } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java index 0b960c4648962..b5d6c6cfb905e 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java @@ -17,6 +17,7 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.sandbox.search.DocValuesTermsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ScoreMode; @@ -47,25 +48,18 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase { // Collection should be in order public void testCollectInOrderAcrossSegments() throws IOException, InterruptedException { - Directory dir = newDirectory(); - IndexWriterConfig iwc = newIndexWriterConfig(); - iwc.setIndexSort( - new Sort( - new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING), - new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG) - ) - ); - RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc); + RandomIndexWriter iw = getIndexWriter(dir); AtomicInteger clock = new AtomicInteger(0); final int THREADS = 5; + final int docCounts = 500; ExecutorService indexer = Executors.newFixedThreadPool(THREADS); for (int i = 0; i < THREADS; i++) { indexer.submit(() -> { Document doc = new Document(); - for (int j = 0; j < 500; j++) { + for (int j = 0; j < docCounts; j++) { String tsid = "tsid" + randomIntBetween(0, 30); long time = clock.addAndGet(randomIntBetween(0, 10)); doc.clear(); @@ -88,7 +82,94 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); - BucketCollector collector = new BucketCollector() { + BucketCollector collector = getBucketCollector(THREADS * docCounts); + + indexSearcher.search(new MatchAllDocsQuery(), collector); + collector.postCollection(); + + reader.close(); + dir.close(); + } + + /** + * this test fixed the wrong init value of tsidOrd + * See https://github.com/elastic/elasticsearch/issues/85711 + */ + public void testCollectFromMiddle() throws IOException { + Directory dir = newDirectory(); + RandomIndexWriter iw = getIndexWriter(dir); + + Document doc = new Document(); + final int docCounts = 500; + + // segment 1 + // pre add a value + doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef("tsid"))); + doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, 1)); + iw.addDocument(doc); + + // segment 1 add value, timestamp is all large then segment 2 + for (int j = 0; j < docCounts; j++) { + String tsid = "tsid" + randomIntBetween(0, 1); + doc.clear(); + doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); + doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, randomIntBetween(20, 25))); + try { + iw.addDocument(doc); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + iw.commit(); + + // segment 2 + // pre add a value + doc.clear(); + doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef("tsid"))); + doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, 1)); + iw.addDocument(doc); + for (int j = 0; j < docCounts; j++) { + String tsid = "tsid" + randomIntBetween(0, 1); + doc.clear(); + doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); + doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, randomIntBetween(10, 15))); + try { + iw.addDocument(doc); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + iw.close(); + IndexReader reader = DirectoryReader.open(dir); + IndexSearcher searcher = new IndexSearcher(reader); + + TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); + + BucketCollector collector = getBucketCollector(2 * docCounts); + + // skip the first doc of segment 1 and 2 + indexSearcher.search(new DocValuesTermsQuery("_tsid", List.of(new BytesRef("tsid0"), new BytesRef("tsid1"))), collector); + collector.postCollection(); + + reader.close(); + dir.close(); + } + + private RandomIndexWriter getIndexWriter(Directory dir) throws IOException { + + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setIndexSort( + new Sort( + new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING), + new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG) + ) + ); + return new RandomIndexWriter(random(), dir, iwc); + } + + private BucketCollector getBucketCollector(long totalCount) { + return new BucketCollector() { BytesRef currentTSID = null; long currentTimestamp = 0; @@ -129,7 +210,7 @@ public void preCollection() throws IOException { @Override public void postCollection() throws IOException { - assertEquals(2500, total); + assertEquals(totalCount, total); } @Override @@ -137,13 +218,5 @@ public ScoreMode scoreMode() { return ScoreMode.COMPLETE; } }; - - indexSearcher.search(new MatchAllDocsQuery(), collector); - collector.postCollection(); - - reader.close(); - dir.close(); - } - } From 05b76b0be4903f8ac9ef133501226b610b312a64 Mon Sep 17 00:00:00 2001 From: weizijun Date: Tue, 12 Apr 2022 09:13:16 +0800 Subject: [PATCH 2/3] add changelog --- docs/changelog/85713.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/85713.yaml diff --git a/docs/changelog/85713.yaml b/docs/changelog/85713.yaml new file mode 100644 index 0000000000000..705942e960f3c --- /dev/null +++ b/docs/changelog/85713.yaml @@ -0,0 +1,6 @@ +pr: 85713 +summary: "TSDB: fix wrong initial value of tsidOrd in TimeSeriesIndexSearcher" +area: TSDB +type: bug +issues: + - 85711 From 314dacabd17d68c3dd26181a27b525c0b06950df Mon Sep 17 00:00:00 2001 From: weizijun Date: Wed, 13 Apr 2022 17:41:02 +0800 Subject: [PATCH 3/3] constant --- .../timeseries/TimeSeriesIndexSearcherTests.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java index b59b50e1afe2a..a6053542eedeb 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java @@ -57,12 +57,12 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx AtomicInteger clock = new AtomicInteger(0); final int THREADS = 5; - final int docCounts = 500; + final int DOC_COUNTS = 500; ExecutorService indexer = Executors.newFixedThreadPool(THREADS); for (int i = 0; i < THREADS; i++) { indexer.submit(() -> { Document doc = new Document(); - for (int j = 0; j < docCounts; j++) { + for (int j = 0; j < DOC_COUNTS; j++) { String tsid = "tsid" + randomIntBetween(0, 30); long time = clock.addAndGet(randomIntBetween(0, 10)); doc.clear(); @@ -85,7 +85,7 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); - BucketCollector collector = getBucketCollector(THREADS * docCounts); + BucketCollector collector = getBucketCollector(THREADS * DOC_COUNTS); indexSearcher.search(new MatchAllDocsQuery(), collector); collector.postCollection(); @@ -103,7 +103,7 @@ public void testCollectFromMiddle() throws IOException { RandomIndexWriter iw = getIndexWriter(dir); Document doc = new Document(); - final int docCounts = 500; + final int DOC_COUNTS = 500; // segment 1 // pre add a value @@ -112,7 +112,7 @@ public void testCollectFromMiddle() throws IOException { iw.addDocument(doc); // segment 1 add value, timestamp is all large then segment 2 - for (int j = 0; j < docCounts; j++) { + for (int j = 0; j < DOC_COUNTS; j++) { String tsid = "tsid" + randomIntBetween(0, 1); doc.clear(); doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); @@ -131,7 +131,7 @@ public void testCollectFromMiddle() throws IOException { doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef("tsid"))); doc.add(new NumericDocValuesField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, 1)); iw.addDocument(doc); - for (int j = 0; j < docCounts; j++) { + for (int j = 0; j < DOC_COUNTS; j++) { String tsid = "tsid" + randomIntBetween(0, 1); doc.clear(); doc.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, new BytesRef(tsid))); @@ -149,7 +149,7 @@ public void testCollectFromMiddle() throws IOException { TimeSeriesIndexSearcher indexSearcher = new TimeSeriesIndexSearcher(searcher, List.of()); - BucketCollector collector = getBucketCollector(2 * docCounts); + BucketCollector collector = getBucketCollector(2 * DOC_COUNTS); // skip the first doc of segment 1 and 2 indexSearcher.search(new DocValuesTermsQuery("_tsid", List.of(new BytesRef("tsid0"), new BytesRef("tsid1"))), collector);