Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Improve robustness of checkpointing #80984

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.core.ilm.action.ExplainLifecycleAction;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.security.support.Automatons;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -99,7 +100,8 @@ public final class IndexPrivilege extends Privilege {
GetDataStreamAction.NAME,
ResolveIndexAction.NAME,
FieldCapabilitiesAction.NAME + "*",
GetRollupIndexCapsAction.NAME + "*"
GetRollupIndexCapsAction.NAME + "*",
GetCheckpointAction.NAME + "*" // transform internal action
);
private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns(
PutFollowAction.NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

/**
* Transform internal API (no REST layer) to retrieve index checkpoints.
*/
public class GetCheckpointAction extends ActionType<GetCheckpointAction.Response> {

public static final GetCheckpointAction INSTANCE = new GetCheckpointAction();

// note: this is an index action and requires `view_index_metadata`
public static final String NAME = "indices:internal/transform/checkpoint";

private GetCheckpointAction() {
super(NAME, GetCheckpointAction.Response::new);
}

public static class Request extends ActionRequest implements IndicesRequest.Replaceable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndicesRequest.Replaceable has a default method boolean allowsRemoteIndices() that returns false, which means the indices field cannot contain remote index names of the format cluster_alias:index. My reading of the code seems to suggest that all indices here have their cluster_alias prefix stripped (if any). So it should be OK for the class to rely on the default implementation of allowsRemoteIndices(). But it would be great if you could confirm on this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's correct. CCS is handled before calling this action: the source (indices to query) is split per cluster, get checkpoint is called separately for every cluster. This has been implemented like this for the original checkpointing, which as explained above is kept for BWC. It would be quite complex to separate the 2. Once the old style checkpointing got removed somewhen in future, we might revisit.


private String[] indices;
private final IndicesOptions indicesOptions;

public Request(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
przemekwitek marked this conversation as resolved.
Show resolved Hide resolved
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

public Request(String[] indices, IndicesOptions indicesOptions) {
this.indices = indices != null ? indices : Strings.EMPTY_ARRAY;
this.indicesOptions = indicesOptions;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request that = (Request) obj;

return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(indices), indicesOptions);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
}

@Override
public IndicesRequest indices(String... indices) {
this.indices = indices;
return this;
}

// this action does not allow remote indices, but they have to be resolved upfront, see {@link DefaultCheckpointProvider}
@Override
public boolean allowsRemoteIndices() {
return false;
}
}

public static class Response extends ActionResponse {

private final Map<String, long[]> checkpoints;

public Response(Map<String, long[]> checkpoints) {
this.checkpoints = checkpoints;
}

public Response(StreamInput in) throws IOException {
this.checkpoints = in.readOrderedMap(StreamInput::readString, StreamInput::readLongArray);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(getCheckpoints(), StreamOutput::writeString, StreamOutput::writeLongArray);
}

public Map<String, long[]> getCheckpoints() {
return Collections.unmodifiableMap(checkpoints);
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Response that = (Response) obj;

return this.checkpoints.size() == that.checkpoints.size()
&& this.checkpoints.entrySet().stream().allMatch(e -> Arrays.equals(e.getValue(), that.checkpoints.get(e.getKey())));
przemekwitek marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public int hashCode() {
int hash = 1;

for (Entry<String, long[]> e : checkpoints.entrySet()) {
hash = 31 * hash + Objects.hash(e.getKey(), Arrays.hashCode(e.getValue()));
}

return hash;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;

public class GetCheckpointNodeAction extends ActionType<GetCheckpointNodeAction.Response> {

public static final GetCheckpointNodeAction INSTANCE = new GetCheckpointNodeAction();

// note: this is an index action and requires `view_index_metadata`
public static final String NAME = GetCheckpointAction.NAME + "[n]";

private GetCheckpointNodeAction() {
super(NAME, GetCheckpointNodeAction.Response::new);
}

public static class Response extends ActionResponse {
private final Map<String, long[]> checkpoints;

public Response(Map<String, long[]> checkpoints) {
this.checkpoints = checkpoints;
}

public Response(StreamInput in) throws IOException {
this.checkpoints = in.readOrderedMap(StreamInput::readString, StreamInput::readLongArray);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(getCheckpoints(), StreamOutput::writeString, StreamOutput::writeLongArray);
}

public Map<String, long[]> getCheckpoints() {
return checkpoints;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Response that = (Response) obj;

return this.checkpoints.size() == that.checkpoints.size()
&& this.checkpoints.entrySet().stream().allMatch(e -> Arrays.equals(e.getValue(), that.checkpoints.get(e.getKey())));
}

@Override
public int hashCode() {
int hash = 1;

for (Entry<String, long[]> e : checkpoints.entrySet()) {
hash = 31 * hash + Objects.hash(e.getKey(), Arrays.hashCode(e.getValue()));
}

return hash;
}
}

public static class Request extends ActionRequest implements IndicesRequest {

private final Set<ShardId> shards;
private final OriginalIndices originalIndices;

public Request(Set<ShardId> shards, OriginalIndices originalIndices) {
this.shards = shards;
this.originalIndices = originalIndices;
}

public Request(StreamInput in) throws IOException {
super(in);
this.shards = Collections.unmodifiableSet(in.readSet(ShardId::new));
this.originalIndices = OriginalIndices.readOriginalIndices(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeCollection(shards);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}

public Set<ShardId> getShards() {
return shards;
}

public OriginalIndices getOriginalIndices() {
return originalIndices;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request that = (Request) obj;

return Objects.equals(shards, that.shards) && Objects.equals(originalIndices, that.originalIndices);
}

@Override
public int hashCode() {
return Objects.hash(shards, originalIndices);
}

@Override
public String[] indices() {
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction.Request;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;

public class GetCheckpointActionRequestTests extends AbstractWireSerializingTestCase<Request> {

@Override
protected Request createTestInstance() {
return new Request(
randomBoolean() ? null : generateRandomStringArray(10, 10, false, false),
IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
Boolean.toString(randomBoolean()),
SearchRequest.DEFAULT_INDICES_OPTIONS
)
);
}

@Override
protected Reader<Request> instanceReader() {
return Request::new;
}

@Override
protected Request mutateInstance(Request instance) throws IOException {
List<String> indices = instance.indices() != null ? new ArrayList<>(Arrays.asList(instance.indices())) : new ArrayList<>();
IndicesOptions indicesOptions = instance.indicesOptions();

switch (between(0, 1)) {
case 0:
indices.add(randomAlphaOfLengthBetween(1, 20));
break;
case 1:
indicesOptions = IndicesOptions.fromParameters(
randomFrom(IndicesOptions.WildcardStates.values()).name().toLowerCase(Locale.ROOT),
Boolean.toString(instance.indicesOptions().ignoreUnavailable() == false),
Boolean.toString(instance.indicesOptions().allowNoIndices() == false),
Boolean.toString(instance.indicesOptions().ignoreThrottled() == false),
SearchRequest.DEFAULT_INDICES_OPTIONS
);
break;
default:
throw new AssertionError("Illegal randomization branch");
}

return new Request(indices.toArray(new String[0]), indicesOptions);
}
}
Loading