Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17160: Time based tracking of core admin requests #2271

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 75 additions & 39 deletions solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.api.AnnotatedApi;
import org.apache.solr.api.Api;
import org.apache.solr.api.JerseyResource;
Expand Down Expand Up @@ -427,11 +427,15 @@ default boolean isExpensive() {
}

public static class CoreAdminAsyncTracker {
private static final int MAX_TRACKED_REQUESTS = 100;
public static final String RUNNING = "running";
public static final String COMPLETED = "completed";
public static final String FAILED = "failed";
public final Map<String, Map<String, TaskObject>> requestStatusMap;

private final long runningTimeoutNanos;
private final long completedTimeoutNanos;
private final long purgePeriodNanos;
private final Map<String, TaskObject> requestStatusMap;
private volatile long lastPurge;

// Executor for all standard tasks (the ones that are not flagged as expensive)
// We always keep 50 live threads
Expand All @@ -448,25 +452,47 @@ public static class CoreAdminAsyncTracker {
new SolrNamedThreadFactory("parallelCoreAdminAPIExpensiveExecutor"));

public CoreAdminAsyncTracker() {
HashMap<String, Map<String, TaskObject>> map = new HashMap<>(3, 1.0f);
map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<>()));
requestStatusMap = Collections.unmodifiableMap(map);
this(
TimeUnit.MINUTES.toNanos(
Long.getLong("solr.admin.requests.running.timeout.minutes", 60L)),
TimeUnit.MINUTES.toNanos(
Long.getLong("solr.admin.requests.completed.timeout.minutes", 5L)),
TimeUnit.MINUTES.toNanos(Long.getLong("solr.admin.requests.purge.minutes", 1L)));
}

/**
* @param runningTimeoutNanos The time-to-keep for tasks in the RUNNING state.
* @param completedTimeoutNanos The time-to-keep for tasks in the COMPLETED or FAILED state
* after the status was polled.
*/
CoreAdminAsyncTracker(
long runningTimeoutNanos, long completedTimeoutNanos, long purgePeriodNanos) {
this.runningTimeoutNanos = runningTimeoutNanos;
this.completedTimeoutNanos = completedTimeoutNanos;
this.purgePeriodNanos = purgePeriodNanos;
requestStatusMap = Collections.synchronizedMap(new HashMap<>());
}

public void shutdown() {
ExecutorUtil.shutdownAndAwaitTermination(standardExecutor);
ExecutorUtil.shutdownAndAwaitTermination(expensiveExecutor);
}

public Map<String, TaskObject> getRequestStatusMap(String key) {
return requestStatusMap.get(key);
public TaskObject getAsyncRequestForStatus(String key) {
TaskObject task = requestStatusMap.get(key);

if (task != null && !RUNNING.equals(task.status)) {
// This is to return the retention time of this task.
// Once the task is completed and we polled the status, this is very likely we won't poll it
// again.
task.polledNanos = System.nanoTime();
}

return task;
}

public void submitAsyncTask(TaskObject taskObject) throws SolrException {
ensureTaskIdNotInUse(taskObject.taskId);
addTask(RUNNING, taskObject);
addTask(taskObject);

Runnable command =
() -> {
Expand Down Expand Up @@ -497,42 +523,45 @@ public void submitAsyncTask(TaskObject taskObject) throws SolrException {
}
}

/** Helper method to add a task to a tracking type. */
private void addTask(String type, TaskObject o, boolean limit) {
synchronized (getRequestStatusMap(type)) {
if (limit && getRequestStatusMap(type).size() == MAX_TRACKED_REQUESTS) {
String key = getRequestStatusMap(type).entrySet().iterator().next().getKey();
getRequestStatusMap(type).remove(key);
}
addTask(type, o);
private void addTask(TaskObject taskObject) {
// Ensure task ID is not already in use
if (requestStatusMap.containsKey(taskObject.taskId)) {
throw new SolrException(
ErrorCode.BAD_REQUEST, "Duplicate request with the same requestid found.");
}
}

private void addTask(String type, TaskObject o) {
synchronized (getRequestStatusMap(type)) {
getRequestStatusMap(type).put(o.taskId, o);
long currentTime = System.nanoTime();
if (currentTime > lastPurge + purgePeriodNanos) {
synchronized (requestStatusMap) {
// Check the condition a second time now we are in the synchronized block.
// If two threads entered the previous 'if' concurrently, only the first one in
// the synchronized block will actually do the purge.
if (currentTime > lastPurge + purgePeriodNanos) {
requestStatusMap.values().removeIf(t -> isExpired(t, currentTime));
lastPurge = currentTime;
}
}
}
}

/** Helper method to remove a task from a tracking map. */
private void removeTask(String map, String taskId) {
synchronized (getRequestStatusMap(map)) {
getRequestStatusMap(map).remove(taskId);
}
taskObject.status = RUNNING;
taskObject.startedNanos = System.nanoTime();
requestStatusMap.put(taskObject.taskId, taskObject);
}

private void ensureTaskIdNotInUse(String taskId) throws SolrException {
if (getRequestStatusMap(RUNNING).containsKey(taskId)
|| getRequestStatusMap(COMPLETED).containsKey(taskId)
|| getRequestStatusMap(FAILED).containsKey(taskId)) {
throw new SolrException(
ErrorCode.BAD_REQUEST, "Duplicate request with the same requestid found.");
}
/**
* Check if a tracked async task is expired and so should not be tracked anymore.
*
* <p>Any task which is older than {@link #runningTimeoutNanos} (an hour by default) is
* considered expired. When a task is already completed or failed, it is also expired {@link
* #completedTimeoutNanos} (5 minutes by default) after the last poll of the status.
*/
private boolean isExpired(TaskObject task, long currentNanos) {
return (task.startedNanos > 0 && currentNanos > task.startedNanos + runningTimeoutNanos)
|| (task.polledNanos > 0 && currentNanos > task.startedNanos + completedTimeoutNanos);
}

private void finishTask(TaskObject taskObject, boolean successful) {
removeTask(RUNNING, taskObject.taskId);
addTask(successful ? COMPLETED : FAILED, taskObject, true);
taskObject.status = successful ? COMPLETED : FAILED;
}

/**
Expand All @@ -546,6 +575,9 @@ public static class TaskObject {
final Callable<SolrQueryResponse> task;
public String rspInfo;
public Object operationRspInfo;
private volatile String status;
private volatile long startedNanos;
private volatile long polledNanos;

public TaskObject(
String taskId, String action, boolean expensive, Callable<SolrQueryResponse> task) {
Expand Down Expand Up @@ -574,6 +606,10 @@ public Object getOperationRspObject() {
public void setOperationRspObject(SolrQueryResponse rspObject) {
this.operationRspInfo = rspObject.getResponse();
}

public String getStatus() {
return status;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.FAILED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.RUNNING;

import jakarta.inject.Inject;
import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi;
import org.apache.solr.client.api.model.GetNodeCommandStatusResponse;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.apache.solr.jersey.PermissionName;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
Expand All @@ -51,25 +51,24 @@ public GetNodeCommandStatus(
public GetNodeCommandStatusResponse getCommandStatus(String requestId) {
ensureRequiredParameterProvided(CoreAdminParams.REQUESTID, requestId);
var requestStatusResponse = new GetNodeCommandStatusResponse();
if (coreAdminAsyncTracker.getRequestStatusMap(RUNNING).containsKey(requestId)) {
requestStatusResponse.status = RUNNING;
} else if (coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).containsKey(requestId)) {
requestStatusResponse.status = COMPLETED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).get(requestId).getRspObject();
requestStatusResponse.response =
coreAdminAsyncTracker
.getRequestStatusMap(COMPLETED)
.get(requestId)
.getOperationRspObject();
} else if (coreAdminAsyncTracker.getRequestStatusMap(FAILED).containsKey(requestId)) {
requestStatusResponse.status = FAILED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(FAILED).get(requestId).getRspObject();
} else {

TaskObject taskObject = coreAdminAsyncTracker.getAsyncRequestForStatus(requestId);
String status = taskObject != null ? taskObject.getStatus() : null;

if (status == null) {
requestStatusResponse.status = "notfound";
requestStatusResponse.msg = "No task found in running, completed or failed tasks";
} else {
requestStatusResponse.status = status;

if (taskObject.getStatus().equals(COMPLETED)) {
requestStatusResponse.response = taskObject.getRspObject();
requestStatusResponse.response = taskObject.getOperationRspObject();
} else if (taskObject.getStatus().equals(FAILED)) {
requestStatusResponse.response = taskObject.getRspObject();
}
}

return requestStatusResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -528,4 +530,33 @@ public void testNonexistentCoreReload() throws Exception {
e.getMessage());
admin.close();
}

@Test
public void testTrackedRequestExpiration() throws Exception {
// No delay to purge tracked tasks, and timeouts are set to 0, so we will try
// to purge tasks each time. This is to keep test execution super short
CoreAdminAsyncTracker asyncTracker = new CoreAdminAsyncTracker(0L, 0L, 0L);
try {
TaskObject task1 = new TaskObject("id1", "ACTION", false, SolrQueryResponse::new);
TaskObject task2 = new TaskObject("id2", "ACTION", false, SolrQueryResponse::new);

// Submit first task and wait until its status is changed to COMPLETED
asyncTracker.submitAsyncTask(task1);
while (!CoreAdminAsyncTracker.COMPLETED.equals(task1.getStatus())) {
Thread.sleep(10L);
}

assertEquals(
CoreAdminAsyncTracker.COMPLETED,
asyncTracker.getAsyncRequestForStatus("id1").getStatus());

// submit another task to force a purge of tracked requests.
// Check status of the first request is not available anymore
asyncTracker.submitAsyncTask(task2);
assertNull(asyncTracker.getAsyncRequestForStatus("id1"));

} finally {
asyncTracker.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Map;
import org.apache.solr.SolrTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -33,7 +33,6 @@
public class GetNodeCommandStatusTest extends SolrTestCase {
private CoreContainer mockCoreContainer;
private CoreAdminHandler.CoreAdminAsyncTracker mockAsyncTracker;
private CoreAdminHandler.CoreAdminAsyncTracker.TaskObject taskObject;
private GetNodeCommandStatusApi requestNodeCommandApi;

@BeforeClass
Expand All @@ -45,7 +44,6 @@ public static void ensureWorkingMockito() {
public void setupMocks() {
mockCoreContainer = mock(CoreContainer.class);
mockAsyncTracker = mock(CoreAdminHandler.CoreAdminAsyncTracker.class);
taskObject = new CoreAdminHandler.CoreAdminAsyncTracker.TaskObject(null, null, false, null);
requestNodeCommandApi =
new GetNodeCommandStatus(mockCoreContainer, mockAsyncTracker, null, null);
}
Expand Down Expand Up @@ -89,6 +87,9 @@ public void testReturnsStatusOfFailedCommandId() {
}

private void whenTaskExistsWithStatus(String taskId, String status) {
when(mockAsyncTracker.getRequestStatusMap(status)).thenReturn(Map.of(taskId, taskObject));
TaskObject taskObject = mock(TaskObject.class);
when(taskObject.getStatus()).thenReturn(status);

when(mockAsyncTracker.getAsyncRequestForStatus(taskId)).thenReturn(taskObject);
}
}
Loading