Skip to content

Commit

Permalink
Remove support for internal versioning for concurrency control (#38254)
Browse files Browse the repository at this point in the history
Elasticsearch has long [supported](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning) compare and set (a.k.a optimistic concurrency control) operations using internal document versioning. Sadly that approach is flawed and can sometime do the wrong thing. Here's the relevant excerpt from the resiliency status page:

> When a primary has been partitioned away from the cluster there is a short period of time until it detects this. During that time it will continue indexing writes locally, thereby updating document versions. When it tries to replicate the operation, however, it will discover that it is partitioned away. It won’t acknowledge the write and will wait until the partition is resolved to negotiate with the master on how to proceed. The master will decide to either fail any replicas which failed to index the operations on the primary or tell the primary that it has to step down because a new primary has been chosen in the meantime. Since the old primary has already written documents, clients may already have read from the old primary before it shuts itself down. The version numbers of these reads may not be unique if the new primary has already accepted writes for the same document 

We recently [introduced](https://www.elastic.co/guide/en/elasticsearch/reference/6.x/optimistic-concurrency-control.html) a new sequence number based approach that doesn't suffer from this dirty reads problem. 

This commit removes support for internal versioning as a concurrency control mechanism in favor of the sequence number approach.

Relates to #1078
  • Loading branch information
bleskes authored Feb 5, 2019
1 parent b03d138 commit 033ba72
Show file tree
Hide file tree
Showing 46 changed files with 225 additions and 805 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ static Request putWatch(PutWatchRequest putWatchRequest) {

Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request)
.withVersion(putWatchRequest.getVersion())
.withIfSeqNo(putWatchRequest.ifSeqNo())
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
if (putWatchRequest.isActive() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;

Expand All @@ -43,11 +42,9 @@ public final class PutWatchRequest implements Validatable {
private final BytesReference source;
private final XContentType xContentType;
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;


public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
Objects.requireNonNull(id, "watch id is missing");
if (isValidId(id) == false) {
Expand Down Expand Up @@ -95,14 +92,6 @@ public XContentType xContentType() {
return xContentType;
}

public long getVersion() {
return version;
}

public void setVersion(long version) {
this.version = version;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,6 @@ public void testUpdate() throws IOException {
}
}
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams);
setRandomVersion(updateRequest, expectedParams);
setRandomVersionType(updateRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
if (randomBoolean()) {
int retryOnConflict = randomIntBetween(0, 5);
Expand Down Expand Up @@ -911,14 +909,7 @@ public void testBulk() throws IOException {
if (randomBoolean()) {
docWriteRequest.routing(randomAlphaOfLength(10));
}
if (randomBoolean()) {
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
}
} else if (randomBoolean()) {
if (opType != DocWriteRequest.OpType.UPDATE && randomBoolean()) {
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.ExecuteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
Expand Down Expand Up @@ -88,9 +88,12 @@ public void testPutWatch() throws Exception {
}

if (randomBoolean()) {
long version = randomLongBetween(10, 100);
putWatchRequest.setVersion(version);
expectedParams.put("version", String.valueOf(version));
long seqNo = randomNonNegativeLong();
long ifPrimaryTerm = randomLongBetween(1, 200);
putWatchRequest.setIfSeqNo(seqNo);
putWatchRequest.setIfPrimaryTerm(ifPrimaryTerm);
expectedParams.put("if_seq_no", String.valueOf(seqNo));
expectedParams.put("if_primary_term", String.valueOf(ifPrimaryTerm));
}

Request request = WatcherRequestConverters.putWatch(putWatchRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public void testIndex() throws Exception {
// tag::index-response
String index = indexResponse.getIndex();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// <1>
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
Expand Down Expand Up @@ -220,7 +219,8 @@ public void testIndex() throws Exception {
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.version(1);
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
Expand Down Expand Up @@ -432,7 +432,8 @@ public void testUpdate() throws Exception {
// tag::update-conflict
UpdateRequest request = new UpdateRequest("posts", "1")
.doc("field", "value")
.version(1);
.setIfSeqNo(101L)
.setIfPrimaryTerm(200L);
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
Expand Down Expand Up @@ -499,9 +500,10 @@ public void testUpdate() throws Exception {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // <1>
request.setRefreshPolicy("wait_for"); // <2>
// end::update-request-refresh
// tag::update-request-version
request.version(2); // <1>
// end::update-request-version
// tag::update-request-cas
request.setIfSeqNo(2L); // <1>
request.setIfPrimaryTerm(1L); // <2>
// end::update-request-request-cas
// tag::update-request-detect-noop
request.detectNoop(false); // <1>
// end::update-request-detect-noop
Expand Down Expand Up @@ -630,7 +632,7 @@ public void testDelete() throws Exception {
// tag::delete-conflict
try {
DeleteResponse deleteResponse = client.delete(
new DeleteRequest("posts", "1").version(2),
new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
Expand Down
5 changes: 3 additions & 2 deletions docs/java-rest/high-level/document/update.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ include-tagged::{doc-tests-file}[{api}-request-source-exclude]

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-version]
include-tagged::{doc-tests-file}[{api}-request-cas]
--------------------------------------------------
<1> Version
<1> ifSeqNo
<2> ifPrimaryTerm

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions docs/reference/migration/migrate_7_0/api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@
[[breaking_70_api_changes]]
=== API changes

[float]
==== Internal Versioning is no longer supported for optimistic concurrency control

Elasticsearch maintains a numeric version field for each document it stores. That field
is incremented by one with every change to the document. Until 7.0.0 the API allowed using
that field for optimistic concurrency control, i.e., making a write operation conditional
on the current document version. Sadly, that approach is flawed because the value of the
version doesn't always uniquely represent a change to the document. If a primary fails
while handling a write operation, it may expose a version that will then be reused by the
new primary.

Due to that issue, internal versioning can no longer be used and is replaced by a new
method based on sequence numbers. See <<optimistic-concurrency-control>> for more details.

Note that the `external` versioning type is still fully supported.

[float]
==== Camel case and underscore parameters deprecated in 6.x have been removed
A number of duplicate parameters deprecated in 6.x have been removed from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
package org.elasticsearch.index.reindex;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -33,18 +31,10 @@
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {

private final boolean useSeqNoForCAS;

public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, action, request, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
ScriptService scriptService, ActionListener<BulkByScrollResponse> listener) {
super(task, false, true, logger, client, threadPool, action, request, listener);
}

@Override
Expand All @@ -60,12 +50,8 @@ protected RequestWrapper<DeleteRequest> buildRequest(ScrollableHitSource.Hit doc
delete.index(doc.getIndex());
delete.type(doc.getType());
delete.id(doc.getId());
if (useSeqNoForCAS) {
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
delete.version(doc.getVersion());
}
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
return wrap(delete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
ClusterState state = clusterService.state();
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService, state,
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService,
listener).start();
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
Expand Down Expand Up @@ -113,13 +112,8 @@ protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc)
index.type(doc.getType());
index.id(doc.getId());
index.source(doc.getSource(), doc.getXContentType());
if (useSeqNoForCAS) {
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
index.versionType(VersionType.INTERNAL);
index.version(doc.getVersion());
}
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
index.setPipeline(mainRequest.getPipeline());
return wrap(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ public void testUpdateWhileReindexing() throws Exception {
IndexRequestBuilder index = client().prepareIndex("test", "test", "test").setSource("test", value.get())
.setRefreshPolicy(IMMEDIATE);
/*
* Update by query increments the version number so concurrent
* Update by query changes the document so concurrent
* indexes might get version conflict exceptions so we just
* blindly retry.
*/
int attempts = 0;
while (true) {
attempts++;
try {
index.setVersion(get.getVersion()).get();
index.setIfSeqNo(get.getSeqNo()).setIfPrimaryTerm(get.getPrimaryTerm()).get();
break;
} catch (VersionConflictEngineException e) {
if (attempts >= MAX_ATTEMPTS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@
"if_primary_term" : {
"type" : "number",
"description" : "only perform the update operation if the last operation that has changed the document has the specified primary term"
},
"version": {
"type": "number",
"description": "Explicit version number for concurrency control"
},
"version_type": {
"type": "enum",
"options": ["internal", "force"],
"description": "Specific version type"
}
}
},
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
id: 1
body: { foo: bar }

- match: { _version: 1}
- match: { _seq_no: 0 }

- do:
catch: conflict
delete:
index: test_1
id: 1
version: 2
if_seq_no: 2
if_primary_term: 1

- do:
delete:
index: test_1
id: 1
version: 1
if_seq_no: 0
if_primary_term: 1

- match: { _version: 2 }
- match: { _seq_no: 1 }
Loading

0 comments on commit 033ba72

Please sign in to comment.