Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/master' into geosql
Browse files Browse the repository at this point in the history
  • Loading branch information
imotov committed Apr 9, 2019
2 parents 513eba1 + 1de2a25 commit ee64cc9
Show file tree
Hide file tree
Showing 32 changed files with 613 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,18 @@ private String createExpiredData(String jobId) throws Exception {

waitForJobToClose(jobId);

long prevJobTimeStamp = System.currentTimeMillis() / 1000;

// Check that the current timestamp component, in seconds, differs from previously.
// Note that we used to use an 'awaitBusy(() -> false, 1, TimeUnit.SECONDS);'
// for the same purpose but the new approach...
// a) explicitly checks that the timestamps, in seconds, are actually different and
// b) is slightly more efficient since we may not need to wait an entire second for the timestamp to increment
assertBusy(() -> {
long timeNow = System.currentTimeMillis() / 1000;
assertFalse(prevJobTimeStamp >= timeNow);
});

// Update snapshot timestamp to force it out of snapshot retention window
long oneDayAgo = nowMillis - TimeValue.timeValueHours(24).getMillis() - 1;
updateModelSnapshotTimestamp(jobId, String.valueOf(oneDayAgo));
Expand Down Expand Up @@ -1418,6 +1430,7 @@ private void startDatafeed(String datafeedId, String start, String end) throws E
}

private void updateModelSnapshotTimestamp(String jobId, String timestamp) throws Exception {

MachineLearningClient machineLearningClient = highLevelClient().machineLearning();

GetModelSnapshotsRequest getModelSnapshotsRequest = new GetModelSnapshotsRequest(jobId);
Expand All @@ -1435,9 +1448,6 @@ private void updateModelSnapshotTimestamp(String jobId, String timestamp) throws
UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + jobId, "_doc", documentId);
updateSnapshotRequest.doc(snapshotUpdate.getBytes(StandardCharsets.UTF_8), XContentType.JSON);
highLevelClient().update(updateSnapshotRequest, RequestOptions.DEFAULT);

// Wait a second to ensure subsequent model snapshots will have a different ID (it depends on epoch seconds)
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
}


Expand Down
39 changes: 32 additions & 7 deletions docs/reference/mapping/params/enabled.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ you are using Elasticsearch as a web session store. You may want to index the
session ID and last update time, but you don't need to query or run
aggregations on the session data itself.

The `enabled` setting, which can be applied only to the mapping type and to
<<object,`object`>> fields, causes Elasticsearch to skip parsing of the
contents of the field entirely. The JSON can still be retrieved from the
<<mapping-source-field,`_source`>> field, but it is not searchable or stored
in any other way:
The `enabled` setting, which can be applied only to the top-level mapping
definition and to <<object,`object`>> fields, causes Elasticsearch to skip
parsing of the contents of the field entirely. The JSON can still be retrieved
from the <<mapping-source-field,`_source`>> field, but it is not searchable or
stored in any other way:

[source,js]
--------------------------------------------------
Expand All @@ -26,6 +26,7 @@ PUT my_index
"type": "date"
},
"session_data": { <1>
"type": "object",
"enabled": false
}
}
Expand Down Expand Up @@ -55,7 +56,7 @@ PUT my_index/_doc/session_2
<2> Any arbitrary data can be passed to the `session_data` field as it will be entirely ignored.
<3> The `session_data` will also ignore values that are not JSON objects.

The entire mapping type may be disabled as well, in which case the document is
The entire mapping may be disabled as well, in which case the document is
stored in the <<mapping-source-field,`_source`>> field, which means it can be
retrieved, but none of its contents are indexed in any way:

Expand Down Expand Up @@ -84,10 +85,34 @@ GET my_index/_doc/session_1 <2>
GET my_index/_mapping <3>
--------------------------------------------------
// CONSOLE
<1> The entire mapping type is disabled.
<1> The entire mapping is disabled.
<2> The document can be retrieved.
<3> Checking the mapping reveals that no fields have been added.

TIP: The `enabled` setting can be updated on existing fields
using the <<indices-put-mapping,PUT mapping API>>.

Note that because Elasticsearch completely skips parsing the field
contents, it is possible to add non-object data to a disabled field:
[source,js]
--------------------------------------------------
PUT my_index
{
"mappings": {
"properties": {
"session_data": {
"type": "object",
"enabled": false
}
}
}
}
PUT my_index/_doc/session_1
{
"session_data": "foo bar" <1>
}
--------------------------------------------------
// CONSOLE

<1> The document is added successfully, even though `session_data` contains non-object data.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static void performOnPrimary(
private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

@Override
protected void doRun() {
protected void doRun() throws Exception {
while (context.hasMoreOperationsToExecute()) {
if (executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate,
ActionListener.wrap(v -> executor.execute(this), this::onRejection)) == false) {
Expand All @@ -168,12 +168,6 @@ protected void doRun() {
finishRequest();
}

@Override
public void onFailure(Exception e) {
assert false : "All exceptions should be handled by #executeBulkItemRequest";
onRejection(e);
}

@Override
public void onRejection(Exception e) {
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
Expand Down Expand Up @@ -204,7 +198,7 @@ private void finishRequest() {
*/
static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener) {
ActionListener<Void> itemDoneListener) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();

final UpdateHelper.Result updateResult;
Expand Down Expand Up @@ -252,55 +246,51 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
final IndexShard primary = context.getPrimary();
final long version = context.getRequestToExecute().version();
final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
try {
final Engine.Result result;
if (isDelete) {
final DeleteRequest request = context.getRequestToExecute();
result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
request.ifSeqNo(), request.ifPrimaryTerm());
} else {
final IndexRequest request = context.getRequestToExecute();
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
context.getRequestToExecute().type(),
new ActionListener<Void>() {
@Override
public void onResponse(Void v) {
context.markAsRequiringMappingUpdate();
waitForMappingUpdate.accept(
ActionListener.runAfter(new ActionListener<Void>() {
@Override
public void onResponse(Void v) {
assert context.requiresWaitingForMappingUpdate();
context.resetForExecutionForRetry();
}

@Override
public void onFailure(Exception e) {
context.failOnMappingUpdate(e);
}
}, () -> itemDoneListener.onResponse(null))
);
}

@Override
public void onFailure(Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
// Requesting mapping update failed, so we don't have to wait for a cluster state update
assert context.isInitial();
itemDoneListener.onResponse(null);
}
});
return false;
} else {
onComplete(result, context, updateResult);
}
} catch (Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
final Engine.Result result;
if (isDelete) {
final DeleteRequest request = context.getRequestToExecute();
result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
request.ifSeqNo(), request.ifPrimaryTerm());
} else {
final IndexRequest request = context.getRequestToExecute();
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
context.getRequestToExecute().type(),
new ActionListener<>() {
@Override
public void onResponse(Void v) {
context.markAsRequiringMappingUpdate();
waitForMappingUpdate.accept(
ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Void v) {
assert context.requiresWaitingForMappingUpdate();
context.resetForExecutionForRetry();
}

@Override
public void onFailure(Exception e) {
context.failOnMappingUpdate(e);
}
}, () -> itemDoneListener.onResponse(null))
);
}

@Override
public void onFailure(Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
// Requesting mapping update failed, so we don't have to wait for a cluster state update
assert context.isInitial();
itemDoneListener.onResponse(null);
}
});
return false;
} else {
onComplete(result, context, updateResult);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
primaryRequest.getPrimaryTerm()),
transportOptions,
new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
new ActionListenerResponseHandler<>(onCompletionListener, reader) {
@Override
public void handleResponse(Response response) {
setPhase(replicationTask, "finished");
Expand All @@ -357,58 +357,54 @@ public void handleException(TransportException exp) {
});
} else {
setPhase(replicationTask, "primary");
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(primaryRequest.getRequest(),
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
primaryShardReference)
.execute();
ActionListener.wrap(result -> result.respond(
new ActionListener<>() {
@Override
public void onResponse(Response response) {
if (syncGlobalCheckpointAfterOperation) {
final IndexShard shard = primaryShardReference.indexShard;
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
handleException(primaryShardReference, e);
}
}), e -> handleException(primaryShardReference, e)
), primaryShardReference).execute();
}
} catch (Exception e) {
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
onFailure(e);
handleException(primaryShardReference, e);
}
}

private void handleException(PrimaryShardReference primaryShardReference, Exception e) {
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
onFailure(e);
}

@Override
public void onFailure(Exception e) {
setPhase(replicationTask, "finished");
onCompletionListener.onFailure(e);
}

private ActionListener<Response> createResponseListener(final PrimaryShardReference primaryShardReference) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
if (syncGlobalCheckpointAfterOperation) {
final IndexShard shard = primaryShardReference.indexShard;
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync",
shard.shardId()),
e);
// intentionally swallow, a missed global checkpoint sync should not fail this operation
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onFailure(e);
}
};
}

protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
Expand Down Expand Up @@ -78,8 +77,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;


@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/40944")
public class SplitIndexIT extends ESIntegTestCase {

@Override
Expand Down
Loading

0 comments on commit ee64cc9

Please sign in to comment.