From 85183414d944291ab7427159e2c57ee6d304a677 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 31 Jul 2022 22:50:12 -0400 Subject: [PATCH] Support multiple fields --- .../index/codec/PerFieldMapperCodec.java | 21 ++- .../ES85BloomFilterPostingsFormat.java | 176 +++++++++++++----- .../ES85BloomFilterPostingsFormatTests.java | 12 +- 3 files changed, 152 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/codec/PerFieldMapperCodec.java b/server/src/main/java/org/elasticsearch/index/codec/PerFieldMapperCodec.java index 30fb7e03806a5..9f39b05d15f0f 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/PerFieldMapperCodec.java +++ b/server/src/main/java/org/elasticsearch/index/codec/PerFieldMapperCodec.java @@ -33,9 +33,9 @@ */ public class PerFieldMapperCodec extends Lucene92Codec { private final MapperService mapperService; - private final BigArrays bigArrays; private final DocValuesFormat docValuesFormat = new Lucene90DocValuesFormat(); + private final ES85BloomFilterPostingsFormat bloomFilterPostingsFormat; static { assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMapperCodec.class) @@ -45,17 +45,18 @@ public class PerFieldMapperCodec extends Lucene92Codec { public PerFieldMapperCodec(Mode compressionMode, MapperService mapperService, BigArrays bigArrays) { super(compressionMode); this.mapperService = mapperService; - this.bigArrays = bigArrays; + this.bloomFilterPostingsFormat = new ES85BloomFilterPostingsFormat(bigArrays, this::internalGetPostingsFormatForField); } @Override public PostingsFormat getPostingsFormatForField(String field) { - if (IdFieldMapper.NAME.equals(field) - && mapperService.mappingLookup().isDataStreamTimestampFieldEnabled() == false - && IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING.get(mapperService.getIndexSettings().getSettings())) { - return new ES85BloomFilterPostingsFormat(super.getPostingsFormatForField(field), bigArrays); - + if (useBloomFilter(field)) { + return bloomFilterPostingsFormat; } + return internalGetPostingsFormatForField(field); + } + + private PostingsFormat internalGetPostingsFormatForField(String field) { final PostingsFormat format = mapperService.mappingLookup().getPostingsFormat(field); if (format != null) { return format; @@ -63,6 +64,12 @@ public PostingsFormat getPostingsFormatForField(String field) { return super.getPostingsFormatForField(field); } + private boolean useBloomFilter(String field) { + return IdFieldMapper.NAME.equals(field) + && mapperService.mappingLookup().isDataStreamTimestampFieldEnabled() == false + && IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING.get(mapperService.getIndexSettings().getSettings()); + } + @Override public KnnVectorsFormat getKnnVectorsFormatForField(String field) { Mapper mapper = mapperService.mappingLookup().getMapper(field); diff --git a/server/src/main/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormat.java index 144ff76727104..50b750823ee3a 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormat.java @@ -24,8 +24,8 @@ import org.apache.lucene.codecs.FieldsProducer; import org.apache.lucene.codecs.NormsProducer; import org.apache.lucene.codecs.PostingsFormat; -import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat; import org.apache.lucene.index.BaseTermsEnum; +import org.apache.lucene.index.FieldInfos; import org.apache.lucene.index.Fields; import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.ImpactsEnum; @@ -56,10 +56,13 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.function.Function; /** * This implementation is forked from Lucene's BloomFilterPosting to support on-disk bloom filters. @@ -74,17 +77,13 @@ public class ES85BloomFilterPostingsFormat extends PostingsFormat { static final String BLOOM_FILTER_META_FILE = "bfm"; static final String BLOOM_FILTER_INDEX_FILE = "bfi"; - private PostingsFormat postingsFormat; + private Function postingsFormats; private BigArrays bigArrays; - public ES85BloomFilterPostingsFormat(PostingsFormat postingsFormat, BigArrays bigArrays) { + public ES85BloomFilterPostingsFormat(BigArrays bigArrays, Function postingsFormats) { this(); - this.postingsFormat = Objects.requireNonNull(postingsFormat); this.bigArrays = Objects.requireNonNull(bigArrays); - if (postingsFormat instanceof PerFieldPostingsFormat) { - assert false : "Can't use PerFieldPostingsFormat with " + BLOOM_CODEC_NAME; - throw new UnsupportedOperationException("Can't use PerFieldPostingsFormat with " + BLOOM_CODEC_NAME); - } + this.postingsFormats = Objects.requireNonNull(postingsFormats); } public ES85BloomFilterPostingsFormat() { @@ -93,11 +92,11 @@ public ES85BloomFilterPostingsFormat() { @Override public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException { - if (postingsFormat == null || bigArrays == null) { + if (postingsFormats == null || bigArrays == null) { assert false : BLOOM_CODEC_NAME + " was initialized with a wrong constructor"; throw new UnsupportedOperationException(BLOOM_CODEC_NAME + " was initialized with a wrong constructor"); } - return new FieldsWriter(postingsFormat, state); + return new FieldsWriter(state); } @Override @@ -107,7 +106,7 @@ public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException @Override public String toString() { - return BLOOM_CODEC_NAME + "(" + postingsFormat.getName() + ")"; + return BLOOM_CODEC_NAME; } private static String metaFile(SegmentInfo si, String segmentSuffix) { @@ -119,30 +118,63 @@ private static String indexFile(SegmentInfo si, String segmentSuffix) { } final class FieldsWriter extends FieldsConsumer { - private final FieldsConsumer termsWriter; private final SegmentWriteState state; private final IndexOutput indexOut; private final List bloomFilters = new ArrayList<>(); + private final List fieldsGroups = new ArrayList<>(); + private final List toCloses = new ArrayList<>(); private boolean closed; - FieldsWriter(PostingsFormat postingsFormat, SegmentWriteState state) throws IOException { + FieldsWriter(SegmentWriteState state) throws IOException { this.state = state; - final List toCloses = new ArrayList<>(2); + boolean success = false; try { - termsWriter = postingsFormat.fieldsConsumer(state); - toCloses.add(termsWriter); indexOut = state.directory.createOutput(indexFile(state.segmentInfo, state.segmentSuffix), state.context); toCloses.add(indexOut); CodecUtil.writeIndexHeader(indexOut, BLOOM_CODEC_NAME, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix); - toCloses.clear(); + success = true; } finally { - IOUtils.closeWhileHandlingException(toCloses); + if (success == false) { + IOUtils.closeWhileHandlingException(toCloses); + } } } @Override public void write(Fields fields, NormsProducer norms) throws IOException { - termsWriter.write(fields, norms); + writePostings(fields, norms); + writeBloomFilters(fields); + } + + private void writePostings(Fields fields, NormsProducer norms) throws IOException { + final Map currentGroups = new HashMap<>(); + for (String field : fields) { + final PostingsFormat postingsFormat = postingsFormats.apply(field); + if (postingsFormat == null) { + throw new IllegalStateException("PostingsFormat for field [" + field + "] wasn't specified"); + } + FieldsGroup group = currentGroups.get(postingsFormat); + if (group == null) { + group = new FieldsGroup(postingsFormat, Integer.toString(fieldsGroups.size()), new ArrayList<>()); + currentGroups.put(postingsFormat, group); + fieldsGroups.add(group); + } + group.fields.add(field); + } + for (FieldsGroup group : currentGroups.values()) { + final FieldsConsumer writer = group.postingsFormat.fieldsConsumer(new SegmentWriteState(state, group.suffix)); + toCloses.add(writer); + final Fields maskedFields = new FilterLeafReader.FilterFields(fields) { + @Override + public Iterator iterator() { + return group.fields.iterator(); + } + }; + writer.write(maskedFields, norms); + } + } + + private void writeBloomFilters(Fields fields) throws IOException { for (String field : fields) { final Terms terms = fields.terms(field); if (terms == null) { @@ -179,16 +211,19 @@ public void close() throws IOException { try { CodecUtil.writeFooter(indexOut); } finally { - IOUtils.close(termsWriter, indexOut); + IOUtils.close(toCloses); } try (IndexOutput metaOut = state.directory.createOutput(metaFile(state.segmentInfo, state.segmentSuffix), state.context)) { CodecUtil.writeIndexHeader(metaOut, BLOOM_CODEC_NAME, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix); - metaOut.writeString(postingsFormat.getName()); + // write postings formats + metaOut.writeVInt(fieldsGroups.size()); + for (FieldsGroup group : fieldsGroups) { + group.writeTo(metaOut, state.fieldInfos); + } + // Write bloom filters metaOut.writeVInt(bloomFilters.size()); - for (BloomFilter field : bloomFilters) { - metaOut.writeVInt(state.fieldInfos.fieldInfo(field.field).number); - metaOut.writeVLong(field.startFilePointer); - metaOut.writeVInt(field.bloomFilterSize); + for (BloomFilter bloomFilter : bloomFilters) { + bloomFilter.writeTo(metaOut, state.fieldInfos); } CodecUtil.writeFooter(metaOut); } @@ -196,16 +231,51 @@ public void close() throws IOException { } private record BloomFilter(String field, long startFilePointer, int bloomFilterSize) { + void writeTo(IndexOutput out, FieldInfos fieldInfos) throws IOException { + out.writeVInt(fieldInfos.fieldInfo(field).number); + out.writeVLong(startFilePointer); + out.writeVInt(bloomFilterSize); + } + + static BloomFilter readFrom(IndexInput in, FieldInfos fieldInfos) throws IOException { + final String fieldName = fieldInfos.fieldInfo(in.readVInt()).name; + final long startFilePointer = in.readVLong(); + final int bloomFilterSize = in.readVInt(); + return new BloomFilter(fieldName, startFilePointer, bloomFilterSize); + } + } + + private record FieldsGroup(PostingsFormat postingsFormat, String suffix, List fields) { + void writeTo(IndexOutput out, FieldInfos fieldInfos) throws IOException { + out.writeString(postingsFormat.getName()); + out.writeString(suffix); + out.writeVInt(fields.size()); + for (String field : fields) { + out.writeVInt(fieldInfos.fieldInfo(field).number); + + } + } + static FieldsGroup readFrom(IndexInput in, FieldInfos fieldInfos) throws IOException { + final PostingsFormat postingsFormat = forName(in.readString()); + final String suffix = in.readString(); + final int numFields = in.readVInt(); + final List fields = new ArrayList<>(); + for (int i = 0; i < numFields; i++) { + fields.add(fieldInfos.fieldInfo(in.readVInt()).name); + } + return new FieldsGroup(postingsFormat, suffix, fields); + } } static final class FieldsReader extends FieldsProducer { - private final FieldsProducer termsReader; private final Map bloomFilters; + private final List toCloses = new ArrayList<>(); + private final Map readerMap = new HashMap<>(); private final IndexInput indexIn; FieldsReader(SegmentReadState state) throws IOException { - final List toCloses = new ArrayList<>(2); + boolean success = false; try ( ChecksumIndexInput metaIn = state.directory.openChecksumInput( metaFile(state.segmentInfo, state.segmentSuffix), @@ -220,21 +290,26 @@ static final class FieldsReader extends FieldsProducer { state.segmentInfo.getId(), state.segmentSuffix ); - final PostingsFormat postingsFormat = forName(metaIn.readString()); + // read postings formats + final int numFieldsGroups = metaIn.readVInt(); + for (int i = 0; i < numFieldsGroups; i++) { + final FieldsGroup group = FieldsGroup.readFrom(metaIn, state.fieldInfos); + final FieldsProducer reader = group.postingsFormat.fieldsProducer(new SegmentReadState(state, group.suffix)); + toCloses.add(reader); + for (String field : group.fields) { + readerMap.put(field, reader); + } + } + // read bloom filters final int numBloomFilters = metaIn.readVInt(); bloomFilters = new HashMap<>(numBloomFilters); for (int i = 0; i < numBloomFilters; i++) { - final int fieldNumber = metaIn.readVInt(); - final long startFilePointer = metaIn.readVLong(); - final int bloomFilterSize = metaIn.readVInt(); - assert bloomFilterSize == bloomFilterSize(state.segmentInfo.maxDoc()) - : "bloom_filter_size=" + bloomFilterSize + ", max_docs=" + state.segmentInfo.maxDoc(); - final String fieldName = state.fieldInfos.fieldInfo(fieldNumber).name; - bloomFilters.put(fieldName, new BloomFilter(fieldName, startFilePointer, bloomFilterSize)); + final BloomFilter bloomFilter = BloomFilter.readFrom(metaIn, state.fieldInfos); + assert bloomFilter.bloomFilterSize == bloomFilterSize(state.segmentInfo.maxDoc()) + : "bloom_filter=" + bloomFilter + ", max_docs=" + state.segmentInfo.maxDoc(); + bloomFilters.put(bloomFilter.field, bloomFilter); } CodecUtil.checkFooter(metaIn); - termsReader = postingsFormat.fieldsProducer(state); - toCloses.add(this.termsReader); indexIn = state.directory.openInput(indexFile(state.segmentInfo, state.segmentSuffix), state.context); toCloses.add(indexIn); CodecUtil.checkIndexHeader( @@ -246,25 +321,31 @@ static final class FieldsReader extends FieldsProducer { state.segmentSuffix ); CodecUtil.retrieveChecksum(indexIn); - toCloses.clear(); + success = true; } finally { - IOUtils.closeWhileHandlingException(toCloses); + if (success == false) { + IOUtils.closeWhileHandlingException(toCloses); + } } } @Override public Iterator iterator() { - return termsReader.iterator(); + return readerMap.keySet().iterator(); } @Override public void close() throws IOException { - IOUtils.close(termsReader, indexIn); + IOUtils.close(toCloses); } @Override public Terms terms(String field) throws IOException { - final Terms terms = termsReader.terms(field); + final FieldsProducer reader = readerMap.get(field); + if (reader == null) { + return null; + } + final Terms terms = reader.terms(field); if (terms == null) { return null; } @@ -282,16 +363,21 @@ public Terms terms(String field) throws IOException { @Override public int size() { - return termsReader.size(); + return readerMap.size(); } @Override public void checkIntegrity() throws IOException { // already fully checked the meta file; let's fully checked the index file. CodecUtil.checksumEntireFile(indexIn); - termsReader.checkIntegrity(); + // multiple fields can share the same reader + final Set seenReaders = new HashSet<>(); + for (FieldsProducer reader : readerMap.values()) { + if (seenReaders.add(reader)) { + reader.checkIntegrity(); + } + } } - } private static class BloomFilterTerms extends FilterLeafReader.FilterTerms { diff --git a/server/src/test/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormatTests.java index 2496140e27dbc..6c3af18800741 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormatTests.java @@ -34,11 +34,13 @@ public class ES85BloomFilterPostingsFormatTests extends BasePostingsFormatTestCa @Override protected Codec getCodec() { - PostingsFormat postingsFormat = Codec.getDefault().postingsFormat(); - if (postingsFormat instanceof PerFieldPostingsFormat) { - postingsFormat = TestUtil.getDefaultPostingsFormat(); - } - return TestUtil.alwaysPostingsFormat(new ES85BloomFilterPostingsFormat(postingsFormat, BigArrays.NON_RECYCLING_INSTANCE)); + return TestUtil.alwaysPostingsFormat(new ES85BloomFilterPostingsFormat(BigArrays.NON_RECYCLING_INSTANCE, field -> { + PostingsFormat postingsFormat = Codec.getDefault().postingsFormat(); + if (postingsFormat instanceof PerFieldPostingsFormat) { + postingsFormat = TestUtil.getDefaultPostingsFormat(); + } + return postingsFormat; + })); } public void testBloomFilterSize() {