Skip to content

Commit

Permalink
Support multiple fields
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Aug 1, 2022
1 parent cfc05d6 commit 8518341
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
*/
public class PerFieldMapperCodec extends Lucene92Codec {
private final MapperService mapperService;
private final BigArrays bigArrays;

private final DocValuesFormat docValuesFormat = new Lucene90DocValuesFormat();
private final ES85BloomFilterPostingsFormat bloomFilterPostingsFormat;

static {
assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMapperCodec.class)
Expand All @@ -45,24 +45,31 @@ public class PerFieldMapperCodec extends Lucene92Codec {
public PerFieldMapperCodec(Mode compressionMode, MapperService mapperService, BigArrays bigArrays) {
super(compressionMode);
this.mapperService = mapperService;
this.bigArrays = bigArrays;
this.bloomFilterPostingsFormat = new ES85BloomFilterPostingsFormat(bigArrays, this::internalGetPostingsFormatForField);
}

@Override
public PostingsFormat getPostingsFormatForField(String field) {
if (IdFieldMapper.NAME.equals(field)
&& mapperService.mappingLookup().isDataStreamTimestampFieldEnabled() == false
&& IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING.get(mapperService.getIndexSettings().getSettings())) {
return new ES85BloomFilterPostingsFormat(super.getPostingsFormatForField(field), bigArrays);

if (useBloomFilter(field)) {
return bloomFilterPostingsFormat;
}
return internalGetPostingsFormatForField(field);
}

private PostingsFormat internalGetPostingsFormatForField(String field) {
final PostingsFormat format = mapperService.mappingLookup().getPostingsFormat(field);
if (format != null) {
return format;
}
return super.getPostingsFormatForField(field);
}

private boolean useBloomFilter(String field) {
return IdFieldMapper.NAME.equals(field)
&& mapperService.mappingLookup().isDataStreamTimestampFieldEnabled() == false
&& IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING.get(mapperService.getIndexSettings().getSettings());
}

@Override
public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
Mapper mapper = mapperService.mappingLookup().getMapper(field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
import org.apache.lucene.index.BaseTermsEnum;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.ImpactsEnum;
Expand Down Expand Up @@ -56,10 +56,13 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;

/**
* This implementation is forked from Lucene's BloomFilterPosting to support on-disk bloom filters.
Expand All @@ -74,17 +77,13 @@ public class ES85BloomFilterPostingsFormat extends PostingsFormat {
static final String BLOOM_FILTER_META_FILE = "bfm";
static final String BLOOM_FILTER_INDEX_FILE = "bfi";

private PostingsFormat postingsFormat;
private Function<String, PostingsFormat> postingsFormats;
private BigArrays bigArrays;

public ES85BloomFilterPostingsFormat(PostingsFormat postingsFormat, BigArrays bigArrays) {
public ES85BloomFilterPostingsFormat(BigArrays bigArrays, Function<String, PostingsFormat> postingsFormats) {
this();
this.postingsFormat = Objects.requireNonNull(postingsFormat);
this.bigArrays = Objects.requireNonNull(bigArrays);
if (postingsFormat instanceof PerFieldPostingsFormat) {
assert false : "Can't use PerFieldPostingsFormat with " + BLOOM_CODEC_NAME;
throw new UnsupportedOperationException("Can't use PerFieldPostingsFormat with " + BLOOM_CODEC_NAME);
}
this.postingsFormats = Objects.requireNonNull(postingsFormats);
}

public ES85BloomFilterPostingsFormat() {
Expand All @@ -93,11 +92,11 @@ public ES85BloomFilterPostingsFormat() {

@Override
public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
if (postingsFormat == null || bigArrays == null) {
if (postingsFormats == null || bigArrays == null) {
assert false : BLOOM_CODEC_NAME + " was initialized with a wrong constructor";
throw new UnsupportedOperationException(BLOOM_CODEC_NAME + " was initialized with a wrong constructor");
}
return new FieldsWriter(postingsFormat, state);
return new FieldsWriter(state);
}

@Override
Expand All @@ -107,7 +106,7 @@ public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException

@Override
public String toString() {
return BLOOM_CODEC_NAME + "(" + postingsFormat.getName() + ")";
return BLOOM_CODEC_NAME;
}

private static String metaFile(SegmentInfo si, String segmentSuffix) {
Expand All @@ -119,30 +118,63 @@ private static String indexFile(SegmentInfo si, String segmentSuffix) {
}

final class FieldsWriter extends FieldsConsumer {
private final FieldsConsumer termsWriter;
private final SegmentWriteState state;
private final IndexOutput indexOut;
private final List<BloomFilter> bloomFilters = new ArrayList<>();
private final List<FieldsGroup> fieldsGroups = new ArrayList<>();
private final List<Closeable> toCloses = new ArrayList<>();
private boolean closed;

FieldsWriter(PostingsFormat postingsFormat, SegmentWriteState state) throws IOException {
FieldsWriter(SegmentWriteState state) throws IOException {
this.state = state;
final List<Closeable> toCloses = new ArrayList<>(2);
boolean success = false;
try {
termsWriter = postingsFormat.fieldsConsumer(state);
toCloses.add(termsWriter);
indexOut = state.directory.createOutput(indexFile(state.segmentInfo, state.segmentSuffix), state.context);
toCloses.add(indexOut);
CodecUtil.writeIndexHeader(indexOut, BLOOM_CODEC_NAME, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
toCloses.clear();
success = true;
} finally {
IOUtils.closeWhileHandlingException(toCloses);
if (success == false) {
IOUtils.closeWhileHandlingException(toCloses);
}
}
}

@Override
public void write(Fields fields, NormsProducer norms) throws IOException {
termsWriter.write(fields, norms);
writePostings(fields, norms);
writeBloomFilters(fields);
}

private void writePostings(Fields fields, NormsProducer norms) throws IOException {
final Map<PostingsFormat, FieldsGroup> currentGroups = new HashMap<>();
for (String field : fields) {
final PostingsFormat postingsFormat = postingsFormats.apply(field);
if (postingsFormat == null) {
throw new IllegalStateException("PostingsFormat for field [" + field + "] wasn't specified");
}
FieldsGroup group = currentGroups.get(postingsFormat);
if (group == null) {
group = new FieldsGroup(postingsFormat, Integer.toString(fieldsGroups.size()), new ArrayList<>());
currentGroups.put(postingsFormat, group);
fieldsGroups.add(group);
}
group.fields.add(field);
}
for (FieldsGroup group : currentGroups.values()) {
final FieldsConsumer writer = group.postingsFormat.fieldsConsumer(new SegmentWriteState(state, group.suffix));
toCloses.add(writer);
final Fields maskedFields = new FilterLeafReader.FilterFields(fields) {
@Override
public Iterator<String> iterator() {
return group.fields.iterator();
}
};
writer.write(maskedFields, norms);
}
}

private void writeBloomFilters(Fields fields) throws IOException {
for (String field : fields) {
final Terms terms = fields.terms(field);
if (terms == null) {
Expand Down Expand Up @@ -179,33 +211,71 @@ public void close() throws IOException {
try {
CodecUtil.writeFooter(indexOut);
} finally {
IOUtils.close(termsWriter, indexOut);
IOUtils.close(toCloses);
}
try (IndexOutput metaOut = state.directory.createOutput(metaFile(state.segmentInfo, state.segmentSuffix), state.context)) {
CodecUtil.writeIndexHeader(metaOut, BLOOM_CODEC_NAME, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
metaOut.writeString(postingsFormat.getName());
// write postings formats
metaOut.writeVInt(fieldsGroups.size());
for (FieldsGroup group : fieldsGroups) {
group.writeTo(metaOut, state.fieldInfos);
}
// Write bloom filters
metaOut.writeVInt(bloomFilters.size());
for (BloomFilter field : bloomFilters) {
metaOut.writeVInt(state.fieldInfos.fieldInfo(field.field).number);
metaOut.writeVLong(field.startFilePointer);
metaOut.writeVInt(field.bloomFilterSize);
for (BloomFilter bloomFilter : bloomFilters) {
bloomFilter.writeTo(metaOut, state.fieldInfos);
}
CodecUtil.writeFooter(metaOut);
}
}
}

private record BloomFilter(String field, long startFilePointer, int bloomFilterSize) {
void writeTo(IndexOutput out, FieldInfos fieldInfos) throws IOException {
out.writeVInt(fieldInfos.fieldInfo(field).number);
out.writeVLong(startFilePointer);
out.writeVInt(bloomFilterSize);
}

static BloomFilter readFrom(IndexInput in, FieldInfos fieldInfos) throws IOException {
final String fieldName = fieldInfos.fieldInfo(in.readVInt()).name;
final long startFilePointer = in.readVLong();
final int bloomFilterSize = in.readVInt();
return new BloomFilter(fieldName, startFilePointer, bloomFilterSize);
}
}

private record FieldsGroup(PostingsFormat postingsFormat, String suffix, List<String> fields) {
void writeTo(IndexOutput out, FieldInfos fieldInfos) throws IOException {
out.writeString(postingsFormat.getName());
out.writeString(suffix);
out.writeVInt(fields.size());
for (String field : fields) {
out.writeVInt(fieldInfos.fieldInfo(field).number);

}
}

static FieldsGroup readFrom(IndexInput in, FieldInfos fieldInfos) throws IOException {
final PostingsFormat postingsFormat = forName(in.readString());
final String suffix = in.readString();
final int numFields = in.readVInt();
final List<String> fields = new ArrayList<>();
for (int i = 0; i < numFields; i++) {
fields.add(fieldInfos.fieldInfo(in.readVInt()).name);
}
return new FieldsGroup(postingsFormat, suffix, fields);
}
}

static final class FieldsReader extends FieldsProducer {
private final FieldsProducer termsReader;
private final Map<String, BloomFilter> bloomFilters;
private final List<Closeable> toCloses = new ArrayList<>();
private final Map<String, FieldsProducer> readerMap = new HashMap<>();
private final IndexInput indexIn;

FieldsReader(SegmentReadState state) throws IOException {
final List<Closeable> toCloses = new ArrayList<>(2);
boolean success = false;
try (
ChecksumIndexInput metaIn = state.directory.openChecksumInput(
metaFile(state.segmentInfo, state.segmentSuffix),
Expand All @@ -220,21 +290,26 @@ static final class FieldsReader extends FieldsProducer {
state.segmentInfo.getId(),
state.segmentSuffix
);
final PostingsFormat postingsFormat = forName(metaIn.readString());
// read postings formats
final int numFieldsGroups = metaIn.readVInt();
for (int i = 0; i < numFieldsGroups; i++) {
final FieldsGroup group = FieldsGroup.readFrom(metaIn, state.fieldInfos);
final FieldsProducer reader = group.postingsFormat.fieldsProducer(new SegmentReadState(state, group.suffix));
toCloses.add(reader);
for (String field : group.fields) {
readerMap.put(field, reader);
}
}
// read bloom filters
final int numBloomFilters = metaIn.readVInt();
bloomFilters = new HashMap<>(numBloomFilters);
for (int i = 0; i < numBloomFilters; i++) {
final int fieldNumber = metaIn.readVInt();
final long startFilePointer = metaIn.readVLong();
final int bloomFilterSize = metaIn.readVInt();
assert bloomFilterSize == bloomFilterSize(state.segmentInfo.maxDoc())
: "bloom_filter_size=" + bloomFilterSize + ", max_docs=" + state.segmentInfo.maxDoc();
final String fieldName = state.fieldInfos.fieldInfo(fieldNumber).name;
bloomFilters.put(fieldName, new BloomFilter(fieldName, startFilePointer, bloomFilterSize));
final BloomFilter bloomFilter = BloomFilter.readFrom(metaIn, state.fieldInfos);
assert bloomFilter.bloomFilterSize == bloomFilterSize(state.segmentInfo.maxDoc())
: "bloom_filter=" + bloomFilter + ", max_docs=" + state.segmentInfo.maxDoc();
bloomFilters.put(bloomFilter.field, bloomFilter);
}
CodecUtil.checkFooter(metaIn);
termsReader = postingsFormat.fieldsProducer(state);
toCloses.add(this.termsReader);
indexIn = state.directory.openInput(indexFile(state.segmentInfo, state.segmentSuffix), state.context);
toCloses.add(indexIn);
CodecUtil.checkIndexHeader(
Expand All @@ -246,25 +321,31 @@ static final class FieldsReader extends FieldsProducer {
state.segmentSuffix
);
CodecUtil.retrieveChecksum(indexIn);
toCloses.clear();
success = true;
} finally {
IOUtils.closeWhileHandlingException(toCloses);
if (success == false) {
IOUtils.closeWhileHandlingException(toCloses);
}
}
}

@Override
public Iterator<String> iterator() {
return termsReader.iterator();
return readerMap.keySet().iterator();
}

@Override
public void close() throws IOException {
IOUtils.close(termsReader, indexIn);
IOUtils.close(toCloses);
}

@Override
public Terms terms(String field) throws IOException {
final Terms terms = termsReader.terms(field);
final FieldsProducer reader = readerMap.get(field);
if (reader == null) {
return null;
}
final Terms terms = reader.terms(field);
if (terms == null) {
return null;
}
Expand All @@ -282,16 +363,21 @@ public Terms terms(String field) throws IOException {

@Override
public int size() {
return termsReader.size();
return readerMap.size();
}

@Override
public void checkIntegrity() throws IOException {
// already fully checked the meta file; let's fully checked the index file.
CodecUtil.checksumEntireFile(indexIn);
termsReader.checkIntegrity();
// multiple fields can share the same reader
final Set<FieldsProducer> seenReaders = new HashSet<>();
for (FieldsProducer reader : readerMap.values()) {
if (seenReaders.add(reader)) {
reader.checkIntegrity();
}
}
}

}

private static class BloomFilterTerms extends FilterLeafReader.FilterTerms {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ public class ES85BloomFilterPostingsFormatTests extends BasePostingsFormatTestCa

@Override
protected Codec getCodec() {
PostingsFormat postingsFormat = Codec.getDefault().postingsFormat();
if (postingsFormat instanceof PerFieldPostingsFormat) {
postingsFormat = TestUtil.getDefaultPostingsFormat();
}
return TestUtil.alwaysPostingsFormat(new ES85BloomFilterPostingsFormat(postingsFormat, BigArrays.NON_RECYCLING_INSTANCE));
return TestUtil.alwaysPostingsFormat(new ES85BloomFilterPostingsFormat(BigArrays.NON_RECYCLING_INSTANCE, field -> {
PostingsFormat postingsFormat = Codec.getDefault().postingsFormat();
if (postingsFormat instanceof PerFieldPostingsFormat) {
postingsFormat = TestUtil.getDefaultPostingsFormat();
}
return postingsFormat;
}));
}

public void testBloomFilterSize() {
Expand Down

0 comments on commit 8518341

Please sign in to comment.