Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add version-based validation to reindex requests #38504

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,9 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest<?> request)
}
}

static ActionRequestValidationException validateSeqNoBasedCASParams(
DocWriteRequest request, ActionRequestValidationException validationException) {
final long version = request.version();
final VersionType versionType = request.versionType();
default ActionRequestValidationException validateVersionAndSeqNoBasedCASParams(ActionRequestValidationException validationException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to default. Nicer.

final long version = version();
final VersionType versionType = versionType();
if (versionType.validateVersionForWrites(version) == false) {
validationException = addValidationError("illegal version value [" + version + "] for version type ["
+ versionType.name() + "]", validationException);
Expand All @@ -272,17 +271,36 @@ static ActionRequestValidationException validateSeqNoBasedCASParams(
"Please use `if_seq_no` and `if_primary_term` instead", validationException);
}

if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && (
if (ifSeqNo() != UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (request.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() != UNASSIGNED_SEQ_NO) {
if (ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM && ifSeqNo() != UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (request.ifPrimaryTerm() != UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() == UNASSIGNED_SEQ_NO) {
if (ifPrimaryTerm() != UNASSIGNED_PRIMARY_TERM && ifSeqNo() == UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + request.ifPrimaryTerm() + "]", validationException);
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm() + "]", validationException);
}
if (opType() == OpType.CREATE) {
if (versionType != VersionType.INTERNAL) {
validationException = addValidationError("create operations only support internal versioning. use index instead",
validationException);
return validationException;
}

if (version != Versions.MATCH_DELETED) {
validationException = addValidationError("create operations do not support explicit versions. use index instead",
validationException);
return validationException;
}

if (ifSeqNo() != UNASSIGNED_SEQ_NO || ifPrimaryTerm() != UNASSIGNED_PRIMARY_TERM) {
validationException = addValidationError("create operations do not support compare and set. use index instead",
validationException);
return validationException;
}
}

return validationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("id is missing", validationException);
}

validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
validationException = validateVersionAndSeqNoBasedCASParams(validationException);

return validationException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,31 +164,13 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("content type is missing", validationException);
}
final long resolvedVersion = resolveVersionDefaults();
if (opType() == OpType.CREATE) {
if (versionType != VersionType.INTERNAL) {
validationException = addValidationError("create operations only support internal versioning. use index instead",
validationException);
return validationException;
}

if (resolvedVersion != Versions.MATCH_DELETED) {
validationException = addValidationError("create operations do not support explicit versions. use index instead",
validationException);
return validationException;
}

if (ifSeqNo != UNASSIGNED_SEQ_NO || ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM) {
validationException = addValidationError("create operations do not support compare and set. use index instead",
validationException);
return validationException;
}
}

if (opType() != OpType.INDEX && id == null) {
addValidationError("an id is required for a " + opType() + " operation", validationException);
}

validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
validationException = validateVersionAndSeqNoBasedCASParams(validationException);

if (id != null && id.getBytes(StandardCharsets.UTF_8).length > 512) {
validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("id is missing", validationException);
}

validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
validationException = validateVersionAndSeqNoBasedCASParams(validationException);

if (ifSeqNo != UNASSIGNED_SEQ_NO) {
if (retryOnConflict > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.VersionType;
Expand All @@ -37,7 +36,6 @@
import java.io.IOException;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.VersionType.INTERNAL;

/**
* Request to reindex some documents from one index to another. This implements CompositeIndicesRequest but in a misleading way. Rather than
Expand Down Expand Up @@ -100,11 +98,9 @@ public ActionRequestValidationException validate() {
if (false == routingIsValid()) {
e = addValidationError("routing must be unset, [keep], [discard] or [=<some new value>]", e);
}
if (destination.versionType() == INTERNAL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is stronger than what validateVersionAndSeqNoBasedCASParams does. It basically says - if you use internal versioning, you shouldn't set a specific version. I think that's good? also, we probably want the same for external versioning (i.e., the version it self can't be set) and something ifSeqNo/ifPrimary term (which I missed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the next line here says:

if (destination.version() != Versions.MATCH_ANY && destination.version() != Versions.MATCH_DELETED) {

and validateVersionAndSeqNoBasedCASParams already has

if (versionType == VersionType.INTERNAL && version != Versions.MATCH_ANY && version != Versions.MATCH_DELETED) {
    validationException = addValidationError("internal versioning can not be used for optimistic concurrency control. " +
        "Please use `if_seq_no` and `if_primary_term` instead", validationException);
}

These look equivalent to me?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and validateVersionAndSeqNoBasedCASParams already has

fair enough (those were folded away and I missed them). Also that would have meant this can't go to 6.x 🤷‍♂️

if (destination.version() != Versions.MATCH_ANY && destination.version() != Versions.MATCH_DELETED) {
e = addValidationError("unsupported version for internal versioning [" + destination.version() + ']', e);
}
}

e = destination.validateVersionAndSeqNoBasedCASParams(e);

if (getRemoteInfo() != null) {
if (getSearchRequest().source().query() != null) {
e = addValidationError("reindex from remote sources should use RemoteInfo's query instead of source's query", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.slice.SliceBuilder;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.hamcrest.Matchers.containsString;

/**
* Tests some of the validation of {@linkplain ReindexRequest}. See reindex's rest tests for much more.
Expand Down Expand Up @@ -57,6 +59,14 @@ public void testReindexFromRemoteDoesNotSupportSlices() {
e.getMessage());
}

public void testReindexShouldThrowErrorWhenCreateIsUsedWithExternalVersionType() {
ReindexRequest reindex = newRequest();
reindex.setDestOpType("create");
reindex.setDestVersionType(VersionType.EXTERNAL);
ActionRequestValidationException e = reindex.validate();
assertThat(e.getMessage(), containsString("create operations only support internal versioning. use index instead;"));
}

public void testNoSliceBuilderSetWithSlicedRequest() {
ReindexRequest reindex = newRequest();
reindex.getSearchRequest().source().slice(new SliceBuilder(0, 4));
Expand Down