Skip to content

Commit

Permalink
Fix term stats when talking to ES 6 (elastic#75735)
Browse files Browse the repository at this point in the history
In a mixed 6.x and 7.x cluster, a search that uses dfs_query_then_fetch can cause a transport serialization errors.

This is related to https://issues.apache.org/jira/browse/LUCENE-8007, which was introduced in Lucene 8 and adds stricter checks to TermStatistics and CollectionStatistics, and https://issues.apache.org/jira/browse/LUCENE-8020, which was introduced in Lucene 8 and avoids bogus term stats (e.g. docfreq=0).

Co-authored-by: Julie Tibshirani [email protected]

Closes elastic#75349
  • Loading branch information
ywelsch committed Jul 28, 2021
1 parent 21b5a2d commit 8082bbb
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"Perform a dfs_query_then_fetch search on a keyword field":
- do:
search:
search_type: dfs_query_then_fetch
index: keyword_index
rest_total_hits_as_int: true
body:
query:
match:
field:
query: value

- match: { hits.total: 3 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
"Perform a dfs_query_then_fetch search on a keyword field":
- do:
indices.create:
index: keyword_index
body:
mappings:
properties:
field:
type: keyword
- do:
index:
index: keyword_index
body:
field: value
refresh: true

- do:
index:
index: keyword_index
body:
field: value
refresh: true

- do:
index:
index: keyword_index
body:
field: value
refresh: true

- do:
search:
search_type: dfs_query_then_fetch
index: keyword_index
rest_total_hits_as_int: true
body:
query:
match:
field:
query: value

- match: { hits.total: 3 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"Perform a dfs_query_then_fetch search on a keyword field":
- do:
search:
search_type: dfs_query_then_fetch
index: keyword_index
rest_total_hits_as_int: true
body:
query:
match:
field:
query: value

- match: { hits.total: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -32,9 +34,19 @@ public AggregatedDfs(StreamInput in) throws IOException {
termStatistics = HppcMaps.newMap(size);
for (int i = 0; i < size; i++) {
Term term = new Term(in.readString(), in.readBytesRef());
TermStatistics stats = new TermStatistics(in.readBytesRef(),
in.readVLong(),
DfsSearchResult.subOne(in.readVLong()));
BytesRef term2 = in.readBytesRef();
final long docFreq = in.readVLong();
assert docFreq >= 0;
long totalTermFreq = DfsSearchResult.subOne(in.readVLong());
if (in.getVersion().before(Version.V_7_0_0)) {
if (totalTermFreq == -1L) {
// Lucene 7 and earlier used -1 to denote that this information wasn't stored by the codec
// or that this field omitted term frequencies and positions. It used docFreq as fallback in that case
// when calculating similarities. See LUCENE-8007 for more information.
totalTermFreq = docFreq;
}
}
TermStatistics stats = new TermStatistics(term2, docFreq, totalTermFreq);
termStatistics.put(term, stats);
}
fieldStatistics = DfsSearchResult.readFieldStats(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static void writeFieldStats(StreamOutput out, ObjectObjectHashMap<String,
for (ObjectObjectCursor<String, CollectionStatistics> c : fieldStatistics) {
out.writeString(c.key);
CollectionStatistics statistics = c.value;
assert statistics.maxDoc() >= 0;
assert statistics.maxDoc() > 0;
out.writeVLong(statistics.maxDoc());
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
// stats are always positive numbers
Expand Down Expand Up @@ -156,8 +156,8 @@ static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(StreamIn
final String field = in.readString();
assert field != null;
final long maxDoc = in.readVLong();
final long docCount;
final long sumTotalTermFreq;
long docCount;
long sumTotalTermFreq;
final long sumDocFreq;
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
// stats are always positive numbers
Expand All @@ -168,6 +168,26 @@ static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(StreamIn
docCount = subOne(in.readVLong());
sumTotalTermFreq = subOne(in.readVLong());
sumDocFreq = subOne(in.readVLong());
if (sumTotalTermFreq == -1L) {
// Lucene 7 and earlier used -1 to denote that this information wasn't stored by the codec
// or that this field omitted term frequencies and positions. It used docFreq as fallback in that case
// when calculating similarities. See LUCENE-8007 for more information.
sumTotalTermFreq = sumDocFreq;
}
if (docCount == -1L) {
// Lucene 7 and earlier used -1 to denote that this information wasn't stored by the codec
// It used maxDoc as fallback in that case when calculating similarities. See LUCENE-8007 for more information.
docCount = maxDoc;
}
if (docCount == 0L) {
// empty stats object (LUCENE-8020)
assert maxDoc == 0 && docCount == 0 && sumTotalTermFreq == 0 && sumDocFreq == 0:
" maxDoc:" + maxDoc +
" docCount:" + docCount +
" sumTotalTermFreq:" + sumTotalTermFreq +
" sumDocFreq:" + sumDocFreq;
continue;
}
}
CollectionStatistics stats = new CollectionStatistics(field, maxDoc, docCount, sumTotalTermFreq, sumDocFreq);
fieldStatistics.put(field, stats);
Expand All @@ -187,10 +207,18 @@ static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOExc
BytesRef term = terms[i].bytes();
final long docFreq = in.readVLong();
assert docFreq >= 0;
final long totalTermFreq = subOne(in.readVLong());
long totalTermFreq = subOne(in.readVLong());
if (docFreq == 0) {
continue;
}
if (in.getVersion().before(Version.V_7_0_0)) {
if (totalTermFreq == -1L) {
// Lucene 7 and earlier used -1 to denote that this information isn't stored by the codec
// or that this field omits term frequencies and positions. It used docFreq as fallback in that case
// when calculating similarities. See LUCENE-8007 for more information.
totalTermFreq = docFreq;
}
}
termStatistics[i] = new TermStatistics(term, docFreq, totalTermFreq);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.dfs;

import com.carrotsearch.hppc.ObjectObjectHashMap;

import org.apache.lucene.search.CollectionStatistics;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;

import java.io.IOException;

public class DfsSearchResultTests extends ESTestCase {

/**
* checks inputs from 6.x that are difficult to simulate in a BWC mixed-cluster test, in particular the case
* where docCount == -1L which does not occur with the codecs that we typically use.
*/
public void test6xSerialization() throws IOException {
Version version = VersionUtils.randomVersionBetween(random(), Version.V_6_8_0, Version.V_6_8_18);
BytesStreamOutput os = new BytesStreamOutput();
os.setVersion(version);
os.writeVInt(1);
String field = randomAlphaOfLength(10);
os.writeString(field);
long maxDoc = randomIntBetween(1, 5);
os.writeVLong(maxDoc);
long docCount = randomBoolean() ? -1 : randomIntBetween(1, (int) maxDoc);
os.writeVLong(DfsSearchResult.addOne(docCount));
long sumTotalTermFreq = randomBoolean() ? -1 : randomIntBetween(20, 30);
os.writeVLong(DfsSearchResult.addOne(sumTotalTermFreq));
long sumDocFreq = sumTotalTermFreq == -1 ? randomIntBetween(20, 30) : randomIntBetween(20, (int) sumTotalTermFreq);
os.writeVLong(DfsSearchResult.addOne(sumDocFreq));

try (StreamInput input = StreamInput.wrap(BytesReference.toBytes(os.bytes()))) {
input.setVersion(version);
ObjectObjectHashMap<String, CollectionStatistics> stats = DfsSearchResult.readFieldStats(input);
assertEquals(stats.size(), 1);
assertNotNull(stats.get(field));
CollectionStatistics cs = stats.get(field);
assertEquals(field, cs.field());
assertEquals(maxDoc, cs.maxDoc());
assertEquals(docCount == -1 ? maxDoc : docCount, cs.docCount());
assertEquals(sumDocFreq, cs.sumDocFreq());
assertEquals(sumTotalTermFreq == -1 ? sumDocFreq : sumTotalTermFreq, cs.sumTotalTermFreq());
}
}
}

0 comments on commit 8082bbb

Please sign in to comment.