Skip to content

Commit

Permalink
Backport of elastic#38411: if_seq_no and if_primary_term paramete…
Browse files Browse the repository at this point in the history
…rs aren't wired correctly in REST Client's CRUD API
  • Loading branch information
bleskes authored Feb 5, 2019
1 parent 9dde53b commit 3e0819a
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ static Request delete(DeleteRequest deleteRequest) {
parameters.withTimeout(deleteRequest.timeout());
parameters.withVersion(deleteRequest.version());
parameters.withVersionType(deleteRequest.versionType());
parameters.withIfSeqNo(deleteRequest.ifSeqNo());
parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm());
parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);
return request;
Expand Down Expand Up @@ -193,6 +195,11 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
}
}

if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
metadata.field("if_seq_no", action.ifSeqNo());
metadata.field("if_primary_term", action.ifPrimaryTerm());
}

if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
IndexRequest indexRequest = (IndexRequest) action;
if (Strings.hasLength(indexRequest.getPipeline())) {
Expand Down Expand Up @@ -309,6 +316,8 @@ static Request index(IndexRequest indexRequest) {
parameters.withTimeout(indexRequest.timeout());
parameters.withVersion(indexRequest.version());
parameters.withVersionType(indexRequest.versionType());
parameters.withIfSeqNo(indexRequest.ifSeqNo());
parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
parameters.withPipeline(indexRequest.getPipeline());
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.client.core.MultiTermVectorsResponse;
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.client.core.TermVectorsResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -95,14 +96,20 @@
public class CrudIT extends ESRestHighLevelClientTestCase {

public void testDelete() throws IOException {
highLevelClient().indices().create(
new CreateIndexRequest("index").settings(Collections.singletonMap("index.number_of_shards", "1")),
RequestOptions.DEFAULT);
{
// Testing deletion
String docId = "id";
highLevelClient().index(
IndexResponse indexResponse = highLevelClient().index(
new IndexRequest("index", "type", docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId);
if (randomBoolean()) {
deleteRequest.version(1L);
deleteRequest.setIfSeqNo(indexResponse.getSeqNo());
deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm());
} else {
deleteRequest.version(indexResponse.getVersion());
}
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync,
highLevelClient()::delete, highLevelClient()::deleteAsync);
Expand All @@ -127,13 +134,26 @@ public void testDelete() throws IOException {
String docId = "version_conflict";
highLevelClient().index(
new IndexRequest("index", "type", docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId).version(2);
DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId);
final boolean seqNos = randomBoolean();
if (seqNos) {
deleteRequest.setIfSeqNo(2).setIfPrimaryTerm(2);
} else {
deleteRequest.version(2);
}

ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync,
highLevelClient()::delete, highLevelClient()::deleteAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][" + docId + "]: " +
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
if (seqNos) {
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][" + docId + "]: " +
"version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]",
exception.getMessage());
} else {
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][" + docId + "]: " +
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
}
assertEquals("index", exception.getMetadata("es.index").get(0));
}
{
Expand Down Expand Up @@ -453,18 +473,29 @@ public void testIndex() throws IOException {
assertEquals("type", indexResponse.getType());
assertEquals("id", indexResponse.getId());
assertEquals(2L, indexResponse.getVersion());
final boolean seqNosForConflict = randomBoolean();

ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
IndexRequest wrongRequest = new IndexRequest("index", "type", "id");
wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
wrongRequest.version(5L);
if (seqNosForConflict) {
wrongRequest.setIfSeqNo(2).setIfPrimaryTerm(2);
} else {
wrongRequest.version(5);
}

execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync,
highLevelClient()::index, highLevelClient()::indexAsync);
});
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: " +
"version conflict, current version [2] is different than the one provided [5]]", exception.getMessage());
if (seqNosForConflict) {
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: " +
"version conflict, required seqNo [1], primary term [5]. current document has seqNo [2] and primary term [1]]",
exception.getMessage());
} else {
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: " +
"version conflict, current version [2] is different than the one provided [5]]", exception.getMessage());
}
assertEquals("index", exception.getMetadata("es.index").get(0));
}
{
Expand Down Expand Up @@ -763,7 +794,8 @@ public void testBulk() throws IOException {
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType);
if (erroneous) {
indexRequest.version(12L);
indexRequest.setIfSeqNo(12L);
indexRequest.setIfPrimaryTerm(12L);
}
bulkRequest.add(indexRequest);

Expand Down Expand Up @@ -1075,7 +1107,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(xContentType, "id", i);
if (erroneous) {
indexRequest.version(12L);
indexRequest.setIfSeqNo(12L);
indexRequest.setIfPrimaryTerm(12L);
}
processor.add(indexRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.core.MultiTermVectorsRequest;
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.MultiTermVectorsRequest;
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -216,6 +216,7 @@ public void testDelete() {
setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams);
setRandomVersion(deleteRequest, expectedParams);
setRandomVersionType(deleteRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(deleteRequest, expectedParams);

if (frequently()) {
if (randomBoolean()) {
Expand Down Expand Up @@ -545,6 +546,7 @@ public void testIndex() throws IOException {
} else {
setRandomVersion(indexRequest, expectedParams);
setRandomVersionType(indexRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(indexRequest, expectedParams);
}

if (frequently()) {
Expand Down Expand Up @@ -650,6 +652,7 @@ public void testUpdate() throws IOException {
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, ActiveShardCount.DEFAULT, expectedParams);
setRandomVersion(updateRequest, expectedParams);
setRandomVersionType(updateRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
if (randomBoolean()) {
int retryOnConflict = randomIntBetween(0, 5);
updateRequest.retryOnConflict(retryOnConflict);
Expand Down Expand Up @@ -680,6 +683,7 @@ public void testUpdate() throws IOException {
assertEquals(updateRequest.docAsUpsert(), parsedUpdateRequest.docAsUpsert());
assertEquals(updateRequest.detectNoop(), parsedUpdateRequest.detectNoop());
assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource());
assertIfSeqNoAndTerm(updateRequest, parsedUpdateRequest);
assertEquals(updateRequest.script(), parsedUpdateRequest.script());
if (updateRequest.doc() != null) {
assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType);
Expand All @@ -693,6 +697,22 @@ public void testUpdate() throws IOException {
}
}

private static void assertIfSeqNoAndTerm(DocWriteRequest<?>request, DocWriteRequest<?> parsedRequest) {
assertEquals(request.ifSeqNo(), parsedRequest.ifSeqNo());
assertEquals(request.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());
}

private static void setRandomIfSeqNoAndTerm(DocWriteRequest<?> request, Map<String, String> expectedParams) {
if (randomBoolean()) {
final long seqNo = randomNonNegativeLong();
request.setIfSeqNo(seqNo);
expectedParams.put("if_seq_no", Long.toString(seqNo));
final long primaryTerm = randomLongBetween(1, 200);
request.setIfPrimaryTerm(primaryTerm);
expectedParams.put("if_primary_term", Long.toString(primaryTerm));
}
}

public void testUpdateWithDifferentContentTypes() {
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> {
UpdateRequest updateRequest = new UpdateRequest();
Expand Down Expand Up @@ -767,10 +787,15 @@ public void testBulk() throws IOException {
docWriteRequest.routing(randomAlphaOfLength(10));
}
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
}
} else if (randomBoolean()) {
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
}
bulkRequest.add(docWriteRequest);
}
Expand Down Expand Up @@ -801,6 +826,8 @@ public void testBulk() throws IOException {
assertEquals(originalRequest.parent(), parsedRequest.parent());
assertEquals(originalRequest.version(), parsedRequest.version());
assertEquals(originalRequest.versionType(), parsedRequest.versionType());
assertEquals(originalRequest.ifSeqNo(), parsedRequest.ifSeqNo());
assertEquals(originalRequest.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());

DocWriteRequest.OpType opType = originalRequest.opType();
if (opType == DocWriteRequest.OpType.INDEX) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
"Compare And Swap Sequence Numbers":

- skip:
version: " - 6.5.99"
reason: cas operations with sequence numbers was added in 6.6

- do:
index:
index: test_1
type: _doc
id: 1
body: { foo: bar }
- match: { _version: 1}
- set: { _seq_no: seqno }
- set: { _primary_term: primary_term }

- do:
bulk:
body:
- index:
_index: test_1
_type: _doc
_id: 1
if_seq_no: 10000
if_primary_term: $primary_term
- foo: bar2

- match: { errors: true }
- match: { items.0.index.status: 409 }
- match: { items.0.index.error.type: version_conflict_engine_exception }

- do:
bulk:
body:
- index:
_index: test_1
_type: _doc
_id: 1
if_seq_no: $seqno
if_primary_term: $primary_term
- foo: bar2

- match: { errors: false}
- match: { items.0.index.status: 200 }

0 comments on commit 3e0819a

Please sign in to comment.