From 033ba725af26b940226aaa9b1ce88349bc433af6 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 5 Feb 2019 20:53:35 +0100 Subject: [PATCH] Remove support for internal versioning for concurrency control (#38254) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Elasticsearch has long [supported](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning) compare and set (a.k.a optimistic concurrency control) operations using internal document versioning. Sadly that approach is flawed and can sometime do the wrong thing. Here's the relevant excerpt from the resiliency status page: > When a primary has been partitioned away from the cluster there is a short period of time until it detects this. During that time it will continue indexing writes locally, thereby updating document versions. When it tries to replicate the operation, however, it will discover that it is partitioned away. It won’t acknowledge the write and will wait until the partition is resolved to negotiate with the master on how to proceed. The master will decide to either fail any replicas which failed to index the operations on the primary or tell the primary that it has to step down because a new primary has been chosen in the meantime. Since the old primary has already written documents, clients may already have read from the old primary before it shuts itself down. The version numbers of these reads may not be unique if the new primary has already accepted writes for the same document We recently [introduced](https://www.elastic.co/guide/en/elasticsearch/reference/6.x/optimistic-concurrency-control.html) a new sequence number based approach that doesn't suffer from this dirty reads problem. This commit removes support for internal versioning as a concurrency control mechanism in favor of the sequence number approach. Relates to #1078 --- .../client/WatcherRequestConverters.java | 1 - .../client/watcher/PutWatchRequest.java | 11 - .../client/RequestConvertersTests.java | 11 +- .../client/WatcherRequestConvertersTests.java | 11 +- .../documentation/CRUDDocumentationIT.java | 16 +- .../high-level/document/update.asciidoc | 5 +- .../migration/migrate_7_0/api.asciidoc | 16 ++ .../reindex/AsyncDeleteByQueryAction.java | 22 +- .../reindex/TransportDeleteByQueryAction.java | 2 +- .../reindex/TransportUpdateByQueryAction.java | 10 +- .../UpdateByQueryWhileModifyingTests.java | 4 +- .../resources/rest-api-spec/api/update.json | 9 - .../test/create/30_internal_version.yml | 36 ---- .../create/31_internal_version_with_types.yml | 35 --- .../{20_internal_version.yml => 20_cas.yml} | 10 +- .../test/delete/21_cas_with_types.yml | 30 +++ .../delete/21_internal_version_with_types.yml | 28 --- .../test/index/30_internal_version.yml | 36 ---- .../index/31_internal_version_with_types.yml | 36 ---- .../test/update/30_internal_version.yml | 31 --- .../update/31_internal_version_with_types.yml | 30 --- .../test/update/35_other_versions.yml | 29 --- .../update/36_other_versions_with_types.yml | 27 --- .../elasticsearch/action/DocWriteRequest.java | 17 +- .../action/bulk/BulkRequest.java | 14 +- .../action/search/SearchRequestBuilder.java | 2 +- .../action/update/UpdateHelper.java | 2 +- .../action/update/UpdateRequest.java | 61 +++--- .../index/get/ShardGetService.java | 7 +- .../action/document/RestUpdateAction.java | 2 - .../action/bulk/BulkRequestTests.java | 11 +- .../action/bulk/BulkWithUpdatesIT.java | 27 +-- .../action/update/UpdateRequestTests.java | 8 +- .../index/shard/ShardGetServiceTests.java | 26 +-- .../indices/settings/UpdateSettingsIT.java | 18 +- .../versioning/SimpleVersioningIT.java | 84 ++------ .../ml/action/TransportOpenJobAction.java | 2 +- .../action/TransportUpdateDatafeedAction.java | 2 +- .../action/TransportUpdateFilterAction.java | 29 +-- .../persistence/DatafeedConfigProvider.java | 21 +- .../xpack/ml/job/JobManager.java | 4 +- .../ml/job/persistence/JobConfigProvider.java | 22 +- .../integration/DatafeedConfigProviderIT.java | 7 +- .../ml/integration/JobConfigProviderIT.java | 9 +- .../xpack/security/authc/TokenService.java | 8 +- .../80_put_get_watch_with_passwords.yml | 201 ------------------ 46 files changed, 225 insertions(+), 805 deletions(-) delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/create/30_internal_version.yml delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/create/31_internal_version_with_types.yml rename rest-api-spec/src/main/resources/rest-api-spec/test/delete/{20_internal_version.yml => 20_cas.yml} (71%) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_cas_with_types.yml delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_internal_version_with_types.yml delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/index/30_internal_version.yml delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/index/31_internal_version_with_types.yml delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/update/30_internal_version.yml delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/update/31_internal_version_with_types.yml delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/update/35_other_versions.yml delete mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/update/36_other_versions_with_types.yml diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java index 34fb826d62382..9718607d8b80e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java @@ -70,7 +70,6 @@ static Request putWatch(PutWatchRequest putWatchRequest) { Request request = new Request(HttpPut.METHOD_NAME, endpoint); RequestConverters.Params params = new RequestConverters.Params(request) - .withVersion(putWatchRequest.getVersion()) .withIfSeqNo(putWatchRequest.ifSeqNo()) .withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm()); if (putWatchRequest.isActive() == false) { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java index 8b83970723dd2..1d13a77e06cf9 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java @@ -21,7 +21,6 @@ import org.elasticsearch.client.Validatable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -43,11 +42,9 @@ public final class PutWatchRequest implements Validatable { private final BytesReference source; private final XContentType xContentType; private boolean active = true; - private long version = Versions.MATCH_ANY; private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - public PutWatchRequest(String id, BytesReference source, XContentType xContentType) { Objects.requireNonNull(id, "watch id is missing"); if (isValidId(id) == false) { @@ -95,14 +92,6 @@ public XContentType xContentType() { return xContentType; } - public long getVersion() { - return version; - } - - public void setVersion(long version) { - this.version = version; - } - /** * only performs this put request if the watch's last modification was assigned the given * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 9364e2ce2d57c..e3807b6067960 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -768,8 +768,6 @@ public void testUpdate() throws IOException { } } setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams); - setRandomVersion(updateRequest, expectedParams); - setRandomVersionType(updateRequest::versionType, expectedParams); setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body if (randomBoolean()) { int retryOnConflict = randomIntBetween(0, 5); @@ -911,14 +909,7 @@ public void testBulk() throws IOException { if (randomBoolean()) { docWriteRequest.routing(randomAlphaOfLength(10)); } - if (randomBoolean()) { - if (randomBoolean()) { - docWriteRequest.version(randomNonNegativeLong()); - } - if (randomBoolean()) { - docWriteRequest.versionType(randomFrom(VersionType.values())); - } - } else if (randomBoolean()) { + if (opType != DocWriteRequest.OpType.UPDATE && randomBoolean()) { docWriteRequest.setIfSeqNo(randomNonNegativeLong()); docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200)); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java index a31206bee88cc..19483cc201a5f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java @@ -29,8 +29,8 @@ import org.elasticsearch.client.watcher.DeactivateWatchRequest; import org.elasticsearch.client.watcher.DeleteWatchRequest; import org.elasticsearch.client.watcher.ExecuteWatchRequest; -import org.elasticsearch.client.watcher.PutWatchRequest; import org.elasticsearch.client.watcher.GetWatchRequest; +import org.elasticsearch.client.watcher.PutWatchRequest; import org.elasticsearch.client.watcher.StartWatchServiceRequest; import org.elasticsearch.client.watcher.StopWatchServiceRequest; import org.elasticsearch.client.watcher.WatcherStatsRequest; @@ -88,9 +88,12 @@ public void testPutWatch() throws Exception { } if (randomBoolean()) { - long version = randomLongBetween(10, 100); - putWatchRequest.setVersion(version); - expectedParams.put("version", String.valueOf(version)); + long seqNo = randomNonNegativeLong(); + long ifPrimaryTerm = randomLongBetween(1, 200); + putWatchRequest.setIfSeqNo(seqNo); + putWatchRequest.setIfPrimaryTerm(ifPrimaryTerm); + expectedParams.put("if_seq_no", String.valueOf(seqNo)); + expectedParams.put("if_primary_term", String.valueOf(ifPrimaryTerm)); } Request request = WatcherRequestConverters.putWatch(putWatchRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index 53524a6a5c215..1af9593e77f87 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -170,7 +170,6 @@ public void testIndex() throws Exception { // tag::index-response String index = indexResponse.getIndex(); String id = indexResponse.getId(); - long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { // <1> } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { @@ -220,7 +219,8 @@ public void testIndex() throws Exception { IndexRequest request = new IndexRequest("posts") .id("1") .source("field", "value") - .version(1); + .setIfSeqNo(10L) + .setIfPrimaryTerm(20); try { IndexResponse response = client.index(request, RequestOptions.DEFAULT); } catch(ElasticsearchException e) { @@ -432,7 +432,8 @@ public void testUpdate() throws Exception { // tag::update-conflict UpdateRequest request = new UpdateRequest("posts", "1") .doc("field", "value") - .version(1); + .setIfSeqNo(101L) + .setIfPrimaryTerm(200L); try { UpdateResponse updateResponse = client.update( request, RequestOptions.DEFAULT); @@ -499,9 +500,10 @@ public void testUpdate() throws Exception { request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // <1> request.setRefreshPolicy("wait_for"); // <2> // end::update-request-refresh - // tag::update-request-version - request.version(2); // <1> - // end::update-request-version + // tag::update-request-cas + request.setIfSeqNo(2L); // <1> + request.setIfPrimaryTerm(1L); // <2> + // end::update-request-request-cas // tag::update-request-detect-noop request.detectNoop(false); // <1> // end::update-request-detect-noop @@ -630,7 +632,7 @@ public void testDelete() throws Exception { // tag::delete-conflict try { DeleteResponse deleteResponse = client.delete( - new DeleteRequest("posts", "1").version(2), + new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2), RequestOptions.DEFAULT); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { diff --git a/docs/java-rest/high-level/document/update.asciidoc b/docs/java-rest/high-level/document/update.asciidoc index 3112d85512217..35300512dfc3e 100644 --- a/docs/java-rest/high-level/document/update.asciidoc +++ b/docs/java-rest/high-level/document/update.asciidoc @@ -140,9 +140,10 @@ include-tagged::{doc-tests-file}[{api}-request-source-exclude] ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- -include-tagged::{doc-tests-file}[{api}-request-version] +include-tagged::{doc-tests-file}[{api}-request-cas] -------------------------------------------------- -<1> Version +<1> ifSeqNo +<2> ifPrimaryTerm ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- diff --git a/docs/reference/migration/migrate_7_0/api.asciidoc b/docs/reference/migration/migrate_7_0/api.asciidoc index bb151edb778e2..6c1d03760f904 100644 --- a/docs/reference/migration/migrate_7_0/api.asciidoc +++ b/docs/reference/migration/migrate_7_0/api.asciidoc @@ -2,6 +2,22 @@ [[breaking_70_api_changes]] === API changes +[float] +==== Internal Versioning is no longer supported for optimistic concurrency control + +Elasticsearch maintains a numeric version field for each document it stores. That field +is incremented by one with every change to the document. Until 7.0.0 the API allowed using +that field for optimistic concurrency control, i.e., making a write operation conditional +on the current document version. Sadly, that approach is flawed because the value of the +version doesn't always uniquely represent a change to the document. If a primary fails +while handling a write operation, it may expose a version that will then be reused by the +new primary. + +Due to that issue, internal versioning can no longer be used and is replaced by a new +method based on sequence numbers. See <> for more details. + +Note that the `external` versioning type is still fully supported. + [float] ==== Camel case and underscore parameters deprecated in 6.x have been removed A number of duplicate parameters deprecated in 6.x have been removed from diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java index b317ea06d9f35..f7d8d037fcddb 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java @@ -20,11 +20,9 @@ package org.elasticsearch.index.reindex; import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.client.ParentTaskAssigningClient; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -33,18 +31,10 @@ */ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction { - private final boolean useSeqNoForCAS; - public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request, - ScriptService scriptService, ClusterState clusterState, ActionListener listener) { - super(task, - // not all nodes support sequence number powered optimistic concurrency control, we fall back to version - clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false, - // all nodes support sequence number powered optimistic concurrency control and we can use it - clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0), - logger, client, threadPool, action, request, listener); - useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0); + ScriptService scriptService, ActionListener listener) { + super(task, false, true, logger, client, threadPool, action, request, listener); } @Override @@ -60,12 +50,8 @@ protected RequestWrapper buildRequest(ScrollableHitSource.Hit doc delete.index(doc.getIndex()); delete.type(doc.getType()); delete.id(doc.getId()); - if (useSeqNoForCAS) { - delete.setIfSeqNo(doc.getSeqNo()); - delete.setIfPrimaryTerm(doc.getPrimaryTerm()); - } else { - delete.version(doc.getVersion()); - } + delete.setIfSeqNo(doc.getSeqNo()); + delete.setIfPrimaryTerm(doc.getPrimaryTerm()); return wrap(delete); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java index 08538b335535d..d7959f0058974 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java @@ -61,7 +61,7 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener buildRequest(ScrollableHitSource.Hit doc) index.type(doc.getType()); index.id(doc.getId()); index.source(doc.getSource(), doc.getXContentType()); - if (useSeqNoForCAS) { - index.setIfSeqNo(doc.getSeqNo()); - index.setIfPrimaryTerm(doc.getPrimaryTerm()); - } else { - index.versionType(VersionType.INTERNAL); - index.version(doc.getVersion()); - } + index.setIfSeqNo(doc.getSeqNo()); + index.setIfPrimaryTerm(doc.getPrimaryTerm()); index.setPipeline(mainRequest.getPipeline()); return wrap(index); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWhileModifyingTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWhileModifyingTests.java index 987830ddd3bc9..1c3456fe20c5f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWhileModifyingTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWhileModifyingTests.java @@ -67,7 +67,7 @@ public void testUpdateWhileReindexing() throws Exception { IndexRequestBuilder index = client().prepareIndex("test", "test", "test").setSource("test", value.get()) .setRefreshPolicy(IMMEDIATE); /* - * Update by query increments the version number so concurrent + * Update by query changes the document so concurrent * indexes might get version conflict exceptions so we just * blindly retry. */ @@ -75,7 +75,7 @@ public void testUpdateWhileReindexing() throws Exception { while (true) { attempts++; try { - index.setVersion(get.getVersion()).get(); + index.setIfSeqNo(get.getSeqNo()).setIfPrimaryTerm(get.getPrimaryTerm()).get(); break; } catch (VersionConflictEngineException e) { if (attempts >= MAX_ATTEMPTS) { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json index 92f1013a317c3..106b29b252ad3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json @@ -70,15 +70,6 @@ "if_primary_term" : { "type" : "number", "description" : "only perform the update operation if the last operation that has changed the document has the specified primary term" - }, - "version": { - "type": "number", - "description": "Explicit version number for concurrency control" - }, - "version_type": { - "type": "enum", - "options": ["internal", "force"], - "description": "Specific version type" } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/create/30_internal_version.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/create/30_internal_version.yml deleted file mode 100644 index 52e8e464da094..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/create/30_internal_version.yml +++ /dev/null @@ -1,36 +0,0 @@ ---- -"Internal version": - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - do: - create: - index: test_1 - id: 1 - body: { foo: bar } - - - match: { _version: 1} - - - do: - catch: conflict - create: - index: test_1 - id: 1 - body: { foo: bar } - ---- -"Internal versioning with explicit version": - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - do: - catch: bad_request - create: - index: test - id: 3 - body: { foo: bar } - version: 5 - - - match: { status: 400 } - - match: { error.type: action_request_validation_exception } - - match: { error.reason: "Validation Failed: 1: create operations do not support explicit versions. use index instead;" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/create/31_internal_version_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/create/31_internal_version_with_types.yml deleted file mode 100644 index 83772828bc8f4..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/create/31_internal_version_with_types.yml +++ /dev/null @@ -1,35 +0,0 @@ ---- -"Internal version": - - - do: - create: - index: test_1 - type: test - id: 1 - body: { foo: bar } - - - match: { _version: 1} - - - do: - catch: conflict - create: - index: test_1 - type: test - id: 1 - body: { foo: bar } - ---- -"Internal versioning with explicit version": - - - do: - catch: bad_request - create: - index: test - type: test - id: 3 - body: { foo: bar } - version: 5 - - - match: { status: 400 } - - match: { error.type: action_request_validation_exception } - - match: { error.reason: "Validation Failed: 1: create operations do not support explicit versions. use index instead;" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_internal_version.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_cas.yml similarity index 71% rename from rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_internal_version.yml rename to rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_cas.yml index afe69b4fe82e5..f3c7b0acbcccd 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_internal_version.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/20_cas.yml @@ -11,19 +11,21 @@ id: 1 body: { foo: bar } - - match: { _version: 1} + - match: { _seq_no: 0 } - do: catch: conflict delete: index: test_1 id: 1 - version: 2 + if_seq_no: 2 + if_primary_term: 1 - do: delete: index: test_1 id: 1 - version: 1 + if_seq_no: 0 + if_primary_term: 1 - - match: { _version: 2 } + - match: { _seq_no: 1 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_cas_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_cas_with_types.yml new file mode 100644 index 0000000000000..ef352a9bad6b1 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_cas_with_types.yml @@ -0,0 +1,30 @@ +--- +"Internal version": + + - do: + index: + index: test_1 + type: test + id: 1 + body: { foo: bar } + + - match: { _seq_no: 0 } + + - do: + catch: conflict + delete: + index: test_1 + type: test + id: 1 + if_seq_no: 2 + if_primary_term: 1 + + - do: + delete: + index: test_1 + type: test + id: 1 + if_seq_no: 0 + if_primary_term: 1 + + - match: { _seq_no: 1 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_internal_version_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_internal_version_with_types.yml deleted file mode 100644 index 3d9ddb79366f7..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/delete/21_internal_version_with_types.yml +++ /dev/null @@ -1,28 +0,0 @@ ---- -"Internal version": - - - do: - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - - - match: { _version: 1} - - - do: - catch: conflict - delete: - index: test_1 - type: test - id: 1 - version: 2 - - - do: - delete: - index: test_1 - type: test - id: 1 - version: 1 - - - match: { _version: 2 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_internal_version.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_internal_version.yml deleted file mode 100644 index adc4f3f4b15c0..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_internal_version.yml +++ /dev/null @@ -1,36 +0,0 @@ ---- -"Internal version": - - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - - do: - index: - index: test_1 - id: 1 - body: { foo: bar } - - match: { _version: 1} - - - do: - index: - index: test_1 - id: 1 - body: { foo: bar } - - match: { _version: 2} - - - do: - catch: conflict - index: - index: test_1 - id: 1 - body: { foo: bar } - version: 1 - - do: - index: - index: test_1 - id: 1 - body: { foo: bar } - version: 2 - - - match: { _version: 3 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/31_internal_version_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/31_internal_version_with_types.yml deleted file mode 100644 index 1767fbebbf966..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/31_internal_version_with_types.yml +++ /dev/null @@ -1,36 +0,0 @@ ---- -"Internal version": - - - do: - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - - match: { _version: 1} - - - do: - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - - match: { _version: 2} - - - do: - catch: conflict - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - version: 1 - - do: - index: - index: test_1 - type: test - id: 1 - body: { foo: bar } - version: 2 - - - match: { _version: 3 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/30_internal_version.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/30_internal_version.yml deleted file mode 100644 index 7b474d6bc09dc..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/30_internal_version.yml +++ /dev/null @@ -1,31 +0,0 @@ ---- -"Internal version": - - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - - do: - catch: missing - update: - index: test_1 - id: 1 - version: 1 - body: - doc: { foo: baz } - - - do: - index: - index: test_1 - id: 1 - body: - doc: { foo: baz } - - - do: - catch: conflict - update: - index: test_1 - id: 1 - version: 2 - body: - doc: { foo: baz } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/31_internal_version_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/31_internal_version_with_types.yml deleted file mode 100644 index 17c4806c693ac..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/31_internal_version_with_types.yml +++ /dev/null @@ -1,30 +0,0 @@ ---- -"Internal version": - - - do: - catch: missing - update: - index: test_1 - type: test - id: 1 - version: 1 - body: - doc: { foo: baz } - - - do: - index: - index: test_1 - type: test - id: 1 - body: - doc: { foo: baz } - - - do: - catch: conflict - update: - index: test_1 - type: test - id: 1 - version: 2 - body: - doc: { foo: baz } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_other_versions.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_other_versions.yml deleted file mode 100644 index 9740aa39edeb3..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/35_other_versions.yml +++ /dev/null @@ -1,29 +0,0 @@ ---- -"Not supported versions": - - - skip: - version: " - 6.99.99" - reason: types are required in requests before 7.0.0 - - - do: - catch: /Validation|Invalid/ - update: - index: test_1 - id: 1 - version: 2 - version_type: external - body: - doc: { foo: baz } - upsert: { foo: bar } - - - do: - catch: /Validation|Invalid/ - update: - index: test_1 - id: 1 - version: 2 - version_type: external_gte - body: - doc: { foo: baz } - upsert: { foo: bar } - diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/update/36_other_versions_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/update/36_other_versions_with_types.yml deleted file mode 100644 index c0ec082b91a4f..0000000000000 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/update/36_other_versions_with_types.yml +++ /dev/null @@ -1,27 +0,0 @@ ---- -"Not supported versions": - - - do: - catch: /Validation|Invalid/ - update: - index: test_1 - type: test - id: 1 - version: 2 - version_type: external - body: - doc: { foo: baz } - upsert: { foo: bar } - - - do: - catch: /Validation|Invalid/ - update: - index: test_1 - type: test - id: 1 - version: 2 - version_type: external_gte - body: - doc: { foo: baz } - upsert: { foo: bar } - diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index d8a9a3503a617..373dfaa5c7416 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -257,16 +257,23 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest request) static ActionRequestValidationException validateSeqNoBasedCASParams( DocWriteRequest request, ActionRequestValidationException validationException) { - if (request.versionType().validateVersionForWrites(request.version()) == false) { - validationException = addValidationError("illegal version value [" + request.version() + "] for version type [" - + request.versionType().name() + "]", validationException); + final long version = request.version(); + final VersionType versionType = request.versionType(); + if (versionType.validateVersionForWrites(version) == false) { + validationException = addValidationError("illegal version value [" + version + "] for version type [" + + versionType.name() + "]", validationException); } - if (request.versionType() == VersionType.FORCE) { + if (versionType == VersionType.FORCE) { validationException = addValidationError("version type [force] may no longer be used", validationException); } + if (versionType == VersionType.INTERNAL && version != Versions.MATCH_ANY && version != Versions.MATCH_DELETED) { + validationException = addValidationError("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead", validationException); + } + if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && ( - request.versionType() != VersionType.INTERNAL || request.version() != Versions.MATCH_ANY + versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY )) { validationException = addValidationError("compare and write operations can not use versioning", validationException); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index b5c786ab2df6d..42f569c0a9bda 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -46,9 +46,9 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.action.document.RestBulkAction; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; @@ -501,8 +501,11 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null .create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); } else if ("update".equals(action)) { + if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { + throw new IllegalArgumentException("Update requests do not support versioning. " + + "Please use `if_seq_no` and `if_primary_term` instead"); + } UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) - .version(version).versionType(versionType) .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) .routing(routing); // EMPTY is safe here because we never call namedObject @@ -516,15 +519,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null } IndexRequest upsertRequest = updateRequest.upsertRequest(); if (upsertRequest != null) { - upsertRequest.version(version); - upsertRequest.versionType(versionType); upsertRequest.setPipeline(defaultPipeline); } - IndexRequest doc = updateRequest.doc(); - if (doc != null) { - doc.version(version); - doc.versionType(versionType); - } internalAdd(updateRequest, payload); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 4e9c598ba9c00..96c93c974cabb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -224,7 +224,7 @@ public SearchRequestBuilder setVersion(boolean version) { sourceBuilder().version(version); return this; } - + /** * Should each {@link org.elasticsearch.search.SearchHit} be returned with the * sequence number and primary term of the last modification of the document. diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 8cd6146768fff..54cd38aa0b960 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -70,7 +70,7 @@ public UpdateHelper(ScriptService scriptService) { */ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { final GetResult getResult = indexShard.getService().getForUpdate( - request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm()); + request.type(), request.id(), request.ifSeqNo(), request.ifPrimaryTerm()); return prepare(indexShard.shardId(), request, getResult, nowInMillis); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index c85f73d90ec3d..3693975ddab08 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -108,8 +108,6 @@ public class UpdateRequest extends InstanceShardOperationRequest private FetchSourceContext fetchSourceContext; - private long version = Versions.MATCH_ANY; - private VersionType versionType = VersionType.INTERNAL; private int retryOnConflict = 0; private long ifSeqNo = UNASSIGNED_SEQ_NO; private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; @@ -150,9 +148,6 @@ public UpdateRequest(String index, String type, String id) { @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = super.validate(); - if (version != Versions.MATCH_ANY && upsertRequest != null) { - validationException = addValidationError("can't provide both upsert request and a version", validationException); - } if(upsertRequest != null && upsertRequest.version() != Versions.MATCH_ANY) { validationException = addValidationError("can't provide version in upsert request", validationException); } @@ -163,30 +158,20 @@ public ActionRequestValidationException validate() { validationException = addValidationError("id is missing", validationException); } - if (versionType != VersionType.INTERNAL) { - validationException = addValidationError("version type [" + versionType + "] is not supported by the update API", - validationException); - } else { + validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - if (version != Versions.MATCH_ANY && retryOnConflict > 0) { - validationException = addValidationError("can't provide both retry_on_conflict and a specific version", - validationException); + if (ifSeqNo != UNASSIGNED_SEQ_NO) { + if (retryOnConflict > 0) { + validationException = addValidationError("compare and write operations can not be retried", validationException); } - if (!versionType.validateVersionForWrites(version)) { - validationException = addValidationError("illegal version value [" + version + "] for version type [" + - versionType.name() + "]", validationException); + if (docAsUpsert) { + validationException = addValidationError("compare and write operations can not be used with upsert", validationException); + } + if (upsertRequest != null) { + validationException = + addValidationError("upsert requests don't support `if_seq_no` and `if_primary_term`", validationException); } - } - - validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException); - - if (ifSeqNo != UNASSIGNED_SEQ_NO && retryOnConflict > 0) { - validationException = addValidationError("compare and write operations can not be retried", validationException); - } - - if (ifSeqNo != UNASSIGNED_SEQ_NO && docAsUpsert) { - validationException = addValidationError("compare and write operations can not be used with upsert", validationException); } if (script == null && doc == null) { @@ -530,24 +515,22 @@ public int retryOnConflict() { @Override public UpdateRequest version(long version) { - this.version = version; - return this; + throw new UnsupportedOperationException("update requests do not support versioning"); } @Override public long version() { - return this.version; + return Versions.MATCH_ANY; } @Override public UpdateRequest versionType(VersionType versionType) { - this.versionType = versionType; - return this; + throw new UnsupportedOperationException("update requests do not support versioning"); } @Override public VersionType versionType() { - return this.versionType; + return VersionType.INTERNAL; } /** @@ -877,8 +860,14 @@ public void readFrom(StreamInput in) throws IOException { upsertRequest.readFrom(in); } docAsUpsert = in.readBoolean(); - version = in.readLong(); - versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().before(Version.V_7_0_0)) { + long version = in.readLong(); + VersionType versionType = VersionType.readFromStream(in); + if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { + throw new UnsupportedOperationException( + "versioned update requests have been removed in 7.0. Use if_seq_no and if_primary_term"); + } + } ifSeqNo = in.readZLong(); ifPrimaryTerm = in.readVLong(); detectNoop = in.readBoolean(); @@ -930,8 +919,10 @@ public void writeTo(StreamOutput out) throws IOException { upsertRequest.writeTo(out); } out.writeBoolean(docAsUpsert); - out.writeLong(version); - out.writeByte(versionType.getValue()); + if (out.getVersion().before(Version.V_7_0_0)) { + out.writeLong(Versions.MATCH_ANY); + out.writeByte(VersionType.INTERNAL.getValue()); + } out.writeZLong(ifSeqNo); out.writeVLong(ifPrimaryTerm); out.writeBoolean(detectNoop); diff --git a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java index 9fb1cb804946f..3c85fe40c5ba7 100644 --- a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; @@ -102,9 +103,9 @@ private GetResult get(String type, String id, String[] gFields, boolean realtime } } - public GetResult getForUpdate(String type, String id, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm) { - return get(type, id, new String[]{RoutingFieldMapper.NAME}, true, version, versionType, ifSeqNo, ifPrimaryTerm, - FetchSourceContext.FETCH_SOURCE, true); + public GetResult getForUpdate(String type, String id, long ifSeqNo, long ifPrimaryTerm) { + return get(type, id, new String[]{RoutingFieldMapper.NAME}, true, + Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE, true); } /** diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java index 463a18ea6b802..804fa61fc53b2 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java @@ -83,8 +83,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict())); - updateRequest.version(RestActions.parseVersion(request)); - updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo())); updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm())); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 75701e0685290..6d3e4c04c13d7 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -311,7 +311,7 @@ public void testSmileIsSupported() throws IOException { assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } - public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOException { + public void testToValidateUpsertRequestAndCASInBulkRequest() throws IOException { XContentType xContentType = XContentType.SMILE; BytesReference data; try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -321,7 +321,8 @@ public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOExcept builder.field("_index", "index"); builder.field("_type", "type"); builder.field("_id", "id"); - builder.field("version", 1L); + builder.field("if_seq_no", 1L); + builder.field("if_primary_term", 100L); builder.endObject(); builder.endObject(); } @@ -330,7 +331,8 @@ public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOExcept builder.startObject(); builder.startObject("doc").endObject(); Map values = new HashMap<>(); - values.put("version", 2L); + values.put("if_seq_no", 1L); + values.put("if_primary_term", 100L); values.put("_index", "index"); values.put("_type", "type"); builder.field("upsert", values); @@ -341,8 +343,7 @@ public void testToValidateUpsertRequestAndVersionInBulkRequest() throws IOExcept } BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(data, null, null, xContentType); - assertThat(bulkRequest.validate().validationErrors(), contains("can't provide both upsert request and a version", - "can't provide version in upsert request")); + assertThat(bulkRequest.validate().validationErrors(), contains("upsert requests don't support `if_seq_no` and `if_primary_term`")); //This test's JSON contains outdated references to types assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java index 277c130cebb1b..f74137d4a418d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -195,8 +196,8 @@ public void testBulkUpdateSimple() throws Exception { assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(4L)); } - public void testBulkVersioning() throws Exception { - createIndex("test"); + public void testBulkWithCAS() throws Exception { + createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build()); ensureGreen(); BulkResponse bulkResponse = client().prepareBulk() .add(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field", "1")) @@ -204,20 +205,22 @@ public void testBulkVersioning() throws Exception { .add(client().prepareIndex("test", "type", "1").setSource("field", "2")).get(); assertEquals(DocWriteResponse.Result.CREATED, bulkResponse.getItems()[0].getResponse().getResult()); - assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L)); + assertThat(bulkResponse.getItems()[0].getResponse().getSeqNo(), equalTo(0L)); assertEquals(DocWriteResponse.Result.CREATED, bulkResponse.getItems()[1].getResponse().getResult()); - assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(1L)); + assertThat(bulkResponse.getItems()[1].getResponse().getSeqNo(), equalTo(1L)); assertEquals(DocWriteResponse.Result.UPDATED, bulkResponse.getItems()[2].getResponse().getResult()); - assertThat(bulkResponse.getItems()[2].getResponse().getVersion(), equalTo(2L)); + assertThat(bulkResponse.getItems()[2].getResponse().getSeqNo(), equalTo(2L)); bulkResponse = client().prepareBulk() - .add(client().prepareUpdate("test", "type", "1").setVersion(4L).setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2")) + .add(client().prepareUpdate("test", "type", "1").setIfSeqNo(40L).setIfPrimaryTerm(20) + .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2")) .add(client().prepareUpdate("test", "type", "2").setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2")) - .add(client().prepareUpdate("test", "type", "1").setVersion(2L).setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3")).get(); + .add(client().prepareUpdate("test", "type", "1").setIfSeqNo(2L).setIfPrimaryTerm(1) + .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3")).get(); assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("version conflict")); - assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L)); - assertThat(bulkResponse.getItems()[2].getResponse().getVersion(), equalTo(3L)); + assertThat(bulkResponse.getItems()[1].getResponse().getSeqNo(), equalTo(3L)); + assertThat(bulkResponse.getItems()[2].getResponse().getSeqNo(), equalTo(4L)); bulkResponse = client().prepareBulk() .add(client().prepareIndex("test", "type", "e1") @@ -237,9 +240,9 @@ public void testBulkVersioning() throws Exception { bulkResponse = client().prepareBulk() .add(client().prepareUpdate("test", "type", "e1") - .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2").setVersion(10)) // INTERNAL + .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "2").setIfSeqNo(10L).setIfPrimaryTerm(1)) .add(client().prepareUpdate("test", "type", "e1") - .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3").setVersion(13).setVersionType(VersionType.INTERNAL)) + .setDoc(Requests.INDEX_CONTENT_TYPE, "field", "3").setIfSeqNo(20L).setIfPrimaryTerm(1)) .get(); assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("version conflict")); @@ -471,7 +474,7 @@ public void testFailingVersionedUpdatedOnBulk() throws Exception { return; } BulkRequestBuilder requestBuilder = client().prepareBulk(); - requestBuilder.add(client().prepareUpdate("test", "type", "1").setVersion(1) + requestBuilder.add(client().prepareUpdate("test", "type", "1").setIfSeqNo(0L).setIfPrimaryTerm(1) .setDoc(Requests.INDEX_CONTENT_TYPE, "field", threadID)); responses[threadID] = requestBuilder.get(); diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index 5a734352eafb2..642d14e2258cb 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -500,12 +500,14 @@ public void testToAndFromXContent() throws IOException { assertToXContentEquivalent(originalBytes, finalBytes, xContentType); } - public void testToValidateUpsertRequestAndVersion() { + public void testToValidateUpsertRequestAndCAS() { UpdateRequest updateRequest = new UpdateRequest("index", "type", "id"); - updateRequest.version(1L); + updateRequest.setIfSeqNo(1L); + updateRequest.setIfPrimaryTerm(1L); updateRequest.doc("{}", XContentType.JSON); updateRequest.upsert(new IndexRequest("index","type", "id")); - assertThat(updateRequest.validate().validationErrors(), contains("can't provide both upsert request and a version")); + assertThat(updateRequest.validate().validationErrors(), + contains("upsert requests don't support `if_seq_no` and `if_primary_term`")); } public void testToValidateUpsertRequestWithVersion() { diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java index 496221ca9fc4e..5492b8bf7c672 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; @@ -31,7 +30,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import static org.elasticsearch.common.lucene.uid.Versions.MATCH_ANY; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -51,8 +49,7 @@ public void testGetForUpdate() throws IOException { recoverShardFromStore(primary); Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); assertTrue(primary.getEngine().refreshNeeded()); - GetResult testGet = primary.getService().getForUpdate( - "test", "0", test.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + GetResult testGet = primary.getService().getForUpdate("test", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { @@ -61,8 +58,7 @@ public void testGetForUpdate() throws IOException { Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); assertTrue(primary.getEngine().refreshNeeded()); - GetResult testGet1 = primary.getService().getForUpdate( - "test", "1", test1.getVersion(), VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + GetResult testGet1 = primary.getService().getForUpdate("test", "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); @@ -77,20 +73,19 @@ public void testGetForUpdate() throws IOException { // now again from the reader Engine.IndexResult test2 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); assertTrue(primary.getEngine().refreshNeeded()); - testGet1 = primary.getService().getForUpdate("test", "1", test2.getVersion(), VersionType.INTERNAL, - UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + testGet1 = primary.getService().getForUpdate("test", "1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); final long primaryTerm = primary.getOperationPrimaryTerm(); - testGet1 = primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm); + testGet1 = primary.getService().getForUpdate("test", "1", test2.getSeqNo(), primaryTerm); assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); expectThrows(VersionConflictEngineException.class, () -> - primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo() + 1, primaryTerm)); + primary.getService().getForUpdate("test", "1", test2.getSeqNo() + 1, primaryTerm)); expectThrows(VersionConflictEngineException.class, () -> - primary.getService().getForUpdate("test", "1", MATCH_ANY, VersionType.INTERNAL, test2.getSeqNo(), primaryTerm + 1)); + primary.getService().getForUpdate("test", "1", test2.getSeqNo(), primaryTerm + 1)); closeShards(primary); } @@ -108,16 +103,13 @@ public void testTypelessGetForUpdate() throws IOException { Engine.IndexResult indexResult = indexDoc(shard, "some_type", "0", "{\"foo\" : \"bar\"}"); assertTrue(indexResult.isCreated()); - GetResult getResult = shard.getService().getForUpdate( - "some_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + GetResult getResult = shard.getService().getForUpdate("some_type", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertTrue(getResult.isExists()); - getResult = shard.getService().getForUpdate( - "some_other_type", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + getResult = shard.getService().getForUpdate("some_other_type", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertFalse(getResult.isExists()); - getResult = shard.getService().getForUpdate( - "_doc", "0", MATCH_ANY, VersionType.INTERNAL, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); + getResult = shard.getService().getForUpdate("_doc", "0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM); assertTrue(getResult.isExists()); closeShards(shard); diff --git a/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java b/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java index fb3eac28b6793..d749ce367cf0b 100644 --- a/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Setting; @@ -436,17 +437,20 @@ public void testOpenCloseUpdateSettings() throws Exception { public void testEngineGCDeletesSetting() throws InterruptedException { createIndex("test"); - client().prepareIndex("test", "type", "1").setSource("f", 1).get(); // set version to 1 - client().prepareDelete("test", "type", "1").get(); // sets version to 2 - // delete is still in cache this should work & set version to 3 - client().prepareIndex("test", "type", "1").setSource("f", 2).setVersion(2).get(); + client().prepareIndex("test", "type", "1").setSource("f", 1).get(); + DeleteResponse response = client().prepareDelete("test", "type", "1").get(); + long seqNo = response.getSeqNo(); + long primaryTerm = response.getPrimaryTerm(); + // delete is still in cache this should work + client().prepareIndex("test", "type", "1").setSource("f", 2).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm).get(); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.gc_deletes", 0)).get(); - client().prepareDelete("test", "type", "1").get(); // sets version to 4 + response = client().prepareDelete("test", "type", "1").get(); + seqNo = response.getSeqNo(); Thread.sleep(300); // wait for cache time to change TODO: this needs to be solved better. To be discussed. // delete is should not be in cache - assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3) - .setVersion(4), VersionConflictEngineException.class); + assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setIfSeqNo(seqNo).setIfPrimaryTerm(primaryTerm), + VersionConflictEngineException.class); } diff --git a/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java b/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java index f562ace967820..ffd876383ad9e 100644 --- a/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java +++ b/server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java @@ -213,11 +213,11 @@ public void testRequireUnitsOnUpdateSettings() throws Exception { } } - public void testInternalVersioningInitialDelete() throws Exception { + public void testCompareAndSetInitialDelete() throws Exception { createIndex("test"); ensureGreen(); - assertThrows(client().prepareDelete("test", "type", "1").setVersion(17).execute(), + assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(17).setIfPrimaryTerm(10).execute(), VersionConflictEngineException.class); IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1") @@ -225,63 +225,6 @@ public void testInternalVersioningInitialDelete() throws Exception { assertThat(indexResponse.getVersion(), equalTo(1L)); } - public void testInternalVersioning() throws Exception { - createIndex("test"); - ensureGreen(); - - IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); - assertThat(indexResponse.getVersion(), equalTo(1L)); - - indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); - assertThat(indexResponse.getVersion(), equalTo(2L)); - - assertThrows( - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), - VersionConflictEngineException.class); - - assertThrows( - client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), - VersionConflictEngineException.class); - - assertThrows( - client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(), - VersionConflictEngineException.class); - - - assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); - assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); - - client().admin().indices().prepareRefresh().execute().actionGet(); - for (int i = 0; i < 10; i++) { - assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(2L)); - } - - // search with versioning - for (int i = 0; i < 10; i++) { - SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).execute().actionGet(); - assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L)); - } - - // search without versioning - for (int i = 0; i < 10; i++) { - SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet(); - assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(Versions.NOT_FOUND)); - } - - DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(2).execute().actionGet(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - assertThat(deleteResponse.getVersion(), equalTo(3L)); - - assertThrows(client().prepareDelete("test", "type", "1").setVersion(2).execute(), VersionConflictEngineException.class); - - - // This is intricate - the object was deleted but a delete transaction was with the right version. We add another one - // and thus the transaction is increased. - deleteResponse = client().prepareDelete("test", "type", "1").setVersion(3).execute().actionGet(); - assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult()); - assertThat(deleteResponse.getVersion(), equalTo(4L)); - } - public void testCompareAndSet() { createIndex("test"); ensureGreen(); @@ -290,7 +233,7 @@ public void testCompareAndSet() { assertThat(indexResponse.getSeqNo(), equalTo(0L)); assertThat(indexResponse.getPrimaryTerm(), equalTo(1L)); - indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setIfSeqNo(0L).setIfPrimaryTerm(1).get(); assertThat(indexResponse.getSeqNo(), equalTo(1L)); assertThat(indexResponse.getPrimaryTerm(), equalTo(1L)); @@ -353,25 +296,21 @@ public void testSimpleVersioningWithFlush() throws Exception { createIndex("test"); ensureGreen(); - IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); - assertThat(indexResponse.getVersion(), equalTo(1L)); + IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").get(); + assertThat(indexResponse.getSeqNo(), equalTo(0L)); client().admin().indices().prepareFlush().execute().actionGet(); - indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); - assertThat(indexResponse.getVersion(), equalTo(2L)); + indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setIfSeqNo(0).setIfPrimaryTerm(1).get(); + assertThat(indexResponse.getSeqNo(), equalTo(1L)); client().admin().indices().prepareFlush().execute().actionGet(); - assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), - VersionConflictEngineException.class); - - assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(), + assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfSeqNo(0).setIfPrimaryTerm(1), VersionConflictEngineException.class); - assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(), + assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1"), VersionConflictEngineException.class); - assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); - assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class); + assertThrows(client().prepareDelete("test", "type", "1").setIfSeqNo(0).setIfPrimaryTerm(1), VersionConflictEngineException.class); for (int i = 0; i < 10; i++) { assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(2L)); @@ -380,10 +319,11 @@ public void testSimpleVersioningWithFlush() throws Exception { client().admin().indices().prepareRefresh().execute().actionGet(); for (int i = 0; i < 10; i++) { - SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true). + SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).seqNoAndPrimaryTerm(true). execute().actionGet(); assertHitCount(searchResponse, 1); assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L)); + assertThat(searchResponse.getHits().getAt(0).getSeqNo(), equalTo(1L)); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index bfe0cdef41596..ac4f435da130d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -495,7 +495,7 @@ public void onTimeout(TimeValue timeout) { private void clearJobFinishedTime(String jobId, ActionListener listener) { JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); - jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( + jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( job -> listener.onResponse(new AcknowledgedResponse(true)), e -> { logger.error("[" + jobId + "] Failed to clear finished_time", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 443e66b81e785..09a8f219afcf4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -87,7 +87,7 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat CheckedConsumer updateConsumer = ok -> { datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers, - jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(), + jobConfigProvider::validateDatafeedJob, ActionListener.wrap( updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)), listener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java index d8d5fe216b2a4..fe5ae7eb6e8bf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; @@ -54,7 +53,6 @@ public class TransportUpdateFilterAction extends HandledTransportAction) UpdateFilterAction.Request::new); this.client = client; this.jobManager = jobManager; - this.clusterService = clusterService; } @Override protected void doExecute(Task task, UpdateFilterAction.Request request, ActionListener listener) { - ActionListener filterListener = ActionListener.wrap(filterWithVersion -> { + ActionListener filterListener = ActionListener.wrap(filterWithVersion -> { updateFilter(filterWithVersion, request, listener); }, listener::onFailure); getFilterWithVersion(request.getFilterId(), filterListener); } - private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterAction.Request request, + private void updateFilter(FilterWithSeqNo filterWithVersion, UpdateFilterAction.Request request, ActionListener listener) { MlFilter filter = filterWithVersion.filter; @@ -100,19 +97,15 @@ private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterActio MlFilter updatedFilter = MlFilter.builder(filter.getId()).setDescription(description).setItems(items).build(); indexUpdatedFilter( - updatedFilter, filterWithVersion.version, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener); + updatedFilter, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener); } - private void indexUpdatedFilter(MlFilter filter, final long version, final long seqNo, final long primaryTerm, + private void indexUpdatedFilter(MlFilter filter, final long seqNo, final long primaryTerm, UpdateFilterAction.Request request, ActionListener listener) { IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId()); - if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) { - indexRequest.setIfSeqNo(seqNo); - indexRequest.setIfPrimaryTerm(primaryTerm); - } else { - indexRequest.version(version); - } + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { @@ -145,7 +138,7 @@ public void onFailure(Exception e) { }); } - private void getFilterWithVersion(String filterId, ActionListener listener) { + private void getFilterWithVersion(String filterId, ActionListener listener) { GetRequest getRequest = new GetRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, MlFilter.documentId(filterId)); executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener() { @Override @@ -157,7 +150,7 @@ public void onResponse(GetResponse getDocResponse) { XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build(); - listener.onResponse(new FilterWithVersion(filter, getDocResponse)); + listener.onResponse(new FilterWithSeqNo(filter, getDocResponse)); } } else { this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId))); @@ -174,16 +167,14 @@ public void onFailure(Exception e) { }); } - private static class FilterWithVersion { + private static class FilterWithSeqNo { private final MlFilter filter; - private final long version; private final long seqNo; private final long primaryTerm; - private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) { + private FilterWithSeqNo(MlFilter filter, GetResponse getDocResponse) { this.filter = filter; - this.version = getDocResponse.getVersion(); this.seqNo = getDocResponse.getSeqNo(); this.primaryTerm = getDocResponse.getPrimaryTerm(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 7d11173e258b1..7237ab0eb9818 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -8,7 +8,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -264,13 +263,11 @@ public void onFailure(Exception e) { * @param headers Datafeed headers applied with the update * @param validator BiConsumer that accepts the updated config and can perform * extra validations. {@code validator} must call the passed listener - * @param minClusterNodeVersion minimum version of nodes in cluster * @param updatedConfigListener Updated datafeed config listener */ public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map headers, - BiConsumer> validator, - Version minClusterNodeVersion, - ActionListener updatedConfigListener) { + BiConsumer> validator, + ActionListener updatedConfigListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)); @@ -304,7 +301,7 @@ public void onResponse(GetResponse getResponse) { ActionListener validatedListener = ActionListener.wrap( ok -> { - indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap( + indexUpdatedConfig(updatedConfig, seqNo, primaryTerm, ActionListener.wrap( indexResponse -> { assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; updatedConfigListener.onResponse(updatedConfig); @@ -324,8 +321,8 @@ public void onFailure(Exception e) { }); } - private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm, - Version minClusterNodeVersion, ActionListener listener) { + private void indexUpdatedConfig(DatafeedConfig updatedConfig, long seqNo, long primaryTerm, + ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), @@ -333,12 +330,8 @@ private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long .setSource(updatedSource) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) { - indexRequest.setIfSeqNo(seqNo); - indexRequest.setIfPrimaryTerm(primaryTerm); - } else { - indexRequest.setVersion(version); - } + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 6696bfe1ad96a..ccd0d594eb382 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -333,7 +333,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener { jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit, - this::validate, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( + this::validate, ActionListener.wrap( updatedJob -> postJobUpdate(request, updatedJob, actionListener), actionListener::onFailure )); @@ -603,7 +603,7 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList .setModelSnapshotId(modelSnapshot.getSnapshotId()) .build(); - jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, clusterService.state().nodes().getMinNodeVersion(), + jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap(job -> { auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index e5ee8855969a3..9423768b8ed4f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -10,7 +10,6 @@ import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -227,11 +226,9 @@ public void onFailure(Exception e) { * @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null} * if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits} * are not changed. - * @param minClusterNodeVersion the minimum version of nodes in the cluster * @param updatedJobListener Updated job listener */ public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, - Version minClusterNodeVersion, ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); @@ -266,7 +263,7 @@ public void onResponse(GetResponse getResponse) { return; } - indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener); + indexUpdatedJob(updatedJob, seqNo, primaryTerm, updatedJobListener); } @Override @@ -287,18 +284,17 @@ public interface UpdateValidator { } /** - * Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but + * Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but * with an extra validation step which is called before the updated is applied. * * @param jobId The Id of the job to update * @param update The job update * @param maxModelMemoryLimit The maximum model memory allowed * @param validator The job update validator - * @param minClusterNodeVersion the minimum version of a node ifn the cluster * @param updatedJobListener Updated job listener */ public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, - UpdateValidator validator, Version minClusterNodeVersion, ActionListener updatedJobListener) { + UpdateValidator validator, ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); @@ -334,7 +330,7 @@ public void onResponse(GetResponse getResponse) { return; } - indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener); + indexUpdatedJob(updatedJob, seqNo, primaryTerm, updatedJobListener); }, updatedJobListener::onFailure )); @@ -347,7 +343,7 @@ public void onFailure(Exception e) { }); } - private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion, + private void indexUpdatedJob(Job updatedJob, long seqNo, long primaryTerm, ActionListener updatedJobListener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -355,12 +351,8 @@ private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long prim ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId())) .setSource(updatedSource) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) { - indexRequest.setIfSeqNo(seqNo); - indexRequest.setIfPrimaryTerm(primaryTerm); - } else { - indexRequest.setVersion(version); - } + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap( indexResponse -> { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 00d62b7e0a933..9496f4ca0d8f2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -7,7 +7,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; @@ -87,7 +86,7 @@ public void testCrud() throws InterruptedException { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders, - (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), configHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(configHolder.get().getIndices(), equalTo(updateIndices)); @@ -168,7 +167,7 @@ public void testUpdateWhenApplyingTheUpdateThrows() throws Exception { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), - (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); assertNotNull(exceptionHolder.get()); @@ -194,7 +193,7 @@ public void testUpdateWithValidatorFunctionThatErrors() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), - validateErrorFunction, Version.CURRENT, actionListener), + validateErrorFunction, actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 3e20bdd73de07..f6ff80edeec02 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -8,7 +8,6 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; @@ -149,7 +148,7 @@ public void testCrud() throws InterruptedException { AtomicReference updateJobResponseHolder = new AtomicReference<>(); blockingCall(actionListener -> jobConfigProvider.updateJob - (jobId, jobUpdate, new ByteSizeValue(32), Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); + (jobId, jobUpdate, new ByteSizeValue(32), actionListener), updateJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription()); @@ -206,7 +205,7 @@ public void testUpdateWithAValidationError() throws Exception { .build(); AtomicReference updateJobResponseHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), Version.CURRENT, + blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), actionListener), updateJobResponseHolder, exceptionHolder); assertNull(updateJobResponseHolder.get()); assertNotNull(exceptionHolder.get()); @@ -231,7 +230,7 @@ public void testUpdateWithValidator() throws Exception { AtomicReference updateJobResponseHolder = new AtomicReference<>(); // update with the no-op validator blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation( - jobId, jobUpdate, new ByteSizeValue(32), validator, Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); + jobId, jobUpdate, new ByteSizeValue(32), validator, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertNotNull(updateJobResponseHolder.get()); @@ -244,7 +243,7 @@ public void testUpdateWithValidator() throws Exception { updateJobResponseHolder.set(null); // Update with a validator that errors blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32), - validatorWithAnError, Version.CURRENT, actionListener), + validatorWithAnError, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(updateJobResponseHolder.get()); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java index 522b1a6d4b97a..8b9fda5b9c3f3 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java @@ -750,12 +750,8 @@ private void innerRefresh(String tokenDocId, Authentication userAuth, ActionList client.prepareUpdate(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, tokenDocId) .setDoc("refresh_token", Collections.singletonMap("refreshed", true)) .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) { - updateRequest.setIfSeqNo(response.getSeqNo()); - updateRequest.setIfPrimaryTerm(response.getPrimaryTerm()); - } else { - updateRequest.setVersion(response.getVersion()); - } + updateRequest.setIfSeqNo(response.getSeqNo()); + updateRequest.setIfPrimaryTerm(response.getPrimaryTerm()); executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(), ActionListener.wrap( updateResponse -> createUserToken(authentication, userAuth, listener, metadata, true), diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml index 74b3a20f5cff8..ebef6c87d7022 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml @@ -115,112 +115,6 @@ setup: } ---- -"Test putting a watch with a redacted password with old version returns an error": - - # version 1 - - do: - xpack.watcher.put_watch: - id: "watch_old_version" - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "pass" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - - # version 2 - - do: - xpack.watcher.put_watch: - id: "watch_old_version" - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "pass" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - - - # using optimistic concurrency control, this one will loose - # as if two users in the watch UI tried to update the same watch - - do: - catch: conflict - xpack.watcher.put_watch: - id: "watch_old_version" - version: 1 - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "::es_redacted::" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - --- "Test putting a watch with a redacted password with old seq no returns an error": - skip: @@ -390,98 +284,3 @@ setup: - match: { hits.hits.0._source.input.http.request.auth.basic.username: "new_user" } - match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" } ---- -"Test putting a watch with a redacted password with current version works": - - - do: - xpack.watcher.put_watch: - id: "my_watch_with_version" - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "pass" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - - - match: { _id: "my_watch_with_version" } - - match: { _version: 1 } - - # this resembles the exact update from the UI and thus should work, no password change, any change in the watch - # but correct version provided - - do: - xpack.watcher.put_watch: - id: "my_watch_with_version" - version: 1 - body: > - { - "trigger": { - "schedule" : { "cron" : "0 0 0 1 * ? 2099" } - }, - "input": { - "http" : { - "request" : { - "host" : "host.domain", - "port" : 9200, - "path" : "/myservice", - "auth" : { - "basic" : { - "username" : "user", - "password" : "::es_redacted::" - } - } - } - } - }, - "actions": { - "logging": { - "logging": { - "text": "Log me Amadeus!" - } - } - } - } - - - match: { _id: "my_watch_with_version" } - - match: { _version: 2 } - - - do: - search: - rest_total_hits_as_int: true - index: .watches - body: > - { - "query": { - "term": { - "_id": { - "value": "my_watch_with_version" - } - } - } - } - - - match: { hits.total: 1 } - - match: { hits.hits.0._id: "my_watch_with_version" } - - match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" } -