Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17160: Time based tracking of core admin requests with Caffeine cache #2304

Merged
merged 16 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 103 additions & 42 deletions solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_EDIT_PERM;
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_READ_PERM;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Ticker;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.api.AnnotatedApi;
import org.apache.solr.api.Api;
import org.apache.solr.api.JerseyResource;
Expand All @@ -47,6 +50,7 @@
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.EnvUtils;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
Expand Down Expand Up @@ -427,11 +431,18 @@ default boolean isExpensive() {
}

public static class CoreAdminAsyncTracker {
private static final int MAX_TRACKED_REQUESTS = 100;
/**
* Max number of requests we track in the Caffeine cache. This limit is super high on purpose,
* we're not supposed to hit it. This is just a protection to grow in memory too much when
* receiving an abusive number of admin requests.
*/
private static final int MAX_TRACKED_REQUESTS = 10_000;
psalagnac marked this conversation as resolved.
Show resolved Hide resolved

public static final String RUNNING = "running";
public static final String COMPLETED = "completed";
public static final String FAILED = "failed";
public final Map<String, Map<String, TaskObject>> requestStatusMap;

private final Cache<String, TaskObject> requestStatusCache; // key by ID

// Executor for all standard tasks (the ones that are not flagged as expensive)
// We always keep 50 live threads
Expand All @@ -448,25 +459,51 @@ public static class CoreAdminAsyncTracker {
new SolrNamedThreadFactory("parallelCoreAdminAPIExpensiveExecutor"));

public CoreAdminAsyncTracker() {
HashMap<String, Map<String, TaskObject>> map = new HashMap<>(3, 1.0f);
map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<>()));
requestStatusMap = Collections.unmodifiableMap(map);
this(
Ticker.systemTicker(),
TimeUnit.MINUTES.toNanos(
EnvUtils.getEnvAsLong("solr.admin.requests.running.timeout.minutes", 60L)),
psalagnac marked this conversation as resolved.
Show resolved Hide resolved
TimeUnit.MINUTES.toNanos(
EnvUtils.getEnvAsLong("solr.admin.requests.completed.timeout.minutes", 5L)));
}

/**
* @param runningTimeoutNanos The time-to-keep for tasks in the RUNNING state.
* @param completedTimeoutNanos The time-to-keep for tasks in the COMPLETED or FAILED state
* after the status was polled.
*/
CoreAdminAsyncTracker(Ticker ticker, long runningTimeoutNanos, long completedTimeoutNanos) {

TaskExpiry expiry = new TaskExpiry(runningTimeoutNanos, completedTimeoutNanos);
requestStatusCache =
Caffeine.newBuilder()
.ticker(ticker)
.maximumSize(MAX_TRACKED_REQUESTS)
.expireAfter(expiry)
.build();
}

public void shutdown() {
ExecutorUtil.shutdownAndAwaitTermination(standardExecutor);
ExecutorUtil.shutdownAndAwaitTermination(expensiveExecutor);
}

public Map<String, TaskObject> getRequestStatusMap(String key) {
return requestStatusMap.get(key);
public TaskObject getAsyncRequestForStatus(String key) {
TaskObject task = requestStatusCache.getIfPresent(key);

if (task != null && !RUNNING.equals(task.status) && !task.polledAfterCompletion) {
task.polledAfterCompletion = true;
// At the first time we retrieve the status of a completed request, do a second lookup in
// the cache. This is necessary to update the TTL of this request in the cache.
// Unfortunately, we can't force the expiration time to be refreshed without a lookup.
requestStatusCache.getIfPresent(key);
}

return task;
}

public void submitAsyncTask(TaskObject taskObject) throws SolrException {
ensureTaskIdNotInUse(taskObject.taskId);
addTask(RUNNING, taskObject);
addTask(taskObject);

Runnable command =
() -> {
Expand Down Expand Up @@ -497,42 +534,23 @@ public void submitAsyncTask(TaskObject taskObject) throws SolrException {
}
}

/** Helper method to add a task to a tracking type. */
private void addTask(String type, TaskObject o, boolean limit) {
synchronized (getRequestStatusMap(type)) {
if (limit && getRequestStatusMap(type).size() == MAX_TRACKED_REQUESTS) {
String key = getRequestStatusMap(type).entrySet().iterator().next().getKey();
getRequestStatusMap(type).remove(key);
}
addTask(type, o);
}
}
private void addTask(TaskObject taskObject) {
// Ensure task ID is not already in use
TaskObject taskInCache = requestStatusCache.get(taskObject.taskId, n -> taskObject);

private void addTask(String type, TaskObject o) {
synchronized (getRequestStatusMap(type)) {
getRequestStatusMap(type).put(o.taskId, o);
}
}

/** Helper method to remove a task from a tracking map. */
private void removeTask(String map, String taskId) {
synchronized (getRequestStatusMap(map)) {
getRequestStatusMap(map).remove(taskId);
}
}

private void ensureTaskIdNotInUse(String taskId) throws SolrException {
if (getRequestStatusMap(RUNNING).containsKey(taskId)
|| getRequestStatusMap(COMPLETED).containsKey(taskId)
|| getRequestStatusMap(FAILED).containsKey(taskId)) {
// If we get a different task instance, it means one was already in the cache with the
// same name. Just reject the new one.
if (taskInCache != taskObject) {
throw new SolrException(
ErrorCode.BAD_REQUEST, "Duplicate request with the same requestid found.");
}

taskObject.status = RUNNING;
psalagnac marked this conversation as resolved.
Show resolved Hide resolved
requestStatusCache.put(taskObject.taskId, taskObject);
psalagnac marked this conversation as resolved.
Show resolved Hide resolved
}

private void finishTask(TaskObject taskObject, boolean successful) {
removeTask(RUNNING, taskObject.taskId);
addTask(successful ? COMPLETED : FAILED, taskObject, true);
taskObject.status = successful ? COMPLETED : FAILED;
}

/**
Expand All @@ -546,6 +564,13 @@ public static class TaskObject {
final Callable<SolrQueryResponse> task;
public String rspInfo;
public Object operationRspInfo;
private volatile String status;

/**
* Flag set to true once the task is complete (can be in error) and the status was polled
* already once. Once set, the time we keep the task status is shortened.
*/
private volatile boolean polledAfterCompletion;

public TaskObject(
String taskId, String action, boolean expensive, Callable<SolrQueryResponse> task) {
Expand Down Expand Up @@ -574,6 +599,42 @@ public Object getOperationRspObject() {
public void setOperationRspObject(SolrQueryResponse rspObject) {
this.operationRspInfo = rspObject.getResponse();
}

public String getStatus() {
return status;
}
}

/**
* Expiration policy for Caffeine cache. Depending on whether the status of a completed task was
* already retrieved, we return {@link #runningTimeoutNanos} or {@link #completedTimeoutNanos}.
*/
private static class TaskExpiry implements Expiry<String, TaskObject> {

private final long runningTimeoutNanos;
private final long completedTimeoutNanos;

private TaskExpiry(long runningTimeoutNanos, long completedTimeoutNanos) {
this.runningTimeoutNanos = runningTimeoutNanos;
this.completedTimeoutNanos = completedTimeoutNanos;
}

@Override
public long expireAfterCreate(String key, TaskObject task, long currentTime) {
return runningTimeoutNanos;
}

@Override
public long expireAfterUpdate(
String key, TaskObject task, long currentTime, long currentDuration) {
return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos;
}

@Override
public long expireAfterRead(
String key, TaskObject task, long currentTime, long currentDuration) {
return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.FAILED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.RUNNING;

import jakarta.inject.Inject;
import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi;
import org.apache.solr.client.api.model.GetNodeCommandStatusResponse;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.apache.solr.jersey.PermissionName;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
Expand All @@ -51,25 +51,24 @@ public GetNodeCommandStatus(
public GetNodeCommandStatusResponse getCommandStatus(String requestId) {
ensureRequiredParameterProvided(CoreAdminParams.REQUESTID, requestId);
var requestStatusResponse = new GetNodeCommandStatusResponse();
if (coreAdminAsyncTracker.getRequestStatusMap(RUNNING).containsKey(requestId)) {
requestStatusResponse.status = RUNNING;
} else if (coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).containsKey(requestId)) {
requestStatusResponse.status = COMPLETED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).get(requestId).getRspObject();
requestStatusResponse.response =
coreAdminAsyncTracker
.getRequestStatusMap(COMPLETED)
.get(requestId)
.getOperationRspObject();
} else if (coreAdminAsyncTracker.getRequestStatusMap(FAILED).containsKey(requestId)) {
requestStatusResponse.status = FAILED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(FAILED).get(requestId).getRspObject();
} else {

TaskObject taskObject = coreAdminAsyncTracker.getAsyncRequestForStatus(requestId);
String status = taskObject != null ? taskObject.getStatus() : null;

if (status == null) {
requestStatusResponse.status = "notfound";
requestStatusResponse.msg = "No task found in running, completed or failed tasks";
} else {
requestStatusResponse.status = status;

if (status.equals(COMPLETED)) {
requestStatusResponse.response = taskObject.getRspObject();
requestStatusResponse.response = taskObject.getOperationRspObject();
Comment on lines +65 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused; you are setting the response twice?

Copy link
Contributor Author

@psalagnac psalagnac Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch!
Something is definitely wrong here, but this is existing code. I did not notice that.
I think this deserves a a fix by itself. Values for msg and response in the V2 API response are not consistent depending on the status. And it seems to me they're not consistent with V1 either.
I think this was introduced with #2144.

Maybe I should put this PR on hold and fix this first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah; didn't know it was existing code. Do in whichever order you want; it doesn't really matter.

} else if (status.equals(FAILED)) {
requestStatusResponse.response = taskObject.getRspObject();
}
}

return requestStatusResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.solr.handler.admin;

import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED;

import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand All @@ -24,7 +26,9 @@
import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.file.PathUtils;
import org.apache.lucene.util.Constants;
import org.apache.solr.SolrTestCaseJ4;
Expand All @@ -43,6 +47,8 @@
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -528,4 +534,61 @@ public void testNonexistentCoreReload() throws Exception {
e.getMessage());
admin.close();
}

@Test
public void testTrackedRequestExpiration() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonderful test

// Create a tracker with controlled clock, relative to 0
AtomicLong clock = new AtomicLong(0L);
CoreAdminAsyncTracker asyncTracker = new CoreAdminAsyncTracker(clock::get, 100L, 10L);
try {
Set<TaskObject> tasks =
Set.of(
new TaskObject("id1", "ACTION", false, SolrQueryResponse::new),
new TaskObject("id2", "ACTION", false, SolrQueryResponse::new));

// Submit all tasks and wait for internal status to be COMPLETED
tasks.forEach(asyncTracker::submitAsyncTask);
while (!tasks.stream().allMatch(t -> COMPLETED.equals(t.getStatus()))) {
Thread.sleep(10L);
}

// Timeout for running tasks is 100n, so status can be retrieved after 20n.
// But timeout for complete tasks is 10n once we polled the status at least once, so status
// is not available anymore 20n later.
clock.set(20);
assertEquals(COMPLETED, asyncTracker.getAsyncRequestForStatus("id1").getStatus());
clock.set(40L);
assertNull(asyncTracker.getAsyncRequestForStatus("id1"));

// Move the clock after the running timeout.
// Status of second task is not available anymore, even if it wasn't retrieved yet
clock.set(110L);
assertNull(asyncTracker.getAsyncRequestForStatus("id2"));

} finally {
asyncTracker.shutdown();
}
}

/** Check we reject a task is the async ID already exists. */
@Test
public void testDuplicatedRequestId() {

// Different tasks but with same ID
TaskObject task1 = new TaskObject("id1", "ACTION", false, null);
TaskObject task2 = new TaskObject("id1", "ACTION", false, null);

CoreAdminAsyncTracker asyncTracker = new CoreAdminAsyncTracker();
try {
asyncTracker.submitAsyncTask(task1);
try {
asyncTracker.submitAsyncTask(task2);
fail("Task should have been rejected.");
} catch (SolrException e) {
assertEquals("Duplicate request with the same requestid found.", e.getMessage());
}
} finally {
asyncTracker.shutdown();
}
}
}
Loading
Loading