Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17160: Time based tracking of core admin requests with Caffeine cache #2304

Merged
merged 16 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ Improvements

* SOLR-16198: Introduce tabbed sections again in the Ref Guide. (Christos Malliaridis via Eric Pugh)

* SOLR-17160: Core Admin "async" request status tracking is no longer capped at 100; it's 10k.
Statuses are now removed 5 minutes after the read of a completed/failed status. Helps collection
async backup/restore and other operations scale to 100+ shards. (Pierre Salagnac, David Smiley)

Optimizations
---------------------
* SOLR-17257: Both Minimize Cores and the Affinity replica placement strategies would over-gather
Expand Down
153 changes: 109 additions & 44 deletions solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_EDIT_PERM;
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_READ_PERM;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Ticker;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.api.AnnotatedApi;
import org.apache.solr.api.Api;
import org.apache.solr.api.JerseyResource;
Expand All @@ -47,6 +50,7 @@
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
Expand Down Expand Up @@ -427,11 +431,19 @@ default boolean isExpensive() {
}

public static class CoreAdminAsyncTracker {
private static final int MAX_TRACKED_REQUESTS = 100;
/**
* Max number of requests we track in the Caffeine cache. This limit is super high on purpose,
* we're not supposed to hit it. This is just a protection to grow in memory too much when
* receiving an abusive number of admin requests.
*/
private static final int MAX_TRACKED_REQUESTS =
EnvUtils.getPropertyAsInteger("solr.admin.async.max", 10_000);

public static final String RUNNING = "running";
public static final String COMPLETED = "completed";
public static final String FAILED = "failed";
public final Map<String, Map<String, TaskObject>> requestStatusMap;

private final Cache<String, TaskObject> requestStatusCache; // key by ID

// Executor for all standard tasks (the ones that are not flagged as expensive)
// We always keep 50 live threads
Expand All @@ -440,33 +452,59 @@ public static class CoreAdminAsyncTracker {
50, new SolrNamedThreadFactory("parallelCoreAdminAPIBaseExecutor"));

// Executor for expensive tasks
// We keep the number number of max threads very low to have throttling for expensive tasks
// We keep the number of max threads very low to have throttling for expensive tasks
private ExecutorService expensiveExecutor =
ExecutorUtil.newMDCAwareCachedThreadPool(
5,
Integer.MAX_VALUE,
new SolrNamedThreadFactory("parallelCoreAdminAPIExpensiveExecutor"));

public CoreAdminAsyncTracker() {
HashMap<String, Map<String, TaskObject>> map = new HashMap<>(3, 1.0f);
map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<>()));
requestStatusMap = Collections.unmodifiableMap(map);
this(
Ticker.systemTicker(),
TimeUnit.MINUTES.toNanos(
EnvUtils.getPropertyAsLong("solr.admin.async.timeout.minutes", 60L)),
TimeUnit.MINUTES.toNanos(
EnvUtils.getPropertyAsLong("solr.admin.async.timeout.completed.minutes", 5L)));
}

/**
* @param runningTimeoutNanos The time-to-keep for tasks in the RUNNING state.
* @param completedTimeoutNanos The time-to-keep for tasks in the COMPLETED or FAILED state
* after the status was polled.
*/
CoreAdminAsyncTracker(Ticker ticker, long runningTimeoutNanos, long completedTimeoutNanos) {

TaskExpiry expiry = new TaskExpiry(runningTimeoutNanos, completedTimeoutNanos);
requestStatusCache =
Caffeine.newBuilder()
.ticker(ticker)
.maximumSize(MAX_TRACKED_REQUESTS)
.expireAfter(expiry)
.build();
}

public void shutdown() {
ExecutorUtil.shutdownAndAwaitTermination(standardExecutor);
ExecutorUtil.shutdownAndAwaitTermination(expensiveExecutor);
}

public Map<String, TaskObject> getRequestStatusMap(String key) {
return requestStatusMap.get(key);
public TaskObject getAsyncRequestForStatus(String key) {
TaskObject task = requestStatusCache.getIfPresent(key);

if (task != null && !RUNNING.equals(task.status) && !task.polledAfterCompletion) {
task.polledAfterCompletion = true;
// At the first time we retrieve the status of a completed request, do a second lookup in
// the cache. This is necessary to update the TTL of this request in the cache.
// Unfortunately, we can't force the expiration time to be refreshed without a lookup.
requestStatusCache.getIfPresent(key);
}

return task;
}

public void submitAsyncTask(TaskObject taskObject) throws SolrException {
ensureTaskIdNotInUse(taskObject.taskId);
addTask(RUNNING, taskObject);
addTask(taskObject);

Runnable command =
() -> {
Expand Down Expand Up @@ -497,42 +535,26 @@ public void submitAsyncTask(TaskObject taskObject) throws SolrException {
}
}

/** Helper method to add a task to a tracking type. */
private void addTask(String type, TaskObject o, boolean limit) {
synchronized (getRequestStatusMap(type)) {
if (limit && getRequestStatusMap(type).size() == MAX_TRACKED_REQUESTS) {
String key = getRequestStatusMap(type).entrySet().iterator().next().getKey();
getRequestStatusMap(type).remove(key);
}
addTask(type, o);
}
}

private void addTask(String type, TaskObject o) {
synchronized (getRequestStatusMap(type)) {
getRequestStatusMap(type).put(o.taskId, o);
}
}

/** Helper method to remove a task from a tracking map. */
private void removeTask(String map, String taskId) {
synchronized (getRequestStatusMap(map)) {
getRequestStatusMap(map).remove(taskId);
}
}

private void ensureTaskIdNotInUse(String taskId) throws SolrException {
if (getRequestStatusMap(RUNNING).containsKey(taskId)
|| getRequestStatusMap(COMPLETED).containsKey(taskId)
|| getRequestStatusMap(FAILED).containsKey(taskId)) {
private void addTask(TaskObject taskObject) {
// Ensure task ID is not already in use
TaskObject taskInCache =
requestStatusCache.get(
taskObject.taskId,
n -> {
taskObject.status = RUNNING;
return taskObject;
});

// If we get a different task instance, it means one was already in the cache with the
// same name. Just reject the new one.
if (taskInCache != taskObject) {
throw new SolrException(
ErrorCode.BAD_REQUEST, "Duplicate request with the same requestid found.");
}
}

private void finishTask(TaskObject taskObject, boolean successful) {
removeTask(RUNNING, taskObject.taskId);
addTask(successful ? COMPLETED : FAILED, taskObject, true);
taskObject.status = successful ? COMPLETED : FAILED;
}

/**
Expand All @@ -546,6 +568,13 @@ public static class TaskObject {
final Callable<SolrQueryResponse> task;
public String rspInfo;
public Object operationRspInfo;
private volatile String status;

/**
* Flag set to true once the task is complete (can be in error) and the status was polled
* already once. Once set, the time we keep the task status is shortened.
*/
private volatile boolean polledAfterCompletion;

public TaskObject(
String taskId, String action, boolean expensive, Callable<SolrQueryResponse> task) {
Expand Down Expand Up @@ -574,6 +603,42 @@ public Object getOperationRspObject() {
public void setOperationRspObject(SolrQueryResponse rspObject) {
this.operationRspInfo = rspObject.getResponse();
}

public String getStatus() {
return status;
}
}

/**
* Expiration policy for Caffeine cache. Depending on whether the status of a completed task was
* already retrieved, we return {@link #runningTimeoutNanos} or {@link #completedTimeoutNanos}.
*/
private static class TaskExpiry implements Expiry<String, TaskObject> {

private final long runningTimeoutNanos;
private final long completedTimeoutNanos;

private TaskExpiry(long runningTimeoutNanos, long completedTimeoutNanos) {
this.runningTimeoutNanos = runningTimeoutNanos;
this.completedTimeoutNanos = completedTimeoutNanos;
}

@Override
public long expireAfterCreate(String key, TaskObject task, long currentTime) {
return runningTimeoutNanos;
}

@Override
public long expireAfterUpdate(
String key, TaskObject task, long currentTime, long currentDuration) {
return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos;
}

@Override
public long expireAfterRead(
String key, TaskObject task, long currentTime, long currentDuration) {
return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.FAILED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.RUNNING;

import jakarta.inject.Inject;
import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi;
import org.apache.solr.client.api.model.GetNodeCommandStatusResponse;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.apache.solr.jersey.PermissionName;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
Expand All @@ -51,25 +51,24 @@ public GetNodeCommandStatus(
public GetNodeCommandStatusResponse getCommandStatus(String requestId) {
ensureRequiredParameterProvided(CoreAdminParams.REQUESTID, requestId);
var requestStatusResponse = new GetNodeCommandStatusResponse();
if (coreAdminAsyncTracker.getRequestStatusMap(RUNNING).containsKey(requestId)) {
requestStatusResponse.status = RUNNING;
} else if (coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).containsKey(requestId)) {
requestStatusResponse.status = COMPLETED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).get(requestId).getRspObject();
requestStatusResponse.response =
coreAdminAsyncTracker
.getRequestStatusMap(COMPLETED)
.get(requestId)
.getOperationRspObject();
} else if (coreAdminAsyncTracker.getRequestStatusMap(FAILED).containsKey(requestId)) {
requestStatusResponse.status = FAILED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(FAILED).get(requestId).getRspObject();
} else {

TaskObject taskObject = coreAdminAsyncTracker.getAsyncRequestForStatus(requestId);
String status = taskObject != null ? taskObject.getStatus() : null;

if (status == null) {
requestStatusResponse.status = "notfound";
requestStatusResponse.msg = "No task found in running, completed or failed tasks";
} else {
requestStatusResponse.status = status;

if (status.equals(COMPLETED)) {
requestStatusResponse.response = taskObject.getRspObject();
requestStatusResponse.response = taskObject.getOperationRspObject();
Comment on lines +65 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused; you are setting the response twice?

Copy link
Contributor Author

@psalagnac psalagnac Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch!
Something is definitely wrong here, but this is existing code. I did not notice that.
I think this deserves a a fix by itself. Values for msg and response in the V2 API response are not consistent depending on the status. And it seems to me they're not consistent with V1 either.
I think this was introduced with #2144.

Maybe I should put this PR on hold and fix this first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah; didn't know it was existing code. Do in whichever order you want; it doesn't really matter.

} else if (status.equals(FAILED)) {
requestStatusResponse.response = taskObject.getRspObject();
}
}

return requestStatusResponse;
}
}
Loading
Loading