Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add immutable 'operator' metadata classes for cluster state #87763

Merged
merged 14 commits into from
Jun 29, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ default boolean isRestorable() {
private final ImmutableOpenMap<String, Set<Index>> aliasedIndices;
private final ImmutableOpenMap<String, IndexTemplateMetadata> templates;
private final ImmutableOpenMap<String, Custom> customs;
private final Map<String, OperatorMetadata> operatorMetadata;

private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
private final int totalOpenIndexShards;
Expand Down Expand Up @@ -248,7 +249,8 @@ private Metadata(
String[] visibleClosedIndices,
SortedMap<String, IndexAbstraction> indicesLookup,
Map<String, MappingMetadata> mappingsByHash,
Version oldestIndexVersion
Version oldestIndexVersion,
Map<String, OperatorMetadata> operatorMetadata
) {
this.clusterUUID = clusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
Expand All @@ -273,6 +275,7 @@ private Metadata(
this.indicesLookup = indicesLookup;
this.mappingsByHash = mappingsByHash;
this.oldestIndexVersion = oldestIndexVersion;
this.operatorMetadata = operatorMetadata;
}

public Metadata withIncrementedVersion() {
Expand All @@ -299,7 +302,8 @@ public Metadata withIncrementedVersion() {
visibleClosedIndices,
indicesLookup,
mappingsByHash,
oldestIndexVersion
oldestIndexVersion,
operatorMetadata
);
}

Expand Down Expand Up @@ -359,7 +363,8 @@ public Metadata withLifecycleState(final Index index, final LifecycleExecutionSt
visibleClosedIndices,
indicesLookup,
mappingsByHash,
oldestIndexVersion
oldestIndexVersion,
operatorMetadata
);
}

Expand Down Expand Up @@ -387,7 +392,8 @@ public Metadata withCoordinationMetadata(CoordinationMetadata coordinationMetada
visibleClosedIndices,
indicesLookup,
mappingsByHash,
oldestIndexVersion
oldestIndexVersion,
operatorMetadata
);
}

Expand Down Expand Up @@ -970,6 +976,15 @@ public Map<String, Custom> customs() {
return this.customs;
}

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example in the related PR of how this will be used:

https://github.com/elastic/elasticsearch/pull/86224/files#diff-d017ebae07494c580da12a539a666fe1d87537f167e71761cf60451b8913d554R127

The related PR is out of date, it calls the API operatorState and it has the extra helper method to perform the additional .get.

Instead of:

OperatorMetadata existingMetadata = state.metadata().operatorState(namespace);

we'll be calling:

OperatorMetadata existingMetadata = state.metadata().operatorMetadata.get(namespace);

* Returns the full {@link OperatorMetadata} Map for all
* operator namespaces.
* @return a map of namespace to {@link OperatorMetadata}
*/
public Map<String, OperatorMetadata> operatorMetadata() {
return this.operatorMetadata;
}

/**
* The collection of index deletions in the cluster.
*/
Expand Down Expand Up @@ -1111,6 +1126,7 @@ private static class MetadataDiff implements Diff<Metadata> {
private final Diff<ImmutableOpenMap<String, IndexMetadata>> indices;
private final Diff<ImmutableOpenMap<String, IndexTemplateMetadata>> templates;
private final Diff<ImmutableOpenMap<String, Custom>> customs;
private final Diff<Map<String, OperatorMetadata>> operatorMetadata;

MetadataDiff(Metadata before, Metadata after) {
clusterUUID = after.clusterUUID;
Expand All @@ -1123,12 +1139,15 @@ private static class MetadataDiff implements Diff<Metadata> {
indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer());
templates = DiffableUtils.diff(before.templates, after.templates, DiffableUtils.getStringKeySerializer());
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
operatorMetadata = DiffableUtils.diff(before.operatorMetadata, after.operatorMetadata, DiffableUtils.getStringKeySerializer());
}

private static final DiffableUtils.DiffableValueReader<String, IndexMetadata> INDEX_METADATA_DIFF_VALUE_READER =
new DiffableUtils.DiffableValueReader<>(IndexMetadata::readFrom, IndexMetadata::readDiffFrom);
private static final DiffableUtils.DiffableValueReader<String, IndexTemplateMetadata> TEMPLATES_DIFF_VALUE_READER =
new DiffableUtils.DiffableValueReader<>(IndexTemplateMetadata::readFrom, IndexTemplateMetadata::readDiffFrom);
private static final DiffableUtils.DiffableValueReader<String, OperatorMetadata> OPERATOR_DIFF_VALUE_READER =
new DiffableUtils.DiffableValueReader<>(OperatorMetadata::readFrom, OperatorMetadata::readDiffFrom);

MetadataDiff(StreamInput in) throws IOException {
clusterUUID = in.readString();
Expand All @@ -1145,6 +1164,11 @@ private static class MetadataDiff implements Diff<Metadata> {
indices = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), INDEX_METADATA_DIFF_VALUE_READER);
templates = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), TEMPLATES_DIFF_VALUE_READER);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
if (in.getVersion().onOrAfter(Version.V_8_4_0)) {
operatorMetadata = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), OPERATOR_DIFF_VALUE_READER);
} else {
operatorMetadata = OperatorMetadata.EMPTY_DIFF;
}
}

@Override
Expand All @@ -1161,6 +1185,9 @@ public void writeTo(StreamOutput out) throws IOException {
indices.writeTo(out);
templates.writeTo(out);
customs.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_4_0)) {
operatorMetadata.writeTo(out);
}
}

@Override
Expand All @@ -1178,6 +1205,7 @@ public Metadata apply(Metadata part) {
builder.indices(indices.apply(part.indices));
builder.templates(templates.apply(part.templates));
builder.customs(customs.apply(part.customs));
builder.put(Collections.unmodifiableMap(operatorMetadata.apply(part.operatorMetadata)));
return builder.build();
}
}
Expand Down Expand Up @@ -1219,7 +1247,12 @@ public static Metadata readFrom(StreamInput in) throws IOException {
Custom customIndexMetadata = in.readNamedWriteable(Custom.class);
builder.putCustom(customIndexMetadata.getWriteableName(), customIndexMetadata);
}

if (in.getVersion().onOrAfter(Version.V_8_4_0)) {
int operatorSize = in.readVInt();
for (int i = 0; i < operatorSize; i++) {
builder.put(OperatorMetadata.readFrom(in));
}
}
return builder.build();
}

Expand All @@ -1246,6 +1279,9 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeCollection(templates.values());
VersionedNamedWriteable.writeVersionedWritables(out, customs);
if (out.getVersion().onOrAfter(Version.V_8_4_0)) {
out.writeCollection(operatorMetadata.values());
}
}

public static Builder builder() {
Expand Down Expand Up @@ -1280,6 +1316,8 @@ public static class Builder {

private SortedMap<String, IndexAbstraction> previousIndicesLookup;

private final Map<String, OperatorMetadata> operatorMetadata;

// If this is set to false we can skip checking #mappingsByHash for unused entries in #build(). Used as an optimization to save
// the rather expensive call to #purgeUnusedEntries when building from another instance and we know that no mappings can have
// become unused because no indices were updated or removed from this builder in a way that would cause unused entries in
Expand Down Expand Up @@ -1307,6 +1345,7 @@ public Builder() {
this.previousIndicesLookup = metadata.indicesLookup;
this.mappingsByHash = new HashMap<>(metadata.mappingsByHash);
this.checkForUnusedMappings = false;
this.operatorMetadata = new HashMap<>(metadata.operatorMetadata);
}

private Builder(Map<String, MappingMetadata> mappingsByHash) {
Expand All @@ -1315,6 +1354,7 @@ private Builder(Map<String, MappingMetadata> mappingsByHash) {
aliasedIndices = ImmutableOpenMap.builder();
templates = ImmutableOpenMap.builder();
customs = ImmutableOpenMap.builder();
operatorMetadata = new HashMap<>();
indexGraveyard(IndexGraveyard.builder().build()); // create new empty index graveyard to initialize
previousIndicesLookup = null;
this.mappingsByHash = new HashMap<>(mappingsByHash);
Expand Down Expand Up @@ -1667,6 +1707,26 @@ public Builder customs(Map<String, Custom> customs) {
return this;
}

/**
* Adds a map of namespace to {@link OperatorMetadata} into the metadata builder
* @param operatorMetadata a map of namespace to {@link OperatorMetadata}
* @return {@link Builder}
*/
public Builder put(Map<String, OperatorMetadata> operatorMetadata) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I used to call this method putOperatorState, but it was too long and it didn't match the naming of other methods we have in Metadata, we simply call .put with different types. Here's the usage here:

https://github.com/elastic/elasticsearch/pull/86224/files#diff-8ca6818d706ec1720e045ea8a1f4f8e4b5dda47c5610aed2bf8e7d55121d8391R129

this.operatorMetadata.putAll(operatorMetadata);
return this;
}

/**
* Adds a {@link OperatorMetadata} for a given namespace to the metadata builder
* @param metadata an {@link OperatorMetadata}
* @return {@link Builder}
*/
public Builder put(OperatorMetadata metadata) {
operatorMetadata.put(metadata.namespace(), metadata);
return this;
}

public Builder indexGraveyard(final IndexGraveyard indexGraveyard) {
putCustom(IndexGraveyard.TYPE, indexGraveyard);
return this;
Expand Down Expand Up @@ -1868,7 +1928,8 @@ public Metadata build() {
visibleClosedIndicesArray,
indicesLookup,
Collections.unmodifiableMap(mappingsByHash),
Version.fromId(oldestIndexVersionId)
Version.fromId(oldestIndexVersionId),
Collections.unmodifiableMap(operatorMetadata)
);
}

Expand Down Expand Up @@ -2167,6 +2228,12 @@ public static void toXContent(Metadata metadata, XContentBuilder builder, ToXCon
}
}

builder.startObject("operator");
for (OperatorMetadata operatorMetadata : metadata.operatorMetadata().values()) {
operatorMetadata.toXContent(builder, params);
}
builder.endObject();

builder.endObject();
}

Expand Down Expand Up @@ -2210,6 +2277,10 @@ public static Metadata fromXContent(XContentParser parser) throws IOException {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
builder.put(IndexTemplateMetadata.Builder.fromXContent(parser, parser.currentName()));
}
} else if ("operator".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
builder.put(OperatorMetadata.fromXContent(parser));
}
} else {
try {
Custom custom = parser.namedObject(Custom.class, currentFieldName, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

/**
* A metadata class to hold error information about errors encountered
* while applying a cluster state update for a given namespace.
* <p>
* This information is held by the {@link OperatorMetadata} class.
*/
public record OperatorErrorMetadata(Long version, ErrorKind errorKind, List<String> errors)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error metadata is set when a file fails to get processed for some reason. In this case we launch a cluster state update task that will update this metadata. Example here:

https://github.com/elastic/elasticsearch/pull/86224/files#diff-a7da9bd5e6ce66769b5113745baa42c88b7c21b916a8ea290434048fc5af2c5fR29

The builder code is now removed, so the OperatorErrorMetadata will be directly created.

implements
SimpleDiffable<OperatorErrorMetadata>,
ToXContentFragment {

static final ParseField ERRORS = new ParseField("errors");
static final ParseField VERSION = new ParseField("version");
static final ParseField ERROR_KIND = new ParseField("error_kind");

/**
* Constructs an operator error metadata
*
* @param version the metadata version of the content which failed to apply
* @param errorKind the kind of error we encountered while processing
* @param errors the list of errors encountered during parsing and validation of the operator content
*/
public OperatorErrorMetadata {}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeString(errorKind.getKindValue());
out.writeCollection(errors, StreamOutput::writeString);
}

/**
* Reads an {@link OperatorErrorMetadata} from a {@link StreamInput}
*
* @param in the {@link StreamInput} to read from
* @return {@link OperatorErrorMetadata}
* @throws IOException
*/
public static OperatorErrorMetadata readFrom(StreamInput in) throws IOException {
return new OperatorErrorMetadata(in.readLong(), ErrorKind.of(in.readString()), in.readList(StreamInput::readString));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(VERSION.getPreferredName(), version);
builder.field(ERROR_KIND.getPreferredName(), errorKind.getKindValue());
builder.stringListField(ERRORS.getPreferredName(), errors);
builder.endObject();
return builder;
}

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<OperatorErrorMetadata, Void> PARSER = new ConstructingObjectParser<>(
"operator_error_metadata",
(a) -> new OperatorErrorMetadata((Long) a[0], ErrorKind.of((String) a[1]), (List<String>) a[2])
);

static {
PARSER.declareLong(constructorArg(), VERSION);
PARSER.declareString(constructorArg(), ERROR_KIND);
PARSER.declareStringArray(constructorArg(), ERRORS);
}

/**
* Reads an {@link OperatorErrorMetadata} from xContent
*
* @param parser {@link XContentParser}
* @return {@link OperatorErrorMetadata}
*/
public static OperatorErrorMetadata fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

/**
* Reads an {@link OperatorErrorMetadata} {@link Diff} from {@link StreamInput}
*
* @param in the {@link StreamInput} to read the diff from
* @return a {@link Diff} of {@link OperatorErrorMetadata}
* @throws IOException
*/
public static Diff<OperatorErrorMetadata> readDiffFrom(StreamInput in) throws IOException {
return SimpleDiffable.readDiffFrom(OperatorErrorMetadata::readFrom, in);
}

/**
* Enum for kinds of errors we might encounter while processing operator cluster state updates.
*/
public enum ErrorKind {
PARSING("parsing"),
VALIDATION("validation"),
TRANSIENT("transient");

private final String kind;

ErrorKind(String kind) {
this.kind = kind;
}

/**
* Returns the String value for this enum value
*
* @return the String value for the enum
*/
public String getKindValue() {
return kind;
}

/**
* Helper method to construct {@link ErrorKind} from a String.
*
* The JDK default implementation throws incomprehensible error.
* @param kind String value
* @return {@link ErrorKind}
*/
public static ErrorKind of(String kind) {
for (var report : values()) {
if (report.kind.equals(kind)) {
return report;
}
}
throw new IllegalArgumentException("kind not supported [" + kind + "]");
}
}
}
Loading