diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java index 7adb1e8ae2c3b..2b1f8e9ffaf1f 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesConsumer.java @@ -45,7 +45,7 @@ import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; import static org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat.SORTED_SET; -final class ES87TSDBDocValuesConsumer extends DocValuesConsumer { +class ES87TSDBDocValuesConsumer extends DocValuesConsumer { IndexOutput data, meta; final int maxDoc; @@ -98,14 +98,19 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, long maxOrd) throws IOException { int numDocsWithValue = 0; long numValues = 0; - SortedNumericDocValues values = valuesProducer.getSortedNumeric(field); - for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { - numDocsWithValue++; - final int count = values.docValueCount(); - numValues += count; + if (values instanceof WithDocValuesCount wc) { + numDocsWithValue = wc.numDocsWithValues(); + numValues = wc.totalValuesCount(); + } else if (DocValues.unwrapSingleton(values) instanceof WithDocValuesCount wc) { + numDocsWithValue = wc.numDocsWithValues(); + numValues = wc.totalValuesCount(); + } else { + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + numDocsWithValue++; + numValues += values.docValueCount(); + } } - if (numDocsWithValue == 0) { // meta[-2, 0]: No documents with values meta.writeLong(-2); // docsWithFieldOffset meta.writeLong(0L); // docsWithFieldLength diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java index c5f597f27eb98..79a2edafe4c53 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormat.java @@ -48,7 +48,7 @@ public ES87TSDBDocValuesFormat() { @Override public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { - return new ES87TSDBDocValuesConsumer(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION); + return new ES87TSDBDocValuesMerger(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesMerger.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesMerger.java new file mode 100644 index 0000000000000..9d3a7d7736ae3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesMerger.java @@ -0,0 +1,992 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.codec.tsdb; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.EmptyDocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FilteredTermsEnum; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OrdinalMap; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LongBitSet; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.packed.PackedInts; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +class ES87TSDBDocValuesMerger extends ES87TSDBDocValuesConsumer { + + ES87TSDBDocValuesMerger(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) + throws IOException { + super(state, dataCodec, dataExtension, metaCodec, metaExtension); + } + + @Override + public void merge(MergeState mergeState) throws IOException { + for (DocValuesProducer docValuesProducer : mergeState.docValuesProducers) { + if (docValuesProducer != null) { + docValuesProducer.checkIntegrity(); + } + } + + for (FieldInfo mergeFieldInfo : mergeState.mergeFieldInfos) { + DocValuesType type = mergeFieldInfo.getDocValuesType(); + if (type != DocValuesType.NONE) { + if (type == DocValuesType.NUMERIC) { + mergeNumericField(mergeFieldInfo, mergeState); + } else if (type == DocValuesType.BINARY) { + mergeBinaryField(mergeFieldInfo, mergeState); + } else if (type == DocValuesType.SORTED) { + mergeSortedField(mergeFieldInfo, mergeState); + } else if (type == DocValuesType.SORTED_SET) { + mergeSortedSetField(mergeFieldInfo, mergeState); + } else if (type == DocValuesType.SORTED_NUMERIC) { + mergeSortedNumericField(mergeFieldInfo, mergeState); + } else { + throw new AssertionError("type=" + type); + } + } + } + } + + /** Tracks state of one numeric sub-reader that we are merging */ + private static class NumericDocValuesSub extends DocIDMerger.Sub { + + final NumericDocValues values; + + public NumericDocValuesSub(MergeState.DocMap docMap, NumericDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the numeric docvalues from MergeState. + * + *

The default implementation calls {@link #addNumericField}, passing a DocValuesProducer that + * merges and filters deleted documents on the fly. + */ + public void mergeNumericField(final FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + addNumericField(mergeFieldInfo, new EmptyDocValuesProducer() { + + @Override + public NumericDocValues getNumeric(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong fieldInfo"); + } + + List subs = new ArrayList<>(); + assert mergeState.docMaps.length == mergeState.docValuesProducers.length; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + NumericDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.NUMERIC) { + values = docValuesProducer.getNumeric(readerFieldInfo); + } + } + if (values != null) { + subs.add(new NumericDocValuesSub(mergeState.docMaps[i], values)); + } + } + + return mergeNumericValues(mergeState, fieldInfo, subs, mergeState.needsIndexSort); + } + }); + } + + private static NumericDocValues mergeNumericValues( + MergeState mergeState, + FieldInfo field, + List subs, + boolean indexIsSorted + ) throws IOException { + long cost = 0; + for (NumericDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new NumericDocValuesWithCount() { + private int docID = -1; + private NumericDocValuesSub current; + private int numDocsWithValue = -1; + + private int getNumDocsWithValues() throws IOException { + int numDocs = 0; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer == null) { + continue; + } + final NumericDocValues numeric; + if (field.getDocValuesType() == DocValuesType.NUMERIC) { + numeric = docValuesProducer.getNumeric(field); + } else { + // TODO: fix me + numeric = DocValues.unwrapSingleton(docValuesProducer.getSortedNumeric(field)); + assert numeric != null; + } + if (numeric == null) { + continue; + } + Bits liveDocs = mergeState.liveDocs[i]; + if (liveDocs == null && numeric instanceof WithDocValuesCount wc) { + numDocs += wc.numDocsWithValues(); + } else { + int docID; + while ((docID = numeric.nextDoc()) != NO_MORE_DOCS) { + if (liveDocs == null || liveDocs.get(docID)) { + numDocs++; + } + } + } + } + return numDocs; + } + + @Override + public int numDocsWithValues() throws IOException { + if (numDocsWithValue == -1) { + numDocsWithValue = getNumDocsWithValues(); + } + return numDocsWithValue; + } + + @Override + public long totalValuesCount() throws IOException { + return numDocsWithValues(); + } + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long longValue() throws IOException { + return current.values.longValue(); + } + }; + } + + /** Tracks state of one binary sub-reader that we are merging */ + private static class BinaryDocValuesSub extends DocIDMerger.Sub { + + final BinaryDocValues values; + + public BinaryDocValuesSub(MergeState.DocMap docMap, BinaryDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the binary docvalues from MergeState. + * + *

The default implementation calls {@link #addBinaryField}, passing a DocValuesProducer that + * merges and filters deleted documents on the fly. + */ + public void mergeBinaryField(FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + addBinaryField(mergeFieldInfo, new EmptyDocValuesProducer() { + @Override + public BinaryDocValues getBinary(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong fieldInfo"); + } + + List subs = new ArrayList<>(); + + long cost = 0; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + BinaryDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.BINARY) { + values = docValuesProducer.getBinary(readerFieldInfo); + } + } + if (values != null) { + cost += values.cost(); + subs.add(new BinaryDocValuesSub(mergeState.docMaps[i], values)); + } + } + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort); + final long finalCost = cost; + + return new BinaryDocValues() { + private BinaryDocValuesSub current; + private int docID = -1; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public BytesRef binaryValue() throws IOException { + return current.values.binaryValue(); + } + }; + } + }); + } + + /** Tracks state of one sorted numeric sub-reader that we are merging */ + private static class SortedNumericDocValuesSub extends DocIDMerger.Sub { + + final SortedNumericDocValues values; + + public SortedNumericDocValuesSub(MergeState.DocMap docMap, SortedNumericDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the sorted docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedNumericField}, passing iterables that + * filter deleted documents. + */ + public void mergeSortedNumericField(FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + + addSortedNumericField(mergeFieldInfo, new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + List subs = new ArrayList<>(); + long cost = 0; + boolean allSingletons = true; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + SortedNumericDocValues values = null; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED_NUMERIC) { + values = docValuesProducer.getSortedNumeric(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedNumeric(); + } + cost += values.cost(); + if (allSingletons && DocValues.unwrapSingleton(values) == null) { + allSingletons = false; + } + subs.add(new SortedNumericDocValuesSub(mergeState.docMaps[i], values)); + } + + if (allSingletons) { + // All subs are single-valued. + // We specialize for that case since it makes it easier for codecs to optimize + // for single-valued fields. + List singleValuedSubs = new ArrayList<>(); + for (SortedNumericDocValuesSub sub : subs) { + final NumericDocValues singleValuedValues = DocValues.unwrapSingleton(sub.values); + assert singleValuedValues != null; + singleValuedSubs.add(new NumericDocValuesSub(sub.docMap, singleValuedValues)); + } + return DocValues.singleton(mergeNumericValues(mergeState, fieldInfo, singleValuedSubs, mergeState.needsIndexSort)); + } + + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort); + + return new SortedNumericDocValues() { + + private int docID = -1; + private SortedNumericDocValuesSub currentSub; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + currentSub = docIDMerger.next(); + if (currentSub == null) { + docID = NO_MORE_DOCS; + } else { + docID = currentSub.mappedDocID; + } + + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int docValueCount() { + return currentSub.values.docValueCount(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long nextValue() throws IOException { + return currentSub.values.nextValue(); + } + }; + } + }); + } + + /** + * A merged {@link TermsEnum}. This helps avoid relying on the default terms enum, which calls + * {@link SortedDocValues#lookupOrd(int)} or {@link SortedSetDocValues#lookupOrd(long)} on every + * call to {@link TermsEnum#next()}. + */ + private static class MergedTermsEnum extends TermsEnum { + + private final TermsEnum[] subs; + private final OrdinalMap ordinalMap; + private final long valueCount; + private long ord = -1; + private BytesRef term; + + MergedTermsEnum(OrdinalMap ordinalMap, TermsEnum[] subs) { + this.ordinalMap = ordinalMap; + this.subs = subs; + this.valueCount = ordinalMap.getValueCount(); + } + + @Override + public BytesRef term() throws IOException { + return term; + } + + @Override + public long ord() throws IOException { + return ord; + } + + @Override + public BytesRef next() throws IOException { + if (++ord >= valueCount) { + return null; + } + final int subNum = ordinalMap.getFirstSegmentNumber(ord); + final TermsEnum sub = subs[subNum]; + final long subOrd = ordinalMap.getFirstSegmentOrd(ord); + do { + term = sub.next(); + } while (sub.ord() < subOrd); + assert sub.ord() == subOrd; + return term; + } + + @Override + public AttributeSource attributes() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(long ord) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(BytesRef term, TermState state) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int docFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long totalTermFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TermState termState() throws IOException { + throw new UnsupportedOperationException(); + } + } + + /** Tracks state of one sorted sub-reader that we are merging */ + private static class SortedDocValuesSub extends DocIDMerger.Sub { + + final SortedDocValues values; + final LongValues map; + + public SortedDocValuesSub(MergeState.DocMap docMap, SortedDocValues values, LongValues map) { + super(docMap); + this.values = values; + this.map = map; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the sorted docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedField}, passing an Iterable that merges + * ordinals and values and filters deleted documents . + */ + public void mergeSortedField(FieldInfo fieldInfo, final MergeState mergeState) throws IOException { + List toMerge = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(fieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED) { + values = docValuesProducer.getSorted(fieldInfo); + } + } + if (values == null) { + values = DocValues.emptySorted(); + } + toMerge.add(values); + } + + final int numReaders = toMerge.size(); + final SortedDocValues[] dvs = toMerge.toArray(new SortedDocValues[numReaders]); + + // step 1: iterate thru each sub and mark terms still in use + TermsEnum[] liveTerms = new TermsEnum[dvs.length]; + long[] weights = new long[liveTerms.length]; + for (int sub = 0; sub < numReaders; sub++) { + SortedDocValues dv = dvs[sub]; + Bits liveDocs = mergeState.liveDocs[sub]; + if (liveDocs == null) { + liveTerms[sub] = dv.termsEnum(); + weights[sub] = dv.getValueCount(); + } else { + LongBitSet bitset = new LongBitSet(dv.getValueCount()); + int docID; + while ((docID = dv.nextDoc()) != NO_MORE_DOCS) { + if (liveDocs.get(docID)) { + int ord = dv.ordValue(); + if (ord >= 0) { + bitset.set(ord); + } + } + } + liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset); + weights[sub] = bitset.cardinality(); + } + } + + // step 2: create ordinal map (this conceptually does the "merging") + final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT); + + // step 3: add field + addSortedField(fieldInfo, new EmptyDocValuesProducer() { + @Override + public SortedDocValues getSorted(FieldInfo fieldInfoIn) throws IOException { + if (fieldInfoIn != fieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + + List subs = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(fieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED) { + values = docValuesProducer.getSorted(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySorted(); + } + + subs.add(new SortedDocValuesSub(mergeState.docMaps[i], values, map.getGlobalOrds(i))); + } + + return mergeSortedValues(mergeState, fieldInfo, subs, mergeState.needsIndexSort, map); + } + }); + } + + private static SortedDocValues mergeSortedValues( + MergeState mergeState, + FieldInfo field, + List subs, + boolean indexIsSorted, + OrdinalMap map + ) throws IOException { + long cost = 0; + for (SortedDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new SortedDocValuesWithCount() { + private int docID = -1; + private SortedDocValuesSub current; + private int numDocsWithValue = -1; + + @Override + public int docID() { + return docID; + } + + private int getNumDocsWithValues() throws IOException { + int numDocs = 0; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer == null) { + continue; + } + SortedDocValues sorted = docValuesProducer.getSorted(field); + if (sorted == null) { + continue; + } + Bits liveDocs = mergeState.liveDocs[i]; + if (liveDocs == null && sorted instanceof WithDocValuesCount wc) { + numDocs += wc.numDocsWithValues(); + } else { + int docID; + while ((docID = sorted.nextDoc()) != NO_MORE_DOCS) { + if (liveDocs == null || liveDocs.get(docID)) { + numDocs++; + } + } + } + } + return numDocs; + } + + @Override + public int numDocsWithValues() throws IOException { + if (numDocsWithValue == -1) { + numDocsWithValue = getNumDocsWithValues(); + } + return numDocsWithValue; + } + + @Override + public long totalValuesCount() throws IOException { + return numDocsWithValues(); + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int ordValue() throws IOException { + int subOrd = current.values.ordValue(); + assert subOrd != -1; + return (int) current.map.get(subOrd); + } + + @Override + public int advance(int target) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public int getValueCount() { + return (int) map.getValueCount(); + } + + @Override + public BytesRef lookupOrd(int ord) throws IOException { + int segmentNumber = map.getFirstSegmentNumber(ord); + int segmentOrd = (int) map.getFirstSegmentOrd(ord); + return subs.get(segmentNumber).values.lookupOrd(segmentOrd); + } + + @Override + public TermsEnum termsEnum() throws IOException { + TermsEnum[] termsEnurmSubs = new TermsEnum[subs.size()]; + for (int sub = 0; sub < termsEnurmSubs.length; ++sub) { + termsEnurmSubs[sub] = subs.get(sub).values.termsEnum(); + } + return new MergedTermsEnum(map, termsEnurmSubs); + } + }; + } + + /** Tracks state of one sorted set sub-reader that we are merging */ + private static class SortedSetDocValuesSub extends DocIDMerger.Sub { + + final SortedSetDocValues values; + final LongValues map; + + public SortedSetDocValuesSub(MergeState.DocMap docMap, SortedSetDocValues values, LongValues map) { + super(docMap); + this.values = values; + this.map = map; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + + @Override + public String toString() { + return "SortedSetDocValuesSub(mappedDocID=" + mappedDocID + " values=" + values + ")"; + } + } + + /** + * Merges the sortedset docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedSetField}, passing an Iterable that merges + * ordinals and values and filters deleted documents . + */ + public void mergeSortedSetField(FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + + List toMerge = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedSetDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo fieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (fieldInfo != null && fieldInfo.getDocValuesType() == DocValuesType.SORTED_SET) { + values = docValuesProducer.getSortedSet(fieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedSet(); + } + toMerge.add(values); + } + + // step 1: iterate thru each sub and mark terms still in use + TermsEnum[] liveTerms = new TermsEnum[toMerge.size()]; + long[] weights = new long[liveTerms.length]; + for (int sub = 0; sub < liveTerms.length; sub++) { + SortedSetDocValues dv = toMerge.get(sub); + Bits liveDocs = mergeState.liveDocs[sub]; + if (liveDocs == null) { + liveTerms[sub] = dv.termsEnum(); + weights[sub] = dv.getValueCount(); + } else { + LongBitSet bitset = new LongBitSet(dv.getValueCount()); + int docID; + while ((docID = dv.nextDoc()) != NO_MORE_DOCS) { + if (liveDocs.get(docID)) { + for (int i = 0; i < dv.docValueCount(); i++) { + bitset.set(dv.nextOrd()); + } + } + } + liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset); + weights[sub] = bitset.cardinality(); + } + } + + // step 2: create ordinal map (this conceptually does the "merging") + final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT); + + // step 3: add field + addSortedSetField(mergeFieldInfo, new EmptyDocValuesProducer() { + @Override + public SortedSetDocValues getSortedSet(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + List subs = new ArrayList<>(); + + long cost = 0; + boolean allSingletons = true; + + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedSetDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED_SET) { + values = docValuesProducer.getSortedSet(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedSet(); + } + cost += values.cost(); + if (allSingletons && DocValues.unwrapSingleton(values) == null) { + allSingletons = false; + } + subs.add(new SortedSetDocValuesSub(mergeState.docMaps[i], values, map.getGlobalOrds(i))); + } + + if (allSingletons) { + // All subs are single-valued. + // We specialize for that case since it makes it easier for codecs to optimize + // for single-valued fields. + List singleValuedSubs = new ArrayList<>(); + for (SortedSetDocValuesSub sub : subs) { + final SortedDocValues singleValuedValues = DocValues.unwrapSingleton(sub.values); + assert singleValuedValues != null; + singleValuedSubs.add(new SortedDocValuesSub(sub.docMap, singleValuedValues, sub.map)); + } + return DocValues.singleton(mergeSortedValues(mergeState, fieldInfo, singleValuedSubs, mergeState.needsIndexSort, map)); + } + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort); + + final long finalCost = cost; + + return new SortedSetDocValues() { + private int docID = -1; + private SortedSetDocValuesSub currentSub; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + currentSub = docIDMerger.next(); + if (currentSub == null) { + docID = NO_MORE_DOCS; + } else { + docID = currentSub.mappedDocID; + } + + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long nextOrd() throws IOException { + long subOrd = currentSub.values.nextOrd(); + if (subOrd == NO_MORE_ORDS) { + return NO_MORE_ORDS; + } + return currentSub.map.get(subOrd); + } + + @Override + public int docValueCount() { + return currentSub.values.docValueCount(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public BytesRef lookupOrd(long ord) throws IOException { + int segmentNumber = map.getFirstSegmentNumber(ord); + long segmentOrd = map.getFirstSegmentOrd(ord); + return toMerge.get(segmentNumber).lookupOrd(segmentOrd); + } + + @Override + public long getValueCount() { + return map.getValueCount(); + } + + @Override + public TermsEnum termsEnum() throws IOException { + TermsEnum[] subs = new TermsEnum[toMerge.size()]; + for (int sub = 0; sub < subs.length; ++sub) { + subs[sub] = toMerge.get(sub).termsEnum(); + } + return new MergedTermsEnum(map, subs); + } + }; + } + }); + } + + // TODO: seek-by-ord to nextSetBit + static class BitsFilteredTermsEnum extends FilteredTermsEnum { + final LongBitSet liveTerms; + + BitsFilteredTermsEnum(TermsEnum in, LongBitSet liveTerms) { + super(in, false); // <-- not passing false here wasted about 3 hours of my time!!!!!!!!!!!!! + assert liveTerms != null; + this.liveTerms = liveTerms; + } + + @Override + protected AcceptStatus accept(BytesRef term) throws IOException { + if (liveTerms.get(ord())) { + return AcceptStatus.YES; + } else { + return AcceptStatus.NO; + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java index b6e1bb503045c..290bcff4ad3ab 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesProducer.java @@ -284,7 +284,18 @@ public SortedDocValues getSorted(FieldInfo field) throws IOException { private SortedDocValues getSorted(SortedEntry entry) throws IOException { final NumericDocValues ords = getNumeric(entry.ordsEntry, entry.termsDictEntry.termsDictSize); + long numValues = entry.ordsEntry.numValues; + return new BaseSortedDocValues(entry) { + @Override + public int numDocsWithValues() throws IOException { + return Math.toIntExact(numValues); + } + + @Override + public long totalValuesCount() throws IOException { + return numValues; + } @Override public int ordValue() throws IOException { @@ -318,7 +329,7 @@ public long cost() { }; } - private abstract class BaseSortedDocValues extends SortedDocValues { + private abstract class BaseSortedDocValues extends SortedDocValues implements WithDocValuesCount { final SortedEntry entry; final TermsEnum termsEnum; @@ -858,7 +869,6 @@ private abstract static class NumericValues { private NumericDocValues getNumeric(NumericEntry entry, long maxOrd) throws IOException { if (entry.docsWithFieldOffset == -2) { - // empty return DocValues.emptyNumeric(); } @@ -1082,6 +1092,10 @@ public long longValue() throws IOException { } } + abstract class NumericDocValuesWithCount extends NumericDocValues implements WithDocValuesCount { + + } + private NumericValues getValues(NumericEntry entry, final long maxOrd) throws IOException { assert entry.numValues > 0; final RandomAccessInput indexSlice = data.randomAccessSlice(entry.indexOffset, entry.indexLength); diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/NumericDocValuesWithCount.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/NumericDocValuesWithCount.java new file mode 100644 index 0000000000000..dca672541508c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/NumericDocValuesWithCount.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.codec.tsdb; + +import org.apache.lucene.index.NumericDocValues; + +abstract class NumericDocValuesWithCount extends NumericDocValues implements WithDocValuesCount { + +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/SortedDocValuesWithCount.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/SortedDocValuesWithCount.java new file mode 100644 index 0000000000000..99b45d32f32db --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/SortedDocValuesWithCount.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.codec.tsdb; + +import org.apache.lucene.index.SortedDocValues; + +abstract class SortedDocValuesWithCount extends SortedDocValues implements WithDocValuesCount { + +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/WithDocValuesCount.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/WithDocValuesCount.java new file mode 100644 index 0000000000000..118ea0a00e4e9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/WithDocValuesCount.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.codec.tsdb; + +import java.io.IOException; + +interface WithDocValuesCount { + int numDocsWithValues() throws IOException; + + long totalValuesCount() throws IOException; +}