diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index 833e79dfe3194..0d92c0a646d0d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -194,7 +194,6 @@ public int incrementalParse( line++; DocWriteRequest request = null; - String action; String type = null; FetchSourceContext fetchSourceContext = defaultFetchSourceContext; String pipeline = defaultPipeline; @@ -202,6 +201,7 @@ public int incrementalParse( // now parse the action try (XContentParser parser = createParser(xContent, data, from, nextMarker)) { + String action; String index = defaultIndex; String id = null; String routing = defaultRouting; @@ -441,8 +441,6 @@ public int incrementalParse( } line++; - // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks - // of index request. if (request instanceof IndexRequest indexRequest) { indexRequest.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType); indexRequestConsumer.accept(indexRequest, type); @@ -534,4 +532,320 @@ private XContentParser parseBytesArray(XContent xContent, BytesReference array, final int offset = array.arrayOffset(); return xContent.createParser(config, array.array(), offset + from, nextMarker - from); } + + private class IncrementalParser { + + // Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to + // deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it + // reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request. + private final Map stringDeduplicator = new HashMap<>(); + + private final String defaultIndex; + private final String defaultRouting; + private final FetchSourceContext defaultFetchSourceContext; + private final String defaultPipeline; + private final Boolean defaultRequireAlias; + private final Boolean defaultRequireDataStream; + private final Boolean defaultListExecutedPipelines; + private final boolean allowExplicitIndex; + + private final XContentType xContentType; + private final BiConsumer indexRequestConsumer; + private final Consumer updateRequestConsumer; + private final Consumer deleteRequestConsumer; + + private int currentSearchOffset = 0; + private int currentLine = 0; + + private DocWriteRequest currentRequest = null; + private String currentType = null; + private FetchSourceContext currentFetchSourceContext = null; + private String currentPipeline = null; + private boolean currentListExecutedPipelines = false; + + private IncrementalParser( + @Nullable String defaultIndex, + @Nullable String defaultRouting, + @Nullable FetchSourceContext defaultFetchSourceContext, + @Nullable String defaultPipeline, + @Nullable Boolean defaultRequireAlias, + @Nullable Boolean defaultRequireDataStream, + @Nullable Boolean defaultListExecutedPipelines, + boolean allowExplicitIndex, + XContentType xContentType, + BiConsumer indexRequestConsumer, + Consumer updateRequestConsumer, + Consumer deleteRequestConsumer + ) { + this.defaultIndex = defaultIndex; + this.defaultRouting = defaultRouting; + this.defaultFetchSourceContext = defaultFetchSourceContext; + this.defaultPipeline = defaultPipeline; + this.defaultRequireAlias = defaultRequireAlias; + this.defaultRequireDataStream = defaultRequireDataStream; + this.defaultListExecutedPipelines = defaultListExecutedPipelines; + this.allowExplicitIndex = allowExplicitIndex; + this.xContentType = xContentType; + this.indexRequestConsumer = indexRequestConsumer; + this.updateRequestConsumer = updateRequestConsumer; + this.deleteRequestConsumer = deleteRequestConsumer; + } + + private boolean parseActionLine(BytesReference data, int from, int to) throws IOException { + assert currentRequest == null; + + // Reset the fields which must be set for document line parsing + currentType = null; + currentFetchSourceContext = defaultFetchSourceContext; + currentPipeline = defaultPipeline; + currentListExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines; + + + try (XContentParser parser = createParser(xContentType.xContent(), data, from, to)) { + String action; + String index = defaultIndex; + String id = null; + String routing = defaultRouting; + String opType = null; + long version = Versions.MATCH_ANY; + VersionType versionType = VersionType.INTERNAL; + long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + int retryOnConflict = 0; + boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias; + boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream; + Map dynamicTemplates = Map.of(); + + // Move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token == null) { + return false; + } + if (token != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException( + "Malformed action/metadata line [" + + currentLine + + "], expected " + + XContentParser.Token.START_OBJECT + + " but found [" + + token + + "]" + ); + } + // Move to FIELD_NAME, that's the action + token = parser.nextToken(); + if (token != XContentParser.Token.FIELD_NAME) { + throw new IllegalArgumentException( + "Malformed action/metadata line [" + + currentLine + + "], expected " + + XContentParser.Token.FIELD_NAME + + " but found [" + + token + + "]" + ); + } + action = parser.currentName(); + if (SUPPORTED_ACTIONS.contains(action) == false) { + throw new IllegalArgumentException( + "Malformed action/metadata line [" + + currentLine + + "], expected field [create], [delete], [index] or [update] but found [" + + action + + "]" + ); + } + + // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) + // or START_OBJECT which will have another set of parameters + token = parser.nextToken(); + + if (token == XContentParser.Token.START_OBJECT) { + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (INDEX.match(currentFieldName, parser.getDeprecationHandler())) { + if (allowExplicitIndex == false) { + throw new IllegalArgumentException("explicit index in bulk is not allowed"); + } + index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); + } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + if (parser.getRestApiVersion().matches(RestApiVersion.equalTo(RestApiVersion.V_7))) { + // for bigger bulks, deprecation throttling might not be enough + // TODO: Fix + // if (deprecateOrErrorOnType && typesDeprecationLogged == false) { + // deprecationLogger.compatibleCritical("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE); + // typesDeprecationLogged = true; + // } + } else if (parser.getRestApiVersion().matches(RestApiVersion.onOrAfter(RestApiVersion.V_8)) + && deprecateOrErrorOnType) { + throw new IllegalArgumentException( + "Action/metadata line [" + + currentLine + + "] contains an unknown parameter [" + + currentFieldName + + "]" + ); + } + currentType = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); + } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) { + id = parser.text(); + } else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) { + routing = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); + } else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + opType = parser.text(); + } else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) { + version = parser.longValue(); + } else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + versionType = VersionType.fromString(parser.text()); + } else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) { + ifSeqNo = parser.longValue(); + } else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) { + ifPrimaryTerm = parser.longValue(); + } else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) { + retryOnConflict = parser.intValue(); + } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { + currentPipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); + } else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) { + currentFetchSourceContext = FetchSourceContext.fromXContent(parser); + } else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) { + requireAlias = parser.booleanValue(); + } else if (REQUIRE_DATA_STREAM.match(currentFieldName, parser.getDeprecationHandler())) { + requireDataStream = parser.booleanValue(); + } else if (LIST_EXECUTED_PIPELINES.match(currentFieldName, parser.getDeprecationHandler())) { + currentListExecutedPipelines = parser.booleanValue(); + } else { + throw new IllegalArgumentException( + "Action/metadata line [" + currentLine + "] contains an unknown parameter [" + currentFieldName + "]" + ); + } + } else if (token == XContentParser.Token.START_ARRAY) { + throw new IllegalArgumentException( + "Malformed action/metadata line [" + + currentLine + + "], expected a simple value for field [" + + currentFieldName + + "] but found [" + + token + + "]" + ); + } else if (token == XContentParser.Token.START_OBJECT + && DYNAMIC_TEMPLATES.match(currentFieldName, parser.getDeprecationHandler())) { + dynamicTemplates = parser.mapStrings(); + } else if (token == XContentParser.Token.START_OBJECT + && SOURCE.match(currentFieldName, parser.getDeprecationHandler())) { + currentFetchSourceContext = FetchSourceContext.fromXContent(parser); + } else if (token != XContentParser.Token.VALUE_NULL) { + throw new IllegalArgumentException( + "Malformed action/metadata line [" + + currentLine + + "], expected a simple value for field [" + + currentFieldName + + "] but found [" + + token + + "]" + ); + } + } + } else if (token != XContentParser.Token.END_OBJECT) { + throw new IllegalArgumentException( + "Malformed action/metadata line [" + + currentLine + + "], expected " + + XContentParser.Token.START_OBJECT + + " or " + + XContentParser.Token.END_OBJECT + + " but found [" + + token + + "]" + ); + } + checkBulkActionIsProperlyClosed(parser); + + if ("delete".equals(action)) { + if (dynamicTemplates.isEmpty() == false) { + throw new IllegalArgumentException( + "Delete request in line [" + currentLine + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName() + ); + } + currentRequest = new DeleteRequest(index).id(id) + .routing(routing) + .version(version) + .versionType(versionType) + .setIfSeqNo(ifSeqNo) + .setIfPrimaryTerm(ifPrimaryTerm); + } else { + // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks + // of index request. + if ("index".equals(action) || "create".equals(action)) { + var indexRequest = new IndexRequest(index).id(id) + .routing(routing) + .version(version) + .versionType(versionType) + .setPipeline(currentPipeline) + .setIfSeqNo(ifSeqNo) + .setIfPrimaryTerm(ifPrimaryTerm) + .setDynamicTemplates(dynamicTemplates) + .setRequireAlias(requireAlias) + .setRequireDataStream(requireDataStream) + .setListExecutedPipelines(currentListExecutedPipelines); + if ("create".equals(action)) { + indexRequest = indexRequest.create(true); + } else if (opType != null) { + indexRequest = indexRequest.create("create".equals(opType)); + } + currentRequest = indexRequest; + } else if ("update".equals(action)) { + if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { + throw new IllegalArgumentException( + "Update requests do not support versioning. " + "Please use `if_seq_no` and `if_primary_term` instead" + ); + } + if (requireDataStream) { + throw new IllegalArgumentException( + "Update requests do not support the `require_data_stream` flag, " + + "as data streams do not support update operations" + ); + } + // TODO: support dynamic_templates in update requests + if (dynamicTemplates.isEmpty() == false) { + throw new IllegalArgumentException( + "Update request in line [" + currentLine + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName() + ); + } + currentRequest = new UpdateRequest().index(index) + .id(id) + .routing(routing) + .retryOnConflict(retryOnConflict) + .setIfSeqNo(ifSeqNo) + .setIfPrimaryTerm(ifPrimaryTerm) + .setRequireAlias(requireAlias) + .routing(routing); + } + } + } + + return true; + } + + private boolean parseDocumentLine() { + assert currentRequest == null; + + return false; + } + + private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) { + final int res = data.indexOf(marker, from); + if (res != -1) { + assert res >= 0; + return res; + } + if (from != data.length() && isIncremental == false) { + throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]"); + } + return res; + } + } }