Skip to content

Commit

Permalink
Remove SLMGetExpiredSnapshotsAction extends ActionType (elastic#104115
Browse files Browse the repository at this point in the history
)

Another unnecessary `ActionType` subclass that can just be a plain
instance.
  • Loading branch information
DaveCTurner authored Jan 9, 2024
1 parent 089435c commit 7dae3a2
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(ExecuteSnapshotLifecycleAction.INSTANCE, TransportExecuteSnapshotLifecycleAction.class),
new ActionHandler<>(GetSnapshotLifecycleStatsAction.INSTANCE, TransportGetSnapshotLifecycleStatsAction.class),
new ActionHandler<>(ExecuteSnapshotRetentionAction.INSTANCE, TransportExecuteSnapshotRetentionAction.class),
new ActionHandler<>(SLMGetExpiredSnapshotsAction.INSTANCE, SLMGetExpiredSnapshotsAction.LocalAction.class),
new ActionHandler<>(TransportSLMGetExpiredSnapshotsAction.INSTANCE, TransportSLMGetExpiredSnapshotsAction.class),
new ActionHandler<>(StartSLMAction.INSTANCE, TransportStartSLMAction.class),
new ActionHandler<>(StopSLMAction.INSTANCE, TransportStopSLMAction.class),
new ActionHandler<>(GetSLMStatusAction.INSTANCE, TransportGetSLMStatusAction.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ void getSnapshotsEligibleForDeletion(
ActionListener<Map<String, List<Tuple<SnapshotId, String>>>> listener
) {
client.execute(
SLMGetExpiredSnapshotsAction.INSTANCE,
new SLMGetExpiredSnapshotsAction.Request(repositories, policies),
TransportSLMGetExpiredSnapshotsAction.INSTANCE,
new TransportSLMGetExpiredSnapshotsAction.Request(repositories, policies),
listener.delegateFailureAndWrap((l, m) -> l.onResponse(m.snapshotsToDelete()))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.GetSnapshotInfoContext;
Expand Down Expand Up @@ -55,98 +54,96 @@
* Computes the expired snapshots for SLM. Called by {@link SnapshotRetentionTask}, but made into a separate (local-only) transport action
* so that it can access the {@link RepositoriesService} directly.
*/
public class SLMGetExpiredSnapshotsAction extends ActionType<SLMGetExpiredSnapshotsAction.Response> {
public class TransportSLMGetExpiredSnapshotsAction extends TransportAction<
TransportSLMGetExpiredSnapshotsAction.Request,
TransportSLMGetExpiredSnapshotsAction.Response> {

public static final SLMGetExpiredSnapshotsAction INSTANCE = new SLMGetExpiredSnapshotsAction();
public static final ActionType<Response> INSTANCE = ActionType.localOnly("cluster:admin/slm/execute/get_expired_snapshots");

private static final Logger logger = LogManager.getLogger(SLMGetExpiredSnapshotsAction.class);
private static final Logger logger = LogManager.getLogger(TransportSLMGetExpiredSnapshotsAction.class);

private SLMGetExpiredSnapshotsAction() {
super("cluster:admin/slm/execute/get_expired_snapshots", Writeable.Reader.localOnly());
}
private final RepositoriesService repositoriesService;
private final Executor retentionExecutor;

public static class LocalAction extends TransportAction<Request, Response> {
private final RepositoriesService repositoriesService;
private final Executor retentionExecutor;
@Inject
public TransportSLMGetExpiredSnapshotsAction(
TransportService transportService,
RepositoriesService repositoriesService,
ActionFilters actionFilters
) {
super(INSTANCE.name(), actionFilters, transportService.getTaskManager());
this.repositoriesService = repositoriesService;
this.retentionExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
}

private static final Logger logger = SLMGetExpiredSnapshotsAction.logger;
private static class ResultsBuilder {
private final Map<String, List<Tuple<SnapshotId, String>>> resultsByRepository = ConcurrentCollections.newConcurrentMap();

@Inject
public LocalAction(TransportService transportService, RepositoriesService repositoriesService, ActionFilters actionFilters) {
super(INSTANCE.name(), actionFilters, transportService.getTaskManager());
this.repositoriesService = repositoriesService;
this.retentionExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
Response getResponse() {
// copyOf just so we aren't returning the CHM
return new Response(Map.copyOf(resultsByRepository));
}

private static class ResultsBuilder {
private final Map<String, List<Tuple<SnapshotId, String>>> resultsByRepository = ConcurrentCollections.newConcurrentMap();

Response getResponse() {
// copyOf just so we aren't returning the CHM
return new Response(Map.copyOf(resultsByRepository));
void addResult(String repository, List<Tuple<SnapshotId, String>> snapshotsToDelete) {
// snapshotsToDelete is immutable because it comes from a Stream#toList() so no further copying needed
if (snapshotsToDelete.isEmpty()) {
assert resultsByRepository.containsKey(repository) == false;
} else {
final var previousValue = resultsByRepository.put(repository, snapshotsToDelete);
assert previousValue == null : repository + ": " + previousValue + " vs " + snapshotsToDelete;
}
}
}

void addResult(String repository, List<Tuple<SnapshotId, String>> snapshotsToDelete) {
// snapshotsToDelete is immutable because it comes from a Stream#toList() so no further copying needed
if (snapshotsToDelete.isEmpty()) {
assert resultsByRepository.containsKey(repository) == false;
} else {
final var previousValue = resultsByRepository.put(repository, snapshotsToDelete);
assert previousValue == null : repository + ": " + previousValue + " vs " + snapshotsToDelete;
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final var resultsBuilder = new ResultsBuilder();
try (var refs = new RefCountingRunnable(() -> listener.onResponse(resultsBuilder.getResponse()))) {
for (final var repositoryName : request.repositories()) {

final Repository repository;
try {
repository = repositoriesService.repository(repositoryName);
} catch (RepositoryMissingException e) {
logger.debug("[{}]: repository not found", repositoryName);
continue;
}
}
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final var resultsBuilder = new ResultsBuilder();
try (var refs = new RefCountingRunnable(() -> listener.onResponse(resultsBuilder.getResponse()))) {
for (final var repositoryName : request.repositories()) {

final Repository repository;
try {
repository = repositoriesService.repository(repositoryName);
} catch (RepositoryMissingException e) {
logger.debug("[{}]: repository not found", repositoryName);
continue;
}
if (repository.isReadOnly()) {
logger.debug("[{}]: skipping readonly repository", repositoryName);
continue;
}

if (repository.isReadOnly()) {
logger.debug("[{}]: skipping readonly repository", repositoryName);
continue;
}
retentionExecutor.execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {}

retentionExecutor.execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {
logger.debug(Strings.format("[%s]: could not compute expired snapshots", repositoryName), e);
}
}, refs.acquire()),
perRepositoryListener -> SubscribableListener

// Get repository data
.<RepositoryData>newForked(l -> repository.getRepositoryData(retentionExecutor, l))

// Collect snapshot details by policy, and get any missing details by reading SnapshotInfo
.<SnapshotDetailsByPolicy>andThen(
(l, repositoryData) -> getSnapshotDetailsByPolicy(retentionExecutor, repository, repositoryData, l)
)

// Compute snapshots to delete for each (relevant) policy
.andThenAccept(snapshotDetailsByPolicy -> {
resultsBuilder.addResult(
repositoryName,
getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy)
);
})

// And notify this repository's listener on completion
.addListener(perRepositoryListener)
));
}
@Override
public void onFailure(Exception e) {
logger.debug(Strings.format("[%s]: could not compute expired snapshots", repositoryName), e);
}
}, refs.acquire()),
perRepositoryListener -> SubscribableListener

// Get repository data
.<RepositoryData>newForked(l -> repository.getRepositoryData(retentionExecutor, l))

// Collect snapshot details by policy, and get any missing details by reading SnapshotInfo
.<SnapshotDetailsByPolicy>andThen(
(l, repositoryData) -> getSnapshotDetailsByPolicy(retentionExecutor, repository, repositoryData, l)
)

// Compute snapshots to delete for each (relevant) policy
.andThenAccept(snapshotDetailsByPolicy -> {
resultsBuilder.addResult(
repositoryName,
getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy)
);
})

// And notify this repository's listener on completion
.addListener(perRepositoryListener)
));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
Request request,
ActionListener<Response> listener
) {
if (action == SLMGetExpiredSnapshotsAction.INSTANCE) {
if (action == TransportSLMGetExpiredSnapshotsAction.INSTANCE) {
logger.info("--> called");
listener.onResponse((Response) new SLMGetExpiredSnapshotsAction.Response(Map.of()));
listener.onResponse((Response) new TransportSLMGetExpiredSnapshotsAction.Response(Map.of()));
} else {
super.doExecute(action, request, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SLMGetExpiredSnapshotsActionTests extends ESTestCase {
public class TransportSLMGetExpiredSnapshotsActionTests extends ESTestCase {

public void testEmpty() {
runActionTest(List.of(), Set.of());
Expand Down Expand Up @@ -137,8 +137,8 @@ private static void runActionTest(List<SnapshotInfo> snapshotInfos, Set<String>
}
});

final var action = new SLMGetExpiredSnapshotsAction.LocalAction(transportService, repositoriesService, new ActionFilters(Set.of()));
final var task = new Task(1, "direct", SLMGetExpiredSnapshotsAction.INSTANCE.name(), "", TaskId.EMPTY_TASK_ID, Map.of());
final var action = new TransportSLMGetExpiredSnapshotsAction(transportService, repositoriesService, new ActionFilters(Set.of()));
final var task = new Task(1, "direct", TransportSLMGetExpiredSnapshotsAction.INSTANCE.name(), "", TaskId.EMPTY_TASK_ID, Map.of());

final var policyMap = createPolicies(
snapshotInfos.stream()
Expand All @@ -148,8 +148,8 @@ private static void runActionTest(List<SnapshotInfo> snapshotInfos, Set<String>
snapshotsToDelete
);

final var responseFuture = new PlainActionFuture<SLMGetExpiredSnapshotsAction.Response>();
action.doExecute(task, new SLMGetExpiredSnapshotsAction.Request(List.of(REPO_NAME), policyMap), responseFuture);
final var responseFuture = new PlainActionFuture<TransportSLMGetExpiredSnapshotsAction.Response>();
action.doExecute(task, new TransportSLMGetExpiredSnapshotsAction.Request(List.of(REPO_NAME), policyMap), responseFuture);
deterministicTaskQueue.runAllTasks();
assertTrue(responseFuture.isDone());
final var deletedSnapshots = responseFuture.actionGet().snapshotsToDelete();
Expand Down Expand Up @@ -179,8 +179,13 @@ record SeenSnapshotInfo(SnapshotId snapshotId, String policyId) {}

.<RepositoryData>newForked(l -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l))

.<SLMGetExpiredSnapshotsAction.SnapshotDetailsByPolicy>andThen(
(l, rd) -> SLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(EsExecutors.DIRECT_EXECUTOR_SERVICE, repository, rd, l)
.<TransportSLMGetExpiredSnapshotsAction.SnapshotDetailsByPolicy>andThen(
(l, rd) -> TransportSLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(
EsExecutors.DIRECT_EXECUTOR_SERVICE,
repository,
rd,
l
)
)

.andThenAccept(snapshotDetailsByPolicy -> {
Expand All @@ -197,39 +202,55 @@ record SeenSnapshotInfo(SnapshotId snapshotId, String policyId) {}
}

public void testGetSnapshotsToDelete() {
final var snapshotDetailsByPolicy = new SLMGetExpiredSnapshotsAction.SnapshotDetailsByPolicy();
final var snapshotDetailsByPolicy = new TransportSLMGetExpiredSnapshotsAction.SnapshotDetailsByPolicy();

assertEquals(
List.of(),
SLMGetExpiredSnapshotsAction.getSnapshotsToDelete(REPO_NAME, createPolicies(Set.of(), Set.of()), snapshotDetailsByPolicy)
TransportSLMGetExpiredSnapshotsAction.getSnapshotsToDelete(
REPO_NAME,
createPolicies(Set.of(), Set.of()),
snapshotDetailsByPolicy
)
);

snapshotDetailsByPolicy.add(mkId("snapshot-with-unknown-policy"), mkDetails("unknown-policy-id"));

assertEquals(
List.of(),
SLMGetExpiredSnapshotsAction.getSnapshotsToDelete(REPO_NAME, createPolicies(Set.of(), Set.of()), snapshotDetailsByPolicy)
TransportSLMGetExpiredSnapshotsAction.getSnapshotsToDelete(
REPO_NAME,
createPolicies(Set.of(), Set.of()),
snapshotDetailsByPolicy
)
);

snapshotDetailsByPolicy.add(mkId("no-retention"), mkDetails(NO_RETENTION_POLICY_ID));

assertEquals(
List.of(),
SLMGetExpiredSnapshotsAction.getSnapshotsToDelete(REPO_NAME, createPolicies(Set.of(), Set.of()), snapshotDetailsByPolicy)
TransportSLMGetExpiredSnapshotsAction.getSnapshotsToDelete(
REPO_NAME,
createPolicies(Set.of(), Set.of()),
snapshotDetailsByPolicy
)
);

snapshotDetailsByPolicy.add(mkId("other-repo-policy"), mkDetails(OTHER_REPO_POLICY_ID));

assertEquals(
List.of(),
SLMGetExpiredSnapshotsAction.getSnapshotsToDelete(REPO_NAME, createPolicies(Set.of(), Set.of()), snapshotDetailsByPolicy)
TransportSLMGetExpiredSnapshotsAction.getSnapshotsToDelete(
REPO_NAME,
createPolicies(Set.of(), Set.of()),
snapshotDetailsByPolicy
)
);

snapshotDetailsByPolicy.add(mkId("expiry-candidate"), mkDetails(POLICY_ID));

assertEquals(
List.of(),
SLMGetExpiredSnapshotsAction.getSnapshotsToDelete(
TransportSLMGetExpiredSnapshotsAction.getSnapshotsToDelete(
REPO_NAME,
createPolicies(Set.of(mkId("expiry-candidate")), Set.of()),
snapshotDetailsByPolicy
Expand All @@ -238,7 +259,7 @@ public void testGetSnapshotsToDelete() {

assertEquals(
List.of(Tuple.tuple(mkId("expiry-candidate"), POLICY_ID)),
SLMGetExpiredSnapshotsAction.getSnapshotsToDelete(
TransportSLMGetExpiredSnapshotsAction.getSnapshotsToDelete(
REPO_NAME,
createPolicies(Set.of(mkId("expiry-candidate")), Set.of(mkId("expiry-candidate"))),
snapshotDetailsByPolicy
Expand Down

0 comments on commit 7dae3a2

Please sign in to comment.