From 7872365e95f0fcb25d0d9ababe3472accd16598e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 18 Dec 2018 10:56:02 +0100 Subject: [PATCH] Expose Sequence Number based Optimistic Concurrency Control in the rest layer (#36721) Relates #36148 Relates #10708 --- .../resources/rest-api-spec/api/delete.json | 8 +++ .../resources/rest-api-spec/api/index.json | 8 +++ .../rest-api-spec/test/index/30_cas.yml | 50 +++++++++++++++++++ .../elasticsearch/action/get/GetResponse.java | 2 +- .../elasticsearch/index/get/GetResult.java | 2 +- .../action/document/RestDeleteAction.java | 4 ++ .../rest/action/document/RestIndexAction.java | 4 ++ 7 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/delete.json b/rest-api-spec/src/main/resources/rest-api-spec/api/delete.json index bbe30d3a8484c..2e75465bf601e 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/delete.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/delete.json @@ -43,6 +43,14 @@ "type" : "time", "description" : "Explicit operation timeout" }, + "if_seq_no_match" : { + "type" : "number", + "description" : "only perform the delete operation if the last operation that has changed the document has the specified sequence number" + }, + "if_primary_term_match" : { + "type" : "number", + "description" : "only perform the delete operation if the last operation that has changed the document has the specified primary term" + }, "version" : { "type" : "number", "description" : "Explicit version number for concurrency control" diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json index 574206a0dc3ed..155707bbdcf14 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json @@ -57,6 +57,14 @@ "options" : ["internal", "external", "external_gte", "force"], "description" : "Specific version type" }, + "if_seq_no_match" : { + "type" : "number", + "description" : "only perform the index operation if the last operation that has changed the document has the specified sequence number" + }, + "if_primary_term_match" : { + "type" : "number", + "description" : "only perform the index operation if the last operation that has changed the document has the specified primary term" + }, "pipeline" : { "type" : "string", "description" : "The pipeline id to preprocess incoming documents with" diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml new file mode 100644 index 0000000000000..b8c60e5a7cf8b --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml @@ -0,0 +1,50 @@ +--- +"Compare And Swap Sequence Numbers": + + - skip: + version: " - 6.99.99" + reason: cas ops are introduced in 7.0.0 + + - do: + index: + index: test_1 + id: 1 + body: { foo: bar } + - match: { _version: 1} + - set: { _seq_no: seqno } + - set: { _primary_term: primary_term } + + - do: + get: + index: test_1 + id: 1 + - match: { _seq_no: $seqno } + - match: { _primary_term: $primary_term } + + - do: + catch: conflict + index: + index: test_1 + id: 1 + if_seq_no_match: 10000 + if_primary_term_match: $primary_term + body: { foo: bar2 } + + - do: + catch: conflict + index: + index: test_1 + id: 1 + if_seq_no_match: $seqno + if_primary_term_match: 1000 + body: { foo: bar2 } + + - do: + index: + index: test_1 + id: 1 + if_seq_no_match: $seqno + if_primary_term_match: $primary_term + body: { foo: bar2 } + + - match: { _version: 2 } diff --git a/server/src/main/java/org/elasticsearch/action/get/GetResponse.java b/server/src/main/java/org/elasticsearch/action/get/GetResponse.java index fbcb47b5fad36..b9383785678b7 100644 --- a/server/src/main/java/org/elasticsearch/action/get/GetResponse.java +++ b/server/src/main/java/org/elasticsearch/action/get/GetResponse.java @@ -91,7 +91,7 @@ public long getVersion() { } /** - * The sequence number assigned to the last operation to have changed this document, if found. + * The sequence number assigned to the last operation that has changed this document, if found. */ public long getSeqNo() { return getResult.getSeqNo(); diff --git a/server/src/main/java/org/elasticsearch/index/get/GetResult.java b/server/src/main/java/org/elasticsearch/index/get/GetResult.java index b98d766dd4e3f..2b3b1b8f4f231 100644 --- a/server/src/main/java/org/elasticsearch/index/get/GetResult.java +++ b/server/src/main/java/org/elasticsearch/index/get/GetResult.java @@ -131,7 +131,7 @@ public long getVersion() { } /** - * The sequence number assigned to the last operation to have changed this document, if found. + * The sequence number assigned to the last operation that has changed this document, if found. */ public long getSeqNo() { return seqNo; diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestDeleteAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestDeleteAction.java index 87cc7a0fb41a4..1891b29d175c9 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestDeleteAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestDeleteAction.java @@ -66,6 +66,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC deleteRequest.setRefreshPolicy(request.param("refresh")); deleteRequest.version(RestActions.parseVersion(request)); deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType())); + deleteRequest.setIfMatch( + request.paramAsLong("if_seq_no_match", deleteRequest.ifSeqNoMatch()), + request.paramAsLong("if_primary_term_match", deleteRequest.ifPrimaryTermMatch()) + ); String waitForActiveShards = request.param("wait_for_active_shards"); if (waitForActiveShards != null) { diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index 619fd811e6a7c..2a072560272bf 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -93,6 +93,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC indexRequest.setRefreshPolicy(request.param("refresh")); indexRequest.version(RestActions.parseVersion(request)); indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType())); + indexRequest.ifMatch( + request.paramAsLong("if_seq_no_match", indexRequest.ifSeqNoMatch()), + request.paramAsLong("if_primary_term_match", indexRequest.ifPrimaryTermMatch()) + ); String sOpType = request.param("op_type"); String waitForActiveShards = request.param("wait_for_active_shards"); if (waitForActiveShards != null) {