diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index 779443eb25251..ba71eaefa5c7a 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -59,20 +59,20 @@ public class CreatePitController { private final PitService pitService; private static final Logger logger = LogManager.getLogger(CreatePitController.class); public static final Setting PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting( - "pit.init.keep_alive", - timeValueSeconds(30), - Setting.Property.NodeScope + "pit.init.keep_alive", + timeValueSeconds(30), + Setting.Property.NodeScope ); public CreatePitController( - CreatePitRequest request, - SearchTransportService searchTransportService, - ClusterService clusterService, - TransportSearchAction transportSearchAction, - NamedWriteableRegistry namedWriteableRegistry, - Task task, - ActionListener listener, - PitService pitService + CreatePitRequest request, + SearchTransportService searchTransportService, + ClusterService clusterService, + TransportSearchAction transportSearchAction, + NamedWriteableRegistry namedWriteableRegistry, + Task task, + ActionListener listener, + PitService pitService ) { this.searchTransportService = searchTransportService; this.clusterService = clusterService; @@ -94,11 +94,11 @@ public void executeCreatePit(StepListener createPitListener, Act searchRequest.indicesOptions(request.getIndicesOptions()); searchRequest.allowPartialSearchResults(request.shouldAllowPartialPitCreation()); SearchTask searchTask = searchRequest.createTask( - task.getId(), - task.getType(), - task.getAction(), - task.getParentTaskId(), - Collections.emptyMap() + task.getId(), + task.getType(), + task.getAction(), + task.getParentTaskId(), + Collections.emptyMap() ); /** * Phase 1 of create PIT @@ -109,8 +109,8 @@ public void executeCreatePit(StepListener createPitListener, Act * Phase 2 of create PIT where we update pit id in pit contexts */ createPitListener.whenComplete( - searchResponse -> { executeUpdatePitId(request, searchRequest, searchResponse, updatePitIdListener); }, - updatePitIdListener::onFailure + searchResponse -> { executeUpdatePitId(request, searchRequest, searchResponse, updatePitIdListener); }, + updatePitIdListener::onFailure ); } @@ -119,33 +119,33 @@ public void executeCreatePit(StepListener createPitListener, Act */ void executeCreatePit(Task task, SearchRequest searchRequest, StepListener createPitListener) { logger.debug( - () -> new ParameterizedMessage("Executing creation of PIT context for indices [{}]", Arrays.toString(searchRequest.indices())) + () -> new ParameterizedMessage("Executing creation of PIT context for indices [{}]", Arrays.toString(searchRequest.indices())) ); transportSearchAction.executeRequest( - task, - searchRequest, - TransportCreatePitAction.CREATE_PIT_ACTION, - true, - new TransportSearchAction.SinglePhaseSearchAction() { - @Override - public void executeOnShardTarget( - SearchTask searchTask, - SearchShardTarget target, - Transport.Connection connection, - ActionListener searchPhaseResultActionListener - ) { - searchTransportService.createPitContext( - connection, - new TransportCreatePitAction.CreateReaderContextRequest( - target.getShardId(), - PIT_INIT_KEEP_ALIVE.get(clusterService.getSettings()) - ), - searchTask, - ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse(r), searchPhaseResultActionListener::onFailure) - ); - } - }, - createPitListener + task, + searchRequest, + TransportCreatePitAction.CREATE_PIT_ACTION, + true, + new TransportSearchAction.SinglePhaseSearchAction() { + @Override + public void executeOnShardTarget( + SearchTask searchTask, + SearchShardTarget target, + Transport.Connection connection, + ActionListener searchPhaseResultActionListener + ) { + searchTransportService.createPitContext( + connection, + new TransportCreatePitAction.CreateReaderContextRequest( + target.getShardId(), + PIT_INIT_KEEP_ALIVE.get(clusterService.getSettings()) + ), + searchTask, + ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse(r), searchPhaseResultActionListener::onFailure) + ); + } + }, + createPitListener ); } @@ -153,16 +153,16 @@ public void executeOnShardTarget( * Updates PIT ID, keep alive and createdTime of PIT reader context */ void executeUpdatePitId( - CreatePitRequest request, - SearchRequest searchRequest, - SearchResponse searchResponse, - ActionListener updatePitIdListener + CreatePitRequest request, + SearchRequest searchRequest, + SearchResponse searchResponse, + ActionListener updatePitIdListener ) { logger.debug( - () -> new ParameterizedMessage( - "Updating PIT context with PIT ID [{}], creation time and keep alive", - searchResponse.pointInTimeId() - ) + () -> new ParameterizedMessage( + "Updating PIT context with PIT ID [{}], creation time and keep alive", + searchResponse.pointInTimeId() + ) ); /** * store the create time ( same create time for all PIT contexts across shards ) to be used @@ -170,57 +170,57 @@ void executeUpdatePitId( */ final long relativeStartNanos = System.nanoTime(); final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( - searchRequest.getOrCreateAbsoluteStartMillis(), - relativeStartNanos, - System::nanoTime + searchRequest.getOrCreateAbsoluteStartMillis(), + relativeStartNanos, + System::nanoTime ); final long creationTime = timeProvider.getAbsoluteStartMillis(); CreatePitResponse createPITResponse = new CreatePitResponse( - searchResponse.pointInTimeId(), - creationTime, - searchResponse.getTotalShards(), - searchResponse.getSuccessfulShards(), - searchResponse.getSkippedShards(), - searchResponse.getFailedShards(), - searchResponse.getShardFailures() + searchResponse.pointInTimeId(), + creationTime, + searchResponse.getTotalShards(), + searchResponse.getSuccessfulShards(), + searchResponse.getSkippedShards(), + searchResponse.getFailedShards(), + searchResponse.getShardFailures() ); SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId()); final StepListener> lookupListener = getConnectionLookupListener(contextId); lookupListener.whenComplete(nodelookup -> { final ActionListener groupedActionListener = getGroupedListener( - updatePitIdListener, - createPITResponse, - contextId.shards().size(), - contextId.shards().values() + updatePitIdListener, + createPITResponse, + contextId.shards().size(), + contextId.shards().values() ); for (Map.Entry entry : contextId.shards().entrySet()) { DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); try { final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node); searchTransportService.updatePitContext( - connection, - new UpdatePitContextRequest( - entry.getValue().getSearchContextId(), - createPITResponse.getId(), - request.getKeepAlive().millis(), - creationTime - ), - groupedActionListener + connection, + new UpdatePitContextRequest( + entry.getValue().getSearchContextId(), + createPITResponse.getId(), + request.getKeepAlive().millis(), + creationTime + ), + groupedActionListener ); } catch (Exception e) { logger.error( - () -> new ParameterizedMessage( - "Create pit update phase failed for PIT ID [{}] on node [{}]", - searchResponse.pointInTimeId(), - node - ), - e + () -> new ParameterizedMessage( + "Create pit update phase failed for PIT ID [{}] on node [{}]", + searchResponse.pointInTimeId(), + node + ), + e ); groupedActionListener.onFailure( - new OpenSearchException( - "Create pit update phase for PIT ID [" + searchResponse.pointInTimeId() + "] failed on node[" + node + "]", - e - ) + new OpenSearchException( + "Create pit update phase for PIT ID [" + searchResponse.pointInTimeId() + "] failed on node[" + node + "]", + e + ) ); } } @@ -230,19 +230,19 @@ void executeUpdatePitId( private StepListener> getConnectionLookupListener(SearchContextId contextId) { ClusterState state = clusterService.state(); final Set clusters = contextId.shards() - .values() - .stream() - .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) - .map(SearchContextIdForNode::getClusterAlias) - .collect(Collectors.toSet()); + .values() + .stream() + .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) + .map(SearchContextIdForNode::getClusterAlias) + .collect(Collectors.toSet()); return SearchUtils.getConnectionLookupListener(searchTransportService.getRemoteClusterService(), state, clusters); } private ActionListener getGroupedListener( - ActionListener updatePitIdListener, - CreatePitResponse createPITResponse, - int size, - Collection contexts + ActionListener updatePitIdListener, + CreatePitResponse createPITResponse, + int size, + Collection contexts ) { return new GroupedActionListener<>(new ActionListener<>() { @Override @@ -268,16 +268,16 @@ public void onResponse(DeletePitResponse response) { // this is invoke and forget call final StringBuilder failedPitsStringBuilder = new StringBuilder(); response.getDeletePitResults() - .stream() - .filter(r -> !r.isSuccessful()) - .forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(",")); + .stream() + .filter(r -> !r.isSuccessful()) + .forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(",")); logger.warn(() -> new ParameterizedMessage("Failed to delete PIT IDs {}", failedPitsStringBuilder.toString())); if (!logger.isDebugEnabled()) return; final StringBuilder successfulPitsStringBuilder = new StringBuilder(); response.getDeletePitResults() - .stream() - .filter(r -> r.isSuccessful()) - .forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(",")); + .stream() + .filter(r -> r.isSuccessful()) + .forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(",")); logger.debug(() -> new ParameterizedMessage("Deleted PIT with IDs {}", successfulPitsStringBuilder.toString())); } diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java index e8842cc34cc64..407345952f611 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java @@ -21,4 +21,4 @@ public class DeletePitAction extends ActionType { private DeletePitAction() { super(NAME, DeletePitResponse::new); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java b/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java index d230b81487565..943199812771a 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java @@ -58,9 +58,9 @@ public void writeTo(StreamOutput out) throws IOException { } static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "delete_pit_info", - true, - args -> new DeletePitInfo((boolean) args[0], (String) args[1]) + "delete_pit_info", + true, + args -> new DeletePitInfo((boolean) args[0], (String) args[1]) ); static { diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java index f1ca7963b422c..945fcfd17eb6c 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java @@ -111,7 +111,7 @@ public void fromXContent(XContentParser parser) throws IOException { } } else { throw new IllegalArgumentException( - "Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] " + "Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] " ); } } diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitResponse.java b/server/src/main/java/org/opensearch/action/search/DeletePitResponse.java index 181a0f046e360..60137496871b1 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitResponse.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitResponse.java @@ -83,13 +83,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par } private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - "delete_pit_response", - true, - (Object[] parsedObjects) -> { - @SuppressWarnings("unchecked") - List deletePitInfoList = (List) parsedObjects[0]; - return new DeletePitResponse(deletePitInfoList); - } + "delete_pit_response", + true, + (Object[] parsedObjects) -> { + @SuppressWarnings("unchecked") + List deletePitInfoList = (List) parsedObjects[0]; + return new DeletePitResponse(deletePitInfoList); + } ); static { PARSER.declareObjectArray(constructorArg(), DeletePitInfo.PARSER, new ParseField("pits")); diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java index e3f80a43fdfc3..6fe5a44aa12e1 100644 --- a/server/src/main/java/org/opensearch/action/search/PitService.java +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -49,24 +49,24 @@ public PitService(ClusterService clusterService, SearchTransportService searchTr * Delete list of pit contexts. Returns the details of success of operation per PIT ID. */ public void deletePitContexts( - Map> nodeToContextsMap, - ActionListener listener + Map> nodeToContextsMap, + ActionListener listener ) { final Set clusters = nodeToContextsMap.values() - .stream() - .flatMap(Collection::stream) - .filter(ctx -> Strings.isEmpty(ctx.getSearchContextIdForNode().getClusterAlias()) == false) - .map(c -> c.getSearchContextIdForNode().getClusterAlias()) - .collect(Collectors.toSet()); + .stream() + .flatMap(Collection::stream) + .filter(ctx -> Strings.isEmpty(ctx.getSearchContextIdForNode().getClusterAlias()) == false) + .map(c -> c.getSearchContextIdForNode().getClusterAlias()) + .collect(Collectors.toSet()); StepListener> lookupListener = SearchUtils.getConnectionLookupListener( - searchTransportService.getRemoteClusterService(), - clusterService.state(), - clusters + searchTransportService.getRemoteClusterService(), + clusterService.state(), + clusters ); lookupListener.whenComplete(nodeLookup -> { final GroupedActionListener groupedListener = getDeletePitGroupedListener( - listener, - nodeToContextsMap.size() + listener, + nodeToContextsMap.size() ); for (Map.Entry> entry : nodeToContextsMap.entrySet()) { @@ -74,7 +74,7 @@ public void deletePitContexts( final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode()); if (node == null) { logger.error( - () -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode()) + () -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode()) ); List deletePitInfos = new ArrayList<>(); for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) { diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index d7c657c33bcd3..23515dd28b329 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -205,26 +205,26 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac } public void sendFreePITContexts( - Transport.Connection connection, - List contextIds, - ActionListener listener + Transport.Connection connection, + List contextIds, + ActionListener listener ) { transportService.sendRequest( - connection, - FREE_PIT_CONTEXT_ACTION_NAME, - new PitFreeContextsRequest(contextIds), - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, DeletePitResponse::new) + connection, + FREE_PIT_CONTEXT_ACTION_NAME, + new PitFreeContextsRequest(contextIds), + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, DeletePitResponse::new) ); } public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { transportService.sendRequest( - connection, - FREE_ALL_PIT_CONTEXTS_ACTION_NAME, - TransportRequest.Empty.INSTANCE, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, DeletePitResponse::new) + connection, + FREE_ALL_PIT_CONTEXTS_ACTION_NAME, + TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, DeletePitResponse::new) ); } @@ -521,18 +521,18 @@ public static void registerRequestHandler(TransportService transportService, Sea TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler( - FREE_PIT_CONTEXT_ACTION_NAME, - ThreadPool.Names.SAME, - PitFreeContextsRequest::new, - (request, channel, task) -> { channel.sendResponse(searchService.freeReaderContextsIfFound(request.getContextIds())); } + FREE_PIT_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + PitFreeContextsRequest::new, + (request, channel, task) -> { channel.sendResponse(searchService.freeReaderContextsIfFound(request.getContextIds())); } ); TransportActionProxy.registerProxyAction(transportService, FREE_PIT_CONTEXT_ACTION_NAME, DeletePitResponse::new); transportService.registerRequestHandler( - FREE_ALL_PIT_CONTEXTS_ACTION_NAME, - ThreadPool.Names.SAME, - TransportRequest.Empty::new, - (request, channel, task) -> { channel.sendResponse(searchService.freeAllPitContexts()); } + FREE_ALL_PIT_CONTEXTS_ACTION_NAME, + ThreadPool.Names.SAME, + TransportRequest.Empty::new, + (request, channel, task) -> { channel.sendResponse(searchService.freeAllPitContexts()); } ); TransportActionProxy.registerProxyAction(transportService, FREE_ALL_PIT_CONTEXTS_ACTION_NAME, DeletePitResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index b1e24227bfa31..d67979d1c87c5 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -36,13 +36,13 @@ public class TransportDeletePitAction extends HandledTransportAction known } private MockTransportService startTransport( - final String id, - final List knownNodes, - final Version version, - final Settings settings + final String id, + final List knownNodes, + final Version version, + final Settings settings ) { return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings); } @@ -97,41 +97,41 @@ public void setupData() { node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); pitId = getPitId(); namedWriteableRegistry = new NamedWriteableRegistry( - Arrays.asList( - new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), - new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), - new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) - ) + Arrays.asList( + new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) + ) ); nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); transportSearchAction = mock(TransportSearchAction.class); task = new Task( - randomLong(), - "transport", - SearchAction.NAME, - "description", - new TaskId(randomLong() + ":" + randomLong()), - Collections.emptyMap() + randomLong(), + "transport", + SearchAction.NAME, + "description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() ); InternalSearchResponse response = new InternalSearchResponse( - new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), - InternalAggregations.EMPTY, - null, - null, - false, - null, - 1 + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), + InternalAggregations.EMPTY, + null, + null, + false, + null, + 1 ); searchResponse = new SearchResponse( - response, - null, - 3, - 3, - 0, - 100, - ShardSearchFailure.EMPTY_ARRAY, - SearchResponse.Clusters.EMPTY, - pitId + response, + null, + 3, + 3, + 0, + 100, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + pitId ); createPitListener = new ActionListener() { @Override @@ -165,29 +165,29 @@ public void testUpdatePitAfterCreatePitSuccess() throws InterruptedException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override public void updatePitContext( - Transport.Connection connection, - UpdatePitContextRequest request, - ActionListener listener + Transport.Connection connection, + UpdatePitContextRequest request, + ActionListener listener ) { updateNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); @@ -199,9 +199,9 @@ public void updatePitContext( */ @Override public void sendFreePITContexts( - Transport.Connection connection, - List contextIds, - ActionListener listener + Transport.Connection connection, + List contextIds, + ActionListener listener ) { deleteNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); @@ -221,14 +221,14 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( - request, - searchTransportService, - clusterServiceMock, - transportSearchAction, - namedWriteableRegistry, - task, - createPitListener, - pitService + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener, + pitService ); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -261,29 +261,29 @@ public void testUpdatePitAfterCreatePitFailure() throws InterruptedException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override public void updatePitContext( - Transport.Connection connection, - UpdatePitContextRequest request, - ActionListener listener + Transport.Connection connection, + UpdatePitContextRequest request, + ActionListener listener ) { updateNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); @@ -297,9 +297,9 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod @Override public void sendFreePITContexts( - Transport.Connection connection, - List contextIds, - ActionListener listener + Transport.Connection connection, + List contextIds, + ActionListener listener ) { deleteNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); @@ -313,14 +313,14 @@ public void sendFreePITContexts( request.setIndices(new String[] { "index" }); PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( - request, - searchTransportService, - clusterServiceMock, - transportSearchAction, - namedWriteableRegistry, - task, - createPitListener, - pitService + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener, + pitService ); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -357,20 +357,20 @@ public void testUpdatePitFailureForNodeDrop() throws InterruptedException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); @@ -378,9 +378,9 @@ public void testUpdatePitFailureForNodeDrop() throws InterruptedException { SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override public void updatePitContext( - Transport.Connection connection, - UpdatePitContextRequest request, - ActionListener listener + Transport.Connection connection, + UpdatePitContextRequest request, + ActionListener listener ) { updateNodesInvoked.add(connection.getNode()); @@ -395,9 +395,9 @@ public void updatePitContext( @Override public void sendFreePITContexts( - Transport.Connection connection, - List contextIds, - ActionListener listener + Transport.Connection connection, + List contextIds, + ActionListener listener ) { deleteNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); @@ -414,14 +414,14 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod request.setIndices(new String[] { "index" }); PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( - request, - searchTransportService, - clusterServiceMock, - transportSearchAction, - namedWriteableRegistry, - task, - createPitListener, - pitService + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener, + pitService ); CountDownLatch latch = new CountDownLatch(1); @@ -456,29 +456,29 @@ public void testUpdatePitFailureWhereAllNodesDown() throws InterruptedException List deleteNodesInvoked = new CopyOnWriteArrayList<>(); List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override public void updatePitContext( - Transport.Connection connection, - UpdatePitContextRequest request, - ActionListener listener + Transport.Connection connection, + UpdatePitContextRequest request, + ActionListener listener ) { updateNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); @@ -487,9 +487,9 @@ public void updatePitContext( @Override public void sendFreePITContexts( - Transport.Connection connection, - List contextIds, - ActionListener listener + Transport.Connection connection, + List contextIds, + ActionListener listener ) { deleteNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onResponse(new DeletePitResponse(new ArrayList<>()))); @@ -505,14 +505,14 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod request.setIndices(new String[] { "index" }); PitService pitService = new PitService(clusterServiceMock, searchTransportService); CreatePitController controller = new CreatePitController( - request, - searchTransportService, - clusterServiceMock, - transportSearchAction, - namedWriteableRegistry, - task, - createPitListener, - pitService + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener, + pitService ); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index e87f9d7e8ab50..e2db4fdbc97ef 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -78,10 +78,10 @@ private MockTransportService startTransport(String id, List known } private MockTransportService startTransport( - final String id, - final List knownNodes, - final Version version, - final Settings settings + final String id, + final List knownNodes, + final Version version, + final Settings settings ) { return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings); } @@ -93,30 +93,30 @@ public void setupData() { node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); pitId = getPitId(); namedWriteableRegistry = new NamedWriteableRegistry( - Arrays.asList( - new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), - new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), - new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) - ) + Arrays.asList( + new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) + ) ); nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); transportSearchAction = mock(TransportSearchAction.class); task = new Task( - randomLong(), - "transport", - SearchAction.NAME, - "description", - new TaskId(randomLong() + ":" + randomLong()), - Collections.emptyMap() + randomLong(), + "transport", + SearchAction.NAME, + "description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() ); InternalSearchResponse response = new InternalSearchResponse( - new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), - InternalAggregations.EMPTY, - null, - null, - false, - null, - 1 + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), + InternalAggregations.EMPTY, + null, + null, + false, + null, + 1 ); clusterServiceMock = mock(ClusterService.class); @@ -140,20 +140,20 @@ public void testDeletePitSuccess() throws InterruptedException, ExecutionExcepti when(actionFilters.filters()).thenReturn(new ActionFilter[0]); List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); @@ -161,9 +161,9 @@ public void testDeletePitSuccess() throws InterruptedException, ExecutionExcepti @Override public void sendFreePITContexts( - Transport.Connection connection, - List contextIds, - ActionListener listener + Transport.Connection connection, + List contextIds, + ActionListener listener ) { deleteNodesInvoked.add(connection.getNode()); DeletePitInfo deletePitInfo = new DeletePitInfo(true, "pitId"); @@ -180,13 +180,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, - pitService + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); PlainActionFuture future = newFuture(); @@ -206,20 +206,20 @@ public void testDeleteAllPITSuccess() throws InterruptedException, ExecutionExce when(actionFilters.filters()).thenReturn(new ActionFilter[0]); List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); @@ -240,13 +240,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, - pitService + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); PlainActionFuture future = newFuture(); @@ -266,20 +266,20 @@ public void testDeletePitWhenNodeIsDown() throws InterruptedException, Execution when(actionFilters.filters()).thenReturn(new ActionFilter[0]); List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); @@ -287,9 +287,9 @@ public void testDeletePitWhenNodeIsDown() throws InterruptedException, Execution @Override public void sendFreePITContexts( - Transport.Connection connection, - List contextIds, - ActionListener listener + Transport.Connection connection, + List contextIds, + ActionListener listener ) { deleteNodesInvoked.add(connection.getNode()); @@ -309,13 +309,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, - pitService + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); PlainActionFuture future = newFuture(); @@ -333,28 +333,28 @@ public void testDeletePitWhenAllNodesAreDown() { when(actionFilters.filters()).thenReturn(new ActionFilter[0]); List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { @Override public void sendFreePITContexts( - Transport.Connection connection, - List contextIds, - ActionListener listener + Transport.Connection connection, + List contextIds, + ActionListener listener ) { deleteNodesInvoked.add(connection.getNode()); Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); @@ -368,13 +368,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, - pitService + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); PlainActionFuture future = newFuture(); @@ -393,20 +393,20 @@ public void testDeletePitFailure() { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); @@ -414,9 +414,9 @@ public void testDeletePitFailure() { @Override public void sendFreePITContexts( - Transport.Connection connection, - List contextId, - ActionListener listener + Transport.Connection connection, + List contextId, + ActionListener listener ) { deleteNodesInvoked.add(connection.getNode()); @@ -436,13 +436,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, - pitService + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); PlainActionFuture future = newFuture(); @@ -461,20 +461,20 @@ public void testDeleteAllPitWhenNodeIsDown() { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); @@ -498,13 +498,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, - pitService + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); PlainActionFuture future = newFuture(); @@ -523,20 +523,20 @@ public void testDeleteAllPitWhenAllNodesAreDown() { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); @@ -556,13 +556,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, - pitService + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); PlainActionFuture future = newFuture(); @@ -581,20 +581,20 @@ public void testDeleteAllPitFailure() { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); - MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) ) { knownNodes.add(cluster1Transport.getLocalDiscoNode()); knownNodes.add(cluster2Transport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - Version.CURRENT, - threadPool, - null - ) + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { transportService.start(); transportService.acceptIncomingRequests(); @@ -618,13 +618,13 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; PitService pitService = new PitService(clusterServiceMock, searchTransportService); TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService, - pitService + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService, + pitService ); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); PlainActionFuture future = newFuture(); diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index 7e493c66b8ee0..79e4a86b3dba9 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -252,10 +252,10 @@ public void testDeleteWhileSearch() throws Exception { latch.await(); for (int j = 0; j < 30; j++) { client().prepareSearch() - .setSize(2) - .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) - .execute() - .get(); + .setSize(2) + .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) + .execute() + .get(); } } catch (Exception e) { /** diff --git a/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java b/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java index 19ecc7fbca293..5944e2a35b14a 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitResponseTests.java @@ -50,8 +50,8 @@ public void testDeletePitResponseToAndFromXContent() throws IOException { parsedResponse = DeletePitResponse.fromXContent(parser); } assertEquals( - originalResponse.getDeletePitResults().get(0).isSuccessful(), - parsedResponse.getDeletePitResults().get(0).isSuccessful() + originalResponse.getDeletePitResults().get(0).isSuccessful(), + parsedResponse.getDeletePitResults().get(0).isSuccessful() ); assertEquals(originalResponse.getDeletePitResults().get(0).getPitId(), parsedResponse.getDeletePitResults().get(0).getPitId()); BytesReference parsedBytes = XContentHelper.toXContent(parsedResponse, xContentType, randomBoolean()); diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index c81e44440ac6b..ecc28470b0eb2 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1436,8 +1436,8 @@ public void testDeletePitReaderContext() { List contextIds = new ArrayList<>(); ShardSearchContextId shardSearchContextId = future.actionGet(); PitSearchContextIdForNode pitSearchContextIdForNode = new PitSearchContextIdForNode( - "1", - new SearchContextIdForNode(null, "node1", shardSearchContextId) + "1", + new SearchContextIdForNode(null, "node1", shardSearchContextId) ); contextIds.add(pitSearchContextIdForNode);