Skip to content

Commit

Permalink
Merge branch 'master' into retention-leases-recovery
Browse files Browse the repository at this point in the history
* master: (23 commits)
  Lift retention lease expiration to index shard (elastic#38380)
  Make Ccr recovery file chunk size configurable (elastic#38370)
  Prevent CCR recovery from missing documents (elastic#38237)
  re-enables awaitsfixed datemath tests (elastic#38376)
  Types removal fix FullClusterRestartIT warnings (elastic#38445)
  Make sure to reject mappings with type _doc when include_type_name is false. (elastic#38270)
  Updates the grok patterns to be consistent with logstash (elastic#27181)
  Ignore type-removal warnings in XPackRestTestHelper (elastic#38431)
  testHlrcFromXContent() should respect assertToXContentEquivalence() (elastic#38232)
  add basic REST test for geohash_grid (elastic#37996)
  Remove DiscoveryPlugin#getDiscoveryTypes (elastic#38414)
  Fix the clock resolution to millis in GetWatchResponseTests (elastic#38405)
  Throw AssertionError when no master (elastic#38432)
  `if_seq_no` and `if_primary_term` parameters aren't wired correctly in REST Client's CRUD API (elastic#38411)
  Enable CronEvalToolTest.testEnsureDateIsShownInRootLocale (elastic#38394)
  Fix failures in BulkProcessorIT#testGlobalParametersAndBulkProcessor. (elastic#38129)
  SQL: Implement CURRENT_DATE (elastic#38175)
  Mute testReadRequestsReturnLatestMappingVersion (elastic#38438)
  [ML] Report index unavailable instead of waiting for lazy node (elastic#38423)
  Update Rollup Caps to allow unknown fields (elastic#38339)
  ...
  • Loading branch information
jasontedor committed Feb 5, 2019
2 parents 439c1d0 + b03d138 commit d010f37
Show file tree
Hide file tree
Showing 147 changed files with 3,824 additions and 3,604 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ static Request delete(DeleteRequest deleteRequest) {
parameters.withTimeout(deleteRequest.timeout());
parameters.withVersion(deleteRequest.version());
parameters.withVersionType(deleteRequest.versionType());
parameters.withIfSeqNo(deleteRequest.ifSeqNo());
parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm());
parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards());
return request;
Expand Down Expand Up @@ -191,6 +193,11 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
}
}

if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
metadata.field("if_seq_no", action.ifSeqNo());
metadata.field("if_primary_term", action.ifPrimaryTerm());
}

if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
IndexRequest indexRequest = (IndexRequest) action;
if (Strings.hasLength(indexRequest.getPipeline())) {
Expand Down Expand Up @@ -319,6 +326,8 @@ static Request index(IndexRequest indexRequest) {
parameters.withTimeout(indexRequest.timeout());
parameters.withVersion(indexRequest.version());
parameters.withVersionType(indexRequest.versionType());
parameters.withIfSeqNo(indexRequest.ifSeqNo());
parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
parameters.withPipeline(indexRequest.getPipeline());
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,

static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");

private final String remoteCluster;
private final String leaderIndex;
Expand All @@ -55,7 +54,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster);
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.client.ccr.PutFollowRequest.FOLLOWER_INDEX_FIELD;

public final class ResumeFollowRequest extends FollowConfig implements Validatable, ToXContentObject {

private final String followerIndex;
Expand All @@ -39,7 +37,6 @@ public ResumeFollowRequest(String followerIndex) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
toXContentFragment(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class RollableIndexCaps implements ToXContentFragment {
public static final Function<String, ConstructingObjectParser<RollableIndexCaps, Void>> PARSER = indexName -> {
@SuppressWarnings("unchecked")
ConstructingObjectParser<RollableIndexCaps, Void> p
= new ConstructingObjectParser<>(indexName,
= new ConstructingObjectParser<>(indexName, true,
a -> new RollableIndexCaps(indexName, (List<RollupJobCaps>) a[0]));

p.declareObjectArray(ConstructingObjectParser.constructorArg(), RollupJobCaps.PARSER::apply,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Represents the Rollup capabilities for a specific job on a single rollup index
Expand All @@ -45,15 +45,12 @@ public class RollupJobCaps implements ToXContentObject {
private static final ParseField FIELDS = new ParseField("fields");
private static final String NAME = "rollup_job_caps";

public static final ConstructingObjectParser<RollupJobCaps, Void> PARSER = new ConstructingObjectParser<>(NAME,
public static final ConstructingObjectParser<RollupJobCaps, Void> PARSER = new ConstructingObjectParser<>(NAME, true,
a -> {
@SuppressWarnings("unchecked")
List<Tuple<String, RollupFieldCaps>> caps = (List<Tuple<String, RollupFieldCaps>>) a[3];
if (caps.isEmpty()) {
return new RollupJobCaps((String) a[0], (String) a[1], (String) a[2], Collections.emptyMap());
}
Map<String, RollupFieldCaps> mapCaps = new HashMap<>(caps.size());
caps.forEach(c -> mapCaps.put(c.v1(), c.v2()));
Map<String, RollupFieldCaps> mapCaps =
new HashMap<>(caps.stream().collect(Collectors.toMap(Tuple::v1, Tuple::v2)));
return new RollupJobCaps((String) a[0], (String) a[1], (String) a[2], mapCaps);
});

Expand Down Expand Up @@ -140,16 +137,6 @@ public static class RollupFieldCaps implements ToXContentFragment {
private static final String NAME = "rollup_field_caps";
private final List<Map<String, Object>> aggs;

public static final Function<String, ConstructingObjectParser<RollupFieldCaps, Void>> PARSER = fieldName -> {
@SuppressWarnings("unchecked")
ConstructingObjectParser<RollupFieldCaps, Void> parser
= new ConstructingObjectParser<>(NAME, a -> new RollupFieldCaps((List<Map<String, Object>>) a[0]));

parser.declareObjectArray(ConstructingObjectParser.constructorArg(),
(p, c) -> p.map(), new ParseField(fieldName));
return parser;
};

RollupFieldCaps(final List<Map<String, Object>> aggs) {
this.aggs = Collections.unmodifiableList(Objects.requireNonNull(aggs));
}
Expand All @@ -170,13 +157,12 @@ public static RollupFieldCaps fromXContent(XContentParser parser) throws IOExcep
List<Map<String, Object>> aggs = new ArrayList<>();
if (parser.nextToken().equals(XContentParser.Token.START_ARRAY)) {
while (parser.nextToken().equals(XContentParser.Token.START_OBJECT)) {
aggs.add(Collections.unmodifiableMap(parser.map()));
aggs.add(parser.map());
}
}
return new RollupFieldCaps(Collections.unmodifiableList(aggs));
return new RollupFieldCaps(aggs);
}


@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.action.document.RestBulkAction;
Expand Down Expand Up @@ -75,12 +76,12 @@ private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.List
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT,
bulkListener), listener);
}

private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(request, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
bulkListener), listener);
}
}

public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -383,22 +384,22 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, localType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(localType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
{
//Check that untyped document additions and untyped global inherit the established custom type
// (the custom document type introduced to the mapping by the earlier code in this test)
// (the custom document type introduced to the mapping by the earlier code in this test)
String globalType = null;
String localType = null;
final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -414,20 +415,19 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(customType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
}

@SuppressWarnings("unchecked")
Expand All @@ -438,8 +438,8 @@ private Matcher<SearchHit>[] expectedIds(int numDocs) {
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
Expand All @@ -448,33 +448,41 @@ private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, S
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);

if (localType != null) {
// If the payload contains types, parsing it into a bulk request results in a warning.
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
}
multiGetRequest.add(localIndex, Integer.toString(i));
}
return multiGetRequest;
}

private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
String action = Strings.toString(jsonBuilder()
.startObject()
.startObject("index")
.field("_index", localIndex)
.field("_type", localType)
.field("_id", Integer.toString(id))
.endObject()
.endObject()
);
String source = Strings.toString(jsonBuilder()
XContentBuilder action = jsonBuilder().startObject().startObject("index");

if (localIndex != null) {
action.field("_index", localIndex);
}

if (localType != null) {
action.field("_type", localType);
}

action.field("_id", Integer.toString(id));
action.endObject().endObject();

XContentBuilder source = jsonBuilder()
.startObject()
.field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
.endObject()
);
.endObject();

String request = action + "\n" + source + "\n";
String request = Strings.toString(action) + "\n" + Strings.toString(source) + "\n";
return new BytesArray(request);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", null, null, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ public void testDelete() throws IOException {
{
// Testing deletion
String docId = "id";
highLevelClient().index(
IndexResponse indexResponse = highLevelClient().index(
new IndexRequest("index").id(docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
assertThat(indexResponse.getSeqNo(), greaterThanOrEqualTo(0L));
DeleteRequest deleteRequest = new DeleteRequest("index", docId);
if (randomBoolean()) {
deleteRequest.version(1L);
deleteRequest.setIfSeqNo(indexResponse.getSeqNo());
deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm());
}
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync);
assertEquals("index", deleteResponse.getIndex());
Expand All @@ -131,12 +133,13 @@ public void testDelete() throws IOException {
String docId = "version_conflict";
highLevelClient().index(
new IndexRequest("index").id( docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
DeleteRequest deleteRequest = new DeleteRequest("index", docId).version(2);
DeleteRequest deleteRequest = new DeleteRequest("index", docId).setIfSeqNo(2).setIfPrimaryTerm(2);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][" + docId + "]: " +
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
"version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]",
exception.getMessage());
assertEquals("index", exception.getMetadata("es.index").get(0));
}
{
Expand Down Expand Up @@ -519,13 +522,14 @@ public void testIndex() throws IOException {
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
IndexRequest wrongRequest = new IndexRequest("index").id("id");
wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
wrongRequest.version(5L);
wrongRequest.setIfSeqNo(1L).setIfPrimaryTerm(5L);

execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync);
});
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: " +
"version conflict, current version [2] is different than the one provided [5]]", exception.getMessage());
"version conflict, required seqNo [1], primary term [5]. current document has seqNo [2] and primary term [1]]",
exception.getMessage());
assertEquals("index", exception.getMetadata("es.index").get(0));
}
{
Expand Down Expand Up @@ -820,7 +824,8 @@ public void testBulk() throws IOException {
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index").id(id).source(source, xContentType);
if (erroneous) {
indexRequest.version(12L);
indexRequest.setIfSeqNo(12L);
indexRequest.setIfPrimaryTerm(12L);
}
bulkRequest.add(indexRequest);

Expand Down Expand Up @@ -1130,7 +1135,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index").id(id).source(xContentType, "id", i);
if (erroneous) {
indexRequest.version(12L);
indexRequest.setIfSeqNo(12L);
indexRequest.setIfPrimaryTerm(12L);
}
processor.add(indexRequest);

Expand Down
Loading

0 comments on commit d010f37

Please sign in to comment.