Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed May 3, 2022
1 parent 849d1d3 commit 51ce82f
Show file tree
Hide file tree
Showing 6 changed files with 706 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@
public class SearchTransportService {

public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
public static final String FREE_CONTEXT_PIT_ACTION_NAME = "indices:data/read/search[free_context/pit]";
public static final String FREE_PIT_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context/pit]";
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
public static final String CLEAR_SCROLL_CONTEXTS_ACTION_NAME = "indices:data/read/search[clear_scroll_contexts]";
public static final String DELETE_ALL_PIT_CONTEXTS_ACTION_NAME = "indices:data/read/search[delete_pit_contexts]";
public static final String FREE_ALL_PIT_CONTEXTS_ACTION_NAME = "indices:data/read/search[delete_pit_contexts]";
public static final String DFS_ACTION_NAME = "indices:data/read/search[phase/dfs]";
public static final String QUERY_ACTION_NAME = "indices:data/read/search[phase/query]";
public static final String QUERY_ID_ACTION_NAME = "indices:data/read/search[phase/query/id]";
Expand Down Expand Up @@ -144,20 +144,6 @@ public void sendFreeContext(
);
}

public void sendPitFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
transportService.sendRequest(
connection,
FREE_CONTEXT_PIT_ACTION_NAME,
new ScrollFreeContextRequest(contextId),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)
);
}

public void updatePitContext(
Transport.Connection connection,
UpdatePITContextRequest request,
Expand Down Expand Up @@ -214,13 +200,27 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac
);
}

public void sendDeleteAllPitContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
public void sendFreePITContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
transportService.sendRequest(
connection,
FREE_PIT_CONTEXT_ACTION_NAME,
new PITFreeContextRequest(contextId),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)
);
}

public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) {
transportService.sendRequest(
connection,
DELETE_ALL_PIT_CONTEXTS_ACTION_NAME,
FREE_ALL_PIT_CONTEXTS_ACTION_NAME,
TransportRequest.Empty.INSTANCE,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, (in) -> TransportResponse.Empty.INSTANCE)
new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)
);
}

Expand Down Expand Up @@ -389,6 +389,30 @@ public ShardSearchContextId id() {

}

static class PITFreeContextRequest extends TransportRequest {
private ShardSearchContextId contextId;

PITFreeContextRequest(ShardSearchContextId contextId) {
this.contextId = Objects.requireNonNull(contextId);
}

PITFreeContextRequest(StreamInput in) throws IOException {
super(in);
contextId = new ShardSearchContextId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
contextId.writeTo(out);
}

public ShardSearchContextId id() {
return this.contextId;
}

}

static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
private OriginalIndices originalIndices;

Expand Down Expand Up @@ -465,15 +489,30 @@ public static void registerRequestHandler(TransportService transportService, Sea
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);

transportService.registerRequestHandler(
FREE_CONTEXT_PIT_ACTION_NAME,
FREE_PIT_CONTEXT_ACTION_NAME,
ThreadPool.Names.SAME,
ScrollFreeContextRequest::new,
PITFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeReaderContextIfFound(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
);
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_PIT_ACTION_NAME, SearchFreeContextResponse::new);
TransportActionProxy.registerProxyAction(transportService, FREE_PIT_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);

transportService.registerRequestHandler(
FREE_ALL_PIT_CONTEXTS_ACTION_NAME,
ThreadPool.Names.SAME,
TransportRequest.Empty::new,
(request, channel, task) -> {
boolean freed = searchService.freeAllPitContexts();
channel.sendResponse(new SearchFreeContextResponse(freed));
}
);
TransportActionProxy.registerProxyAction(
transportService,
FREE_ALL_PIT_CONTEXTS_ACTION_NAME,
(in) -> TransportResponse.Empty.INSTANCE
);

transportService.registerRequestHandler(
FREE_CONTEXT_ACTION_NAME,
Expand Down Expand Up @@ -658,21 +697,6 @@ public static void registerRequestHandler(TransportService transportService, Sea
);
TransportActionProxy.registerProxyAction(transportService, UPDATE_READER_CONTEXT_ACTION_NAME, UpdatePitContextResponse::new);

transportService.registerRequestHandler(
DELETE_ALL_PIT_CONTEXTS_ACTION_NAME,
ThreadPool.Names.SAME,
TransportRequest.Empty::new,
(request, channel, task) -> {
searchService.freeAllPitContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
);
TransportActionProxy.registerProxyAction(
transportService,
DELETE_ALL_PIT_CONTEXTS_ACTION_NAME,
(in) -> TransportResponse.Empty.INSTANCE
);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand All @@ -22,7 +23,6 @@
import org.opensearch.common.Strings;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.search.SearchService;
import org.opensearch.tasks.Task;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
Expand All @@ -38,7 +38,6 @@
* Transport action for deleting pit reader context - supports deleting list and all pit contexts
*/
public class TransportDeletePITAction extends HandledTransportAction<DeletePITRequest, DeletePITResponse> {
private SearchService searchService;
private final NamedWriteableRegistry namedWriteableRegistry;
private TransportSearchAction transportSearchAction;
private final ClusterService clusterService;
Expand All @@ -47,7 +46,6 @@ public class TransportDeletePITAction extends HandledTransportAction<DeletePITRe

@Inject
public TransportDeletePITAction(
SearchService searchService,
TransportService transportService,
ActionFilters actionFilters,
NamedWriteableRegistry namedWriteableRegistry,
Expand All @@ -56,7 +54,6 @@ public TransportDeletePITAction(
SearchTransportService searchTransportService
) {
super(DeletePITAction.NAME, transportService, actionFilters, DeletePITRequest::new);
this.searchService = searchService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.transportSearchAction = transportSearchAction;
this.clusterService = clusterService;
Expand Down Expand Up @@ -95,11 +92,33 @@ protected void doExecute(Task task, DeletePITRequest request, ActionListener<Del
*/
void deleteAllPits(ActionListener<DeletePITResponse> listener) {
int size = clusterService.state().getNodes().getSize();
ActionListener groupedActionListener = getGroupedListener(listener, size);
ActionListener groupedActionListener = new GroupedActionListener<SearchTransportService.SearchFreeContextResponse>(
new ActionListener<>() {
@Override
public void onResponse(final Collection<SearchTransportService.SearchFreeContextResponse> responses) {
final SetOnce<Boolean> succeeded = new SetOnce<>();
for (SearchTransportService.SearchFreeContextResponse response : responses) {
if (!response.isFreed()) {
succeeded.set(false);
break;
}
}
succeeded.trySet(true);
listener.onResponse(new DeletePITResponse(succeeded.get()));
}

@Override
public void onFailure(final Exception e) {
logger.debug("Delete all PITs failed ", e);
listener.onResponse(new DeletePITResponse(false));
}
},
size
);
for (final DiscoveryNode node : clusterService.state().getNodes()) {
try {
Transport.Connection connection = searchTransportService.getConnection(null, node);
searchTransportService.sendDeleteAllPitContexts(connection, groupedActionListener);
searchTransportService.sendFreeAllPitContexts(connection, groupedActionListener);
} catch (Exception e) {
groupedActionListener.onFailure(e);
}
Expand All @@ -123,11 +142,11 @@ void deletePits(List<SearchContextIdForNode> contexts, ActionListener<Integer> l
for (SearchContextIdForNode contextId : contexts) {
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node == null) {
groupedListener.onFailure(new OpenSearchException("node not connected"));
groupedListener.onFailure(new OpenSearchException("node not found"));
} else {
try {
final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node);
searchTransportService.sendPitFreeContext(
searchTransportService.sendFreePITContext(
connection,
contextId.getSearchContextId(),
ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false))
Expand All @@ -154,19 +173,4 @@ private StepListener<BiFunction<String, String, DiscoveryNode>> getLookupListene
}
return lookupListener;
}

private ActionListener<DeletePITResponse> getGroupedListener(ActionListener<DeletePITResponse> deletePitListener, int size) {
return new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(final Collection<DeletePITResponse> responses) {
deletePitListener.onResponse(new DeletePITResponse(true));
}

@Override
public void onFailure(final Exception e) {
logger.debug("Delete all PITs failed ", e);
deletePitListener.onResponse(new DeletePITResponse(false));
}
}, size);
}
}
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.SetOnce;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
Expand Down Expand Up @@ -1034,16 +1035,21 @@ public boolean freeReaderContextIfFound(ShardSearchContextId contextId) {
return true;
}


/**
* Free all active pit contexts
*/
public void freeAllPitContexts() {
public boolean freeAllPitContexts() {
final SetOnce<Boolean> isFreed = new SetOnce<>();
for (ReaderContext readerContext : activeReaders.values()) {
if (readerContext instanceof PitReaderContext) {
freeReaderContextIfFound(readerContext.id());
final boolean succeeded = freeReaderContextIfFound(readerContext.id());
if (!succeeded) {
isFreed.trySet(false);
}
}
}
isFreed.trySet(true);
return isFreed.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setupData() {
node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT);
setPitId();
pitId = getPitId();
namedWriteableRegistry = new NamedWriteableRegistry(
Arrays.asList(
new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new),
Expand Down Expand Up @@ -431,7 +431,7 @@ public void onFailure(Exception e) {

}

QueryBuilder randomQueryBuilder() {
public static QueryBuilder randomQueryBuilder() {
if (randomBoolean()) {
return new TermQueryBuilder(randomAlphaOfLength(10), randomAlphaOfLength(10));
} else if (randomBoolean()) {
Expand All @@ -441,21 +441,21 @@ QueryBuilder randomQueryBuilder() {
}
}

private void setPitId() {
public static String getPitId() {
AtomicArray<SearchPhaseResult> array = new AtomicArray<>(3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(
new ShardSearchContextId("a", 1),
node1
null
);
testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(
new ShardSearchContextId("b", 12),
node2
null
);
testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(
new ShardSearchContextId("c", 42),
node3
null
);
testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null));
array.setOnce(0, testSearchPhaseResult1);
Expand All @@ -477,7 +477,7 @@ private void setPitId() {
aliasFilters.put(result.getSearchShardTarget().getShardId().getIndex().getUUID(), aliasFilter);
}
}
pitId = SearchContextId.encode(array.asList(), aliasFilters, version);
return SearchContextId.encode(array.asList(), aliasFilters, version);
}

}
Loading

0 comments on commit 51ce82f

Please sign in to comment.