Skip to content

Commit

Permalink
Working using pipelines - recovering index metadata still doesn't work
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest committed Nov 20, 2023
1 parent 247c840 commit 4a53f97
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
ingestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
hasIndexRequestsWithPipelines |= ingestService.hasPipeline(indexRequest);
}

if (actionRequest instanceof IndexRequest ir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement

private String pipeline;
private String finalPipeline;
private String inferencePipeline;
private String pluginsPipeline;

private boolean isPipelineResolved;

Expand Down Expand Up @@ -191,7 +191,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
}
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SEMANTIC_TEXT_FIELD)) {
this.inferencePipeline = in.readOptionalString();
this.pluginsPipeline = in.readOptionalString();
}
}

Expand Down Expand Up @@ -274,10 +274,6 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("final pipeline cannot be an empty string", validationException);
}

if (inferencePipeline != null && inferencePipeline.isEmpty()) {
validationException = addValidationError("inference pipeline cannot be an empty string", validationException);
}

return validationException;
}

Expand Down Expand Up @@ -363,12 +359,13 @@ public String getFinalPipeline() {
return this.finalPipeline;
}

public String getInferencePipeline() {
return inferencePipeline;
public String getPluginsPipeline() {
return pluginsPipeline;
}

public void setInferencePipeline(String inferencePipeline) {
this.inferencePipeline = inferencePipeline;
public IndexRequest setPluginsPipeline(String pluginsPipeline) {
this.pluginsPipeline = pluginsPipeline;
return this;
}

/**
Expand Down Expand Up @@ -751,7 +748,7 @@ private void writeBody(StreamOutput out) throws IOException {
}
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SEMANTIC_TEXT_FIELD)) {
out.writeOptionalString(inferencePipeline);
out.writeOptionalString(pluginsPipeline);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.elasticsearch.TransportVersions.SEMANTIC_TEXT_FIELD;
import static org.elasticsearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM;
import static org.elasticsearch.cluster.metadata.Metadata.DEDUPLICATED_MAPPINGS_PARAM;
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND;
Expand Down Expand Up @@ -635,6 +636,7 @@ public Iterator<Setting<?>> settings() {
private final Double writeLoadForecast;
@Nullable
private final Long shardSizeInBytesForecast;
private final Map<String, List<String>> inferenceModelsForFields;

private IndexMetadata(
final Index index,
Expand Down Expand Up @@ -680,7 +682,8 @@ private IndexMetadata(
final IndexVersion indexCompatibilityVersion,
@Nullable final IndexMetadataStats stats,
@Nullable final Double writeLoadForecast,
@Nullable Long shardSizeInBytesForecast
@Nullable Long shardSizeInBytesForecast,
final Map<String, List<String>> inferenceModelsForFields
) {
this.index = index;
this.version = version;
Expand Down Expand Up @@ -736,6 +739,8 @@ private IndexMetadata(
this.writeLoadForecast = writeLoadForecast;
this.shardSizeInBytesForecast = shardSizeInBytesForecast;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
this.inferenceModelsForFields = inferenceModelsForFields;
assert inferenceModelsForFields != null;
}

IndexMetadata withMappingMetadata(MappingMetadata mapping) {
Expand Down Expand Up @@ -786,7 +791,8 @@ IndexMetadata withMappingMetadata(MappingMetadata mapping) {
this.indexCompatibilityVersion,
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast
this.shardSizeInBytesForecast,
this.inferenceModelsForFields
);
}

Expand Down Expand Up @@ -844,7 +850,8 @@ public IndexMetadata withInSyncAllocationIds(int shardId, Set<String> inSyncSet)
this.indexCompatibilityVersion,
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast
this.shardSizeInBytesForecast,
this.inferenceModelsForFields
);
}

Expand Down Expand Up @@ -900,7 +907,8 @@ public IndexMetadata withIncrementedPrimaryTerm(int shardId) {
this.indexCompatibilityVersion,
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast
this.shardSizeInBytesForecast,
this.inferenceModelsForFields
);
}

Expand Down Expand Up @@ -956,7 +964,8 @@ public IndexMetadata withTimestampRange(IndexLongFieldRange timestampRange) {
this.indexCompatibilityVersion,
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast
this.shardSizeInBytesForecast,
this.inferenceModelsForFields
);
}

Expand Down Expand Up @@ -1008,7 +1017,8 @@ public IndexMetadata withIncrementedVersion() {
this.indexCompatibilityVersion,
this.stats,
this.writeLoadForecast,
this.shardSizeInBytesForecast
this.shardSizeInBytesForecast,
this.inferenceModelsForFields
);
}

Expand Down Expand Up @@ -1212,6 +1222,10 @@ public OptionalLong getForecastedShardSizeInBytes() {
return shardSizeInBytesForecast == null ? OptionalLong.empty() : OptionalLong.of(shardSizeInBytesForecast);
}

public Map<String, List<String>> getInferenceModelsForFields() {
return inferenceModelsForFields;
}

public static final String INDEX_RESIZE_SOURCE_UUID_KEY = "index.resize.source.uuid";
public static final String INDEX_RESIZE_SOURCE_NAME_KEY = "index.resize.source.name";
public static final Setting<String> INDEX_RESIZE_SOURCE_UUID = Setting.simpleString(INDEX_RESIZE_SOURCE_UUID_KEY);
Expand Down Expand Up @@ -1702,6 +1716,9 @@ public static IndexMetadata readFrom(StreamInput in, @Nullable Function<String,
builder.indexWriteLoadForecast(in.readOptionalDouble());
builder.shardSizeInBytesForecast(in.readOptionalLong());
}
if (in.getTransportVersion().onOrAfter(SEMANTIC_TEXT_FIELD)) {
builder.inferenceModelsForfields(in.readImmutableMap(StreamInput::readStringCollectionAsImmutableList));
}
return builder.build(true);
}

Expand Down Expand Up @@ -1748,6 +1765,9 @@ public void writeTo(StreamOutput out, boolean mappingsAsHash) throws IOException
out.writeOptionalDouble(writeLoadForecast);
out.writeOptionalLong(shardSizeInBytesForecast);
}
if (out.getTransportVersion().onOrAfter(SEMANTIC_TEXT_FIELD)) {
out.writeMap(inferenceModelsForFields, StreamOutput::writeStringCollection);
}
}

@Override
Expand Down Expand Up @@ -1797,6 +1817,7 @@ public static class Builder {
private IndexMetadataStats stats = null;
private Double indexWriteLoadForecast = null;
private Long shardSizeInBytesForecast = null;
private Map<String, List<String>> inferenceModelsForFields = Map.of();

public Builder(String index) {
this.index = index;
Expand Down Expand Up @@ -1828,6 +1849,7 @@ public Builder(IndexMetadata indexMetadata) {
this.stats = indexMetadata.stats;
this.indexWriteLoadForecast = indexMetadata.writeLoadForecast;
this.shardSizeInBytesForecast = indexMetadata.shardSizeInBytesForecast;
this.inferenceModelsForFields = indexMetadata.inferenceModelsForFields;
}

public Builder index(String index) {
Expand Down Expand Up @@ -2057,6 +2079,11 @@ public Builder shardSizeInBytesForecast(Long shardSizeInBytesForecast) {
return this;
}

public Builder inferenceModelsForfields(Map<String, List<String>> inferenceModelsForfields) {
this.inferenceModelsForFields = inferenceModelsForfields;
return this;
}

public IndexMetadata build() {
return build(false);
}
Expand Down Expand Up @@ -2251,7 +2278,8 @@ IndexMetadata build(boolean repair) {
SETTING_INDEX_VERSION_COMPATIBILITY.get(settings),
stats,
indexWriteLoadForecast,
shardSizeInBytesForecast
shardSizeInBytesForecast,
inferenceModelsForFields
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,7 @@ static IndexMetadata buildIndexMetadata(
if (mapper != null) {
MappingMetadata mappingMd = new MappingMetadata(mapper);
mappingsMetadata.put(mapper.type(), mappingMd);
indexMetadataBuilder.inferenceModelsForfields(mapper.mappers().fieldsForModels());
}

for (MappingMetadata mappingMd : mappingsMetadata.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ private static ClusterState applyRequest(
DocumentMapper mapper = mapperService.documentMapper();
if (mapper != null) {
indexMetadataBuilder.putMapping(new MappingMetadata(mapper));
indexMetadataBuilder.inferenceModelsForfields(mapper.mappers().fieldsForModels());
}
if (updatedMapping) {
indexMetadataBuilder.mappingVersion(1 + indexMetadataBuilder.mappingVersion());
Expand Down
Loading

0 comments on commit 4a53f97

Please sign in to comment.