Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Name BulkItemResponse ctors #76439

Merged
merged 4 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
}

private static class BulkRestBuilderListener extends RestBuilderListener<BulkRequest> {
private final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, DocWriteRequest.OpType.UPDATE,
private final BulkItemResponse ITEM_RESPONSE = BulkItemResponse.success(1, DocWriteRequest.OpType.UPDATE,
new UpdateResponse(new ShardId("mock", "", 1), "1", 0L, 1L, 1L, DocWriteResponse.Result.CREATED));

private final RestRequest request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.transport.TransportService;

public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
private static final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, DocWriteRequest.OpType.UPDATE,
private static final BulkItemResponse ITEM_RESPONSE = BulkItemResponse.success(1, DocWriteRequest.OpType.UPDATE,
new UpdateResponse(new ShardId("mock", "", 1), "1", 0L, 1L, 1L, DocWriteResponse.Result.CREATED));

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public void testBulkResponseSetsLotsOfStatus() throws Exception {
ShardId shardId = new ShardId(new Index("name", "uid"), 0);
if (rarely()) {
versionConflicts++;
responses[i] = new BulkItemResponse(i, randomFrom(DocWriteRequest.OpType.values()),
responses[i] = BulkItemResponse.failure(i, randomFrom(DocWriteRequest.OpType.values()),
new Failure(shardId.getIndexName(), "id" + i,
new VersionConflictEngineException(shardId, "id", "test")));
continue;
Expand Down Expand Up @@ -307,7 +307,7 @@ public void testBulkResponseSetsLotsOfStatus() throws Exception {
final int primaryTerm = randomIntBetween(1, 16);
final IndexResponse response =
new IndexResponse(shardId, "id" + i, seqNo, primaryTerm, randomInt(), createdResponse);
responses[i] = new BulkItemResponse(i, opType, response);
responses[i] = BulkItemResponse.success(i, opType, response);
}
assertExactlyOnce(onSuccess ->
new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0),
Expand Down Expand Up @@ -389,8 +389,10 @@ public void testSearchTimeoutsAbortRequest() throws Exception {
public void testBulkFailuresAbortRequest() throws Exception {
Failure failure = new Failure("index", "id", new RuntimeException("test"));
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]
{new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong());
BulkResponse bulkResponse = new BulkResponse(
new BulkItemResponse[] { BulkItemResponse.failure(0, DocWriteRequest.OpType.CREATE, failure) },
randomLong()
);
action.onBulkResponse(bulkResponse, Assert::fail);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), contains(failure));
Expand Down Expand Up @@ -974,10 +976,10 @@ void doExecute(ActionType<Response> action, Request request, ActionListener<Resp
throw new RuntimeException("Unknown request: " + item);
}
if (i == toReject) {
responses[i] = new BulkItemResponse(i, item.opType(),
responses[i] = BulkItemResponse.failure(i, item.opType(),
new Failure(response.getIndex(), response.getId(), new EsRejectedExecutionException()));
} else {
responses[i] = new BulkItemResponse(i, item.opType(), response);
responses[i] = BulkItemResponse.success(i, item.opType(), response);
}
}
listener.onResponse((Response) new BulkResponse(responses, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ void setPrimaryResponse(BulkItemResponse primaryResponse) {
*/
public void abort(String index, Exception cause) {
if (primaryResponse == null) {
final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.id(),
Objects.requireNonNull(cause), true);
setPrimaryResponse(new BulkItemResponse(id, request.opType(), failure));
final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.id(), Objects.requireNonNull(cause), true);
setPrimaryResponse(BulkItemResponse.failure(id, request.opType(), failure));
} else {
assert primaryResponse.isFailed() && primaryResponse.getFailure().isAborted()
: "response [" + Strings.toString(primaryResponse) + "]; cause [" + cause + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ public static BulkItemResponse fromXContent(XContentParser parser, int id) throw
BulkItemResponse bulkItemResponse;
if (exception != null) {
Failure failure = new Failure(builder.getShardId().getIndexName(), builder.getId(), exception, status);
bulkItemResponse = new BulkItemResponse(id, opType, failure);
bulkItemResponse = BulkItemResponse.failure(id, opType, failure);
} else {
bulkItemResponse = new BulkItemResponse(id, opType, builder.build());
bulkItemResponse = BulkItemResponse.success(id, opType, builder.build());
}
return bulkItemResponse;
}
Expand Down Expand Up @@ -341,66 +341,48 @@ public String toString() {
}
}

private int id;
private final int id;

private OpType opType;
private final OpType opType;

private DocWriteResponse response;
private final DocWriteResponse response;

private Failure failure;

BulkItemResponse() {}
private final Failure failure;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all started with me wanting to make these final to make them easier to reason about.


BulkItemResponse(ShardId shardId, StreamInput in) throws IOException {
id = in.readVInt();
opType = OpType.fromId(in.readByte());

byte type = in.readByte();
if (type == 0) {
response = new IndexResponse(shardId, in);
} else if (type == 1) {
response = new DeleteResponse(shardId, in);
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
response = new UpdateResponse(shardId, in);
} else if (type != 2) {
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}

if (in.readBoolean()) {
failure = new Failure(in);
}
response = readResponse(shardId, in);
failure = in.readBoolean() ? new Failure(in) : null;
assertConsistent();
}

BulkItemResponse(StreamInput in) throws IOException {
id = in.readVInt();
opType = OpType.fromId(in.readByte());

byte type = in.readByte();
if (type == 0) {
response = new IndexResponse(in);
} else if (type == 1) {
response = new DeleteResponse(in);
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
response = new UpdateResponse(in);
} else if (type != 2) {
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}

if (in.readBoolean()) {
failure = new Failure(in);
}
response = readResponse(in);
failure = in.readBoolean() ? new Failure(in) : null;
assertConsistent();
}

public BulkItemResponse(int id, OpType opType, DocWriteResponse response) {
private BulkItemResponse(int id, OpType opType, DocWriteResponse response, Failure failure) {
this.id = id;
this.response = response;
this.opType = opType;
this.failure = failure;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to add an assertion here that either response or failure needs to be set, but not both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

assertConsistent();
}

public BulkItemResponse(int id, OpType opType, Failure failure) {
this.id = id;
this.opType = opType;
this.failure = failure;
private void assertConsistent() {
assert (response == null) ^ (failure == null) : "only one of response or failure may be set";
}

public static BulkItemResponse success(int id, OpType opType, DocWriteResponse response) {
return new BulkItemResponse(id, opType, response, null);
}

public static BulkItemResponse failure(int id, OpType opType, Failure failure) {
return new BulkItemResponse(id, opType, null, failure);
}

/**
Expand Down Expand Up @@ -528,4 +510,36 @@ private void writeResponseType(StreamOutput out) throws IOException {
throw new IllegalStateException("Unexpected response type found [" + response.getClass() + "]");
}
}

private static DocWriteResponse readResponse(ShardId shardId, StreamInput in) throws IOException {
int type = in.readByte();
switch (type) {
case 0:
return new IndexResponse(shardId, in);
case 1:
return new DeleteResponse(shardId, in);
case 2:
return null;
case 3:
return new UpdateResponse(shardId, in);
default:
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}
}

private static DocWriteResponse readResponse(StreamInput in) throws IOException {
int type = in.readByte();
switch (type) {
case 0:
return new IndexResponse(in);
case 1:
return new DeleteResponse(in);
case 2:
return null;
case 3:
return new UpdateResponse(in);
default:
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void resetForExecutionForRetry() {
/** completes the operation without doing anything on the primary */
public void markOperationAsNoOp(DocWriteResponse response) {
assertInvariants(ItemProcessingState.INITIAL);
executionResult = new BulkItemResponse(getCurrentItem().id(), getCurrentItem().request().opType(), response);
executionResult = BulkItemResponse.success(getCurrentItem().id(), getCurrentItem().request().opType(), response);
currentItemState = ItemProcessingState.EXECUTED;
assertInvariants(ItemProcessingState.EXECUTED);
}
Expand All @@ -226,7 +226,7 @@ public void failOnMappingUpdate(Exception cause) {
assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
currentItemState = ItemProcessingState.EXECUTED;
final DocWriteRequest<?> docWriteRequest = getCurrentItem().request();
executionResult = new BulkItemResponse(getCurrentItem().id(), docWriteRequest.opType(),
executionResult = BulkItemResponse.failure(getCurrentItem().id(), docWriteRequest.opType(),
// Make sure to use getCurrentItem().index() here, if you use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(getCurrentItem().index(), docWriteRequest.id(), cause));
Expand All @@ -253,13 +253,13 @@ public void markOperationAsExecuted(Engine.Result result) {
} else {
throw new AssertionError("unknown result type :" + result.getResultType());
}
executionResult = new BulkItemResponse(current.id(), current.request().opType(), response);
executionResult = BulkItemResponse.success(current.id(), current.request().opType(), response);
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo());
locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation());
break;
case FAILURE:
executionResult = new BulkItemResponse(current.id(), docWriteRequest.opType(),
executionResult = BulkItemResponse.failure(current.id(), docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ void createIndex(String index,
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest<?> request,
String index, Exception e) {
if (index.equals(request.index())) {
responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(),
request.id(), e)));
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.id(), e);
responses.set(idx, BulkItemResponse.failure(idx, request.opType(), failure));
return true;
}
return false;
Expand Down Expand Up @@ -469,9 +469,8 @@ protected void doRun() {
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(),
docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
Expand Down Expand Up @@ -518,8 +517,8 @@ public void onFailure(Exception e) {
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)));
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e);
responses.set(request.id(), BulkItemResponse.failure(request.id(), docWriteRequest.opType(), failure));
}
if (counter.decrementAndGet() == 0) {
finishHim();
Expand Down Expand Up @@ -616,7 +615,7 @@ private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int i
private void addFailure(DocWriteRequest<?> request, int idx, Exception unavailableException) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.id(),
unavailableException);
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure);
BulkItemResponse bulkItemResponse = BulkItemResponse.failure(idx, request.opType(), failure);
responses.set(idx, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(idx, null);
Expand Down Expand Up @@ -788,7 +787,7 @@ synchronized void markItemAsDropped(int slot) {
failedSlots.set(slot);
final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id();
itemResponses.add(
new BulkItemResponse(slot, indexRequest.opType(),
BulkItemResponse.success(slot, indexRequest.opType(),
new UpdateResponse(
new ShardId(indexRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0),
id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
Expand All @@ -806,7 +805,7 @@ synchronized void markItemAsFailed(int slot, Exception e) {
// 3) Continue with the next request in the bulk.
failedSlots.set(slot);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.id(), e);
itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure));
itemResponses.add(BulkItemResponse.failure(slot, indexRequest.opType(), failure));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,11 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update
BulkItemResponse operationResponse, final UpdateHelper.Result translate) {
final BulkItemResponse response;
if (operationResponse.isFailed()) {
response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, operationResponse.getFailure());
response = BulkItemResponse.failure(
operationResponse.getItemId(),
DocWriteRequest.OpType.UPDATE,
operationResponse.getFailure()
);
} else {
final DocWriteResponse.Result translatedResult = translate.getResponseResult();
final UpdateResponse updateResponse;
Expand Down Expand Up @@ -407,7 +411,7 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update
} else {
throw new IllegalArgumentException("unknown operation type: " + translatedResult);
}
response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, updateResponse);
response = BulkItemResponse.success(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, updateResponse);
}
return response;
}
Expand Down
Loading