Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix failures in BulkProcessorIT#testGlobalParametersAndBulkProcessor. #38129

Merged
merged 2 commits into from
Feb 5, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.action.document.RestBulkAction;
Expand Down Expand Up @@ -75,12 +76,12 @@ private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.List
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT,
bulkListener), listener);
}

private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(request, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
bulkListener), listener);
}
}

public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -383,22 +384,22 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, localType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(localType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
{
//Check that untyped document additions and untyped global inherit the established custom type
// (the custom document type introduced to the mapping by the earlier code in this test)
// (the custom document type introduced to the mapping by the earlier code in this test)
String globalType = null;
String localType = null;
final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -414,20 +415,19 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(customType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
}

@SuppressWarnings("unchecked")
Expand All @@ -438,8 +438,8 @@ private Matcher<SearchHit>[] expectedIds(int numDocs) {
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
Expand All @@ -448,33 +448,41 @@ private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, S
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);

if (localType != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line troubled me. Bulk requests can pass a type on each request line (localType var here) or pass a default fallback choice which is inherited by any request lines that don't declare a localType (the fallback is called globalType).
So here, I assumed the test's logic should ideally be:

 if (localType !=null || globalType != null) {

These are the conditions under which a user has sent a choice of custom type to the server.
Sadly this doesn't work. It looks like in the server-side BulkRequest that only types expressed at the line level are flagged. It assumes any "global" references to type are already flagged by the main RestBulkAction which we are not exercising in this test. What we are squashing with these assertWarnings checks in BulkProcessorIT are the ThreadContext values normally set up on the server side by the server-side objects like BulkRequest which we are (questionably) re-using in HLRC.
These warning checks are not really the same as simply checking that the response from a _bulk REST interaction has the expected warning if you use happen to use global or local type references in the request. That contract is exercised in BulkRequestWithGlobalParametersIT so I think we're good there.

// If the payload contains types, parsing it into a bulk request results in a warning.
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
}
multiGetRequest.add(localIndex, Integer.toString(i));
}
return multiGetRequest;
}

private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
String action = Strings.toString(jsonBuilder()
.startObject()
.startObject("index")
.field("_index", localIndex)
.field("_type", localType)
.field("_id", Integer.toString(id))
.endObject()
.endObject()
);
String source = Strings.toString(jsonBuilder()
XContentBuilder action = jsonBuilder().startObject().startObject("index");

if (localIndex != null) {
action.field("_index", localIndex);
}

if (localType != null) {
action.field("_type", localType);
}

action.field("_id", Integer.toString(id));
action.endObject().endObject();

XContentBuilder source = jsonBuilder()
.startObject()
.field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
.endObject()
);
.endObject();

String request = action + "\n" + source + "\n";
String request = Strings.toString(action) + "\n" + Strings.toString(source) + "\n";
return new BytesArray(request);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", null, null, null, null);
}

Expand Down