Skip to content

Commit

Permalink
REST
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Aug 13, 2024
1 parent df9fc6c commit 88b5a40
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import java.util.ArrayList;
import java.util.List;

public class IncrementalBulkApplication {
public class IncrementalBulkHandler {

private final Client client;

public IncrementalBulkApplication(Client client) {
public IncrementalBulkHandler(Client client) {
this.client = client;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,30 @@
package org.elasticsearch.rest.action.document;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestParser;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.bulk.IncrementalBulkHandler;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
Expand All @@ -42,6 +51,7 @@ public class RestBulkAction extends BaseRestHandler {
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";

private final boolean allowExplicitIndex;
private volatile IncrementalBulkHandler bulkHandler;

public RestBulkAction(Settings settings) {
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
Expand All @@ -66,38 +76,124 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
// TODO: Move this to CTOR and hook everything up
if (bulkHandler == null) {
bulkHandler = new IncrementalBulkHandler(client);
}

if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
BulkRequest bulkRequest = new BulkRequest();
String defaultIndex = request.param("index");
String defaultRouting = request.param("routing");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));

return new ChunkHandler(request);
}

private class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {

private final RestRequest request;

private final Map<String, String> stringDeduplicator = new HashMap<>();
private final String defaultIndex;
private final String defaultRouting;
private final FetchSourceContext defaultFetchSourceContext;
private final String defaultPipeline;
private final boolean defaultListExecutedPipelines;
private final Boolean defaultRequireAlias;
private final boolean defaultRequireDataStream;
private final BulkRequestParser parser;
private final IncrementalBulkHandler.Handler handler;

private volatile RestChannel restChannel;
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);

private ChunkHandler(RestRequest request) {
this.request = request;
this.defaultIndex = request.param("index");
this.defaultRouting = request.param("routing");
this.defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
this.defaultPipeline = request.param("pipeline");
this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
this.parser = new BulkRequestParser(true, request.getRestApiVersion());
handler = bulkHandler.newBulkRequest(
request.param("wait_for_active_shards"),
request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT),
request.param("refresh")
);
}
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
request.getRestApiVersion()
);

return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
@Override
public void accept(RestChannel restChannel) throws Exception {
this.restChannel = restChannel;
}

@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
assert channel == restChannel;

final ReleasableBytesReference data;
try {
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine

unParsedChunks.add(chunk);

if (unParsedChunks.size() > 1) {
ReleasableBytesReference[] bytesReferences = unParsedChunks.toArray(new ReleasableBytesReference[0]);
data = new ReleasableBytesReference(
CompositeBytesReference.of(bytesReferences),
() -> Releasables.close(bytesReferences)
);
} else {
data = chunk;
}

int bytesConsumed = parser.incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
(request, type) -> items.add(request),
items::add,
items::add,
stringDeduplicator
);

accountParsing(bytesConsumed);

} catch (IOException e) {
// TODO: Exception Handling
throw new UncheckedIOException(e);
}

if (isLast) {
assert unParsedChunks.isEmpty();
assert channel != null;
handler.lastItems(items, data, new RestRefCountedChunkedToXContentListener<>(channel));
} else {
handler.addItems(items, data, () -> request.contentStream().next());
}
}

private void accountParsing(int bytesConsumed) {
while (bytesConsumed > 0) {
ReleasableBytesReference reference = unParsedChunks.removeFirst();
if (bytesConsumed >= reference.length()) {
bytesConsumed -= reference.length();
} else {
unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed));
bytesConsumed = 0;
}
}
}
}

@Override
Expand Down

0 comments on commit 88b5a40

Please sign in to comment.