diff --git a/CHANGELOG.md b/CHANGELOG.md index 60fff385e8dfb..7b2db325c7b17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) - Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064)) - Add failover support with Segment Replication enabled. ([#4325](https://github.com/opensearch-project/OpenSearch/pull/4325) +- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388)) - Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342)) - Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343)) - Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344)) diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java index ca8db8ce9e6f0..b3144a75d1445 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java @@ -498,6 +498,10 @@ static Request deleteAllPits() { return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all"); } + static Request getAllPits() { + return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all"); + } + static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException { Request request = new Request(HttpPost.METHOD_NAME, "/_msearch"); diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java index 0c73c65f6175f..0a5880b778942 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java @@ -63,6 +63,7 @@ import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; @@ -1368,6 +1369,40 @@ public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListen ); } + /** + * Get all point in time searches using list all PITs API + * + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final GetAllPitNodesResponse getAllPits(RequestOptions options) throws IOException { + return performRequestAndParseEntity( + new MainRequest(), + (request) -> RequestConverters.getAllPits(), + options, + GetAllPitNodesResponse::fromXContent, + emptySet() + ); + } + + /** + * Asynchronously get all point in time searches using list all PITs API + * + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return the response + */ + public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener listener) { + return performRequestAsyncAndParseEntity( + new MainRequest(), + (request) -> RequestConverters.getAllPits(), + options, + GetAllPitNodesResponse::fromXContent, + listener, + emptySet() + ); + } + /** * Clears one or more scroll ids using the Clear Scroll API. * diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java index 395ec6e46a7b3..cbb4db10cd519 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java @@ -18,12 +18,14 @@ import org.opensearch.action.search.DeletePitInfo; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesResponse; import org.opensearch.common.unit.TimeValue; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Tests point in time API with rest high level client @@ -52,21 +54,24 @@ public void indexDocuments() throws IOException { public void testCreateAndDeletePit() throws IOException { CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index"); - CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); - assertTrue(pitResponse.getId() != null); - assertEquals(1, pitResponse.getTotalShards()); - assertEquals(1, pitResponse.getSuccessfulShards()); - assertEquals(0, pitResponse.getFailedShards()); - assertEquals(0, pitResponse.getSkippedShards()); + CreatePitResponse createPitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + assertTrue(createPitResponse.getId() != null); + assertEquals(1, createPitResponse.getTotalShards()); + assertEquals(1, createPitResponse.getSuccessfulShards()); + assertEquals(0, createPitResponse.getFailedShards()); + assertEquals(0, createPitResponse.getSkippedShards()); + GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT); + List pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList()); + assertTrue(pits.contains(createPitResponse.getId())); List pitIds = new ArrayList<>(); - pitIds.add(pitResponse.getId()); + pitIds.add(createPitResponse.getId()); DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds); DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync); assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); - assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId())); + assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId())); } - public void testDeleteAllPits() throws IOException { + public void testDeleteAllAndListAllPits() throws IOException { CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index"); CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); @@ -80,6 +85,11 @@ public void testDeleteAllPits() throws IOException { pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); assertTrue(pitResponse.getId() != null); assertTrue(pitResponse1.getId() != null); + GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT); + + List pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList()); + assertTrue(pits.contains(pitResponse.getId())); + assertTrue(pits.contains(pitResponse1.getId())); ActionListener deletePitListener = new ActionListener<>() { @Override public void onResponse(DeletePitResponse response) { @@ -95,8 +105,27 @@ public void onFailure(Exception e) { } } }; + final CreatePitResponse pitResponse3 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + + ActionListener getPitsListener = new ActionListener() { + @Override + public void onResponse(GetAllPitNodesResponse response) { + List pits = response.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList()); + assertTrue(pits.contains(pitResponse3.getId())); + } + + @Override + public void onFailure(Exception e) { + if (!(e instanceof OpenSearchStatusException)) { + throw new AssertionError("List all PITs failed", e); + } + } + }; + highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener); highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener); // validate no pits case + getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT); + assertTrue(getAllPitResponse.getPitInfos().size() == 0); highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener); } } diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index cdd63743f2644..ad8da7244eae0 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -135,6 +135,7 @@ public class RestHighLevelClientTests extends OpenSearchTestCase { "ping", "info", "delete_all_pits", + "get_all_pits", // security "security.get_ssl_certificates", "security.authenticate", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json b/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json new file mode 100644 index 0000000000000..544a8cb11b002 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json @@ -0,0 +1,19 @@ +{ + "get_all_pits":{ + "documentation":{ + "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", + "description":"Lists all active point in time searches." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_search/point_in_time/_all", + "methods":[ + "GET" + ] + } + ] + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml index c7f728b6daa4d..9572173f397d7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml @@ -79,6 +79,12 @@ - match: {hits.total: 3 } - length: {hits.hits: 1 } + - do: + get_all_pits: {} + + - match: {pits.0.pit_id: $pit_id} + - match: {pits.0.keep_alive: 82800000 } + - do: delete_pit: body: @@ -119,6 +125,12 @@ - set: {pit_id: pit_id} - match: { _shards.failed: 0} + - do: + get_all_pits: {} + + - match: {pits.0.pit_id: $pit_id} + - match: {pits.0.keep_alive: 82800000 } + - do: delete_all_pits: {} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index d27c165306519..b8cbba930951a 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -238,16 +238,14 @@ import org.opensearch.action.search.ClearScrollAction; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.DeletePitAction; -import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.MultiSearchAction; -import org.opensearch.action.search.NodesGetAllPitsAction; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePitAction; import org.opensearch.action.search.TransportDeletePitAction; import org.opensearch.action.search.TransportGetAllPitsAction; -import org.opensearch.action.search.TransportNodesGetAllPitsAction; import org.opensearch.action.search.TransportMultiSearchAction; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.search.TransportSearchScrollAction; @@ -385,6 +383,7 @@ import org.opensearch.rest.action.cat.RestClusterManagerAction; import org.opensearch.rest.action.cat.RestNodeAttrsAction; import org.opensearch.rest.action.cat.RestNodesAction; +import org.opensearch.rest.action.cat.RestPitSegmentsAction; import org.opensearch.rest.action.cat.RestPluginsAction; import org.opensearch.rest.action.cat.RestRepositoriesAction; import org.opensearch.rest.action.cat.RestSegmentsAction; @@ -413,6 +412,7 @@ import org.opensearch.rest.action.search.RestCreatePitAction; import org.opensearch.rest.action.search.RestDeletePitAction; import org.opensearch.rest.action.search.RestExplainAction; +import org.opensearch.rest.action.search.RestGetAllPitsAction; import org.opensearch.rest.action.search.RestMultiSearchAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.rest.action.search.RestSearchScrollAction; @@ -678,10 +678,9 @@ public void reg // point in time actions actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class); - actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class); - actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class); + actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); return unmodifiableMap(actions.getRegistry()); } @@ -858,6 +857,8 @@ public void initRestHandlers(Supplier nodesInCluster) { // Point in time API registerHandler.accept(new RestCreatePitAction()); registerHandler.accept(new RestDeletePitAction()); + registerHandler.accept(new RestGetAllPitsAction(nodesInCluster)); + registerHandler.accept(new RestPitSegmentsAction(nodesInCluster)); for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index 84f5e5ad6a1e8..de0d390cddc4a 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -13,6 +13,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.ArrayList; @@ -84,4 +85,37 @@ public ActionRequestValidationException validate() { } return validationException; } + + public void fromXContent(XContentParser parser) throws IOException { + pitIds.clear(); + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("Malformed content, must start with an object"); + } else { + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if ("pit_id".equals(currentFieldName)) { + if (token == XContentParser.Token.START_ARRAY) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token.isValue() == false) { + throw new IllegalArgumentException("pit_id array element should only contain PIT identifier"); + } + pitIds.add(parser.text()); + } + } else { + if (token.isValue() == false) { + throw new IllegalArgumentException("pit_id element should only contain PIT identifier"); + } + pitIds.add(parser.text()); + } + } else { + throw new IllegalArgumentException( + "Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] " + ); + } + } + } + } } diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index f64dd3d7efae6..745139fd1f1e8 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -98,6 +98,11 @@ public void executeCreatePit( task.getParentTaskId(), Collections.emptyMap() ); + /** + * This is needed for cross cluster functionality to work with PITs and current ccsMinimizeRoundTrips is + * not supported for point in time + */ + searchRequest.setCcsMinimizeRoundtrips(false); /** * Phase 1 of create PIT */ @@ -193,6 +198,29 @@ void executeUpdatePitId( ); for (Map.Entry entry : contextId.shards().entrySet()) { DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); + if (node == null) { + node = this.clusterService.state().getNodes().get(entry.getValue().getNode()); + } + if (node == null) { + logger.error( + () -> new ParameterizedMessage( + "Create pit update phase for PIT ID [{}] failed " + "because node [{}] not found", + searchResponse.pointInTimeId(), + entry.getValue().getNode() + ) + ); + groupedActionListener.onFailure( + new OpenSearchException( + "Create pit update phase for PIT ID [" + + searchResponse.pointInTimeId() + + "] failed because node[" + + entry.getValue().getNode() + + "] " + + "not found" + ) + ); + return; + } try { final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node); searchTransportService.updatePitContext( @@ -206,11 +234,12 @@ void executeUpdatePitId( groupedActionListener ); } catch (Exception e) { + String nodeName = node.getName(); logger.error( () -> new ParameterizedMessage( "Create pit update phase failed for PIT ID [{}] on node [{}]", searchResponse.pointInTimeId(), - node + nodeName ), e ); diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java index 340f9b842adbf..b4ad2f6641087 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java @@ -21,22 +21,11 @@ */ public class GetAllPitNodesRequest extends BaseNodesRequest { - // Security plugin intercepts and sets the response with permitted PIT contexts - private GetAllPitNodesResponse getAllPitNodesResponse; - @Inject public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) { super(concreteNodes); } - public void setGetAllPitNodesResponse(GetAllPitNodesResponse getAllPitNodesResponse) { - this.getAllPitNodesResponse = getAllPitNodesResponse; - } - - public GetAllPitNodesResponse getGetAllPitNodesResponse() { - return getAllPitNodesResponse; - } - public GetAllPitNodesRequest(StreamInput in) throws IOException { super(in); } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java index 091447798cf5f..610520a4c1f9d 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java @@ -11,10 +11,13 @@ import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.cluster.ClusterName; +import org.opensearch.common.ParseField; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ConstructingObjectParser; import org.opensearch.common.xcontent.ToXContentObject; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.ArrayList; @@ -24,6 +27,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; + /** * This class transforms active PIT objects from all nodes to unique PIT objects */ @@ -40,13 +45,13 @@ public GetAllPitNodesResponse(StreamInput in) throws IOException { public GetAllPitNodesResponse( ClusterName clusterName, - List getAllPitNodeResponse, + List getAllPitNodeResponseList, List failures ) { - super(clusterName, getAllPitNodeResponse, failures); + super(clusterName, getAllPitNodeResponseList, failures); Set uniquePitIds = new HashSet<>(); pitInfos.addAll( - getAllPitNodeResponse.stream() + getAllPitNodeResponseList.stream() .flatMap(p -> p.getPitInfos().stream().filter(t -> uniquePitIds.add(t.getPitId()))) .collect(Collectors.toList()) ); @@ -60,14 +65,30 @@ public GetAllPitNodesResponse(List listPitInfos, GetAllPitNodesResp pitInfos.addAll(listPitInfos); } + public GetAllPitNodesResponse( + List listPitInfos, + ClusterName clusterName, + List getAllPitNodeResponseList, + List failures + ) { + super(clusterName, getAllPitNodeResponseList, failures); + pitInfos.addAll(listPitInfos); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.startArray("pitInfos"); + builder.startArray("pits"); for (ListPitInfo pit : pitInfos) { pit.toXContent(builder, params); } builder.endArray(); + if (!failures().isEmpty()) { + builder.startArray("failures"); + for (FailedNodeException e : failures()) { + e.toXContent(builder, params); + } + } builder.endObject(); return builder; } @@ -85,4 +106,28 @@ public void writeNodesTo(StreamOutput out, List nodes) th public List getPitInfos() { return Collections.unmodifiableList(new ArrayList<>(pitInfos)); } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "get_all_pits_response", + true, + (Object[] parsedObjects) -> { + @SuppressWarnings("unchecked") + List listPitInfos = (List) parsedObjects[0]; + List failures = null; + if (parsedObjects.length > 1) { + failures = (List) parsedObjects[1]; + } + if (failures == null) { + failures = new ArrayList<>(); + } + return new GetAllPitNodesResponse(listPitInfos, new ClusterName(""), new ArrayList<>(), failures); + } + ); + static { + PARSER.declareObjectArray(constructorArg(), ListPitInfo.PARSER, new ParseField("pits")); + } + + public static GetAllPitNodesResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java index 16e65cb785a7d..8fe901add5e3a 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java @@ -11,7 +11,7 @@ import org.opensearch.action.ActionType; /** - * Action type for listing all PIT reader contexts + * Action type for retrieving all PIT reader contexts from nodes */ public class GetAllPitsAction extends ActionType { public static final GetAllPitsAction INSTANCE = new GetAllPitsAction(); diff --git a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java index 4499e7d6e8ef5..249b0a9ab3baa 100644 --- a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java +++ b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java @@ -8,14 +8,18 @@ package org.opensearch.action.search; +import org.opensearch.common.ParseField; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.ConstructingObjectParser; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; import java.io.IOException; +import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; + /** * This holds information about pit reader context such as pit id and creation time */ @@ -36,16 +40,6 @@ public ListPitInfo(StreamInput in) throws IOException { this.keepAlive = in.readLong(); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field("pitId", pitId); - builder.field("creationTime", creationTime); - builder.field("keepAlive", keepAlive); - builder.endObject(); - return builder; - } - public String getPitId() { return pitId; } @@ -60,4 +54,30 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(creationTime); out.writeLong(keepAlive); } + + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "list_pit_info", + true, + args -> new ListPitInfo((String) args[0], (long) args[1], (long) args[2]) + ); + + private static final ParseField CREATION_TIME = new ParseField("creation_time"); + private static final ParseField PIT_ID = new ParseField("pit_id"); + private static final ParseField KEEP_ALIVE = new ParseField("keep_alive"); + static { + PARSER.declareString(constructorArg(), PIT_ID); + PARSER.declareLong(constructorArg(), CREATION_TIME); + PARSER.declareLong(constructorArg(), KEEP_ALIVE); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(PIT_ID.getPreferredName(), pitId); + builder.field(CREATION_TIME.getPreferredName(), creationTime); + builder.field(KEEP_ALIVE.getPreferredName(), keepAlive); + builder.endObject(); + return builder; + } + } diff --git a/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java deleted file mode 100644 index af41f7d49551c..0000000000000 --- a/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.action.ActionType; - -/** - * Action type for retrieving all PIT reader contexts from nodes - */ -public class NodesGetAllPitsAction extends ActionType { - public static final NodesGetAllPitsAction INSTANCE = new NodesGetAllPitsAction(); - public static final String NAME = "cluster:admin/point_in_time/read_from_nodes"; - - private NodesGetAllPitsAction() { - super(NAME, GetAllPitNodesResponse::new); - } -} diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java index ff068397ad94e..f42d84477f9a3 100644 --- a/server/src/main/java/org/opensearch/action/search/PitService.java +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -93,7 +93,10 @@ public void deletePitContexts( for (Map.Entry> entry : nodeToContextsMap.entrySet()) { String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias(); - final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); + DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); + if (node == null) { + node = this.clusterService.state().getNodes().get(entry.getValue().get(0).getSearchContextIdForNode().getNode()); + } if (node == null) { logger.error( () -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode()) @@ -108,7 +111,8 @@ public void deletePitContexts( final Transport.Connection connection = searchTransportService.getConnection(clusterAlias, node); searchTransportService.sendFreePITContexts(connection, entry.getValue(), groupedListener); } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", node.getName()), e); + String nodeName = node.getName(); + logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", nodeName), e); List deletePitInfos = new ArrayList<>(); for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId())); @@ -173,10 +177,11 @@ public void getAllPits(ActionListener getAllPitsListener nodes.add(node); } DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]); + GetAllPitNodesRequest getAllPitNodesRequest = new GetAllPitNodesRequest(disNodesArr); transportService.sendRequest( transportService.getLocalNode(), - NodesGetAllPitsAction.NAME, - new GetAllPitNodesRequest(disNodesArr), + GetAllPitsAction.NAME, + getAllPitNodesRequest, new TransportResponseHandler() { @Override diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index 19abe2361290d..b85fe302a748f 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -11,7 +11,6 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.tasks.Task; @@ -28,9 +27,6 @@ */ public class TransportDeletePitAction extends HandledTransportAction { private final NamedWriteableRegistry namedWriteableRegistry; - private TransportSearchAction transportSearchAction; - private final ClusterService clusterService; - private final SearchTransportService searchTransportService; private final PitService pitService; @Inject @@ -38,16 +34,10 @@ public TransportDeletePitAction( TransportService transportService, ActionFilters actionFilters, NamedWriteableRegistry namedWriteableRegistry, - TransportSearchAction transportSearchAction, - ClusterService clusterService, - SearchTransportService searchTransportService, PitService pitService ) { super(DeletePitAction.NAME, transportService, actionFilters, DeletePitRequest::new); this.namedWriteableRegistry = namedWriteableRegistry; - this.transportSearchAction = transportSearchAction; - this.clusterService = clusterService; - this.searchTransportService = searchTransportService; this.pitService = pitService; } @@ -57,11 +47,7 @@ public TransportDeletePitAction( @Override protected void doExecute(Task task, DeletePitRequest request, ActionListener listener) { List pitIds = request.getPitIds(); - // when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty - // and in this case return empty response - if (pitIds.isEmpty()) { - listener.onResponse(new DeletePitResponse(new ArrayList<>())); - } else if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { deleteAllPits(listener); } else { deletePits(listener, request); diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java index c8529c5b02bd4..39299f9a33b18 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java @@ -8,31 +8,79 @@ package org.opensearch.action.search; -import org.opensearch.action.ActionListener; +import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; -import org.opensearch.tasks.Task; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.SearchService; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.io.IOException; +import java.util.List; + /** - * Transport action to get all active PIT contexts across the cluster + * Transport action to get all active PIT contexts across all nodes */ -public class TransportGetAllPitsAction extends HandledTransportAction { - private final PitService pitService; +public class TransportGetAllPitsAction extends TransportNodesAction< + GetAllPitNodesRequest, + GetAllPitNodesResponse, + GetAllPitNodeRequest, + GetAllPitNodeResponse> { + private final SearchService searchService; @Inject - public TransportGetAllPitsAction(ActionFilters actionFilters, TransportService transportService, PitService pitService) { - super(GetAllPitsAction.NAME, transportService, actionFilters, in -> new GetAllPitNodesRequest(in)); - this.pitService = pitService; + public TransportGetAllPitsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + SearchService searchService + ) { + super( + GetAllPitsAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + GetAllPitNodesRequest::new, + GetAllPitNodeRequest::new, + ThreadPool.Names.SAME, + GetAllPitNodeResponse.class + ); + this.searchService = searchService; + } + + @Override + protected GetAllPitNodesResponse newResponse( + GetAllPitNodesRequest request, + List getAllPitNodeResponses, + List failures + ) { + return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeResponses, failures); + } + + @Override + protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { + return new GetAllPitNodeRequest(); + } + + @Override + protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { + return new GetAllPitNodeResponse(in); } - protected void doExecute(Task task, GetAllPitNodesRequest request, ActionListener listener) { - // If security plugin intercepts the request, it'll replace all PIT IDs with permitted PIT IDs - if (request.getGetAllPitNodesResponse() != null) { - listener.onResponse(request.getGetAllPitNodesResponse()); - } else { - pitService.getAllPits(listener); - } + /** + * This retrieves all active PITs in the node + */ + @Override + protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { + GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( + transportService.getLocalNode(), + searchService.getAllPITReaderContexts() + ); + return nodeResponse; } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java deleted file mode 100644 index 520830cd293f0..0000000000000 --- a/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.action.FailedNodeException; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.nodes.TransportNodesAction; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.search.SearchService; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; - -import java.io.IOException; -import java.util.List; - -/** - * Transport action to get all active PIT contexts across all nodes - */ -public class TransportNodesGetAllPitsAction extends TransportNodesAction< - GetAllPitNodesRequest, - GetAllPitNodesResponse, - GetAllPitNodeRequest, - GetAllPitNodeResponse> { - private final SearchService searchService; - - @Inject - public TransportNodesGetAllPitsAction( - ThreadPool threadPool, - ClusterService clusterService, - TransportService transportService, - ActionFilters actionFilters, - SearchService searchService - ) { - super( - NodesGetAllPitsAction.NAME, - threadPool, - clusterService, - transportService, - actionFilters, - GetAllPitNodesRequest::new, - GetAllPitNodeRequest::new, - ThreadPool.Names.SAME, - GetAllPitNodeResponse.class - ); - this.searchService = searchService; - } - - @Override - protected GetAllPitNodesResponse newResponse( - GetAllPitNodesRequest request, - List getAllPitNodeRespons, - List failures - ) { - return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures); - } - - @Override - protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { - return new GetAllPitNodeRequest(); - } - - @Override - protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { - return new GetAllPitNodeResponse(in); - } - - /** - * This retrieves all active PITs in the node - */ - @Override - protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { - GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( - transportService.getLocalNode(), - searchService.getAllPITReaderContexts() - ); - return nodeResponse; - } -} diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java index 94043d5c3c89f..f20f0b4246cb6 100644 --- a/server/src/main/java/org/opensearch/client/Client.java +++ b/server/src/main/java/org/opensearch/client/Client.java @@ -64,6 +64,8 @@ import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; @@ -341,6 +343,11 @@ public interface Client extends OpenSearchClient, Releasable { */ void deletePits(DeletePitRequest deletePITRequest, ActionListener listener); + /** + * Get all active point in time searches + */ + void getAllPits(GetAllPitNodesRequest getAllPitNodesRequest, ActionListener listener); + /** * Get information of segments of one or more PITs */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index bc80a2ba92bf8..21cd01bf65a45 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -335,10 +335,13 @@ import org.opensearch.action.search.DeletePitAction; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchRequestBuilder; import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchRequestBuilder; @@ -595,6 +598,11 @@ public void deletePits(final DeletePitRequest deletePITRequest, final ActionList execute(DeletePitAction.INSTANCE, deletePITRequest, listener); } + @Override + public void getAllPits(final GetAllPitNodesRequest getAllPitNodesRequest, final ActionListener listener) { + execute(GetAllPitsAction.INSTANCE, getAllPitNodesRequest, listener); + } + @Override public void pitSegments(final PitSegmentsRequest request, final ActionListener listener) { execute(PitSegmentsAction.INSTANCE, request, listener); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java new file mode 100644 index 0000000000000..ba9606e8eb444 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestPitSegmentsAction.java @@ -0,0 +1,171 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.cat; + +import org.opensearch.action.admin.indices.segments.IndexSegments; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.PitSegmentsAction; +import org.opensearch.action.admin.indices.segments.PitSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.Table; +import org.opensearch.index.engine.Segment; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action for pit segments + */ +public class RestPitSegmentsAction extends AbstractCatAction { + private final Supplier nodesInCluster; + + public RestPitSegmentsAction(Supplier nodesInCluster) { + super(); + this.nodesInCluster = nodesInCluster; + } + + @Override + public List routes() { + return unmodifiableList(asList(new Route(GET, "/_cat/pit_segments/_all"), new Route(GET, "/_cat/pit_segments"))); + } + + @Override + public String getName() { + return "cat_pit_segments_action"; + } + + @Override + public boolean allowSystemIndexAccessByDefault() { + return true; + } + + @Override + protected BaseRestHandler.RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { + String allPitIdsQualifier = "_all"; + final PitSegmentsRequest pitSegmentsRequest; + if (request.path().contains(allPitIdsQualifier)) { + pitSegmentsRequest = new PitSegmentsRequest(allPitIdsQualifier); + } else { + pitSegmentsRequest = new PitSegmentsRequest(); + try { + request.withContentOrSourceParamParserOrNull((xContentParser -> { + if (xContentParser != null) { + pitSegmentsRequest.fromXContent(xContentParser); + } + })); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse request body", e); + } + } + return channel -> client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest, new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final IndicesSegmentResponse indicesSegmentResponse) throws Exception { + final Map indicesSegments = indicesSegmentResponse.getIndices(); + Table tab = buildTable(request, indicesSegments); + return RestTable.buildResponse(tab, channel); + } + }); + } + + @Override + protected void documentation(StringBuilder sb) { + sb.append("/_cat/pit_segments\n"); + sb.append("/_cat/pit_segments/{pit_id}\n"); + } + + @Override + protected Table getTableWithHeader(RestRequest request) { + Table table = new Table(); + table.startHeaders(); + table.addCell("index", "default:true;alias:i,idx;desc:index name"); + table.addCell("shard", "default:true;alias:s,sh;desc:shard name"); + table.addCell("prirep", "alias:p,pr,primaryOrReplica;default:true;desc:primary or replica"); + table.addCell("ip", "default:true;desc:ip of node where it lives"); + table.addCell("id", "default:false;desc:unique id of node where it lives"); + table.addCell("segment", "default:true;alias:seg;desc:segment name"); + table.addCell("generation", "default:true;alias:g,gen;text-align:right;desc:segment generation"); + table.addCell("docs.count", "default:true;alias:dc,docsCount;text-align:right;desc:number of docs in segment"); + table.addCell("docs.deleted", "default:true;alias:dd,docsDeleted;text-align:right;desc:number of deleted docs in segment"); + table.addCell("size", "default:true;alias:si;text-align:right;desc:segment size in bytes"); + table.addCell("size.memory", "default:true;alias:sm,sizeMemory;text-align:right;desc:segment memory in bytes"); + table.addCell("committed", "default:true;alias:ic,isCommitted;desc:is segment committed"); + table.addCell("searchable", "default:true;alias:is,isSearchable;desc:is segment searched"); + table.addCell("version", "default:true;alias:v,ver;desc:version"); + table.addCell("compound", "default:true;alias:ico,isCompound;desc:is segment compound"); + table.endHeaders(); + return table; + } + + private Table buildTable(final RestRequest request, Map indicesSegments) { + Table table = getTableWithHeader(request); + + DiscoveryNodes nodes = this.nodesInCluster.get(); + table.startRow(); + table.addCell("index", "default:true;alias:i,idx;desc:index name"); + table.addCell("shard", "default:true;alias:s,sh;desc:shard name"); + table.addCell("prirep", "alias:p,pr,primaryOrReplica;default:true;desc:primary or replica"); + table.addCell("ip", "default:true;desc:ip of node where it lives"); + table.addCell("id", "default:false;desc:unique id of node where it lives"); + table.addCell("segment", "default:true;alias:seg;desc:segment name"); + table.addCell("generation", "default:true;alias:g,gen;text-align:right;desc:segment generation"); + table.addCell("docs.count", "default:true;alias:dc,docsCount;text-align:right;desc:number of docs in segment"); + table.addCell("docs.deleted", "default:true;alias:dd,docsDeleted;text-align:right;desc:number of deleted docs in segment"); + table.addCell("size", "default:true;alias:si;text-align:right;desc:segment size in bytes"); + table.addCell("size.memory", "default:true;alias:sm,sizeMemory;text-align:right;desc:segment memory in bytes"); + table.addCell("committed", "default:true;alias:ic,isCommitted;desc:is segment committed"); + table.addCell("searchable", "default:true;alias:is,isSearchable;desc:is segment searched"); + table.addCell("version", "default:true;alias:v,ver;desc:version"); + table.addCell("compound", "default:true;alias:ico,isCompound;desc:is segment compound"); + table.endRow(); + for (IndexSegments indexSegments : indicesSegments.values()) { + Map shards = indexSegments.getShards(); + for (IndexShardSegments indexShardSegments : shards.values()) { + ShardSegments[] shardSegments = indexShardSegments.getShards(); + for (ShardSegments shardSegment : shardSegments) { + List segments = shardSegment.getSegments(); + for (Segment segment : segments) { + table.startRow(); + table.addCell(shardSegment.getShardRouting().getIndexName()); + table.addCell(shardSegment.getShardRouting().getId()); + table.addCell(shardSegment.getShardRouting().primary() ? "p" : "r"); + table.addCell(nodes.get(shardSegment.getShardRouting().currentNodeId()).getHostAddress()); + table.addCell(shardSegment.getShardRouting().currentNodeId()); + table.addCell(segment.getName()); + table.addCell(segment.getGeneration()); + table.addCell(segment.getNumDocs()); + table.addCell(segment.getDeletedDocs()); + table.addCell(segment.getSize()); + table.addCell(0L); + table.addCell(segment.isCommitted()); + table.addCell(segment.isSearch()); + table.addCell(segment.getVersion()); + table.addCell(segment.isCompound()); + table.endRow(); + + } + } + } + } + return table; + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java index 452e66f8f5018..b19a7505741cc 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java @@ -26,7 +26,6 @@ * Rest action for deleting PIT contexts */ public class RestDeletePitAction extends BaseRestHandler { - @Override public String getName() { return "delete_pit_action"; diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java new file mode 100644 index 0000000000000..0e1febe9d2a61 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java @@ -0,0 +1,90 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.RestStatus; +import org.opensearch.rest.action.RestBuilderListener; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Rest action for retrieving all active PIT IDs across all nodes + */ +public class RestGetAllPitsAction extends BaseRestHandler { + + private final Supplier nodesInCluster; + + public RestGetAllPitsAction(Supplier nodesInCluster) { + super(); + this.nodesInCluster = nodesInCluster; + } + + @Override + public String getName() { + return "get_all_pit_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final List nodes = new ArrayList<>(); + for (DiscoveryNode node : nodesInCluster.get()) { + nodes.add(node); + } + DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]); + GetAllPitNodesRequest getAllPitNodesRequest = new GetAllPitNodesRequest(disNodesArr); + return channel -> client.getAllPits(getAllPitNodesRequest, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(final GetAllPitNodesResponse getAllPITNodesResponse, XContentBuilder builder) + throws Exception { + builder.startObject(); + if (getAllPITNodesResponse.hasFailures()) { + builder.startArray("failures"); + for (int idx = 0; idx < getAllPITNodesResponse.failures().size(); idx++) { + builder.startObject(); + builder.field( + getAllPITNodesResponse.failures().get(idx).nodeId(), + getAllPITNodesResponse.failures().get(idx).getDetailedMessage() + ); + builder.endObject(); + } + builder.endArray(); + } + builder.field("pits", getAllPITNodesResponse.getPitInfos()); + builder.endObject(); + if (getAllPITNodesResponse.hasFailures() && getAllPITNodesResponse.getPitInfos().isEmpty()) { + return new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder); + } + return new BytesRestResponse(RestStatus.OK, builder); + } + }); + } + + @Override + public List routes() { + return unmodifiableList(Collections.singletonList(new Route(GET, "/_search/point_in_time/_all"))); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index 60a31c62dc32d..3962a4a11fc90 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -30,6 +30,7 @@ import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.ShardSearchContextId; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -134,6 +135,22 @@ public static void assertGetAllPitsEmpty(Client client) throws ExecutionExceptio Assert.assertEquals(0, getPitResponse.getPitInfos().size()); } + public static void assertSegments(boolean isEmpty, String index, long expectedShardSize, Client client, String pitId) { + PitSegmentsRequest pitSegmentsRequest; + pitSegmentsRequest = new PitSegmentsRequest(); + List pitIds = new ArrayList<>(); + pitIds.add(pitId); + pitSegmentsRequest.clearAndSetPitIds(pitIds); + IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest).actionGet(); + assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0); + assertEquals(indicesSegmentResponse.getIndices().isEmpty(), isEmpty); + if (!isEmpty) { + assertTrue(indicesSegmentResponse.getIndices().get(index) != null); + assertTrue(indicesSegmentResponse.getIndices().get(index).getIndex().equalsIgnoreCase(index)); + assertEquals(expectedShardSize, indicesSegmentResponse.getIndices().get(index).getShards().size()); + } + } + public static void assertSegments(boolean isEmpty, String index, long expectedShardSize, Client client) { PitSegmentsRequest pitSegmentsRequest = new PitSegmentsRequest("_all"); IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest).actionGet(); @@ -149,4 +166,8 @@ public static void assertSegments(boolean isEmpty, String index, long expectedSh public static void assertSegments(boolean isEmpty, Client client) { assertSegments(isEmpty, "index", 2, client); } + + public static void assertSegments(boolean isEmpty, Client client, String pitId) { + assertSegments(isEmpty, "index", 2, client, pitId); + } } diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index bdc0440a89f69..d6de562d616fa 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -172,9 +172,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); @@ -250,9 +247,6 @@ public void getAllPits(ActionListener getAllPitsListener transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); @@ -319,9 +313,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); @@ -378,9 +369,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); @@ -446,9 +434,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); @@ -526,9 +511,6 @@ public void getAllPits(ActionListener getAllPitsListener transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); @@ -602,9 +584,6 @@ public void getAllPits(ActionListener getAllPitsListener transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); @@ -682,9 +661,6 @@ public void getAllPits(ActionListener getAllPitsListener transportService, actionFilters, namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index 9a28f1800847e..ae7f795f57ee7 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -32,6 +32,8 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -75,7 +77,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -105,7 +107,7 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse response = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), response.getId()); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); @@ -126,7 +128,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -228,7 +230,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); validatePitStats("index", 1, 0, 0); @@ -295,14 +297,16 @@ public void testCreatePitMoreThanMaxOpenPitContexts() throws Exception { CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); SearchService service = getInstanceFromNode(SearchService.class); + List pitIds = new ArrayList<>(); try { for (int i = 0; i < 1000; i++) { - client().execute(CreatePitAction.INSTANCE, request).get(); + CreatePitResponse cpr = client().execute(CreatePitAction.INSTANCE, request).actionGet(); + if (cpr.getId() != null) pitIds.add(cpr.getId()); } } catch (Exception ex) { assertTrue( - ex.getMessage() + ((SearchPhaseExecutionException) ex).getDetailedMessage() .contains( "Trying to create too many Point In Time contexts. " + "Must be less than or equal to: [" @@ -315,7 +319,7 @@ public void testCreatePitMoreThanMaxOpenPitContexts() throws Exception { final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); validatePitStats("index", maxPitContexts, 0, 0); // deleteall - DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds.toArray(new String[pitIds.size()])); /** * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context @@ -567,7 +571,7 @@ public void testConcurrentSearches() throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); Thread[] threads = new Thread[5]; CountDownLatch latch = new CountDownLatch(threads.length); @@ -603,7 +607,6 @@ public void testConcurrentSearches() throws Exception { validatePitStats("index", 0, 1, 0); validatePitStats("index", 0, 1, 1); PitTestsUtil.assertGetAllPitsEmpty(client()); - assertSegments(true, client()); } public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException, diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index d29ccf5b97138..a23e4141a78e4 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -86,7 +86,7 @@ public void testPit() throws Exception { assertEquals(2, searchResponse.getTotalShards()); validatePitStats("index", 2, 2); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, client()); + assertSegments(false, client(), pitResponse.getId()); } public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception { @@ -114,7 +114,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); - assertSegments(false, "index", 1, client()); + assertSegments(false, "index", 1, client(), pitResponse.getId()); assertEquals(1, pitResponse.getSuccessfulShards()); assertEquals(2, pitResponse.getTotalShards()); SearchResponse searchResponse = client().prepareSearch("index")