Skip to content

Commit

Permalink
SOLR-17160: Core admin async ID status, 10k limit and time expire (#2304
Browse files Browse the repository at this point in the history
)

Core Admin "async" request status tracking is no longer capped at 100; it's 10k.
Statuses are now removed 5 minutes after the read of a completed/failed status.
Helps collection async backup/restore and other operations scale to 100+ shards.

Co-authored-by: David Smiley <[email protected]>
(cherry picked from commit d3b4c2e)
  • Loading branch information
psalagnac authored and dsmiley committed Jul 16, 2024
1 parent d684934 commit 338fbda
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 65 deletions.
4 changes: 4 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ Improvements

* SOLR-16198: Introduce tabbed sections again in the Ref Guide. (Christos Malliaridis via Eric Pugh)

* SOLR-17160: Core Admin "async" request status tracking is no longer capped at 100; it's 10k.
Statuses are now removed 5 minutes after the read of a completed/failed status. Helps collection
async backup/restore and other operations scale to 100+ shards. (Pierre Salagnac, David Smiley)

Optimizations
---------------------
* SOLR-17257: Both Minimize Cores and the Affinity replica placement strategies would over-gather
Expand Down
153 changes: 109 additions & 44 deletions solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_EDIT_PERM;
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_READ_PERM;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Ticker;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.api.AnnotatedApi;
import org.apache.solr.api.Api;
import org.apache.solr.api.JerseyResource;
Expand All @@ -47,6 +50,7 @@
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
Expand Down Expand Up @@ -427,11 +431,19 @@ default boolean isExpensive() {
}

public static class CoreAdminAsyncTracker {
private static final int MAX_TRACKED_REQUESTS = 100;
/**
* Max number of requests we track in the Caffeine cache. This limit is super high on purpose,
* we're not supposed to hit it. This is just a protection to grow in memory too much when
* receiving an abusive number of admin requests.
*/
private static final int MAX_TRACKED_REQUESTS =
EnvUtils.getPropertyAsInteger("solr.admin.async.max", 10_000);

public static final String RUNNING = "running";
public static final String COMPLETED = "completed";
public static final String FAILED = "failed";
public final Map<String, Map<String, TaskObject>> requestStatusMap;

private final Cache<String, TaskObject> requestStatusCache; // key by ID

// Executor for all standard tasks (the ones that are not flagged as expensive)
// We always keep 50 live threads
Expand All @@ -440,33 +452,59 @@ public static class CoreAdminAsyncTracker {
50, new SolrNamedThreadFactory("parallelCoreAdminAPIBaseExecutor"));

// Executor for expensive tasks
// We keep the number number of max threads very low to have throttling for expensive tasks
// We keep the number of max threads very low to have throttling for expensive tasks
private ExecutorService expensiveExecutor =
ExecutorUtil.newMDCAwareCachedThreadPool(
5,
Integer.MAX_VALUE,
new SolrNamedThreadFactory("parallelCoreAdminAPIExpensiveExecutor"));

public CoreAdminAsyncTracker() {
HashMap<String, Map<String, TaskObject>> map = new HashMap<>(3, 1.0f);
map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<>()));
requestStatusMap = Collections.unmodifiableMap(map);
this(
Ticker.systemTicker(),
TimeUnit.MINUTES.toNanos(
EnvUtils.getPropertyAsLong("solr.admin.async.timeout.minutes", 60L)),
TimeUnit.MINUTES.toNanos(
EnvUtils.getPropertyAsLong("solr.admin.async.timeout.completed.minutes", 5L)));
}

/**
* @param runningTimeoutNanos The time-to-keep for tasks in the RUNNING state.
* @param completedTimeoutNanos The time-to-keep for tasks in the COMPLETED or FAILED state
* after the status was polled.
*/
CoreAdminAsyncTracker(Ticker ticker, long runningTimeoutNanos, long completedTimeoutNanos) {

TaskExpiry expiry = new TaskExpiry(runningTimeoutNanos, completedTimeoutNanos);
requestStatusCache =
Caffeine.newBuilder()
.ticker(ticker)
.maximumSize(MAX_TRACKED_REQUESTS)
.expireAfter(expiry)
.build();
}

public void shutdown() {
ExecutorUtil.shutdownAndAwaitTermination(standardExecutor);
ExecutorUtil.shutdownAndAwaitTermination(expensiveExecutor);
}

public Map<String, TaskObject> getRequestStatusMap(String key) {
return requestStatusMap.get(key);
public TaskObject getAsyncRequestForStatus(String key) {
TaskObject task = requestStatusCache.getIfPresent(key);

if (task != null && !RUNNING.equals(task.status) && !task.polledAfterCompletion) {
task.polledAfterCompletion = true;
// At the first time we retrieve the status of a completed request, do a second lookup in
// the cache. This is necessary to update the TTL of this request in the cache.
// Unfortunately, we can't force the expiration time to be refreshed without a lookup.
requestStatusCache.getIfPresent(key);
}

return task;
}

public void submitAsyncTask(TaskObject taskObject) throws SolrException {
ensureTaskIdNotInUse(taskObject.taskId);
addTask(RUNNING, taskObject);
addTask(taskObject);

Runnable command =
() -> {
Expand Down Expand Up @@ -497,42 +535,26 @@ public void submitAsyncTask(TaskObject taskObject) throws SolrException {
}
}

/** Helper method to add a task to a tracking type. */
private void addTask(String type, TaskObject o, boolean limit) {
synchronized (getRequestStatusMap(type)) {
if (limit && getRequestStatusMap(type).size() == MAX_TRACKED_REQUESTS) {
String key = getRequestStatusMap(type).entrySet().iterator().next().getKey();
getRequestStatusMap(type).remove(key);
}
addTask(type, o);
}
}

private void addTask(String type, TaskObject o) {
synchronized (getRequestStatusMap(type)) {
getRequestStatusMap(type).put(o.taskId, o);
}
}

/** Helper method to remove a task from a tracking map. */
private void removeTask(String map, String taskId) {
synchronized (getRequestStatusMap(map)) {
getRequestStatusMap(map).remove(taskId);
}
}

private void ensureTaskIdNotInUse(String taskId) throws SolrException {
if (getRequestStatusMap(RUNNING).containsKey(taskId)
|| getRequestStatusMap(COMPLETED).containsKey(taskId)
|| getRequestStatusMap(FAILED).containsKey(taskId)) {
private void addTask(TaskObject taskObject) {
// Ensure task ID is not already in use
TaskObject taskInCache =
requestStatusCache.get(
taskObject.taskId,
n -> {
taskObject.status = RUNNING;
return taskObject;
});

// If we get a different task instance, it means one was already in the cache with the
// same name. Just reject the new one.
if (taskInCache != taskObject) {
throw new SolrException(
ErrorCode.BAD_REQUEST, "Duplicate request with the same requestid found.");
}
}

private void finishTask(TaskObject taskObject, boolean successful) {
removeTask(RUNNING, taskObject.taskId);
addTask(successful ? COMPLETED : FAILED, taskObject, true);
taskObject.status = successful ? COMPLETED : FAILED;
}

/**
Expand All @@ -546,6 +568,13 @@ public static class TaskObject {
final Callable<SolrQueryResponse> task;
public String rspInfo;
public Object operationRspInfo;
private volatile String status;

/**
* Flag set to true once the task is complete (can be in error) and the status was polled
* already once. Once set, the time we keep the task status is shortened.
*/
private volatile boolean polledAfterCompletion;

public TaskObject(
String taskId, String action, boolean expensive, Callable<SolrQueryResponse> task) {
Expand Down Expand Up @@ -574,6 +603,42 @@ public Object getOperationRspObject() {
public void setOperationRspObject(SolrQueryResponse rspObject) {
this.operationRspInfo = rspObject.getResponse();
}

public String getStatus() {
return status;
}
}

/**
* Expiration policy for Caffeine cache. Depending on whether the status of a completed task was
* already retrieved, we return {@link #runningTimeoutNanos} or {@link #completedTimeoutNanos}.
*/
private static class TaskExpiry implements Expiry<String, TaskObject> {

private final long runningTimeoutNanos;
private final long completedTimeoutNanos;

private TaskExpiry(long runningTimeoutNanos, long completedTimeoutNanos) {
this.runningTimeoutNanos = runningTimeoutNanos;
this.completedTimeoutNanos = completedTimeoutNanos;
}

@Override
public long expireAfterCreate(String key, TaskObject task, long currentTime) {
return runningTimeoutNanos;
}

@Override
public long expireAfterUpdate(
String key, TaskObject task, long currentTime, long currentDuration) {
return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos;
}

@Override
public long expireAfterRead(
String key, TaskObject task, long currentTime, long currentDuration) {
return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.FAILED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.RUNNING;

import jakarta.inject.Inject;
import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi;
import org.apache.solr.client.api.model.GetNodeCommandStatusResponse;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.apache.solr.jersey.PermissionName;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
Expand All @@ -51,25 +51,24 @@ public GetNodeCommandStatus(
public GetNodeCommandStatusResponse getCommandStatus(String requestId) {
ensureRequiredParameterProvided(CoreAdminParams.REQUESTID, requestId);
var requestStatusResponse = new GetNodeCommandStatusResponse();
if (coreAdminAsyncTracker.getRequestStatusMap(RUNNING).containsKey(requestId)) {
requestStatusResponse.status = RUNNING;
} else if (coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).containsKey(requestId)) {
requestStatusResponse.status = COMPLETED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).get(requestId).getRspObject();
requestStatusResponse.response =
coreAdminAsyncTracker
.getRequestStatusMap(COMPLETED)
.get(requestId)
.getOperationRspObject();
} else if (coreAdminAsyncTracker.getRequestStatusMap(FAILED).containsKey(requestId)) {
requestStatusResponse.status = FAILED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(FAILED).get(requestId).getRspObject();
} else {

TaskObject taskObject = coreAdminAsyncTracker.getAsyncRequestForStatus(requestId);
String status = taskObject != null ? taskObject.getStatus() : null;

if (status == null) {
requestStatusResponse.status = "notfound";
requestStatusResponse.msg = "No task found in running, completed or failed tasks";
} else {
requestStatusResponse.status = status;

if (status.equals(COMPLETED)) {
requestStatusResponse.response = taskObject.getRspObject();
requestStatusResponse.response = taskObject.getOperationRspObject();
} else if (status.equals(FAILED)) {
requestStatusResponse.response = taskObject.getRspObject();
}
}

return requestStatusResponse;
}
}
Loading

0 comments on commit 338fbda

Please sign in to comment.