Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Oct 3, 2024
1 parent 7b52d6e commit fb3255c
Showing 1 changed file with 317 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ public int incrementalParse(
line++;

DocWriteRequest<?> request = null;
String action;
String type = null;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
String pipeline = defaultPipeline;
boolean listExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;

// now parse the action
try (XContentParser parser = createParser(xContent, data, from, nextMarker)) {
String action;
String index = defaultIndex;
String id = null;
String routing = defaultRouting;
Expand Down Expand Up @@ -441,8 +441,6 @@ public int incrementalParse(
}
line++;

// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request.
if (request instanceof IndexRequest indexRequest) {
indexRequest.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType);
indexRequestConsumer.accept(indexRequest, type);
Expand Down Expand Up @@ -534,4 +532,320 @@ private XContentParser parseBytesArray(XContent xContent, BytesReference array,
final int offset = array.arrayOffset();
return xContent.createParser(config, array.array(), offset + from, nextMarker - from);
}

private class IncrementalParser {

// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
private final Map<String, String> stringDeduplicator = new HashMap<>();

private final String defaultIndex;
private final String defaultRouting;
private final FetchSourceContext defaultFetchSourceContext;
private final String defaultPipeline;
private final Boolean defaultRequireAlias;
private final Boolean defaultRequireDataStream;
private final Boolean defaultListExecutedPipelines;
private final boolean allowExplicitIndex;

private final XContentType xContentType;
private final BiConsumer<IndexRequest, String> indexRequestConsumer;
private final Consumer<UpdateRequest> updateRequestConsumer;
private final Consumer<DeleteRequest> deleteRequestConsumer;

private int currentSearchOffset = 0;
private int currentLine = 0;

private DocWriteRequest<?> currentRequest = null;
private String currentType = null;
private FetchSourceContext currentFetchSourceContext = null;
private String currentPipeline = null;
private boolean currentListExecutedPipelines = false;

private IncrementalParser(
@Nullable String defaultIndex,
@Nullable String defaultRouting,
@Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline,
@Nullable Boolean defaultRequireAlias,
@Nullable Boolean defaultRequireDataStream,
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) {
this.defaultIndex = defaultIndex;
this.defaultRouting = defaultRouting;
this.defaultFetchSourceContext = defaultFetchSourceContext;
this.defaultPipeline = defaultPipeline;
this.defaultRequireAlias = defaultRequireAlias;
this.defaultRequireDataStream = defaultRequireDataStream;
this.defaultListExecutedPipelines = defaultListExecutedPipelines;
this.allowExplicitIndex = allowExplicitIndex;
this.xContentType = xContentType;
this.indexRequestConsumer = indexRequestConsumer;
this.updateRequestConsumer = updateRequestConsumer;
this.deleteRequestConsumer = deleteRequestConsumer;
}

private boolean parseActionLine(BytesReference data, int from, int to) throws IOException {
assert currentRequest == null;

// Reset the fields which must be set for document line parsing
currentType = null;
currentFetchSourceContext = defaultFetchSourceContext;
currentPipeline = defaultPipeline;
currentListExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;


try (XContentParser parser = createParser(xContentType.xContent(), data, from, to)) {
String action;
String index = defaultIndex;
String id = null;
String routing = defaultRouting;
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream;
Map<String, String> dynamicTemplates = Map.of();

// Move to START_OBJECT
XContentParser.Token token = parser.nextToken();
if (token == null) {
return false;
}
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ currentLine
+ "], expected "
+ XContentParser.Token.START_OBJECT
+ " but found ["
+ token
+ "]"
);
}
// Move to FIELD_NAME, that's the action
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ currentLine
+ "], expected "
+ XContentParser.Token.FIELD_NAME
+ " but found ["
+ token
+ "]"
);
}
action = parser.currentName();
if (SUPPORTED_ACTIONS.contains(action) == false) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ currentLine
+ "], expected field [create], [delete], [index] or [update] but found ["
+ action
+ "]"
);
}

// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();

if (token == XContentParser.Token.START_OBJECT) {
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (INDEX.match(currentFieldName, parser.getDeprecationHandler())) {
if (allowExplicitIndex == false) {
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (parser.getRestApiVersion().matches(RestApiVersion.equalTo(RestApiVersion.V_7))) {
// for bigger bulks, deprecation throttling might not be enough
// TODO: Fix
// if (deprecateOrErrorOnType && typesDeprecationLogged == false) {
// deprecationLogger.compatibleCritical("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE);
// typesDeprecationLogged = true;
// }
} else if (parser.getRestApiVersion().matches(RestApiVersion.onOrAfter(RestApiVersion.V_8))
&& deprecateOrErrorOnType) {
throw new IllegalArgumentException(
"Action/metadata line ["
+ currentLine
+ "] contains an unknown parameter ["
+ currentFieldName
+ "]"
);
}
currentType = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
routing = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
opType = parser.text();
} else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) {
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNo = parser.longValue();
} else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTerm = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
currentPipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
requireAlias = parser.booleanValue();
} else if (REQUIRE_DATA_STREAM.match(currentFieldName, parser.getDeprecationHandler())) {
requireDataStream = parser.booleanValue();
} else if (LIST_EXECUTED_PIPELINES.match(currentFieldName, parser.getDeprecationHandler())) {
currentListExecutedPipelines = parser.booleanValue();
} else {
throw new IllegalArgumentException(
"Action/metadata line [" + currentLine + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
} else if (token == XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ currentLine
+ "], expected a simple value for field ["
+ currentFieldName
+ "] but found ["
+ token
+ "]"
);
} else if (token == XContentParser.Token.START_OBJECT
&& DYNAMIC_TEMPLATES.match(currentFieldName, parser.getDeprecationHandler())) {
dynamicTemplates = parser.mapStrings();
} else if (token == XContentParser.Token.START_OBJECT
&& SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (token != XContentParser.Token.VALUE_NULL) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ currentLine
+ "], expected a simple value for field ["
+ currentFieldName
+ "] but found ["
+ token
+ "]"
);
}
}
} else if (token != XContentParser.Token.END_OBJECT) {
throw new IllegalArgumentException(
"Malformed action/metadata line ["
+ currentLine
+ "], expected "
+ XContentParser.Token.START_OBJECT
+ " or "
+ XContentParser.Token.END_OBJECT
+ " but found ["
+ token
+ "]"
);
}
checkBulkActionIsProperlyClosed(parser);

if ("delete".equals(action)) {
if (dynamicTemplates.isEmpty() == false) {
throw new IllegalArgumentException(
"Delete request in line [" + currentLine + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
);
}
currentRequest = new DeleteRequest(index).id(id)
.routing(routing)
.version(version)
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm);
} else {
// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request.
if ("index".equals(action) || "create".equals(action)) {
var indexRequest = new IndexRequest(index).id(id)
.routing(routing)
.version(version)
.versionType(versionType)
.setPipeline(currentPipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
.setRequireDataStream(requireDataStream)
.setListExecutedPipelines(currentListExecutedPipelines);
if ("create".equals(action)) {
indexRequest = indexRequest.create(true);
} else if (opType != null) {
indexRequest = indexRequest.create("create".equals(opType));
}
currentRequest = indexRequest;
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException(
"Update requests do not support versioning. " + "Please use `if_seq_no` and `if_primary_term` instead"
);
}
if (requireDataStream) {
throw new IllegalArgumentException(
"Update requests do not support the `require_data_stream` flag, "
+ "as data streams do not support update operations"
);
}
// TODO: support dynamic_templates in update requests
if (dynamicTemplates.isEmpty() == false) {
throw new IllegalArgumentException(
"Update request in line [" + currentLine + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
);
}
currentRequest = new UpdateRequest().index(index)
.id(id)
.routing(routing)
.retryOnConflict(retryOnConflict)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.setRequireAlias(requireAlias)
.routing(routing);
}
}
}

return true;
}

private boolean parseDocumentLine() {
assert currentRequest == null;

return false;
}

private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
final int res = data.indexOf(marker, from);
if (res != -1) {
assert res >= 0;
return res;
}
if (from != data.length() && isIncremental == false) {
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
}
return res;
}
}
}

0 comments on commit fb3255c

Please sign in to comment.