Skip to content

Commit

Permalink
Enable RestHighLevel-Client to set parameter require_alias for bulk i…
Browse files Browse the repository at this point in the history
…ndex and reindex requests (opensearch-project#1604)

Signed-off-by: Jan Baudisch <[email protected]>

Co-authored-by: Jan Baudisch <[email protected]>
  • Loading branch information
jbaudisc and Jan Baudisch authored Nov 23, 2021
1 parent 41e320a commit dc14994
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ static Request index(IndexRequest indexRequest) {
parameters.withPipeline(indexRequest.getPipeline());
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards());
parameters.withRequireAlias(indexRequest.isRequireAlias());

BytesRef source = indexRequest.source().toBytesRef();
ContentType contentType = createContentType(indexRequest.getContentType());
Expand Down Expand Up @@ -391,6 +392,7 @@ static Request update(UpdateRequest updateRequest) throws IOException {
parameters.withRetryOnConflict(updateRequest.retryOnConflict());
parameters.withVersion(updateRequest.version());
parameters.withVersionType(updateRequest.versionType());
parameters.withRequireAlias(updateRequest.isRequireAlias());

// The Java API allows update requests with different content types
// set for the partial document and the upsert document. This client
Expand Down Expand Up @@ -618,6 +620,7 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
.withTimeout(reindexRequest.getTimeout())
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
.withRequestsPerSecond(reindexRequest.getRequestsPerSecond())
.withRequireAlias(reindexRequest.getDestination().isRequireAlias())
.withSlices(reindexRequest.getSlices());

if (reindexRequest.getScrollTime() != null) {
Expand Down Expand Up @@ -964,6 +967,13 @@ Params withRequestsPerSecond(float requestsPerSecond) {
}
}

Params withRequireAlias(boolean requireAlias) {
if (requireAlias) {
return putParam("require_alias", Boolean.TRUE.toString());
}
return this;
}

Params withRetryOnConflict(int retryOnConflict) {
if (retryOnConflict > 0) {
return putParam("retry_on_conflict", String.valueOf(retryOnConflict));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,21 @@ public void testIndex() throws IOException {
exception.getMessage()
);
}
{
OpenSearchStatusException exception = expectThrows(OpenSearchStatusException.class, () -> {
IndexRequest indexRequest = new IndexRequest("index").id("missing_alias").setRequireAlias(true);
indexRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());

execute(indexRequest, highLevelClient()::index, highLevelClient()::indexAsync);
});

assertEquals(RestStatus.NOT_FOUND, exception.status());
assertEquals(
"OpenSearch exception [type=index_not_found_exception, reason=no such index [index]"
+ " and [require_alias] request flag is [true] and [index] is not an alias]",
exception.getMessage()
);
}
{
IndexRequest indexRequest = new IndexRequest("index").id("external_version_type");
indexRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
Expand Down Expand Up @@ -926,6 +941,18 @@ public void testUpdate() throws IOException {
exception.getMessage()
);
}
{
OpenSearchException exception = expectThrows(OpenSearchException.class, () -> {
UpdateRequest updateRequest = new UpdateRequest("index", "require_alias").setRequireAlias(true);
updateRequest.doc(new IndexRequest().source(Collections.singletonMap("field", "doc"), XContentType.JSON));
execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
});
assertEquals(RestStatus.NOT_FOUND, exception.status());
assertEquals(
"OpenSearch exception [type=index_not_found_exception, reason=no such index [index] and [require_alias] request flag is [true] and [index] is not an alias]",
exception.getMessage()
);
}
}

public void testUpdateWithTypes() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.client;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.bulk.BulkItemResponse;
Expand Down Expand Up @@ -102,6 +103,25 @@ public void testReindex() throws IOException {
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// set require_alias=true, but there exists no alias
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1"));
reindexRequest.setRefresh(true);
reindexRequest.setRequireAlias(true);

OpenSearchStatusException exception = expectThrows(
OpenSearchStatusException.class,
() -> { execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); }
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
assertEquals(
"OpenSearch exception [type=index_not_found_exception, reason=no such index [dest] and [require_alias] request flag is [true] and [dest] is not an alias]",
exception.getMessage()
);
}
}

public void testReindexTask() throws Exception {
Expand Down

0 comments on commit dc14994

Please sign in to comment.