Skip to content

Commit

Permalink
[Type removal] Ignore _type field in bulk request (opensearch-project…
Browse files Browse the repository at this point in the history
…#3505)

Reverts [Type removal] Remove type from BulkRequestParser and, instead, ignores _type field in bulk request. This enables bulk API bwc with external clients such as Beats and Logstash until a formal REST Version API mechanism is available for OpenSearch core.

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored Jun 6, 2022
1 parent 625d08a commit a93ff13
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
---
"Array of objects":
- do:
bulk:
refresh: true
body:
- index:
_index: test_index
_type: test_type
_id: test_id
- f1: v1
f2: 42
- index:
_index: test_index
_type: test_type
_id: test_id2
- f1: v2
f2: 47

- do:
count:
index: test_index

- match: {count: 2}

---
"Empty _id":
- do:
bulk:
refresh: true
body:
- index:
_index: test
_type: type
_id: ''
- f: 1
- index:
_index: test
_type: type
_id: id
- f: 2
- index:
_index: test
_type: type
- f: 3
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }

- do:
count:
index: test

- match: { count: 2 }

---
"Empty _id with op_type create":
- skip:
version: " - 7.4.99"
reason: "auto id + op type create only supported since 7.5"

- do:
bulk:
refresh: true
body:
- index:
_index: test
_type: type
_id: ''
- f: 1
- index:
_index: test
_type: type
_id: id
- f: 2
- index:
_index: test
_type: type
- f: 3
- create:
_index: test
_type: type
- f: 4
- index:
_index: test
_type: type
op_type: create
- f: 5
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }
- match: { items.3.create.result: created }
- match: { items.4.create.result: created }

- do:
count:
index: test

- match: { count: 4 }

---
"empty action":
- skip:
features: headers

- do:
catch: /Malformed action\/metadata line \[3\], expected FIELD_NAME but found \[END_OBJECT\]/
headers:
Content-Type: application/json
bulk:
body: |
{"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id"}}
{"f1": "v1", "f2": 42}
{}
---
"List of strings":
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id"}}'
- '{"f1": "v1", "f2": 42}'
- '{"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id2"}}'
- '{"f1": "v2", "f2": 47}'

- do:
count:
index: test_index

- match: {count: 2}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ public void testBulkWithGlobalDefaults() throws Exception {
}
}

// Todo: This test is added to verify type support in bulk action. This should be removed once all clients
// avoid sending this param.
// https://github.com/opensearch-project/OpenSearch/issues/3484
public void testBulkWithTypes() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/opensearch/action/bulk/bulk-with-deprecated-types.json");
{
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON);
assertThat(bulkRequest.numberOfActions(), equalTo(5));
}
}

private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException {
XContentBuilder pipeline = jsonBuilder().startObject()
.startArray("processors")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ public BulkRequest add(
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
new BulkRequestParser().parse(
// https://github.com/opensearch-project/OpenSearch/issues/3484
// Undo error on types which breaks compatibility with some external clients
new BulkRequestParser(false).parse(
data,
defaultIndex,
routing,
Expand All @@ -296,7 +298,7 @@ public BulkRequest add(
requireAlias,
allowExplicitIndex,
xContentType,
this::internalAdd,
(indexRequest, type) -> internalAdd(indexRequest),
this::internalAdd,
this::add
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -66,6 +67,7 @@
public final class BulkRequestParser {

private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
private static final ParseField ID = new ParseField("_id");
private static final ParseField ROUTING = new ParseField("routing");
private static final ParseField OP_TYPE = new ParseField("op_type");
Expand All @@ -78,6 +80,17 @@ public final class BulkRequestParser {
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;

/**
* Create a new parser.
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
*/
public BulkRequestParser(boolean errorOnType) {
this.errorOnType = errorOnType;
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
Expand Down Expand Up @@ -123,7 +136,7 @@ public void parse(
@Nullable Boolean defaultRequireAlias,
boolean allowExplicitIndex,
XContentType xContentType,
Consumer<IndexRequest> indexRequestConsumer,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
Expand Down Expand Up @@ -179,6 +192,7 @@ public void parse(
String action = parser.currentName();

String index = defaultIndex;
String type = null;
String id = null;
String routing = defaultRouting;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
Expand All @@ -191,7 +205,7 @@ public void parse(
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;

// at this stage, next token can either be END_OBJECT (and use default index with auto generated id)
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();

Expand All @@ -206,6 +220,13 @@ public void parse(
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (errorOnType) {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -301,7 +322,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
} else {
indexRequestConsumer.accept(
Expand All @@ -314,7 +336,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
}
} else if ("create".equals(action)) {
Expand All @@ -328,7 +351,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
Expand Down
Loading

0 comments on commit a93ff13

Please sign in to comment.