Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Jun 29, 2022
1 parent d4ecca2 commit beacfc1
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testCreatePit() throws IOException {
pitIds.add(pitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSucceeded());
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
}

Expand All @@ -79,7 +79,7 @@ public void testDeleteAllPits() throws IOException {
highLevelClient()::deleteAllPitsAsync
);
for (DeletePitInfo deletePitInfo : deletePitResponse.getDeletePitResults()) {
assertTrue(deletePitInfo.isSucceeded());
assertTrue(deletePitInfo.isSuccessful());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ public void testSearchWithPit() throws Exception {
highLevelClient()::deletePit,
highLevelClient()::deletePitAsync
);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSucceeded());
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class CreatePitController {
private final Task task;
private final ActionListener<CreatePitResponse> listener;
private final CreatePitRequest request;
private final PitService pitService;
private static final Logger logger = LogManager.getLogger(CreatePitController.class);
public static final Setting<TimeValue> PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting(
"pit.init.keep_alive",
Expand All @@ -70,7 +71,8 @@ public CreatePitController(
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry,
Task task,
ActionListener<CreatePitResponse> listener
ActionListener<CreatePitResponse> listener,
PitService pitService
) {
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
Expand All @@ -79,6 +81,7 @@ public CreatePitController(
this.task = task;
this.listener = listener;
this.request = request;
this.pitService = pitService;
}

/**
Expand Down Expand Up @@ -263,14 +266,19 @@ private void cleanupContexts(Collection<SearchContextIdForNode> contexts, String
@Override
public void onResponse(DeletePitResponse response) {
// this is invoke and forget call
final StringBuilder failedPitsStringBuilder = new StringBuilder();
response.getDeletePitResults()
.stream()
.filter(r -> !r.isSuccessful())
.forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(","));
logger.warn(() -> new ParameterizedMessage("Failed to delete PIT IDs {}", failedPitsStringBuilder.toString()));
if (!logger.isDebugEnabled()) return;
for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) {
if (!deletePitInfo.isSucceeded()) {
logger.debug(() -> new ParameterizedMessage("Failed to delete PIT ID {}", deletePitInfo.getPitId()));
} else {
logger.debug(() -> new ParameterizedMessage("Deleted PIT with ID {}", deletePitInfo.getPitId()));
}
}
final StringBuilder successfulPitsStringBuilder = new StringBuilder();
response.getDeletePitResults()
.stream()
.filter(r -> r.isSuccessful())
.forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(","));
logger.debug(() -> new ParameterizedMessage("Deleted PIT with IDs {}", successfulPitsStringBuilder.toString()));
}

@Override
Expand All @@ -284,6 +292,6 @@ public void onFailure(Exception e) {
contextIdsForNode.add(new PitSearchContextIdForNode(pitId, context));
nodeToContextsMap.put(context.getNode(), contextIdsForNode);
}
SearchUtils.deletePitContexts(nodeToContextsMap, deleteListener, clusterService.state(), searchTransportService);
pitService.deletePitContexts(nodeToContextsMap, deleteListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ public class DeletePitInfo extends TransportResponse implements Writeable, ToXCo
/**
* This will be true if PIT reader contexts are deleted ond also if contexts are not found.
*/
private final boolean succeeded;
private final boolean successful;

private final String pitId;

public DeletePitInfo(boolean succeeded, String pitId) {
this.succeeded = succeeded;
public DeletePitInfo(boolean successful, String pitId) {
this.successful = successful;
this.pitId = pitId;
}

public DeletePitInfo(StreamInput in) throws IOException {
succeeded = in.readBoolean();
successful = in.readBoolean();
pitId = in.readString();

}

public boolean isSucceeded() {
return succeeded;
public boolean isSuccessful() {
return successful;
}

public String getPitId() {
Expand All @@ -53,7 +53,7 @@ public String getPitId() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(succeeded);
out.writeBoolean(successful);
out.writeString(pitId);
}

Expand All @@ -64,17 +64,17 @@ public void writeTo(StreamOutput out) throws IOException {
);

static {
PARSER.declareBoolean(constructorArg(), new ParseField("succeeded"));
PARSER.declareBoolean(constructorArg(), new ParseField("successful"));
PARSER.declareString(constructorArg(), new ParseField("pitId"));
}

private static final ParseField SUCCEEDED = new ParseField("succeeded");
private static final ParseField SUCCESSFUL = new ParseField("successful");
private static final ParseField PIT_ID = new ParseField("pitId");

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SUCCEEDED.getPreferredName(), succeeded);
builder.field(SUCCESSFUL.getPreferredName(), successful);
builder.field(PIT_ID.getPreferredName(), pitId);
builder.endObject();
return builder;
Expand Down
132 changes: 132 additions & 0 deletions server/src/main/java/org/opensearch/action/search/PitService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.inject.Inject;
import org.opensearch.transport.Transport;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Service class for PIT reusable functions
*/
public class PitService {

private static final Logger logger = LogManager.getLogger(PitService.class);

private final ClusterService clusterService;
private final SearchTransportService searchTransportService;

@Inject
public PitService(ClusterService clusterService, SearchTransportService searchTransportService) {
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
}

/**
* Delete list of pit contexts. Returns the details of success of operation per PIT ID.
*/
public void deletePitContexts(
Map<String, List<PitSearchContextIdForNode>> nodeToContextsMap,
ActionListener<DeletePitResponse> listener
) {
final Set<String> clusters = nodeToContextsMap.values()
.stream()
.flatMap(Collection::stream)
.filter(ctx -> Strings.isEmpty(ctx.getSearchContextIdForNode().getClusterAlias()) == false)
.map(c -> c.getSearchContextIdForNode().getClusterAlias())
.collect(Collectors.toSet());
StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = SearchUtils.getConnectionLookupListener(
searchTransportService.getRemoteClusterService(),
clusterService.state(),
clusters
);
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<DeletePitResponse> groupedListener = getDeletePitGroupedListener(
listener,
nodeToContextsMap.size()
);

for (Map.Entry<String, List<PitSearchContextIdForNode>> entry : nodeToContextsMap.entrySet()) {
String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias();
final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode());
if (node == null) {
logger.error(
() -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode())
);
List<DeletePitInfo> deletePitInfos = new ArrayList<>();
for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) {
deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId()));
}
groupedListener.onResponse(new DeletePitResponse(deletePitInfos));
} else {
try {
final Transport.Connection connection = searchTransportService.getConnection(clusterAlias, node);
searchTransportService.sendFreePITContexts(connection, entry.getValue(), groupedListener);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Delete PITs failed on node [{}]", node.getName()), e);
List<DeletePitInfo> deletePitInfos = new ArrayList<>();
for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) {
deletePitInfos.add(new DeletePitInfo(false, pitSearchContextIdForNode.getPitId()));
}
groupedListener.onResponse(new DeletePitResponse(deletePitInfos));
}
}
}
}, listener::onFailure);
}

public GroupedActionListener<DeletePitResponse> getDeletePitGroupedListener(ActionListener<DeletePitResponse> listener, int size) {
return new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(final Collection<DeletePitResponse> responses) {
Map<String, Boolean> pitIdToSucceededMap = new HashMap<>();
for (DeletePitResponse response : responses) {
for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) {
if (!pitIdToSucceededMap.containsKey(deletePitInfo.getPitId())) {
pitIdToSucceededMap.put(deletePitInfo.getPitId(), deletePitInfo.isSuccessful());
}
if (!deletePitInfo.isSuccessful()) {
logger.debug(() -> new ParameterizedMessage("Deleting PIT with ID {} failed ", deletePitInfo.getPitId()));
pitIdToSucceededMap.put(deletePitInfo.getPitId(), deletePitInfo.isSuccessful());
}
}
}
List<DeletePitInfo> deletePitResults = new ArrayList<>();
for (Map.Entry<String, Boolean> entry : pitIdToSucceededMap.entrySet()) {
deletePitResults.add(new DeletePitInfo(entry.getValue(), entry.getKey()));
}
DeletePitResponse deletePitResponse = new DeletePitResponse(deletePitResults);
listener.onResponse(deletePitResponse);
}

@Override
public void onFailure(final Exception e) {
logger.error("Delete PITs failed", e);
listener.onFailure(e);
}
}, size);
}
}
Loading

0 comments on commit beacfc1

Please sign in to comment.