Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for internal versioning for concurrency control #38254

Merged
merged 51 commits into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
44b87e5
Remove internal versioning as CAS
bleskes Jan 28, 2019
7ef53e9
fix upsert + seq numbers
bleskes Feb 1, 2019
0db0dd9
Merge branch 'master' into cas_remove_internal_versioning
bleskes Feb 2, 2019
16cc5a6
remove version params from the update rest api
bleskes Feb 2, 2019
9c18658
lint
bleskes Feb 2, 2019
11834f8
move ml to seq no cas
bleskes Feb 3, 2019
1622b47
Merge branch 'cas_job_config' into cas_remove_internal_versioning
bleskes Feb 3, 2019
a2bf755
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 3, 2019
d7564a0
fix testEngineGCDeletesSetting
bleskes Feb 3, 2019
a9f47c9
fix SimpleVersioningIT
bleskes Feb 3, 2019
fc97c1a
fix BulkWithUpdatesIT
bleskes Feb 3, 2019
3617f06
move DatafeedConfigProvider to seq No
bleskes Feb 3, 2019
a833522
Merge branch 'cas_job_config' into cas_remove_internal_versioning
bleskes Feb 3, 2019
de81b45
remove watch test with versions
bleskes Feb 3, 2019
575267d
fix testFailingVersionedUpdatedOnBulk
bleskes Feb 4, 2019
0852123
remove "Test putting a watch with a redacted password with current ve…
bleskes Feb 4, 2019
24f92bc
Move TokenService to seqno powered cas
bleskes Feb 4, 2019
820e055
Merge branch 'cas_token_service' into cas_remove_internal_versioning
bleskes Feb 4, 2019
4c00bd6
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 4, 2019
96c9bc1
remove duplicate method
bleskes Feb 4, 2019
03163ce
add docs
bleskes Feb 4, 2019
914da06
force one shard in testBulkWithCAS
bleskes Feb 4, 2019
1b00d76
fix doc reference
bleskes Feb 4, 2019
7974e3c
remove version yml tests for updates
bleskes Feb 4, 2019
d843842
remove version usage in TokenService
bleskes Feb 4, 2019
2690b88
remove version usage in TransportUpdateFilterAction
bleskes Feb 4, 2019
aa3d01b
remove cluster checks and potential usages of versions in ml
bleskes Feb 4, 2019
112af69
merge master
bleskes Feb 4, 2019
2c0ca27
remove duplicates post merge
bleskes Feb 4, 2019
515f95f
add create + update if_seq_no yml tests
bleskes Feb 4, 2019
7124c9f
remove rest internal version tests
bleskes Feb 4, 2019
f2bc6f7
fix documentation + rollback resiliency
bleskes Feb 4, 2019
0dddeb1
fix WatcherRequestConvertersTests.testPutWatch
bleskes Feb 4, 2019
e31f826
E -> e
bleskes Feb 4, 2019
892dead
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 4, 2019
c61a394
fix testUpdateWhileReindexing
bleskes Feb 5, 2019
e8aa73a
fix RequestConvertersTests.testBulk
bleskes Feb 5, 2019
313ec1a
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 5, 2019
a46a763
remove version updates from RequestConvertersTests.testUpdate
bleskes Feb 5, 2019
7e35372
Wire if_seq_no and if_primary_term in rest client bulk
bleskes Feb 5, 2019
2df14d4
wire index and delete
bleskes Feb 5, 2019
949ddb9
feedback
bleskes Feb 5, 2019
621457f
lint
bleskes Feb 5, 2019
74d6a9f
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 5, 2019
99f3fd2
merge cas_bulk_rest
bleskes Feb 5, 2019
6d81066
fix CRUDDocumentationIT
bleskes Feb 5, 2019
57bafed
update requests don't have parameters in url
bleskes Feb 5, 2019
f2a74d3
line lengths, again
bleskes Feb 5, 2019
0239d66
merge cas_bulk_rest
bleskes Feb 5, 2019
208211a
Merge remote-tracking branch 'upstream/master' into cas_remove_intern…
bleskes Feb 5, 2019
2c81f7e
merge master
bleskes Feb 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ static Request putWatch(PutWatchRequest putWatchRequest) {

Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request)
.withVersion(putWatchRequest.getVersion())
.withIfSeqNo(putWatchRequest.ifSeqNo())
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
if (putWatchRequest.isActive() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;

Expand All @@ -43,11 +42,9 @@ public final class PutWatchRequest implements Validatable {
private final BytesReference source;
private final XContentType xContentType;
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;


public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
Objects.requireNonNull(id, "watch id is missing");
if (isValidId(id) == false) {
Expand Down Expand Up @@ -95,14 +92,6 @@ public XContentType xContentType() {
return xContentType;
}

public long getVersion() {
return version;
}

public void setVersion(long version) {
this.version = version;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,6 @@ public void testUpdate() throws IOException {
}
}
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams);
setRandomVersion(updateRequest, expectedParams);
setRandomVersionType(updateRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
if (randomBoolean()) {
int retryOnConflict = randomIntBetween(0, 5);
Expand Down Expand Up @@ -911,14 +909,7 @@ public void testBulk() throws IOException {
if (randomBoolean()) {
docWriteRequest.routing(randomAlphaOfLength(10));
}
if (randomBoolean()) {
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
}
} else if (randomBoolean()) {
if (opType != DocWriteRequest.OpType.UPDATE && randomBoolean()) {
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.ExecuteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
Expand Down Expand Up @@ -88,9 +88,12 @@ public void testPutWatch() throws Exception {
}

if (randomBoolean()) {
long version = randomLongBetween(10, 100);
putWatchRequest.setVersion(version);
expectedParams.put("version", String.valueOf(version));
long seqNo = randomNonNegativeLong();
long ifPrimaryTerm = randomLongBetween(1, 200);
putWatchRequest.setIfSeqNo(seqNo);
putWatchRequest.setIfPrimaryTerm(ifPrimaryTerm);
expectedParams.put("if_seq_no", String.valueOf(seqNo));
expectedParams.put("if_primary_term", String.valueOf(ifPrimaryTerm));
bleskes marked this conversation as resolved.
Show resolved Hide resolved
}

Request request = WatcherRequestConverters.putWatch(putWatchRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public void testIndex() throws Exception {
// tag::index-response
String index = indexResponse.getIndex();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
// <1>
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
Expand Down Expand Up @@ -220,7 +219,8 @@ public void testIndex() throws Exception {
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.version(1);
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
Expand Down Expand Up @@ -432,7 +432,8 @@ public void testUpdate() throws Exception {
// tag::update-conflict
UpdateRequest request = new UpdateRequest("posts", "1")
.doc("field", "value")
.version(1);
.setIfSeqNo(101L)
.setIfPrimaryTerm(200L);
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
Expand Down Expand Up @@ -499,9 +500,10 @@ public void testUpdate() throws Exception {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // <1>
request.setRefreshPolicy("wait_for"); // <2>
// end::update-request-refresh
// tag::update-request-version
request.version(2); // <1>
// end::update-request-version
// tag::update-request-cas
request.setIfSeqNo(2L); // <1>
request.setIfPrimaryTerm(1L); // <2>
// end::update-request-request-cas
// tag::update-request-detect-noop
request.detectNoop(false); // <1>
// end::update-request-detect-noop
Expand Down Expand Up @@ -630,7 +632,7 @@ public void testDelete() throws Exception {
// tag::delete-conflict
try {
DeleteResponse deleteResponse = client.delete(
new DeleteRequest("posts", "1").version(2),
new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
Expand Down
5 changes: 3 additions & 2 deletions docs/java-rest/high-level/document/update.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ include-tagged::{doc-tests-file}[{api}-request-source-exclude]

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-version]
include-tagged::{doc-tests-file}[{api}-request-cas]
--------------------------------------------------
<1> Version
<1> ifSeqNo
<2> ifPrimaryTerm

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions docs/reference/migration/migrate_7_0/api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@
[[breaking_70_api_changes]]
=== API changes

[float]
==== Internal Versioning is no longer supported for optimistic concurrency control

Elasticsearch maintains a numeric version field for each document it stores. That field
is incremented by one with every change to the document. Until 7.0.0 the API allowed using
that field for optimistic concurrency control, i.e., making a write operation conditional
on the current document version. Sadly, that approach is flawed because the value of the
version doesn't always uniquely represent a change to the document. If a primary fails
while handling a write operation, it may expose a version that will then be reused by the
new primary.

Due to that issue, internal versioning can no longer be used and is replaced by a new
method based on sequence numbers. See <<optimistic-concurrency-control>> for more details.

Note that the `external` versioning type is still fully supported.

[float]
==== Camel case and underscore parameters deprecated in 6.x have been removed
A number of duplicate parameters deprecated in 6.x have been removed from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
package org.elasticsearch.index.reindex;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -33,18 +31,10 @@
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {

private final boolean useSeqNoForCAS;

public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task,
// not all nodes support sequence number powered optimistic concurrency control, we fall back to version
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0) == false,
// all nodes support sequence number powered optimistic concurrency control and we can use it
clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0),
logger, client, threadPool, action, request, listener);
useSeqNoForCAS = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
ScriptService scriptService, ActionListener<BulkByScrollResponse> listener) {
super(task, false, true, logger, client, threadPool, action, request, listener);
}

@Override
Expand All @@ -60,12 +50,8 @@ protected RequestWrapper<DeleteRequest> buildRequest(ScrollableHitSource.Hit doc
delete.index(doc.getIndex());
delete.type(doc.getType());
delete.id(doc.getId());
if (useSeqNoForCAS) {
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
delete.version(doc.getVersion());
}
delete.setIfSeqNo(doc.getSeqNo());
delete.setIfPrimaryTerm(doc.getPrimaryTerm());
return wrap(delete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
ClusterState state = clusterService.state();
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService, state,
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService,
listener).start();
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
Expand Down Expand Up @@ -113,13 +112,8 @@ protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc)
index.type(doc.getType());
index.id(doc.getId());
index.source(doc.getSource(), doc.getXContentType());
if (useSeqNoForCAS) {
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
} else {
index.versionType(VersionType.INTERNAL);
index.version(doc.getVersion());
}
index.setIfSeqNo(doc.getSeqNo());
index.setIfPrimaryTerm(doc.getPrimaryTerm());
index.setPipeline(mainRequest.getPipeline());
return wrap(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ public void testUpdateWhileReindexing() throws Exception {
IndexRequestBuilder index = client().prepareIndex("test", "test", "test").setSource("test", value.get())
.setRefreshPolicy(IMMEDIATE);
/*
* Update by query increments the version number so concurrent
* Update by query changes the document so concurrent
* indexes might get version conflict exceptions so we just
* blindly retry.
*/
int attempts = 0;
while (true) {
attempts++;
try {
index.setVersion(get.getVersion()).get();
index.setIfSeqNo(get.getSeqNo()).setIfPrimaryTerm(get.getPrimaryTerm()).get();
break;
} catch (VersionConflictEngineException e) {
if (attempts >= MAX_ATTEMPTS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@
"if_primary_term" : {
"type" : "number",
"description" : "only perform the update operation if the last operation that has changed the document has the specified primary term"
},
"version": {
"type": "number",
"description": "Explicit version number for concurrency control"
},
"version_type": {
"type": "enum",
"options": ["internal", "force"],
"description": "Specific version type"
}
}
},
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
id: 1
body: { foo: bar }

- match: { _version: 1}
- match: { _seq_no: 0 }

- do:
catch: conflict
delete:
index: test_1
id: 1
version: 2
if_seq_no: 2
if_primary_term: 1

- do:
delete:
index: test_1
id: 1
version: 1
if_seq_no: 0
if_primary_term: 1

- match: { _version: 2 }
- match: { _seq_no: 1 }
Loading