Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rest Api Compatibility] Typed endpoint for bulk api #73571

Merged
merged 9 commits into from
Jun 7, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultRouting,
null, defaultPipeline, defaultRequireAlias, true, request.getXContentType());
null, defaultPipeline, defaultRequireAlias, true, request.getXContentType(),
pgomulka marked this conversation as resolved.
Show resolved Hide resolved
request.getRestApiVersion());

// short circuit the call to the transport layer
return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,8 @@ XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationH
XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler,
InputStream is, RestApiVersion restApiVersion) throws IOException;

XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data, int offset, int length,
RestApiVersion restApiVersion) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,11 @@ public XContentParser createParserForCompatibility(NamedXContentRegistry xConten
return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(is), restApiVersion);
}

@Override
public XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler,
byte[] data, int offset, int length, RestApiVersion restApiVersion)
throws IOException {
return new CborXContentParser(xContentRegistry, deprecationHandler, cborFactory.createParser(data, offset, length), restApiVersion);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,11 @@ public XContentParser createParserForCompatibility(NamedXContentRegistry xConten
return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(is), restApiVersion);
}

@Override
public XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler,
byte[] data, int offset, int length, RestApiVersion restApiVersion)
throws IOException {
return new JsonXContentParser(xContentRegistry, deprecationHandler, jsonFactory.createParser(data, offset, length), restApiVersion);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,12 @@ public XContentParser createParserForCompatibility(NamedXContentRegistry xConten
RestApiVersion restApiVersion) throws IOException {
return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(is), restApiVersion);
}

@Override
public XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler,
byte[] data, int offset, int length, RestApiVersion restApiVersion)
throws IOException {
return new SmileXContentParser(xContentRegistry, deprecationHandler, smileFactory.createParser(data, offset, length),
restApiVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,13 @@ public XContentParser createParserForCompatibility(NamedXContentRegistry xConten
return new YamlXContentParser(xContentRegistry, deprecationHandler, yamlFactory.createParser(is), restApiVersion);
}

@Override
public XContentParser createParserForCompatibility(NamedXContentRegistry xContentRegistry,
DeprecationHandler deprecationHandler, byte[] data, int offset, int length,
RestApiVersion restApiVersion) throws IOException {
return new YamlXContentParser(xContentRegistry, deprecationHandler, yamlFactory.createParser(data, offset, length),
restApiVersion);
}


}
11 changes: 0 additions & 11 deletions rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,6 @@ tasks.named("yamlRestCompatTest").configure {
//type information is not stored, hence the the index will be found
'termvectors/50_mix_typeless_typeful/Term vectors with typeless API on an index that has types',
// 85 - 13 = 72 tests won't be fixed
'bulk/11_basic_with_types/Array of objects',
'bulk/11_basic_with_types/Empty _id',
'bulk/11_basic_with_types/Empty _id with op_type create',
'bulk/11_basic_with_types/empty action',
'bulk/21_list_of_strings_with_types/List of strings',
'bulk/31_big_string_with_types/One big string',
'bulk/41_source_with_types/Source filtering',
'bulk/51_refresh_with_types/refresh=empty string immediately makes changes are visible in search',
'bulk/51_refresh_with_types/refresh=true immediately makes changes are visible in search',
'bulk/51_refresh_with_types/refresh=wait_for waits until changes are visible in search',
'bulk/81_cas_with_types/Compare And Swap Sequence Numbers',
'cluster.voting_config_exclusions/10_basic/Throw exception when adding voting config exclusion and specifying both node_ids and node_names',
'cluster.voting_config_exclusions/10_basic/Throw exception when adding voting config exclusion without specifying nodes',
'count/11_basic_with_types/count body without query element',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.RestApiVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -64,6 +65,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATUS, response.status().getStatus());
} else {
builder.field(_INDEX, failure.getIndex());
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.field(MapperService.TYPE_FIELD_NAME, MapperService.SINGLE_MAPPING_NAME);
}

builder.field(_ID, failure.getId());
builder.field(STATUS, failure.getStatus().getStatus());
builder.startObject(ERROR);
Expand Down Expand Up @@ -313,6 +318,9 @@ public boolean isAborted() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(INDEX_FIELD, index);
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.field(MapperService.TYPE_FIELD_NAME, MapperService.SINGLE_MAPPING_NAME);
}
if (id != null) {
builder.field(ID_FIELD, id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.RestApiVersion;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -398,7 +399,7 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex,
try {
ensureOpen();
bulkRequest.add(data, defaultIndex, null, null, defaultPipeline, null,
true, xContentType);
true, xContentType, RestApiVersion.current());
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.RestApiVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -227,26 +228,26 @@ public BulkRequest add(byte[] data, int from, int length, @Nullable String defau
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, null, null, null, null, true, xContentType);
return add(data, defaultIndex, null, null, null, null, true, xContentType, RestApiVersion.current());
}

/**
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex,
XContentType xContentType) throws IOException {
return add(data, defaultIndex, null, null, null, null, allowExplicitIndex, xContentType);
return add(data, defaultIndex, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());

}

public BulkRequest add(BytesReference data, @Nullable String defaultIndex,
@Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline, @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex,
XContentType xContentType) throws IOException {
XContentType xContentType, RestApiVersion restApiVersion) throws IOException {
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
new BulkRequestParser(true).parse(data, defaultIndex, routing, defaultFetchSourceContext, pipeline, requireAlias,
new BulkRequestParser(true, restApiVersion).parse(data, defaultIndex, routing, defaultFetchSourceContext, pipeline, requireAlias,
allowExplicitIndex, xContentType, (indexRequest, type) -> internalAdd(indexRequest), this::internalAdd, this::add);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.RestApiVersion;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -23,6 +25,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
Expand All @@ -38,6 +41,7 @@
* Helper to parse bulk requests. This should be considered an internal class.
*/
public final class BulkRequestParser {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(BulkRequestParser.class);

private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
Expand All @@ -55,15 +59,19 @@ public final class BulkRequestParser {
private static final ParseField DYNAMIC_TEMPLATES = new ParseField("dynamic_templates");

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;
// for CompatibleApi V7 this means to deprecate on type, for V8+ it means to throw an error
private final boolean deprecateOrErrorOnType;
private RestApiVersion restApiVersion;

/**
* Create a new parser.
*
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
* @param deprecateOrErrorOnType whether to allow _type information in the index line; used by BulkMonitoring
* @param restApiVersion
*/
public BulkRequestParser(boolean errorOnType) {
this.errorOnType = errorOnType;
public BulkRequestParser(boolean deprecateOrErrorOnType, RestApiVersion restApiVersion) {
this.deprecateOrErrorOnType = deprecateOrErrorOnType;
this.restApiVersion = restApiVersion;
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
Expand Down Expand Up @@ -114,6 +122,8 @@ public void parse(
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
final Map<String, String> stringDeduplicator = new HashMap<>();
boolean typesDeprecationLogged = false;

while (true) {
int nextMarker = findNextMarker(marker, from, data);
if (nextMarker == -1) {
Expand All @@ -122,7 +132,7 @@ public void parse(
line++;

// now parse the action
try (XContentParser parser = createParser(data, xContent, from, nextMarker)) {
try (XContentParser parser = createParser(data, xContent, from, nextMarker, restApiVersion)) {
// move pointers
from = nextMarker + 1;

Expand Down Expand Up @@ -174,9 +184,17 @@ public void parse(
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (errorOnType) {
if (parser.getRestApiVersion().matches(RestApiVersion.equalTo(RestApiVersion.V_7))) {
// for bigger bulks, deprecation throttling might not be enough
if (deprecateOrErrorOnType && typesDeprecationLogged == false) {
deprecationLogger.compatibleApiWarning("bulk_with_types",
RestBulkAction.TYPES_DEPRECATION_MESSAGE);
typesDeprecationLogged = true;
}
} else if (parser.getRestApiVersion().matches(RestApiVersion.onOrAfter(RestApiVersion.V_8))
&& deprecateOrErrorOnType) {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
+ currentFieldName + "]");
+ currentFieldName + "]");
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -279,7 +297,7 @@ public void parse(
.setRequireAlias(requireAlias)
.routing(routing);
try (XContentParser sliceParser = createParser(
sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContent)) {
sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContent, restApiVersion)) {
updateRequest.fromXContent(sliceParser);
}
if (fetchSourceContext != null) {
Expand All @@ -299,36 +317,40 @@ public void parse(
}
}

private static XContentParser createParser(BytesReference data, XContent xContent) throws IOException {
private static XContentParser createParser(BytesReference data, XContent xContent, RestApiVersion restApiVersion) throws IOException {
if (data.hasArray()) {
return parseBytesArray(xContent, data, 0, data.length());
return parseBytesArray(xContent, data, 0, data.length(), restApiVersion);
} else {
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, data.streamInput());
return xContent.createParserForCompatibility(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
data.streamInput(), restApiVersion);
}
}

// Create an efficient parser of the given bytes, trying to directly parse a byte array if possible and falling back to stream wrapping
// otherwise.
private static XContentParser createParser(BytesReference data, XContent xContent, int from, int nextMarker) throws IOException {
private static XContentParser createParser(BytesReference data, XContent xContent, int from, int nextMarker,
RestApiVersion restApiVersion) throws IOException {
if (data.hasArray()) {
return parseBytesArray(xContent, data, from, nextMarker);
return parseBytesArray(xContent, data, from, nextMarker, restApiVersion);
} else {
final int length = nextMarker - from;
final BytesReference slice = data.slice(from, length);
if (slice.hasArray()) {
return parseBytesArray(xContent, slice, 0, length);
return parseBytesArray(xContent, slice, 0, length, restApiVersion);
} else {
// EMPTY is safe here because we never call namedObject
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, slice.streamInput());
return xContent.createParserForCompatibility(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
slice.streamInput(), restApiVersion);
}
}
}

private static XContentParser parseBytesArray(XContent xContent, BytesReference array, int from, int nextMarker) throws IOException {
private static XContentParser parseBytesArray(XContent xContent, BytesReference array, int from, int nextMarker,
RestApiVersion restApiVersion) throws IOException {
assert array.hasArray();
final int offset = array.arrayOffset();
// EMPTY is safe here because we never call namedObject
return xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, array.array(),
offset + from, nextMarker - from);
return xContent.createParserForCompatibility(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, array.array(),
offset + from, nextMarker - from, restApiVersion);
}
}
Loading