Skip to content

Commit

Permalink
Merge branch 'main' into downsample_ilm_better_naming
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Aug 15, 2023
2 parents a78c826 + 6587611 commit 9ea8a05
Show file tree
Hide file tree
Showing 38 changed files with 522 additions and 341 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/97967.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 97967
summary: Ensure frozen indices have correct tier preference
area: "Indices APIs"
type: bug
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/98459.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98459
summary: Unwrap IOException in `ContextIndexSearcher` concurrent code-path
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ beta::[]
<titleabbrev>Delete Analytics Collection</titleabbrev>
++++

////
[source,console]
----
PUT _application/analytics/my_analytics_collection
----
// TESTSETUP
////

Removes an Analytics Collection and its associated data stream.

[[delete-analytics-collection-request]]
Expand Down Expand Up @@ -44,4 +53,3 @@ The following example deletes the Analytics Collection named `my_analytics_colle
----
DELETE _application/analytics/my_analytics_collection/
----
// TEST[skip:TBD]
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ beta::[]
<titleabbrev>List Analytics Collections</titleabbrev>
++++

////
[source,console]
----
PUT _application/analytics/my_analytics_collection
PUT _application/analytics/my_analytics_collection2
----
// TESTSETUP
[source,console]
----
DELETE _application/analytics/my_analytics_collection
DELETE _application/analytics/my_analytics_collection2
----
// TEARDOWN
////

Returns information about Analytics Collections.

[[list-analytics-collection-request]]
Expand Down Expand Up @@ -44,7 +60,6 @@ The following example lists all configured Analytics Collections:
----
GET _application/analytics/
----
// TEST[skip:TBD]

A sample response:

Expand All @@ -70,7 +85,6 @@ The following example returns the Analytics Collection that matches `my_analytic
----
GET _application/analytics/my_analytics_collection
----
// TEST[skip:TBD]

A sample response:

Expand All @@ -91,7 +105,6 @@ The following example returns all Analytics Collections prefixed with `my`:
----
GET _application/analytics/my*
----
// TEST[skip:TBD]

A sample response:

Expand All @@ -110,4 +123,3 @@ A sample response:
}
}
----
// TEST[skip:TBD]
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@ beta::[]
<titleabbrev>Post Analytics Collection Event</titleabbrev>
++++

////
[source,console]
----
PUT _application/analytics/my_analytics_collection
----
// TESTSETUP
[source,console]
----
DELETE _application/analytics/my_analytics_collection
----
// TEARDOWN
////

Post an event to an Analytics Collection.

[[post-analytics-collection-event-request]]
Expand Down Expand Up @@ -85,4 +99,3 @@ POST _application/analytics/my_analytics_collection/event/search_click
}
}
----
// TEST[skip:TBD]
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ beta::[]
<titleabbrev>Put Analytics Collection</titleabbrev>
++++

////
[source,console]
----
DELETE _application/analytics/my_analytics_collection
----
// TEARDOWN
////

Creates an Analytics Collection.

[[put-analytics-collection-request]]
Expand Down Expand Up @@ -40,6 +48,4 @@ The following example creates a new Analytics Collection called `my_analytics_co
[source,console]
----
PUT _application/analytics/my_analytics_collection
----
// TEST[skip:TBD]
209 changes: 99 additions & 110 deletions server/src/main/java/org/elasticsearch/Version.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -127,7 +127,7 @@ private void sendShardAction(
masterNode,
actionName,
request,
new EmptyTransportResponseHandler(listener.delegateResponse((l, exp) -> {
TransportResponseHandler.empty(TransportResponseHandler.TRANSPORT_WORKER, listener.delegateResponse((l, exp) -> {
if (isMasterChannelException(exp)) {
waitForNewMasterAndRetry(actionName, observer, request, listener);
} else {
Expand Down Expand Up @@ -284,7 +284,10 @@ private static class ShardFailedTransportHandler implements TransportRequestHand
@Override
public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) throws Exception {
logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure);
var update = new FailedShardUpdateTask(request, new ChannelActionListener<>(channel));
var update = new FailedShardUpdateTask(
request,
new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)
);
taskQueue.submitTask(TASK_SOURCE, update, null);
}
}
Expand Down Expand Up @@ -316,7 +319,7 @@ public ClusterState execute(BatchExecutionContext<FailedShardUpdateTask> batchEx
entry,
entry.getShardId().getIndex()
);
taskContext.success(() -> task.listener().onResponse(TransportResponse.Empty.INSTANCE));
taskContext.success(task::onSuccess);
} else {
// The primary term is 0 if the shard failed itself. It is > 0 if a write was done on a primary but was failed to be
// replicated to the shard copy with the provided allocation id. In case where the shard failed itself, it's ok to just
Expand Down Expand Up @@ -375,7 +378,7 @@ public ClusterState execute(BatchExecutionContext<FailedShardUpdateTask> batchEx
} else {
// tasks that correspond to non-existent shards are marked as successful
logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", entry.getShardId(), entry);
taskContext.success(() -> task.listener().onResponse(TransportResponse.Empty.INSTANCE));
taskContext.success(task::onSuccess);
}
} else {
// failing a shard also possibly marks it as stale (see IndexMetadataUpdater)
Expand All @@ -392,7 +395,8 @@ public ClusterState execute(BatchExecutionContext<FailedShardUpdateTask> batchEx
// drop deprecation warnings arising from the computation (reroute etc).
maybeUpdatedState = applyFailedShards(initialState, failedShardsToBeApplied, staleShardsToBeApplied);
for (final var taskContext : tasksToBeApplied) {
taskContext.success(() -> taskContext.getTask().listener().onResponse(TransportResponse.Empty.INSTANCE));
final var task = taskContext.getTask();
taskContext.success(task::onSuccess);
}
} catch (Exception e) {
logger.warn(() -> format("failed to apply failed shards %s", failedShardsToBeApplied), e);
Expand Down Expand Up @@ -518,9 +522,11 @@ public int hashCode() {
}
}

public record FailedShardUpdateTask(FailedShardEntry entry, ActionListener<TransportResponse.Empty> listener)
implements
ClusterStateTaskListener {
public record FailedShardUpdateTask(FailedShardEntry entry, ActionListener<Void> listener) implements ClusterStateTaskListener {
public void onSuccess() {
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
logger.log(
Expand Down Expand Up @@ -569,12 +575,13 @@ private static class ShardStartedTransportHandler implements TransportRequestHan
}

@Override
public void messageReceived(StartedShardEntry request, TransportChannel channel, Task task) throws Exception {
public void messageReceived(StartedShardEntry request, TransportChannel channel, Task task) {
logger.debug("{} received shard started for [{}]", request.shardId, request);
final ChannelActionListener<TransportResponse.Empty> listener = new ChannelActionListener<>(channel);

var update = new StartedShardUpdateTask(request, listener);
taskQueue.submitTask("shard-started " + request, update, null);
taskQueue.submitTask(
"shard-started " + request,
new StartedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)),
null
);
}
}

Expand Down Expand Up @@ -604,7 +611,7 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
// requests might still be in flight even after the shard has already been started or failed on the master. We just
// ignore these requests for now.
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", entry.shardId, entry);
taskContext.success(() -> task.listener().onResponse(TransportResponse.Empty.INSTANCE));
taskContext.success(task::onSuccess);
} else {
if (matched.primary() && entry.primaryTerm > 0) {
final IndexMetadata indexMetadata = initialState.metadata().index(entry.shardId.getIndex());
Expand All @@ -625,7 +632,7 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
entry.primaryTerm,
currentPrimaryTerm
);
taskContext.success(() -> task.listener().onResponse(TransportResponse.Empty.INSTANCE));
taskContext.success(task::onSuccess);
continue;
}
}
Expand All @@ -638,7 +645,7 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
entry,
matched
);
taskContext.success(() -> task.listener().onResponse(TransportResponse.Empty.INSTANCE));
taskContext.success(task::onSuccess);
} else {
// remove duplicate actions as allocation service expects a clean list without duplicates
if (seenShardRoutings.contains(matched)) {
Expand Down Expand Up @@ -695,7 +702,8 @@ public ClusterState execute(BatchExecutionContext<StartedShardUpdateTask> batchE
assert assertStartedIndicesHaveCompleteTimestampRanges(maybeUpdatedState);

for (final var taskContext : tasksToBeApplied) {
taskContext.success(() -> taskContext.getTask().listener().onResponse(TransportResponse.Empty.INSTANCE));
final var task = taskContext.getTask();
taskContext.success(task::onSuccess);
}
} catch (Exception e) {
logger.warn(() -> format("failed to apply started shards %s", shardRoutingsToBeApplied), e);
Expand Down Expand Up @@ -804,9 +812,7 @@ public int hashCode() {
}
}

public record StartedShardUpdateTask(StartedShardEntry entry, ActionListener<TransportResponse.Empty> listener)
implements
ClusterStateTaskListener {
public record StartedShardUpdateTask(StartedShardEntry entry, ActionListener<Void> listener) implements ClusterStateTaskListener {

public StartedShardEntry getEntry() {
return entry;
Expand All @@ -824,6 +830,10 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}

public void onSuccess() {
listener.onResponse(null);
}

@Override
public String toString() {
return "StartedShardUpdateTask{entry=" + entry + ", listener=" + listener + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
Expand Down Expand Up @@ -74,6 +73,7 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -674,7 +674,7 @@ private void validateJoinRequest(JoinRequest joinRequest, ActionListener<Void> v
// - we have a healthy PING channel to the node

final ClusterState stateForJoinValidation = getStateForJoinValidationService();
final ListenableActionFuture<Empty> validateStateListener = new ListenableActionFuture<>();
final ListenableActionFuture<Void> validateStateListener = new ListenableActionFuture<>();
if (stateForJoinValidation != null) {
assert stateForJoinValidation.nodes().isLocalNodeElectedMaster();
onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
Expand All @@ -693,8 +693,8 @@ private void validateJoinRequest(JoinRequest joinRequest, ActionListener<Void> v

sendJoinPing(joinRequest.getSourceNode(), TransportRequestOptions.Type.PING, new ActionListener<>() {
@Override
public void onResponse(Empty empty) {
validateStateListener.addListener(validateListener.map(ignored -> null));
public void onResponse(Void ignored) {
validateStateListener.addListener(validateListener);
}

@Override
Expand All @@ -703,7 +703,7 @@ public void onFailure(Exception e) {
// don't want lots of cluster states in flight.
validateStateListener.addListener(new ActionListener<>() {
@Override
public void onResponse(Empty empty) {
public void onResponse(Void ignored) {
validateListener.onFailure(e);
}

Expand All @@ -717,7 +717,7 @@ public void onFailure(Exception e2) {
});
}

private void sendJoinValidate(DiscoveryNode discoveryNode, ActionListener<Empty> listener) {
private void sendJoinValidate(DiscoveryNode discoveryNode, ActionListener<Void> listener) {
joinValidationService.validateJoin(discoveryNode, listener.delegateResponse((delegate, e) -> {
logger.warn(() -> "failed to validate incoming join request from node [" + discoveryNode + "]", e);
delegate.onFailure(
Expand All @@ -734,13 +734,13 @@ private void sendJoinValidate(DiscoveryNode discoveryNode, ActionListener<Empty>
}));
}

private void sendJoinPing(DiscoveryNode discoveryNode, TransportRequestOptions.Type channelType, ActionListener<Empty> listener) {
private void sendJoinPing(DiscoveryNode discoveryNode, TransportRequestOptions.Type channelType, ActionListener<Void> listener) {
transportService.sendRequest(
discoveryNode,
JoinHelper.JOIN_PING_ACTION_NAME,
TransportRequest.Empty.INSTANCE,
TransportRequestOptions.of(null, channelType),
new ActionListenerResponseHandler<>(listener.delegateResponse((l, e) -> {
TransportResponseHandler.empty(clusterCoordinationExecutor, listener.delegateResponse((l, e) -> {
logger.warn(() -> format("failed to ping joining node [%s] on channel type [%s]", discoveryNode, channelType), e);
listener.onFailure(
new IllegalStateException(
Expand All @@ -753,7 +753,7 @@ private void sendJoinPing(DiscoveryNode discoveryNode, TransportRequestOptions.T
e
)
);
}), i -> Empty.INSTANCE, clusterCoordinationExecutor)
}))
);
}

Expand Down Expand Up @@ -2147,7 +2147,7 @@ protected void sendPublishRequest(
protected void sendApplyCommit(
DiscoveryNode destination,
ApplyCommitRequest applyCommit,
ActionListener<Empty> responseActionListener
ActionListener<Void> responseActionListener
) {
assert transportService.getThreadPool().getThreadContext().isSystemContext();
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
Expand All @@ -2157,11 +2157,7 @@ protected void sendApplyCommit(
COMMIT_STATE_ACTION_NAME,
applyCommit,
COMMIT_STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<>(
wrapWithMutex(responseActionListener),
in -> Empty.INSTANCE,
clusterCoordinationExecutor
)
TransportResponseHandler.empty(clusterCoordinationExecutor, wrapWithMutex(responseActionListener))
);
} catch (Exception e) {
responseActionListener.onFailure(e);
Expand Down
Loading

0 comments on commit 9ea8a05

Please sign in to comment.