diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 8e31aa23d88cf..082f9ae688b92 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -235,14 +235,7 @@ import org.opensearch.action.ingest.SimulatePipelineTransportAction; import org.opensearch.action.main.MainAction; import org.opensearch.action.main.TransportMainAction; -import org.opensearch.action.search.ClearScrollAction; -import org.opensearch.action.search.MultiSearchAction; -import org.opensearch.action.search.SearchAction; -import org.opensearch.action.search.SearchScrollAction; -import org.opensearch.action.search.TransportClearScrollAction; -import org.opensearch.action.search.TransportMultiSearchAction; -import org.opensearch.action.search.TransportSearchAction; -import org.opensearch.action.search.TransportSearchScrollAction; +import org.opensearch.action.search.*; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -398,12 +391,7 @@ import org.opensearch.rest.action.ingest.RestGetPipelineAction; import org.opensearch.rest.action.ingest.RestPutPipelineAction; import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; -import org.opensearch.rest.action.search.RestClearScrollAction; -import org.opensearch.rest.action.search.RestCountAction; -import org.opensearch.rest.action.search.RestExplainAction; -import org.opensearch.rest.action.search.RestMultiSearchAction; -import org.opensearch.rest.action.search.RestSearchAction; -import org.opensearch.rest.action.search.RestSearchScrollAction; +import org.opensearch.rest.action.search.*; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; @@ -660,6 +648,7 @@ public void reg actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class); return unmodifiableMap(actions.getRegistry()); } @@ -832,6 +821,10 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); registerHandler.accept(new RestTemplatesAction()); + + // Point in time API + registerHandler.accept(new RestCreatePITAction()); + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITAction.java b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java index 22dfbafdf0b08..69b7b4bb2d61f 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java @@ -10,11 +10,11 @@ import org.opensearch.action.ActionType; -public class CreatePITAction extends ActionType { +public class CreatePITAction extends ActionType { public static final CreatePITAction INSTANCE = new CreatePITAction(); public static final String NAME = "indices:data/read/pit"; private CreatePITAction() { - super(NAME, PITResponse::new); + super(NAME, SearchResponse::new); } } diff --git a/server/src/main/java/org/opensearch/action/search/PITRequest.java b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java similarity index 78% rename from server/src/main/java/org/opensearch/action/search/PITRequest.java rename to server/src/main/java/org/opensearch/action/search/CreatePITRequest.java index c188dd3b41400..dad6c3b95fbd6 100644 --- a/server/src/main/java/org/opensearch/action/search/PITRequest.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java @@ -24,9 +24,44 @@ import java.util.Map; import java.util.Objects; -public class PITRequest extends ActionRequest implements IndicesRequest.Replaceable { - public PITRequest(TimeValue keepAlive) { +import static org.opensearch.action.ValidateActions.addValidationError; + +public class CreatePITRequest extends ActionRequest implements IndicesRequest.Replaceable { + + private TimeValue keepAlive; + private final boolean allowPartialPitCreation; + @Nullable + private String routing = null; + @Nullable + private String preference = null; + private String[] indices = Strings.EMPTY_ARRAY; + private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + + public CreatePITRequest(TimeValue keepAlive, boolean allowPartialPitCreation) { this.keepAlive = keepAlive; + this.allowPartialPitCreation = allowPartialPitCreation; + } + + public CreatePITRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + routing = in.readOptionalString(); + preference = in.readOptionalString(); + keepAlive = in.readTimeValue(); + routing = in.readOptionalString(); + allowPartialPitCreation = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + out.writeOptionalString(preference); + out.writeTimeValue(keepAlive); + out.writeOptionalString(routing); + out.writeBoolean(allowPartialPitCreation); } public String getRouting() { @@ -49,58 +84,33 @@ public TimeValue getKeepAlive() { return keepAlive; } - private TimeValue keepAlive; - - public PITRequest(StreamInput in) throws IOException { - super(in); - indices = in.readStringArray(); - indicesOptions = IndicesOptions.readIndicesOptions(in); - routing = in.readOptionalString(); - preference = in.readOptionalString(); - keepAlive = in.readTimeValue(); - routing = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringArray(indices); - indicesOptions.writeIndicesOptions(out); - out.writeOptionalString(preference); - out.writeTimeValue(keepAlive); - out.writeOptionalString(routing); + public boolean isAllowPartialPitCreation() { + return allowPartialPitCreation; } public void setRouting(String routing) { this.routing = routing; } - @Nullable - private String routing = null; - public void setPreference(String preference) { this.preference = preference; } - @Nullable - private String preference = null; - public void setIndices(String[] indices) { this.indices = indices; } - private String[] indices = Strings.EMPTY_ARRAY; - public void setIndicesOptions(IndicesOptions indicesOptions) { this.indicesOptions = Objects.requireNonNull(indicesOptions, "indicesOptions must not be null"); } - private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; - - @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (keepAlive == null) { + validationException = addValidationError("Keep alive is missing", validationException); + } + return validationException; } @Override @@ -113,8 +123,6 @@ public IndicesOptions indicesOptions() { return indicesOptions; } - - public void setKeepAlive(TimeValue keepAlive) { this.keepAlive = keepAlive; } @@ -128,7 +136,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, * Sets the indices the search will be executed on. */ @Override - public PITRequest indices(String... indices) { + public CreatePITRequest indices(String... indices) { SearchRequest.validateIndices(indices); this.indices = indices; return this; diff --git a/server/src/main/java/org/opensearch/action/search/PITResponse.java b/server/src/main/java/org/opensearch/action/search/PITResponse.java deleted file mode 100644 index 33df87bcb5112..0000000000000 --- a/server/src/main/java/org/opensearch/action/search/PITResponse.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.action.ActionResponse; -import org.opensearch.common.ParseField; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.xcontent.StatusToXContentObject; -import org.opensearch.common.xcontent.XContentBuilder; -import org.opensearch.rest.RestStatus; - -import java.io.IOException; - -public class PITResponse extends ActionResponse implements StatusToXContentObject { - - private static final ParseField ID = new ParseField("id"); - - public String getId() { - return id; - } - - private final String id; - - PITResponse(String id) { - this.id = id; - } - - public PITResponse(StreamInput streamInput) throws IOException { - id = streamInput.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(id); - - } - - @Override - public RestStatus status() { - return RestStatus.OK; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(ID.getPreferredName(), id); - builder.endObject(); - return builder; - } -} diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 6205bf0a05129..2ebececee7ddc 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -94,6 +94,7 @@ public class SearchTransportService { public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]"; public static final String CREATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[create_context]"; + public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]"; private final TransportService transportService; private final BiFunction responseWrapper; @@ -141,6 +142,23 @@ public void sendFreeContext( ); } + public void updatePitContext( + Transport.Connection connection, + TransportCreatePITAction.UpdatePITContextRequest request, + ActionListener actionListener + ) { + transportService.sendRequest( + connection, + UPDATE_READER_CONTEXT_ACTION_NAME, + request, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler( + actionListener, + TransportCreatePITAction.UpdatePitContextResponse::new + ) + ); + } + public void sendCanMatch( Transport.Connection connection, final ShardSearchRequest request, @@ -546,12 +564,55 @@ public static void registerRequestHandler(TransportService transportService, Sea } ); TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new); - transportService.registerRequestHandler(CREATE_READER_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, TransportCreatePITAction.CreateReaderContextRequest::new, + transportService.registerRequestHandler( + CREATE_READER_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + TransportCreatePITAction.CreateReaderContextRequest::new, (request, channel, task) -> { - ChannelActionListener listener = new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request); - searchService.openReaderContext(request.getShardId(), request.getKeepAlive(), ActionListener.wrap(r -> listener.onResponse(new TransportCreatePITAction.CreateReaderContextResponse(r)), listener::onFailure)); - }); - TransportActionProxy.registerProxyAction(transportService, CREATE_READER_CONTEXT_ACTION_NAME, TransportCreatePITAction.CreateReaderContextResponse::new); + ChannelActionListener< + TransportCreatePITAction.CreateReaderContextResponse, + TransportCreatePITAction.CreateReaderContextRequest> listener = new ChannelActionListener<>( + channel, + CREATE_READER_CONTEXT_ACTION_NAME, + request + ); + searchService.openReaderContext( + request.getShardId(), + request.getKeepAlive(), + ActionListener.wrap( + r -> listener.onResponse(new TransportCreatePITAction.CreateReaderContextResponse(r)), + listener::onFailure + ) + ); + } + ); + TransportActionProxy.registerProxyAction( + transportService, + CREATE_READER_CONTEXT_ACTION_NAME, + TransportCreatePITAction.CreateReaderContextResponse::new + ); + + transportService.registerRequestHandler( + UPDATE_READER_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + TransportCreatePITAction.UpdatePITContextRequest::new, + (request, channel, task) -> { + ChannelActionListener< + TransportCreatePITAction.UpdatePitContextResponse, + TransportCreatePITAction.UpdatePITContextRequest> listener = new ChannelActionListener<>( + channel, + UPDATE_READER_CONTEXT_ACTION_NAME, + request + ); + searchService.updatePitIdAndKeepAlive(request, ActionListener.wrap(r -> listener.onResponse(r), listener::onFailure)); + } + ); + TransportActionProxy.registerProxyAction( + transportService, + UPDATE_READER_CONTEXT_ACTION_NAME, + TransportCreatePITAction.UpdatePitContextResponse::new + ); + } /** diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java index 991e4719fb56d..e96b01c22ca50 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java @@ -9,90 +9,212 @@ package org.opensearch.action.search; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.shard.ShardId; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchService; +import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.tasks.Task; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportRequest; -import org.opensearch.transport.TransportResponseHandler; -import org.opensearch.transport.TransportService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.*; import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.stream.Collectors; -public class TransportCreatePITAction extends HandledTransportAction { +/** + * Transport action for creating PIT reader context + */ +public class TransportCreatePITAction extends HandledTransportAction { public static final String CREATE_PIT = "create_pit"; - private SearchService searchService; + private final TimeValue CREATE_PIT_TEMPORARY_KEEP_ALIVE = new TimeValue(30, TimeUnit.SECONDS); + private final SearchService searchService; private final TransportService transportService; - private TransportSearchAction transportSearchAction; + private final SearchTransportService searchTransportService; + private final ClusterService clusterService; + private final TransportSearchAction transportSearchAction; + private final NamedWriteableRegistry namedWriteableRegistry; @Inject - public TransportCreatePITAction(SearchService searchService, - TransportService transportService, - ActionFilters actionFilters, - TransportSearchAction transportSearchAction) { - super(CreatePITAction.NAME, transportService, actionFilters, in -> new PITRequest(in)); + public TransportCreatePITAction( + SearchService searchService, + TransportService transportService, + ActionFilters actionFilters, + SearchTransportService searchTransportService, + ClusterService clusterService, + TransportSearchAction transportSearchAction, + NamedWriteableRegistry namedWriteableRegistry + ) { + super(CreatePITAction.NAME, transportService, actionFilters, in -> new CreatePITRequest(in)); this.searchService = searchService; this.transportService = transportService; + this.searchTransportService = searchTransportService; + this.clusterService = clusterService; this.transportSearchAction = transportSearchAction; + this.namedWriteableRegistry = namedWriteableRegistry; } - @Override - protected void doExecute(Task task, PITRequest request, ActionListener listener) { - SearchRequest sr = new SearchRequest(request.getIndices()); - sr.preference(request.getPreference()); - sr.routing(request.getRouting()); - sr.indicesOptions(request.getIndicesOptions()); - transportSearchAction.executeRequest(task, sr, CREATE_PIT, true, - (searchTask, target, connection, searchPhaseResultActionListener) -> - /*TODO set a timeout based on "awaitActive"*/ - transportService.sendChildRequest(connection, SearchTransportService.CREATE_READER_CONTEXT_ACTION_NAME, - - new CreateReaderContextRequest(target.getShardId(), request.getKeepAlive()), searchTask, - - new TransportResponseHandler() { - @Override - public CreateReaderContextResponse read(StreamInput in) throws IOException { - return new CreateReaderContextResponse(in); - } + protected void doExecute(Task task, CreatePITRequest request, ActionListener listener) { + SearchRequest searchRequest = new SearchRequest(request.getIndices()); + searchRequest.preference(request.getPreference()); + searchRequest.routing(request.getRouting()); + searchRequest.indicesOptions(request.getIndicesOptions()); + searchRequest.allowPartialSearchResults(request.isAllowPartialPitCreation()); - @Override - public void handleResponse(CreateReaderContextResponse response) { - searchPhaseResultActionListener.onResponse(response); - } + final StepListener createPitListener = new StepListener<>(); - @Override - public void handleException(TransportException exp) { + final ActionListener updatePitIdListener = new ActionListener<>() { + @Override + public void onResponse(SearchResponse searchResponse) { + listener.onResponse(searchResponse); + } - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }; + + /** + * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a + * temporary keep alive + */ + transportSearchAction.executeRequest(task, searchRequest, CREATE_PIT, true, new TransportSearchAction.SinglePhaseSearchAction() { + @Override + public void executeOnShardTarget( + SearchTask searchTask, + SearchShardTarget target, + Transport.Connection connection, + ActionListener searchPhaseResultActionListener + ) { + transportService.sendChildRequest( + connection, + SearchTransportService.CREATE_READER_CONTEXT_ACTION_NAME, + new CreateReaderContextRequest(target.getShardId(), CREATE_PIT_TEMPORARY_KEEP_ALIVE), + searchTask, + new TransportResponseHandler() { + @Override + public CreateReaderContextResponse read(StreamInput in) throws IOException { + return new CreateReaderContextResponse(in); + } + + @Override + public void handleResponse(CreateReaderContextResponse response) { + searchPhaseResultActionListener.onResponse(response); + } - @Override - public String executor() { - return "generic"; //TODO + @Override + public void handleException(TransportException exp) { + searchPhaseResultActionListener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } } - }), + ); + } + }, createPitListener); - new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - listener.onResponse(new PITResponse(searchResponse.pointInTimeId())); + /** + * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request + * Fail create pit operation if any of the updates in this phase are failed + */ + createPitListener.whenComplete(searchResponse -> { + SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, searchResponse.pointInTimeId()); + final StepListener> lookupListener = getConnectionLookupListener(contextId); + lookupListener.whenComplete(nodelookup -> { + final ActionListener groupedActionListener = getGroupedListener( + updatePitIdListener, + searchResponse, + contextId.shards().size() + ); + /** + * store the create time ( same create time for all PIT contexts across shards ) to be used + * for list PIT api + */ + TimeValue createTime = new TimeValue(System.currentTimeMillis()); + for (Map.Entry entry : contextId.shards().entrySet()) { + DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); + try { + final Transport.Connection connection = searchTransportService.getConnection( + entry.getValue().getClusterAlias(), + node + ); + searchTransportService.updatePitContext( + connection, + new UpdatePITContextRequest( + entry.getValue().getSearchContextId(), + searchResponse.pointInTimeId(), + request.getKeepAlive().millis(), + createTime + ), + groupedActionListener + ); + } catch (Exception e) { + groupedActionListener.onFailure(e); + } } + }, updatePitIdListener::onFailure); + }, updatePitIdListener::onFailure); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + private StepListener> getConnectionLookupListener(SearchContextId contextId) { + ClusterState state = clusterService.state(); + + final Set clusters = contextId.shards() + .values() + .stream() + .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) + .map(SearchContextIdForNode::getClusterAlias) + .collect(Collectors.toSet()); + + final StepListener> lookupListener = new StepListener<>(); + if (clusters.isEmpty() == false) { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); + } else { + lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); + } + return lookupListener; + } + + private ActionListener getGroupedListener( + ActionListener updatePitIdListener, + SearchResponse searchResponse, + int size + ) { + return new GroupedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(final Collection responses) { + updatePitIdListener.onResponse(searchResponse); + } + + @Override + public void onFailure(final Exception e) { + updatePitIdListener.onFailure(e); + } + }, size); } public static class CreateReaderContextRequest extends TransportRequest { @@ -127,7 +249,6 @@ public void writeTo(StreamOutput out) throws IOException { } public static class CreateReaderContextResponse extends SearchPhaseResult { - public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) { this.contextId = shardSearchContextId; } @@ -141,7 +262,76 @@ public CreateReaderContextResponse(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); contextId.writeTo(out); + } + } + + public static class UpdatePITContextRequest extends TransportRequest { + private final String pitId; + private final long keepAlive; + + private final TimeValue createTime; + private final ShardSearchContextId searchContextId; + + UpdatePITContextRequest(ShardSearchContextId searchContextId, String pitId, long keepAlive, TimeValue createTime) { + this.pitId = pitId; + this.searchContextId = searchContextId; + this.keepAlive = keepAlive; + this.createTime = createTime; + } + + UpdatePITContextRequest(StreamInput in) throws IOException { + super(in); + pitId = in.readString(); + keepAlive = in.readLong(); + createTime = in.readTimeValue(); + searchContextId = new ShardSearchContextId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(pitId); + out.writeLong(keepAlive); + out.writeTimeValue(createTime); + searchContextId.writeTo(out); + } + + public ShardSearchContextId getSearchContextId() { + return searchContextId; + } + + public String getPitId() { + return pitId; + } + + public String id() { + return this.getPitId(); + } + + public TimeValue getCreateTime() { + return createTime; + } + } + + public static class UpdatePitContextResponse extends TransportResponse { + private final String pitId; + + UpdatePitContextResponse(StreamInput in) throws IOException { + super(in); + pitId = in.readString(); + } + + public UpdatePitContextResponse(String pitId) { + this.pitId = pitId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(pitId); + } + public String getPitId() { + return pitId; } } } diff --git a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java index 30ed4041f6b39..e2005c65ad3d7 100644 --- a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java @@ -292,7 +292,7 @@ public void onNewPitContext(ReaderContext readerContext) { public void onFreePitContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { try { - listener.onNewPitContext(readerContext); + listener.onFreePitContext(readerContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFreePitContext listener [{}] failed", listener), e); } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java index 86b6e1887d75a..1df09e3766229 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java @@ -9,7 +9,7 @@ package org.opensearch.rest.action.search; import org.opensearch.action.search.CreatePITAction; -import org.opensearch.action.search.PITRequest; +import org.opensearch.action.search.CreatePITRequest; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.node.NodeClient; import org.opensearch.common.Strings; @@ -33,21 +33,22 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - PITRequest pitRequest = new PITRequest(request.paramAsTime("keep_alive", null)); - pitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, pitRequest.indicesOptions())); - pitRequest.setPreference(request.param("preference")); - pitRequest.setRouting(request.param("routing")); - pitRequest.setIndices(Strings.splitStringByCommaToArray(request.param("index"))); + boolean allowPartialPitCreation = request.paramAsBoolean("allow_partial_pit_creation", false); + + CreatePITRequest createPitRequest = new CreatePITRequest(request.paramAsTime("keep_alive", null), allowPartialPitCreation); + createPitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, createPitRequest.indicesOptions())); + createPitRequest.setPreference(request.param("preference")); + createPitRequest.setRouting(request.param("routing")); + createPitRequest.setIndices(Strings.splitStringByCommaToArray(request.param("index"))); return channel -> { RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); - cancelClient.execute(CreatePITAction.INSTANCE, pitRequest, new RestStatusToXContentListener<>(channel)); + cancelClient.execute(CreatePITAction.INSTANCE, createPitRequest, new RestStatusToXContentListener<>(channel)); }; } @Override public List routes() { - return unmodifiableList(Collections.singletonList( - new Route(POST, "/{index}/_pit"))); + return unmodifiableList(Collections.singletonList(new Route(POST, "/{index}/_pit"))); } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 221a2c9a1a1d7..8558f3b3555cd 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -41,9 +41,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.OriginalIndices; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchShardTask; -import org.opensearch.action.search.SearchType; +import org.opensearch.action.search.*; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; @@ -208,9 +206,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); - public static final Setting MAX_OPEN_PIT_CONTEXT = - Setting.intSetting( - "search.max_open_pit_context", 500, 0, Property.Dynamic, Property.NodeScope); + public static final Setting MAX_OPEN_PIT_CONTEXT = Setting.intSetting( + "search.max_open_pit_context", + 500, + 0, + Property.Dynamic, + Property.NodeScope + ); public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; @@ -464,7 +466,7 @@ public void executeQueryPhase( assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 : "empty responses require more than one shard"; final IndexShard shard = getShard(request); - rewriteAndFetchShardRequest(shard, request, new ActionListener() { + rewriteAndFetchShardRequest(shard, request, new ActionListener<>() { @Override public void onResponse(ShardSearchRequest orig) { // check if we can shortcut the query phase entirely. @@ -806,26 +808,36 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen Releasable decreasePitContexts = null; Engine.SearcherSupplier searcherSupplier = null; ReaderContext readerContext = null; + boolean success = false; try { decreasePitContexts = openPitContexts::decrementAndGet; if (openPitContexts.incrementAndGet() > maxOpenPitContext) { throw new OpenSearchRejectedExecutionException( - "Trying to create too many Point In Time contexts. Must be less than or equal to: [" + - maxOpenPitContext + "]. " + "This limit can be set by changing the [" - + MAX_OPEN_PIT_CONTEXT.getKey() + "] setting."); + "Trying to create too many Point In Time contexts. Must be less than or equal to: [" + + maxOpenPitContext + + "]. " + + "This limit can be set by changing the [" + + MAX_OPEN_PIT_CONTEXT.getKey() + + "] setting." + ); } searcherSupplier = shard.acquireSearcherSupplier(); List nonVerboseSegments = shard.segments(false); - shard.routingEntry(); final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); - readerContext = new PitReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false, - shard.routingEntry(),nonVerboseSegments); - readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false); + readerContext = new PitReaderContext( + id, + indexService, + shard, + searcherSupplier, + keepAlive.millis(), + false, + shard.routingEntry(), + nonVerboseSegments + ); final ReaderContext finalReaderContext = readerContext; searcherSupplier = null; // transfer ownership to reader context searchOperationListener.onNewReaderContext(readerContext); - searchOperationListener.onNewPitContext(finalReaderContext); readerContext.addOnClose(decreasePitContexts); decreasePitContexts = null; @@ -834,15 +846,18 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen searchOperationListener.onFreeReaderContext(finalReaderContext); searchOperationListener.onFreePitContext(finalReaderContext); }); - readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext)); putReaderContext(readerContext); readerContext = null; listener.onResponse(finalReaderContext.id()); + success = true; } catch (Exception exc) { - Releasables.closeWhileHandlingException(searcherSupplier, readerContext); listener.onFailure(exc); - } finally{ - Releasables.close(decreasePitContexts); + } finally { + if (success) { + Releasables.close(readerContext, searcherSupplier, decreasePitContexts); + } else { + Releasables.closeWhileHandlingException(searcherSupplier, readerContext, decreasePitContexts); + } } }); } @@ -960,6 +975,24 @@ public boolean freeReaderContext(ShardSearchContextId contextId) { return false; } + public void updatePitIdAndKeepAlive( + TransportCreatePITAction.UpdatePITContextRequest request, + ActionListener listener + ) { + try { + PitReaderContext readerContext = getPitReaderContext(request.getSearchContextId()); + if (readerContext == null) { + throw new SearchContextMissingException(request.getSearchContextId()); + } + readerContext.setPitId(request.getPitId()); + readerContext.tryUpdateKeepAlive(maxKeepAlive); + readerContext.setCreateTimestamp(request.getCreateTime()); + listener.onResponse(new TransportCreatePITAction.UpdatePitContextResponse(request.getPitId())); + } catch (Exception e) { + listener.onFailure(e); + } + } + public void freeAllScrollContexts() { for (ReaderContext readerContext : activeReaders.values()) { if (readerContext.scrollContext() != null) { @@ -1290,8 +1323,12 @@ public ResponseCollectorService getResponseCollectorService() { } public PitReaderContext getPitReaderContext(ShardSearchContextId id) { - ReaderContext readerContext = getReaderContext(id); - return (PitReaderContext) readerContext; + for (Map.Entry context : activeReaders.entrySet()) { + if (context.getValue() instanceof PitReaderContext) { + if (context.getKey() == id.getId()) return (PitReaderContext) context.getValue(); + } + } + return null; } class Reaper implements Runnable { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 103d5104a84c0..a85f355cc9cb1 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -9,11 +9,11 @@ package org.opensearch.search.internal; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; - import java.util.List; public class PitReaderContext extends ReaderContext { @@ -24,11 +24,19 @@ public ShardRouting getShardRouting() { private final ShardRouting shardRouting; private final List segments; - - public PitReaderContext(ShardSearchContextId id, IndexService indexService, - IndexShard indexShard, Engine.SearcherSupplier searcherSupplier, - long keepAliveInMillis, boolean singleSession, - ShardRouting shardRouting, List nonVerboseSegments) { + private String pitId; + private TimeValue createTime; + + public PitReaderContext( + ShardSearchContextId id, + IndexService indexService, + IndexShard indexShard, + Engine.SearcherSupplier searcherSupplier, + long keepAliveInMillis, + boolean singleSession, + ShardRouting shardRouting, + List nonVerboseSegments + ) { super(id, indexService, indexShard, searcherSupplier, keepAliveInMillis, singleSession); this.shardRouting = shardRouting; segments = nonVerboseSegments; @@ -38,5 +46,15 @@ public List getSegments() { return segments; } + public String getPitId() { + return this.pitId; + } + + public void setPitId(final String pitId) { + this.pitId = pitId; + } + public void setCreateTimestamp(final TimeValue createTime) { + this.createTime = createTime; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index 31d7a44d9bbd2..ca4a047d4adee 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -138,7 +138,7 @@ public Engine.Searcher acquireSearcher(String source) { return searcherSupplier.acquireSearcher(source); } - private void tryUpdateKeepAlive(long keepAlive) { + public void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.updateAndGet(curr -> Math.max(curr, keepAlive)); } diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java new file mode 100644 index 0000000000000..e52c02c7537da --- /dev/null +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -0,0 +1,172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.action.ActionFuture; +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.concurrent.ExecutionException; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +public class PitSingleNodeTests extends OpenSearchSingleNodeTestCase { + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testCreatePIT() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2) + .setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))) + .get(); + assertHitCount(searchResponse, 1); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(2, service.getActiveContexts()); + service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + } + + public void testCreatePITOnValidIndexAndThenDeleteIndex() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + client().admin().indices().prepareDelete("index").get(); + + IndexNotFoundException ex = expectThrows(IndexNotFoundException.class, () -> { + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2) + .setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))) + .get(); + }); + assertTrue(ex.getMessage().contains("no such index [index]")); + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITOnCloseIndex() { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().admin().indices().prepareClose("index").get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + + ExecutionException ex = expectThrows(ExecutionException.class, execute::get); + + assertTrue(ex.getMessage().contains("IndexClosedException")); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITWithMultipleIndices() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + createIndex("index1", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index", "index1" }); + SearchService service = getInstanceFromNode(SearchService.class); + + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse response = execute.get(); + assertEquals(4, response.getSuccessfulShards()); + assertEquals(4, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITWithNonExistentIndex() { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index", "index1" }); + SearchService service = getInstanceFromNode(SearchService.class); + + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + + ExecutionException ex = expectThrows(ExecutionException.class, execute::get); + + assertTrue(ex.getMessage().contains("no such index [index1]")); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testMaxOpenPitContexts() throws Exception { + createIndex("index"); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + SearchService service = getInstanceFromNode(SearchService.class); + + for (int i = 0; i < SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); i++) { + client().execute(CreatePITAction.INSTANCE, request).get(); + } + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + ExecutionException ex = expectThrows(ExecutionException.class, execute::get); + + assertTrue( + ex.getMessage() + .contains( + "Trying to create too many Point In Time contexts. " + + "Must be less than or equal to: [" + + SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY) + + "]. " + + "This limit can be set by changing the [search.max_open_pit_context] setting." + ) + ); + + service.doClose(); + } + + public void testCreatePITWithShardReplicas() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2) + .setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))) + .get(); + assertHitCount(searchResponse, 1); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(2, service.getActiveContexts()); + service.doClose(); + } +}