Skip to content

Commit

Permalink
Addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Jul 19, 2022
1 parent 5d8ec6b commit 987cb57
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -50,38 +51,35 @@ public class CreatePitController {
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private final Task task;
private final ActionListener<CreatePitResponse> listener;
private final CreatePitRequest request;
private static final Logger logger = LogManager.getLogger(CreatePitController.class);
public static final Setting<TimeValue> PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting(
"pit.init.keep_alive",
"point_in_time.init.keep_alive",
timeValueSeconds(30),
Setting.Property.NodeScope
);

@Inject
public CreatePitController(
CreatePitRequest request,
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry,
Task task,
ActionListener<CreatePitResponse> listener
NamedWriteableRegistry namedWriteableRegistry
) {
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
this.task = task;
this.listener = listener;
this.request = request;
}

/**
* This method creates PIT reader context
*/
public void executeCreatePit(StepListener<SearchResponse> createPitListener, ActionListener<CreatePitResponse> updatePitIdListener) {
public void executeCreatePit(
CreatePitRequest request,
Task task,
StepListener<SearchResponse> createPitListener,
ActionListener<CreatePitResponse> updatePitIdListener
) {
SearchRequest searchRequest = new SearchRequest(request.getIndices());
searchRequest.preference(request.getPreference());
searchRequest.routing(request.getRouting());
Expand Down Expand Up @@ -229,7 +227,11 @@ private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLoo
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.collect(Collectors.toSet());
return SearchUtils.getConnectionLookupListener(searchTransportService.getRemoteClusterService(), state, clusters);
return (StepListener<BiFunction<String, String, DiscoveryNode>>) SearchUtils.getConnectionLookupListener(
searchTransportService.getRemoteClusterService(),
state,
clusters
);
}

private ActionListener<UpdatePitContextResponse> getGroupedListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.action.search;

import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -26,7 +27,7 @@ public SearchUtils() {}
/**
* Get connection lookup listener for list of clusters passed
*/
public static StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(
public static ActionListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(
RemoteClusterService remoteClusterService,
ClusterState state,
Set<String> clusters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class TransportCreatePitAction extends HandledTransportAction<CreatePitRe
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private final CreatePitController createPitController;

@Inject
public TransportCreatePitAction(
Expand All @@ -48,27 +49,20 @@ public TransportCreatePitAction(
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry
NamedWriteableRegistry namedWriteableRegistry,
CreatePitController createPitController
) {
super(CreatePitAction.NAME, transportService, actionFilters, in -> new CreatePitRequest(in));
this.transportService = transportService;
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
this.createPitController = createPitController;
}

@Override
protected void doExecute(Task task, CreatePitRequest request, ActionListener<CreatePitResponse> listener) {
CreatePitController controller = new CreatePitController(
request,
searchTransportService,
clusterService,
transportSearchAction,
namedWriteableRegistry,
task,
listener
);
final StepListener<SearchResponse> createPitListener = new StepListener<>();
final ActionListener<CreatePitResponse> updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> {
logger.error(
Expand All @@ -79,7 +73,7 @@ protected void doExecute(Task task, CreatePitRequest request, ActionListener<Cre
);
listener.onFailure(e);
});
controller.executeCreatePit(createPitListener, updatePitIdListener);
createPitController.executeCreatePit(request, task, createPitListener, updatePitIdListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
* This setting will help validate the max keep alive that can be set during creation or extension for a PIT reader context
*/
public static final Setting<TimeValue> MAX_PIT_KEEPALIVE_SETTING = Setting.positiveTimeSetting(
"pit.max_keep_alive",
"point_in_time.max_keep_alive",
timeValueHours(24),
Property.NodeScope,
Property.Dynamic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId,
setPitId(pitId);
setCreationTime(createTime);
return Releasables.releaseOnce(() -> {
getLastAccessTime().updateAndGet(curr -> Math.max(curr, nowInMillis()));
updateLastAccessTime();
getRefCounted().decRef();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ protected AbstractRefCounted getRefCounted() {
return refCounted;
}

protected AtomicLong getLastAccessTime() {
return lastAccessTime;
protected void updateLastAccessTime() {
this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis()));
}

protected long nowInMillis() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,10 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod
request.setIndices(new String[] { "index" });

CreatePitController controller = new CreatePitController(
request,
searchTransportService,
clusterServiceMock,
transportSearchAction,
namedWriteableRegistry,
task,
createPitListener
namedWriteableRegistry
);

ActionListener<CreatePitResponse> updatelistener = new LatchedActionListener<>(new ActionListener<CreatePitResponse>() {
Expand All @@ -226,7 +223,7 @@ public void onFailure(Exception e) {
}, latch);

StepListener<SearchResponse> createListener = new StepListener<>();
controller.executeCreatePit(createListener, updatelistener);
controller.executeCreatePit(request, task, createListener, updatelistener);
createListener.onResponse(searchResponse);
latch.await();
assertEquals(3, updateNodesInvoked.size());
Expand Down Expand Up @@ -281,13 +278,10 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });
CreatePitController controller = new CreatePitController(
request,
searchTransportService,
clusterServiceMock,
transportSearchAction,
namedWriteableRegistry,
task,
createPitListener
namedWriteableRegistry
);

ActionListener<CreatePitResponse> updatelistener = new LatchedActionListener<>(new ActionListener<CreatePitResponse>() {
Expand All @@ -304,7 +298,7 @@ public void onFailure(Exception e) {

StepListener<SearchResponse> createListener = new StepListener<>();

controller.executeCreatePit(createListener, updatelistener);
controller.executeCreatePit(request, task, createListener, updatelistener);
createListener.onFailure(new Exception("Exception occurred in phase 1"));
latch.await();
assertEquals(0, updateNodesInvoked.size());
Expand Down Expand Up @@ -364,13 +358,10 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });
CreatePitController controller = new CreatePitController(
request,
searchTransportService,
clusterServiceMock,
transportSearchAction,
namedWriteableRegistry,
task,
createPitListener
namedWriteableRegistry
);

CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -388,7 +379,7 @@ public void onFailure(Exception e) {
}, latch);

StepListener<SearchResponse> createListener = new StepListener<>();
controller.executeCreatePit(createListener, updatelistener);
controller.executeCreatePit(request, task, createListener, updatelistener);
createListener.onResponse(searchResponse);
latch.await();
assertEquals(3, updateNodesInvoked.size());
Expand Down Expand Up @@ -437,13 +428,10 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });
CreatePitController controller = new CreatePitController(
request,
searchTransportService,
clusterServiceMock,
transportSearchAction,
namedWriteableRegistry,
task,
createPitListener
namedWriteableRegistry
);

CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -461,7 +449,7 @@ public void onFailure(Exception e) {
}, latch);

StepListener<SearchResponse> createListener = new StepListener<>();
controller.executeCreatePit(createListener, updatelistener);
controller.executeCreatePit(request, task, createListener, updatelistener);
createListener.onResponse(searchResponse);
latch.await();
assertEquals(3, updateNodesInvoked.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ public void testPitInvalidDefaultKeepAlive() {
() -> client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("pit.max_keep_alive", "1m").put("search.default_keep_alive", "2m"))
.setPersistentSettings(Settings.builder().put("point_in_time.max_keep_alive", "1m").put("search.default_keep_alive", "2m"))
.get()
);
assertThat(exc.getMessage(), containsString("was (2m > 1m)"));
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "5m").put("pit.max_keep_alive", "5m"))
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "5m").put("point_in_time.max_keep_alive", "5m"))
.get()
);
assertAcked(
Expand All @@ -163,7 +163,7 @@ public void testPitInvalidDefaultKeepAlive() {
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("pit.max_keep_alive", "2m"))
.setPersistentSettings(Settings.builder().put("point_in_time.max_keep_alive", "2m"))
.get()
);
exc = expectThrows(
Expand All @@ -187,7 +187,7 @@ public void testPitInvalidDefaultKeepAlive() {
() -> client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("pit.max_keep_alive", "30s"))
.setPersistentSettings(Settings.builder().put("point_in_time.max_keep_alive", "30s"))
.get()
);
assertThat(exc.getMessage(), containsString("was (1m > 30s)"));
Expand Down

0 comments on commit 987cb57

Please sign in to comment.