Skip to content

Commit

Permalink
Undo changes in elastic#114903 and alter ValuesSourceReaderOperator t…
Browse files Browse the repository at this point in the history
…o fully support synthetic source.
  • Loading branch information
martijnvg committed Oct 21, 2024
1 parent 8d9a0a4 commit fae9ffb
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,7 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
SourceValueFetcher fetcher = SourceValueFetcher.toString(blContext.sourcePaths(name()));
// MatchOnlyText never has norms, so we have to use the field names field
BlockSourceReader.LeafIteratorLookup lookup = BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name());
var sourceMode = blContext.indexSettings().getIndexMappingSourceMode();
return new BlockSourceReader.BytesRefsBlockLoader(fetcher, lookup, sourceMode);
return new BlockSourceReader.BytesRefsBlockLoader(fetcher, lookup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
BlockSourceReader.LeafIteratorLookup lookup = isStored() || isIndexed()
? BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
: BlockSourceReader.lookupMatchingAll();
var sourceMode = blContext.indexSettings().getIndexMappingSourceMode();
return new BlockSourceReader.DoublesBlockLoader(valueFetcher, lookup, sourceMode);
return new BlockSourceReader.DoublesBlockLoader(valueFetcher, lookup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
protected BlockLoader blockLoaderFromSource(BlockLoaderContext blContext) {
ValueFetcher fetcher = valueFetcher(blContext.sourcePaths(name()), nullValue, GeometryFormatterFactory.WKB);
// TODO consider optimization using BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
var sourceMode = blContext.indexSettings().getIndexMappingSourceMode();
return new BlockSourceReader.GeometriesBlockLoader(fetcher, BlockSourceReader.lookupMatchingAll(), sourceMode);
return new BlockSourceReader.GeometriesBlockLoader(fetcher, BlockSourceReader.lookupMatchingAll());
}

protected abstract Object nullValueAsSource(T nullValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
* Loads values from {@code _source}. This whole process is very slow and cast-tastic,
* so it doesn't really try to avoid megamorphic invocations. It's just going to be
* slow.
*/
public abstract class BlockSourceReader implements BlockLoader.RowStrideReader {

// _ignored_source is needed when source mode is synthetic.
static final StoredFieldsSpec NEEDS_SOURCE_AND_IGNORED_SOURCE = new StoredFieldsSpec(
true,
false,
Set.of(IgnoredSourceFieldMapper.NAME)
);

private final ValueFetcher fetcher;
private final List<Object> ignoredValues = new ArrayList<>();
private final DocIdSetIterator iter;
Expand Down Expand Up @@ -100,25 +91,10 @@ public interface LeafIteratorLookup {
private abstract static class SourceBlockLoader implements BlockLoader {
protected final ValueFetcher fetcher;
private final LeafIteratorLookup lookup;
private final StoredFieldsSpec storedFieldsSpec;

private SourceBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, StoredFieldsSpec storedFieldsSpec) {
private SourceBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
this.fetcher = fetcher;
this.lookup = lookup;
this.storedFieldsSpec = storedFieldsSpec;
}

private SourceBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, SourceFieldMapper.Mode sourceMode) {
this(
fetcher,
lookup,
sourceMode == SourceFieldMapper.Mode.SYNTHETIC ? NEEDS_SOURCE_AND_IGNORED_SOURCE : StoredFieldsSpec.NEEDS_SOURCE
);
}

// Assumes synthetic source only
private SourceBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, Set<String> requiredStoredFields) {
this(fetcher, lookup, NEEDS_SOURCE_AND_IGNORED_SOURCE.merge(new StoredFieldsSpec(true, false, requiredStoredFields)));
}

@Override
Expand All @@ -128,7 +104,7 @@ public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context)

@Override
public final StoredFieldsSpec rowStrideStoredFieldSpec() {
return storedFieldsSpec;
return StoredFieldsSpec.NEEDS_SOURCE;
}

@Override
Expand Down Expand Up @@ -164,8 +140,8 @@ public final String toString() {
* Load {@code boolean}s from {@code _source}.
*/
public static class BooleansBlockLoader extends SourceBlockLoader {
public BooleansBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, SourceFieldMapper.Mode sourceMode) {
super(fetcher, lookup, sourceMode);
public BooleansBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
super(fetcher, lookup);
}

@Override
Expand Down Expand Up @@ -204,12 +180,8 @@ public String toString() {
* Load {@link BytesRef}s from {@code _source}.
*/
public static class BytesRefsBlockLoader extends SourceBlockLoader {
public BytesRefsBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, SourceFieldMapper.Mode sourceMode) {
super(fetcher, lookup, sourceMode);
}

public BytesRefsBlockLoader(SourceValueFetcher fetcher, LeafIteratorLookup lookup, Set<String> requiredStoredFields) {
super(fetcher, lookup, requiredStoredFields);
public BytesRefsBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
super(fetcher, lookup);
}

@Override
Expand All @@ -219,7 +191,7 @@ public final Builder builder(BlockFactory factory, int expectedCount) {

@Override
protected RowStrideReader rowStrideReader(LeafReaderContext context, DocIdSetIterator iter) throws IOException {
return new BytesRefs(fetcher, iter, null);
return new BytesRefs(fetcher, iter);
}

@Override
Expand All @@ -229,8 +201,8 @@ protected String name() {
}

public static class GeometriesBlockLoader extends SourceBlockLoader {
public GeometriesBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, SourceFieldMapper.Mode sourceMode) {
super(fetcher, lookup, sourceMode);
public GeometriesBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
super(fetcher, lookup);
}

@Override
Expand All @@ -240,7 +212,7 @@ public final Builder builder(BlockFactory factory, int expectedCount) {

@Override
protected RowStrideReader rowStrideReader(LeafReaderContext context, DocIdSetIterator iter) {
return new Geometries(fetcher, iter, null);
return new Geometries(fetcher, iter);
}

@Override
Expand All @@ -252,7 +224,7 @@ protected String name() {
private static class BytesRefs extends BlockSourceReader {
private final BytesRef scratch = new BytesRef();

BytesRefs(ValueFetcher fetcher, DocIdSetIterator iter, SourceFieldMapper.Mode sourceMode) {
BytesRefs(ValueFetcher fetcher, DocIdSetIterator iter) {
super(fetcher, iter);
}

Expand All @@ -269,7 +241,7 @@ public String toString() {

private static class Geometries extends BlockSourceReader {

Geometries(ValueFetcher fetcher, DocIdSetIterator iter, SourceFieldMapper.Mode sourceMode) {
Geometries(ValueFetcher fetcher, DocIdSetIterator iter) {
super(fetcher, iter);
}

Expand All @@ -292,8 +264,8 @@ public String toString() {
* Load {@code double}s from {@code _source}.
*/
public static class DoublesBlockLoader extends SourceBlockLoader {
public DoublesBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, SourceFieldMapper.Mode sourceMode) {
super(fetcher, lookup, sourceMode);
public DoublesBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
super(fetcher, lookup);
}

@Override
Expand Down Expand Up @@ -332,8 +304,8 @@ public String toString() {
* Load {@code int}s from {@code _source}.
*/
public static class IntsBlockLoader extends SourceBlockLoader {
public IntsBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, SourceFieldMapper.Mode sourceMode) {
super(fetcher, lookup, sourceMode);
public IntsBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
super(fetcher, lookup);
}

@Override
Expand Down Expand Up @@ -372,8 +344,8 @@ public String toString() {
* Load {@code long}s from {@code _source}.
*/
public static class LongsBlockLoader extends SourceBlockLoader {
public LongsBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup, SourceFieldMapper.Mode sourceMode) {
super(fetcher, lookup, sourceMode);
public LongsBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
super(fetcher, lookup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
BlockSourceReader.LeafIteratorLookup lookup = isIndexed() || isStored()
? BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
: BlockSourceReader.lookupMatchingAll();
return new BlockSourceReader.BooleansBlockLoader(fetcher, lookup, blContext.indexSettings().getIndexMappingSourceMode());
return new BlockSourceReader.BooleansBlockLoader(fetcher, lookup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,7 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
BlockSourceReader.LeafIteratorLookup lookup = isStored() || isIndexed()
? BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
: BlockSourceReader.lookupMatchingAll();
var sourceMode = blContext.indexSettings().getIndexMappingSourceMode();
return new BlockSourceReader.LongsBlockLoader(sourceValueFetcher(blContext.sourcePaths(name())), lookup, sourceMode);
return new BlockSourceReader.LongsBlockLoader(sourceValueFetcher(blContext.sourcePaths(name())), lookup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,33 +635,8 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (isStored()) {
return new BlockStoredFieldsReader.BytesFromBytesRefsBlockLoader(name());
}
String parentField = blContext.parentField(name());
String field = parentField != null ? parentField : name();
SourceValueFetcher fetcher = sourceValueFetcher(blContext.sourcePaths(field));
return sourceBlockLoader(blContext, fetcher, sourceBlockLoaderLookup(blContext));
}

public BlockLoader sourceBlockLoader(
BlockLoaderContext blContext,
SourceValueFetcher fetcher,
BlockSourceReader.LeafIteratorLookup leafIteratorLookup
) {
var sourceMode = blContext.indexSettings().getIndexMappingSourceMode();
if (sourceMode == SourceFieldMapper.Mode.SYNTHETIC && ignoreAbove() != Integer.MAX_VALUE) {
// Can't just read the original field, because only when ignore_above threshold is exceeded,
// then this stored fields gets added. In other cases this field does not exist. But we do need to include the field name:
String originalName = name() + "._original";
Set<String> requiredStoredFields;
if (isStored()) {
// in case field is stored and ignore_above is configured:
requiredStoredFields = Set.of(name(), originalName);
} else {
requiredStoredFields = Set.of(originalName);
}
return new BlockSourceReader.BytesRefsBlockLoader(fetcher, leafIteratorLookup, requiredStoredFields);
} else {
return new BlockSourceReader.BytesRefsBlockLoader(fetcher, leafIteratorLookup, sourceMode);
}
SourceValueFetcher fetcher = sourceValueFetcher(blContext.sourcePaths(name()));
return new BlockSourceReader.BytesRefsBlockLoader(fetcher, sourceBlockLoaderLookup(blContext));
}

private BlockSourceReader.LeafIteratorLookup sourceBlockLoaderLookup(BlockLoaderContext blContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,9 @@ BlockLoader blockLoaderFromDocValues(String fieldName) {
@Override
BlockLoader blockLoaderFromSource(
SourceValueFetcher sourceValueFetcher,
BlockSourceReader.LeafIteratorLookup lookup,
SourceFieldMapper.Mode sourceMode
BlockSourceReader.LeafIteratorLookup lookup
) {
return new BlockSourceReader.DoublesBlockLoader(sourceValueFetcher, lookup, sourceMode);
return new BlockSourceReader.DoublesBlockLoader(sourceValueFetcher, lookup);
}
},
FLOAT("float", NumericType.FLOAT) {
Expand Down Expand Up @@ -651,10 +650,9 @@ BlockLoader blockLoaderFromDocValues(String fieldName) {
@Override
BlockLoader blockLoaderFromSource(
SourceValueFetcher sourceValueFetcher,
BlockSourceReader.LeafIteratorLookup lookup,
SourceFieldMapper.Mode sourceMode
BlockSourceReader.LeafIteratorLookup lookup
) {
return new BlockSourceReader.DoublesBlockLoader(sourceValueFetcher, lookup, sourceMode);
return new BlockSourceReader.DoublesBlockLoader(sourceValueFetcher, lookup);
}
},
DOUBLE("double", NumericType.DOUBLE) {
Expand Down Expand Up @@ -805,10 +803,9 @@ BlockLoader blockLoaderFromDocValues(String fieldName) {
@Override
BlockLoader blockLoaderFromSource(
SourceValueFetcher sourceValueFetcher,
BlockSourceReader.LeafIteratorLookup lookup,
SourceFieldMapper.Mode sourceMode
BlockSourceReader.LeafIteratorLookup lookup
) {
return new BlockSourceReader.DoublesBlockLoader(sourceValueFetcher, lookup, sourceMode);
return new BlockSourceReader.DoublesBlockLoader(sourceValueFetcher, lookup);
}
},
BYTE("byte", NumericType.BYTE) {
Expand Down Expand Up @@ -922,10 +919,9 @@ BlockLoader blockLoaderFromDocValues(String fieldName) {
@Override
BlockLoader blockLoaderFromSource(
SourceValueFetcher sourceValueFetcher,
BlockSourceReader.LeafIteratorLookup lookup,
SourceFieldMapper.Mode sourceMode
BlockSourceReader.LeafIteratorLookup lookup
) {
return new BlockSourceReader.IntsBlockLoader(sourceValueFetcher, lookup, sourceMode);
return new BlockSourceReader.IntsBlockLoader(sourceValueFetcher, lookup);
}

private boolean isOutOfRange(Object value) {
Expand Down Expand Up @@ -1039,10 +1035,9 @@ BlockLoader blockLoaderFromDocValues(String fieldName) {
@Override
BlockLoader blockLoaderFromSource(
SourceValueFetcher sourceValueFetcher,
BlockSourceReader.LeafIteratorLookup lookup,
SourceFieldMapper.Mode sourceMode
BlockSourceReader.LeafIteratorLookup lookup
) {
return new BlockSourceReader.IntsBlockLoader(sourceValueFetcher, lookup, sourceMode);
return new BlockSourceReader.IntsBlockLoader(sourceValueFetcher, lookup);
}

private boolean isOutOfRange(Object value) {
Expand Down Expand Up @@ -1230,10 +1225,9 @@ BlockLoader blockLoaderFromDocValues(String fieldName) {
@Override
BlockLoader blockLoaderFromSource(
SourceValueFetcher sourceValueFetcher,
BlockSourceReader.LeafIteratorLookup lookup,
SourceFieldMapper.Mode sourceMode
BlockSourceReader.LeafIteratorLookup lookup
) {
return new BlockSourceReader.IntsBlockLoader(sourceValueFetcher, lookup, sourceMode);
return new BlockSourceReader.IntsBlockLoader(sourceValueFetcher, lookup);
}
},
LONG("long", NumericType.LONG) {
Expand Down Expand Up @@ -1381,10 +1375,9 @@ BlockLoader blockLoaderFromDocValues(String fieldName) {
@Override
BlockLoader blockLoaderFromSource(
SourceValueFetcher sourceValueFetcher,
BlockSourceReader.LeafIteratorLookup lookup,
SourceFieldMapper.Mode sourceMode
BlockSourceReader.LeafIteratorLookup lookup
) {
return new BlockSourceReader.LongsBlockLoader(sourceValueFetcher, lookup, sourceMode);
return new BlockSourceReader.LongsBlockLoader(sourceValueFetcher, lookup);
}

private boolean isOutOfRange(Object value) {
Expand Down Expand Up @@ -1664,8 +1657,7 @@ protected void writeValue(XContentBuilder b, long value) throws IOException {

abstract BlockLoader blockLoaderFromSource(
SourceValueFetcher sourceValueFetcher,
BlockSourceReader.LeafIteratorLookup lookup,
SourceFieldMapper.Mode sourceMode
BlockSourceReader.LeafIteratorLookup lookup
);
}

Expand Down Expand Up @@ -1805,8 +1797,7 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
BlockSourceReader.LeafIteratorLookup lookup = isStored() || isIndexed()
? BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
: BlockSourceReader.lookupMatchingAll();
var sourceMode = blContext.indexSettings().getIndexMappingSourceMode();
return type.blockLoaderFromSource(sourceValueFetcher(blContext.sourcePaths(name())), lookup, sourceMode);
return type.blockLoaderFromSource(sourceValueFetcher(blContext.sourcePaths(name())), lookup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,20 +1012,8 @@ protected String delegatingTo() {
if (isStored()) {
return new BlockStoredFieldsReader.BytesFromStringsBlockLoader(name());
}
if (isSyntheticSource && syntheticSourceDelegate == null) {
/*
* When we're in synthetic source mode we don't currently
* support text fields that are not stored and are not children
* of perfect keyword fields. We'd have to load from the parent
* field and then convert the result to a string. In this case,
* even if we would synthesize the source, the current field
* would be missing.
*/
return null;
}

SourceValueFetcher fetcher = SourceValueFetcher.toString(blContext.sourcePaths(name()));
return syntheticSourceDelegate.sourceBlockLoader(blContext, fetcher, blockReaderDisiLookup(blContext));
return new BlockSourceReader.BytesRefsBlockLoader(fetcher, blockReaderDisiLookup(blContext));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testEmptyArray() throws IOException {
private void loadBlock(LeafReaderContext ctx, Consumer<TestBlock> test) throws IOException {
ValueFetcher valueFetcher = SourceValueFetcher.toString(Set.of("field"));
BlockSourceReader.LeafIteratorLookup lookup = BlockSourceReader.lookupFromNorms("field");
BlockLoader loader = new BlockSourceReader.BytesRefsBlockLoader(valueFetcher, lookup, null);
BlockLoader loader = new BlockSourceReader.BytesRefsBlockLoader(valueFetcher, lookup);
assertThat(loader.columnAtATimeReader(ctx), nullValue());
BlockLoader.RowStrideReader reader = loader.rowStrideReader(ctx);
assertThat(loader.rowStrideStoredFieldSpec(), equalTo(StoredFieldsSpec.NEEDS_SOURCE));
Expand Down
Loading

0 comments on commit fae9ffb

Please sign in to comment.