From 72bd39b99be17682c41f7712111fd94e5b979091 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 30 Mar 2022 15:01:40 +0530 Subject: [PATCH] two phase create pit Signed-off-by: Bharathwaj G --- .../org/opensearch/action/ActionModule.java | 21 +- .../action/search/CreatePITAction.java | 4 +- ...{PITRequest.java => CreatePITRequest.java} | 75 ++--- .../opensearch/action/search/PITResponse.java | 57 ---- .../action/search/SearchTransportService.java | 63 ++++- .../search/TransportCreatePITAction.java | 262 ++++++++++++++---- .../action/search/RestCreatePITAction.java | 17 +- .../org/opensearch/search/SearchService.java | 31 ++- .../search/internal/PitReaderContext.java | 4 + .../search/internal/ReaderContext.java | 2 +- .../opensearch/search/PitSingleNodeTests.java | 180 ++++++++++++ 11 files changed, 536 insertions(+), 180 deletions(-) rename server/src/main/java/org/opensearch/action/search/{PITRequest.java => CreatePITRequest.java} (84%) delete mode 100644 server/src/main/java/org/opensearch/action/search/PITResponse.java create mode 100644 server/src/test/java/org/opensearch/search/PitSingleNodeTests.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 8e31aa23d88cf..082f9ae688b92 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -235,14 +235,7 @@ import org.opensearch.action.ingest.SimulatePipelineTransportAction; import org.opensearch.action.main.MainAction; import org.opensearch.action.main.TransportMainAction; -import org.opensearch.action.search.ClearScrollAction; -import org.opensearch.action.search.MultiSearchAction; -import org.opensearch.action.search.SearchAction; -import org.opensearch.action.search.SearchScrollAction; -import org.opensearch.action.search.TransportClearScrollAction; -import org.opensearch.action.search.TransportMultiSearchAction; -import org.opensearch.action.search.TransportSearchAction; -import org.opensearch.action.search.TransportSearchScrollAction; +import org.opensearch.action.search.*; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -398,12 +391,7 @@ import org.opensearch.rest.action.ingest.RestGetPipelineAction; import org.opensearch.rest.action.ingest.RestPutPipelineAction; import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; -import org.opensearch.rest.action.search.RestClearScrollAction; -import org.opensearch.rest.action.search.RestCountAction; -import org.opensearch.rest.action.search.RestExplainAction; -import org.opensearch.rest.action.search.RestMultiSearchAction; -import org.opensearch.rest.action.search.RestSearchAction; -import org.opensearch.rest.action.search.RestSearchScrollAction; +import org.opensearch.rest.action.search.*; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; @@ -660,6 +648,7 @@ public void reg actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class); return unmodifiableMap(actions.getRegistry()); } @@ -832,6 +821,10 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); registerHandler.accept(new RestTemplatesAction()); + + // Point in time API + registerHandler.accept(new RestCreatePITAction()); + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITAction.java b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java index 22dfbafdf0b08..69b7b4bb2d61f 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java @@ -10,11 +10,11 @@ import org.opensearch.action.ActionType; -public class CreatePITAction extends ActionType { +public class CreatePITAction extends ActionType { public static final CreatePITAction INSTANCE = new CreatePITAction(); public static final String NAME = "indices:data/read/pit"; private CreatePITAction() { - super(NAME, PITResponse::new); + super(NAME, SearchResponse::new); } } diff --git a/server/src/main/java/org/opensearch/action/search/PITRequest.java b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java similarity index 84% rename from server/src/main/java/org/opensearch/action/search/PITRequest.java rename to server/src/main/java/org/opensearch/action/search/CreatePITRequest.java index c188dd3b41400..eb077969fc0d7 100644 --- a/server/src/main/java/org/opensearch/action/search/PITRequest.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java @@ -24,9 +24,43 @@ import java.util.Map; import java.util.Objects; -public class PITRequest extends ActionRequest implements IndicesRequest.Replaceable { - public PITRequest(TimeValue keepAlive) { +public class CreatePITRequest extends ActionRequest implements IndicesRequest.Replaceable { + + private TimeValue keepAlive; + private final boolean allowPartialPitCreation; + @Nullable + private String routing = null; + @Nullable + private String preference = null; + private String[] indices = Strings.EMPTY_ARRAY; + private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + + + public CreatePITRequest(TimeValue keepAlive, boolean allowPartialPitCreation) { this.keepAlive = keepAlive; + this.allowPartialPitCreation = allowPartialPitCreation; + } + + public CreatePITRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + routing = in.readOptionalString(); + preference = in.readOptionalString(); + keepAlive = in.readTimeValue(); + routing = in.readOptionalString(); + allowPartialPitCreation = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + out.writeOptionalString(preference); + out.writeTimeValue(keepAlive); + out.writeOptionalString(routing); + out.writeBoolean(allowPartialPitCreation); } public String getRouting() { @@ -49,55 +83,26 @@ public TimeValue getKeepAlive() { return keepAlive; } - private TimeValue keepAlive; - - public PITRequest(StreamInput in) throws IOException { - super(in); - indices = in.readStringArray(); - indicesOptions = IndicesOptions.readIndicesOptions(in); - routing = in.readOptionalString(); - preference = in.readOptionalString(); - keepAlive = in.readTimeValue(); - routing = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringArray(indices); - indicesOptions.writeIndicesOptions(out); - out.writeOptionalString(preference); - out.writeTimeValue(keepAlive); - out.writeOptionalString(routing); + public boolean isAllowPartialPitCreation() { + return allowPartialPitCreation; } public void setRouting(String routing) { this.routing = routing; } - @Nullable - private String routing = null; - public void setPreference(String preference) { this.preference = preference; } - @Nullable - private String preference = null; - public void setIndices(String[] indices) { this.indices = indices; } - private String[] indices = Strings.EMPTY_ARRAY; - public void setIndicesOptions(IndicesOptions indicesOptions) { this.indicesOptions = Objects.requireNonNull(indicesOptions, "indicesOptions must not be null"); } - private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; - - @Override public ActionRequestValidationException validate() { return null; @@ -113,8 +118,6 @@ public IndicesOptions indicesOptions() { return indicesOptions; } - - public void setKeepAlive(TimeValue keepAlive) { this.keepAlive = keepAlive; } @@ -128,7 +131,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, * Sets the indices the search will be executed on. */ @Override - public PITRequest indices(String... indices) { + public CreatePITRequest indices(String... indices) { SearchRequest.validateIndices(indices); this.indices = indices; return this; diff --git a/server/src/main/java/org/opensearch/action/search/PITResponse.java b/server/src/main/java/org/opensearch/action/search/PITResponse.java deleted file mode 100644 index 33df87bcb5112..0000000000000 --- a/server/src/main/java/org/opensearch/action/search/PITResponse.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.action.ActionResponse; -import org.opensearch.common.ParseField; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.xcontent.StatusToXContentObject; -import org.opensearch.common.xcontent.XContentBuilder; -import org.opensearch.rest.RestStatus; - -import java.io.IOException; - -public class PITResponse extends ActionResponse implements StatusToXContentObject { - - private static final ParseField ID = new ParseField("id"); - - public String getId() { - return id; - } - - private final String id; - - PITResponse(String id) { - this.id = id; - } - - public PITResponse(StreamInput streamInput) throws IOException { - id = streamInput.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(id); - - } - - @Override - public RestStatus status() { - return RestStatus.OK; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(ID.getPreferredName(), id); - builder.endObject(); - return builder; - } -} diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 6205bf0a05129..ddadf25f75d14 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -94,6 +94,7 @@ public class SearchTransportService { public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]"; public static final String CREATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[create_context]"; + public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]"; private final TransportService transportService; private final BiFunction responseWrapper; @@ -141,6 +142,20 @@ public void sendFreeContext( ); } + public void updatePitContext( + Transport.Connection connection, + TransportCreatePITAction.UpdatePITReaderRequest request, + ActionListener actionListener) { + transportService.sendRequest( + connection, + UPDATE_READER_CONTEXT_ACTION_NAME, + request, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler(actionListener, + TransportCreatePITAction.UpdatePitContextResponse::new) + ); + } + public void sendCanMatch( Transport.Connection connection, final ShardSearchRequest request, @@ -308,7 +323,7 @@ public Map getPendingSearchRequests() { return new HashMap<>(clientConnections); } - static class ScrollFreeContextRequest extends TransportRequest { + static class ScrollFreeContextRequest extends TransportRequest { private ShardSearchContextId contextId; ScrollFreeContextRequest(ShardSearchContextId contextId) { @@ -546,12 +561,50 @@ public static void registerRequestHandler(TransportService transportService, Sea } ); TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new); - transportService.registerRequestHandler(CREATE_READER_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, TransportCreatePITAction.CreateReaderContextRequest::new, + transportService.registerRequestHandler( + CREATE_READER_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + TransportCreatePITAction.CreateReaderContextRequest::new, (request, channel, task) -> { - ChannelActionListener listener = new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request); - searchService.openReaderContext(request.getShardId(), request.getKeepAlive(), ActionListener.wrap(r -> listener.onResponse(new TransportCreatePITAction.CreateReaderContextResponse(r)), listener::onFailure)); + ChannelActionListener< + TransportCreatePITAction.CreateReaderContextResponse, + TransportCreatePITAction.CreateReaderContextRequest> + listener = + new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request); + searchService.openReaderContext( + request.getShardId(), + request.getKeepAlive(), + ActionListener.wrap( + r -> + listener.onResponse( + new TransportCreatePITAction.CreateReaderContextResponse(r)), + listener::onFailure)); }); - TransportActionProxy.registerProxyAction(transportService, CREATE_READER_CONTEXT_ACTION_NAME, TransportCreatePITAction.CreateReaderContextResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + CREATE_READER_CONTEXT_ACTION_NAME, + TransportCreatePITAction.CreateReaderContextResponse::new); + + transportService.registerRequestHandler( + UPDATE_READER_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + TransportCreatePITAction.UpdatePITReaderRequest::new, + (request, channel, task) -> { + ChannelActionListener listener = + new ChannelActionListener<>(channel, UPDATE_READER_CONTEXT_ACTION_NAME, request); + searchService.updatePitIdAndKeepAlive(request, + ActionListener.wrap( + r -> listener.onResponse(r), + listener::onFailure + )); + } + ); + TransportActionProxy.registerProxyAction( + transportService, + UPDATE_READER_CONTEXT_ACTION_NAME, + TransportCreatePITAction.UpdatePitContextResponse::new); + } /** diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java index 991e4719fb56d..01b4104cb5ab9 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java @@ -9,115 +9,233 @@ package org.opensearch.action.search; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.shard.ShardId; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchService; +import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.tasks.Task; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportRequest; -import org.opensearch.transport.TransportResponseHandler; -import org.opensearch.transport.TransportService; +import org.opensearch.transport.*; import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.stream.Collectors; -public class TransportCreatePITAction extends HandledTransportAction { +/** + * Transport action for creating PIT reader context + */ +public class TransportCreatePITAction extends HandledTransportAction { public static final String CREATE_PIT = "create_pit"; + private final TimeValue CREATE_PIT_TEMPORARY_KEEP_ALIVE = new TimeValue(30, TimeUnit.SECONDS); private SearchService searchService; private final TransportService transportService; + private final SearchTransportService searchTransportService; + private final ClusterService clusterService; private TransportSearchAction transportSearchAction; + private final NamedWriteableRegistry namedWriteableRegistry; @Inject - public TransportCreatePITAction(SearchService searchService, - TransportService transportService, - ActionFilters actionFilters, - TransportSearchAction transportSearchAction) { - super(CreatePITAction.NAME, transportService, actionFilters, in -> new PITRequest(in)); + public TransportCreatePITAction( + SearchService searchService, + TransportService transportService, + ActionFilters actionFilters, + SearchTransportService searchTransportService, + ClusterService clusterService, + TransportSearchAction transportSearchAction, + NamedWriteableRegistry namedWriteableRegistry) { + super(CreatePITAction.NAME, transportService, actionFilters, in -> new CreatePITRequest(in)); this.searchService = searchService; this.transportService = transportService; + this.searchTransportService = searchTransportService; + this.clusterService = clusterService; this.transportSearchAction = transportSearchAction; + this.namedWriteableRegistry = namedWriteableRegistry; } - @Override - protected void doExecute(Task task, PITRequest request, ActionListener listener) { - SearchRequest sr = new SearchRequest(request.getIndices()); - sr.preference(request.getPreference()); - sr.routing(request.getRouting()); - sr.indicesOptions(request.getIndicesOptions()); - transportSearchAction.executeRequest(task, sr, CREATE_PIT, true, - (searchTask, target, connection, searchPhaseResultActionListener) -> - /*TODO set a timeout based on "awaitActive"*/ - transportService.sendChildRequest(connection, SearchTransportService.CREATE_READER_CONTEXT_ACTION_NAME, - - new CreateReaderContextRequest(target.getShardId(), request.getKeepAlive()), searchTask, - - new TransportResponseHandler() { + protected void doExecute(Task task, CreatePITRequest request, ActionListener listener) { + SearchRequest searchRequest = new SearchRequest(request.getIndices()); + searchRequest.preference(request.getPreference()); + searchRequest.routing(request.getRouting()); + searchRequest.indicesOptions(request.getIndicesOptions()); + searchRequest.allowPartialSearchResults(request.isAllowPartialPitCreation()); + + final StepListener createPitListener = new StepListener(); + + final ActionListener updatePitIdListener = + new ActionListener() { @Override - public CreateReaderContextResponse read(StreamInput in) throws IOException { - return new CreateReaderContextResponse(in); + public void onResponse(SearchResponse searchResponse) { + listener.onResponse(searchResponse); } @Override - public void handleResponse(CreateReaderContextResponse response) { - searchPhaseResultActionListener.onResponse(response); + public void onFailure(Exception e) { + listener.onFailure(e); } + }; + /** + * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a + * temporary keep alive + */ + transportSearchAction.executeRequest( + task, + searchRequest, + CREATE_PIT, + true, + new TransportSearchAction.SinglePhaseSearchAction() { @Override - public void handleException(TransportException exp) { + public void executeOnShardTarget( + SearchTask searchTask, + SearchShardTarget target, + Transport.Connection connection, + ActionListener searchPhaseResultActionListener) { + transportService.sendChildRequest( + connection, + SearchTransportService.CREATE_READER_CONTEXT_ACTION_NAME, + new CreateReaderContextRequest(target.getShardId(), CREATE_PIT_TEMPORARY_KEEP_ALIVE), + searchTask, + new TransportResponseHandler() { + @Override + public CreateReaderContextResponse read(StreamInput in) throws IOException { + return new CreateReaderContextResponse(in); + } - } + @Override + public void handleResponse(CreateReaderContextResponse response) { + searchPhaseResultActionListener.onResponse(response); + } - @Override - public String executor() { - return "generic"; //TODO + @Override + public void handleException(TransportException exp) { + searchPhaseResultActionListener.onFailure(exp); + } + + @Override + public String executor() { + return "generic"; // TODO + } + }); } - }), + }, + createPitListener); + + /** + * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request + * Fail create pit operation if any of the updates in this phase are failed + */ + createPitListener.whenComplete( + searchResponse -> { + SearchContextId contextId = + SearchContextId.decode(namedWriteableRegistry, searchResponse.pointInTimeId()); + final StepListener> lookupListener = + getConnectionLookupListener(contextId); + lookupListener.whenComplete( + nodelookup -> { + final ActionListener + groupedActionListener = + getGroupedListener( + updatePitIdListener, searchResponse, contextId.shards().size()); + for (Map.Entry entry : + contextId.shards().entrySet()) { + DiscoveryNode node = + nodelookup.apply( + entry.getValue().getClusterAlias(), entry.getValue().getNode()); + try { + final Transport.Connection connection = + searchTransportService.getConnection( + entry.getValue().getClusterAlias(), node); + searchTransportService.updatePitContext( + connection, + new UpdatePITReaderRequest( + entry.getValue().getSearchContextId(), + searchResponse.pointInTimeId(), + request.getKeepAlive().millis()), + groupedActionListener); + } catch (Exception e) { + groupedActionListener.onFailure(e); + } + } + }, + updatePitIdListener::onFailure); + }, + updatePitIdListener::onFailure); + } - new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - listener.onResponse(new PITResponse(searchResponse.pointInTimeId())); - } + private StepListener> getConnectionLookupListener( + SearchContextId contextId) { + ClusterState state = clusterService.state(); - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + final Set clusters = + contextId.shards().values().stream() + .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) + .map(SearchContextIdForNode::getClusterAlias) + .collect(Collectors.toSet()); + final StepListener> lookupListener = + new StepListener<>(); + + if (clusters.isEmpty() == false) { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); + } else { + lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); + } + return lookupListener; + } + + private ActionListener getGroupedListener( + ActionListener updatePitIdListener, SearchResponse searchResponse, int size) { + return new GroupedActionListener( + new ActionListener>() { + @Override + public void onResponse(final Collection responses) { + updatePitIdListener.onResponse(searchResponse); + } + @Override + public void onFailure(final Exception e) { + updatePitIdListener.onFailure(e); + } + }, + size); } public static class CreateReaderContextRequest extends TransportRequest { private final ShardId shardId; private final TimeValue keepAlive; - public CreateReaderContextRequest(ShardId shardId, TimeValue keepAlive) { this.shardId = shardId; this.keepAlive = keepAlive; } - public ShardId getShardId() { return shardId; } - public TimeValue getKeepAlive() { return keepAlive; } - public CreateReaderContextRequest(StreamInput in) throws IOException { super(in); this.shardId = new ShardId(in); this.keepAlive = in.readTimeValue(); } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -127,21 +245,61 @@ public void writeTo(StreamOutput out) throws IOException { } public static class CreateReaderContextResponse extends SearchPhaseResult { - public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) { this.contextId = shardSearchContextId; } - public CreateReaderContextResponse(StreamInput in) throws IOException { super(in); contextId = new ShardSearchContextId(in); } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); contextId.writeTo(out); + } + } + + public static class UpdatePITReaderRequest extends TransportRequest { + private String PitId; + private long keepAlive; + private ShardSearchContextId searchContextId; + UpdatePITReaderRequest(ShardSearchContextId searchContextId, String PitId, long keepAlive) { + this.PitId = PitId; + this.searchContextId = searchContextId; + this.keepAlive = keepAlive; + } + UpdatePITReaderRequest(StreamInput in) throws IOException { + super(in); + } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + public ShardSearchContextId getSearchContextId() { + return searchContextId; + } + public String getPitId() { + return PitId; + } + public String id() { + return this.getPitId(); + } + } + public static class UpdatePitContextResponse extends TransportResponse { + private String pitId; + UpdatePitContextResponse(StreamInput in) throws IOException { + pitId = in.readString(); + } + public UpdatePitContextResponse(String pitId) { + this.pitId = pitId; + } + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(pitId); + } + public String getPitId() { + return pitId; } } } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java index 86b6e1887d75a..198ad06d7fbff 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java @@ -9,7 +9,7 @@ package org.opensearch.rest.action.search; import org.opensearch.action.search.CreatePITAction; -import org.opensearch.action.search.PITRequest; +import org.opensearch.action.search.CreatePITRequest; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.node.NodeClient; import org.opensearch.common.Strings; @@ -33,14 +33,17 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - PITRequest pitRequest = new PITRequest(request.paramAsTime("keep_alive", null)); - pitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, pitRequest.indicesOptions())); - pitRequest.setPreference(request.param("preference")); - pitRequest.setRouting(request.param("routing")); - pitRequest.setIndices(Strings.splitStringByCommaToArray(request.param("index"))); + boolean allowPartialPitCreation = request.paramAsBoolean("allow_partial_pit_creation", false); + + CreatePITRequest createPitRequest = + new CreatePITRequest(request.paramAsTime("keep_alive", null), allowPartialPitCreation); + createPitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, createPitRequest.indicesOptions())); + createPitRequest.setPreference(request.param("preference")); + createPitRequest.setRouting(request.param("routing")); + createPitRequest.setIndices(Strings.splitStringByCommaToArray(request.param("index"))); return channel -> { RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); - cancelClient.execute(CreatePITAction.INSTANCE, pitRequest, new RestStatusToXContentListener<>(channel)); + cancelClient.execute(CreatePITAction.INSTANCE, createPitRequest, new RestStatusToXContentListener<>(channel)); }; } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 221a2c9a1a1d7..729cd2801d899 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -41,9 +41,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.OriginalIndices; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchShardTask; -import org.opensearch.action.search.SearchType; +import org.opensearch.action.search.*; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; @@ -820,7 +818,6 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); readerContext = new PitReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false, shard.routingEntry(),nonVerboseSegments); - readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false); final ReaderContext finalReaderContext = readerContext; searcherSupplier = null; // transfer ownership to reader context @@ -960,6 +957,23 @@ public boolean freeReaderContext(ShardSearchContextId contextId) { return false; } + public void updatePitIdAndKeepAlive(TransportCreatePITAction.UpdatePITReaderRequest request, + ActionListener listener) { + if (getReaderContext(request.getSearchContextId()) != null) { + try { + PitReaderContext readerContext = getPitReaderContext(request.getSearchContextId()); + if(readerContext == null) { + throw new SearchContextMissingException(request.getSearchContextId()); + } + readerContext.updatePitId(request.getPitId()); + readerContext.tryUpdateKeepAlive(maxKeepAlive); + listener.onResponse(new TransportCreatePITAction.UpdatePitContextResponse(request.getPitId())); + } catch(Exception e) { + listener.onFailure(e); + } + } + } + public void freeAllScrollContexts() { for (ReaderContext readerContext : activeReaders.values()) { if (readerContext.scrollContext() != null) { @@ -1290,8 +1304,13 @@ public ResponseCollectorService getResponseCollectorService() { } public PitReaderContext getPitReaderContext(ShardSearchContextId id) { - ReaderContext readerContext = getReaderContext(id); - return (PitReaderContext) readerContext; + for(Map.Entry context : activeReaders.entrySet()) { + if(context.getValue() instanceof PitReaderContext) { + if(context.getKey() == id.getId()) return (PitReaderContext) context.getValue(); + } + } + + return null; } class Reaper implements Runnable { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 103d5104a84c0..e7a29f8c0b68b 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -24,6 +24,7 @@ public ShardRouting getShardRouting() { private final ShardRouting shardRouting; private final List segments; + private String pitId; public PitReaderContext(ShardSearchContextId id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier searcherSupplier, @@ -38,5 +39,8 @@ public List getSegments() { return segments; } + public void updatePitId(final String pitId) { + this.pitId = pitId; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index 31d7a44d9bbd2..ca4a047d4adee 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -138,7 +138,7 @@ public Engine.Searcher acquireSearcher(String source) { return searcherSupplier.acquireSearcher(source); } - private void tryUpdateKeepAlive(long keepAlive) { + public void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.updateAndGet(curr -> Math.max(curr, keepAlive)); } diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java new file mode 100644 index 0000000000000..3f458865e5351 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.action.ActionFuture; +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + + +import java.util.concurrent.ExecutionException; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +public class PitSingleNodeTests extends OpenSearchSingleNodeTestCase { + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testCreatePIT() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + assertHitCount(searchResponse, 1); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(2, service.getActiveContexts()); + service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + } + + public void testCreatePITWithShardReplicas() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + assertHitCount(searchResponse, 1); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(2, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITOnValidIndexAndThenDeleteIndex() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + client().admin().indices().prepareDelete("index").get(); + + IndexNotFoundException ex = expectThrows( + IndexNotFoundException.class, + () -> { + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + } + ); + assertTrue(ex.getMessage().contains("no such index [index]")); + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITOnValidIndexAndThenCloseIndex() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().admin().indices().prepareClose("index").get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + SearchResponse response = execute.get(); + }); + + assertTrue(ex.getMessage().contains("IndexClosedException")); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITWithMultipleIndices() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + createIndex("index1", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index", "index1"}); + SearchService service = getInstanceFromNode(SearchService.class); + + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse response = execute.get(); + assertEquals(4,response.getSuccessfulShards()); + assertEquals(4, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITWithNonExistentIndex() { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index", "index1"}); + SearchService service = getInstanceFromNode(SearchService.class); + + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + SearchResponse response = execute.get(); + }); + + assertTrue(ex.getMessage().contains("no such index [index1]")); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testMaxOpenPitContexts() throws Exception { + createIndex("index"); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + SearchService service = getInstanceFromNode(SearchService.class); + + SearchResponse pitResponse = null; + for (int i = 0; i < SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); i++) { + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + pitResponse = execute.get(); + logger.info(SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY) + " -- " + i); + } + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse response = execute.get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + }); + + assertTrue(ex.getMessage().contains("Trying to create too many Point In Time contexts. " + + "Must be less than or equal to: [" + SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY) + "]. " + + "This limit can be set by changing the [search.max_open_pit_context] setting.")); + + service.doClose(); + } +}