diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index c7ec6e000c8..38231465f4d 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -150,6 +150,10 @@ Improvements * SOLR-16198: Introduce tabbed sections again in the Ref Guide. (Christos Malliaridis via Eric Pugh) +* SOLR-17160: Core Admin "async" request status tracking is no longer capped at 100; it's 10k. + Statuses are now removed 5 minutes after the read of a completed/failed status. Helps collection + async backup/restore and other operations scale to 100+ shards. (Pierre Salagnac, David Smiley) + Optimizations --------------------- * SOLR-17257: Both Minimize Cores and the Affinity replica placement strategies would over-gather diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index d2d823eb190..c35117fb088 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -21,20 +21,23 @@ import static org.apache.solr.security.PermissionNameProvider.Name.CORE_EDIT_PERM; import static org.apache.solr.security.PermissionNameProvider.Name.CORE_READ_PERM; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import com.github.benmanes.caffeine.cache.Ticker; import java.io.File; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.solr.api.AnnotatedApi; import org.apache.solr.api.Api; import org.apache.solr.api.JerseyResource; @@ -47,6 +50,7 @@ import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.EnvUtils; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrNamedThreadFactory; @@ -427,11 +431,19 @@ default boolean isExpensive() { } public static class CoreAdminAsyncTracker { - private static final int MAX_TRACKED_REQUESTS = 100; + /** + * Max number of requests we track in the Caffeine cache. This limit is super high on purpose, + * we're not supposed to hit it. This is just a protection to grow in memory too much when + * receiving an abusive number of admin requests. + */ + private static final int MAX_TRACKED_REQUESTS = + EnvUtils.getPropertyAsInteger("solr.admin.async.max", 10_000); + public static final String RUNNING = "running"; public static final String COMPLETED = "completed"; public static final String FAILED = "failed"; - public final Map> requestStatusMap; + + private final Cache requestStatusCache; // key by ID // Executor for all standard tasks (the ones that are not flagged as expensive) // We always keep 50 live threads @@ -440,7 +452,7 @@ public static class CoreAdminAsyncTracker { 50, new SolrNamedThreadFactory("parallelCoreAdminAPIBaseExecutor")); // Executor for expensive tasks - // We keep the number number of max threads very low to have throttling for expensive tasks + // We keep the number of max threads very low to have throttling for expensive tasks private ExecutorService expensiveExecutor = ExecutorUtil.newMDCAwareCachedThreadPool( 5, @@ -448,11 +460,28 @@ public static class CoreAdminAsyncTracker { new SolrNamedThreadFactory("parallelCoreAdminAPIExpensiveExecutor")); public CoreAdminAsyncTracker() { - HashMap> map = new HashMap<>(3, 1.0f); - map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<>())); - map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<>())); - map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<>())); - requestStatusMap = Collections.unmodifiableMap(map); + this( + Ticker.systemTicker(), + TimeUnit.MINUTES.toNanos( + EnvUtils.getPropertyAsLong("solr.admin.async.timeout.minutes", 60L)), + TimeUnit.MINUTES.toNanos( + EnvUtils.getPropertyAsLong("solr.admin.async.timeout.completed.minutes", 5L))); + } + + /** + * @param runningTimeoutNanos The time-to-keep for tasks in the RUNNING state. + * @param completedTimeoutNanos The time-to-keep for tasks in the COMPLETED or FAILED state + * after the status was polled. + */ + CoreAdminAsyncTracker(Ticker ticker, long runningTimeoutNanos, long completedTimeoutNanos) { + + TaskExpiry expiry = new TaskExpiry(runningTimeoutNanos, completedTimeoutNanos); + requestStatusCache = + Caffeine.newBuilder() + .ticker(ticker) + .maximumSize(MAX_TRACKED_REQUESTS) + .expireAfter(expiry) + .build(); } public void shutdown() { @@ -460,13 +489,22 @@ public void shutdown() { ExecutorUtil.shutdownAndAwaitTermination(expensiveExecutor); } - public Map getRequestStatusMap(String key) { - return requestStatusMap.get(key); + public TaskObject getAsyncRequestForStatus(String key) { + TaskObject task = requestStatusCache.getIfPresent(key); + + if (task != null && !RUNNING.equals(task.status) && !task.polledAfterCompletion) { + task.polledAfterCompletion = true; + // At the first time we retrieve the status of a completed request, do a second lookup in + // the cache. This is necessary to update the TTL of this request in the cache. + // Unfortunately, we can't force the expiration time to be refreshed without a lookup. + requestStatusCache.getIfPresent(key); + } + + return task; } public void submitAsyncTask(TaskObject taskObject) throws SolrException { - ensureTaskIdNotInUse(taskObject.taskId); - addTask(RUNNING, taskObject); + addTask(taskObject); Runnable command = () -> { @@ -497,42 +535,26 @@ public void submitAsyncTask(TaskObject taskObject) throws SolrException { } } - /** Helper method to add a task to a tracking type. */ - private void addTask(String type, TaskObject o, boolean limit) { - synchronized (getRequestStatusMap(type)) { - if (limit && getRequestStatusMap(type).size() == MAX_TRACKED_REQUESTS) { - String key = getRequestStatusMap(type).entrySet().iterator().next().getKey(); - getRequestStatusMap(type).remove(key); - } - addTask(type, o); - } - } - - private void addTask(String type, TaskObject o) { - synchronized (getRequestStatusMap(type)) { - getRequestStatusMap(type).put(o.taskId, o); - } - } - - /** Helper method to remove a task from a tracking map. */ - private void removeTask(String map, String taskId) { - synchronized (getRequestStatusMap(map)) { - getRequestStatusMap(map).remove(taskId); - } - } - - private void ensureTaskIdNotInUse(String taskId) throws SolrException { - if (getRequestStatusMap(RUNNING).containsKey(taskId) - || getRequestStatusMap(COMPLETED).containsKey(taskId) - || getRequestStatusMap(FAILED).containsKey(taskId)) { + private void addTask(TaskObject taskObject) { + // Ensure task ID is not already in use + TaskObject taskInCache = + requestStatusCache.get( + taskObject.taskId, + n -> { + taskObject.status = RUNNING; + return taskObject; + }); + + // If we get a different task instance, it means one was already in the cache with the + // same name. Just reject the new one. + if (taskInCache != taskObject) { throw new SolrException( ErrorCode.BAD_REQUEST, "Duplicate request with the same requestid found."); } } private void finishTask(TaskObject taskObject, boolean successful) { - removeTask(RUNNING, taskObject.taskId); - addTask(successful ? COMPLETED : FAILED, taskObject, true); + taskObject.status = successful ? COMPLETED : FAILED; } /** @@ -546,6 +568,13 @@ public static class TaskObject { final Callable task; public String rspInfo; public Object operationRspInfo; + private volatile String status; + + /** + * Flag set to true once the task is complete (can be in error) and the status was polled + * already once. Once set, the time we keep the task status is shortened. + */ + private volatile boolean polledAfterCompletion; public TaskObject( String taskId, String action, boolean expensive, Callable task) { @@ -574,6 +603,42 @@ public Object getOperationRspObject() { public void setOperationRspObject(SolrQueryResponse rspObject) { this.operationRspInfo = rspObject.getResponse(); } + + public String getStatus() { + return status; + } + } + + /** + * Expiration policy for Caffeine cache. Depending on whether the status of a completed task was + * already retrieved, we return {@link #runningTimeoutNanos} or {@link #completedTimeoutNanos}. + */ + private static class TaskExpiry implements Expiry { + + private final long runningTimeoutNanos; + private final long completedTimeoutNanos; + + private TaskExpiry(long runningTimeoutNanos, long completedTimeoutNanos) { + this.runningTimeoutNanos = runningTimeoutNanos; + this.completedTimeoutNanos = completedTimeoutNanos; + } + + @Override + public long expireAfterCreate(String key, TaskObject task, long currentTime) { + return runningTimeoutNanos; + } + + @Override + public long expireAfterUpdate( + String key, TaskObject task, long currentTime, long currentDuration) { + return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos; + } + + @Override + public long expireAfterRead( + String key, TaskObject task, long currentTime, long currentDuration) { + return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos; + } } } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/GetNodeCommandStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/api/GetNodeCommandStatus.java index c6627d012e3..258e83e16a0 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/GetNodeCommandStatus.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/GetNodeCommandStatus.java @@ -18,7 +18,6 @@ import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED; import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.FAILED; -import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.RUNNING; import jakarta.inject.Inject; import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi; @@ -26,6 +25,7 @@ import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.core.CoreContainer; import org.apache.solr.handler.admin.CoreAdminHandler; +import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject; import org.apache.solr.jersey.PermissionName; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; @@ -51,25 +51,24 @@ public GetNodeCommandStatus( public GetNodeCommandStatusResponse getCommandStatus(String requestId) { ensureRequiredParameterProvided(CoreAdminParams.REQUESTID, requestId); var requestStatusResponse = new GetNodeCommandStatusResponse(); - if (coreAdminAsyncTracker.getRequestStatusMap(RUNNING).containsKey(requestId)) { - requestStatusResponse.status = RUNNING; - } else if (coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).containsKey(requestId)) { - requestStatusResponse.status = COMPLETED; - requestStatusResponse.response = - coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).get(requestId).getRspObject(); - requestStatusResponse.response = - coreAdminAsyncTracker - .getRequestStatusMap(COMPLETED) - .get(requestId) - .getOperationRspObject(); - } else if (coreAdminAsyncTracker.getRequestStatusMap(FAILED).containsKey(requestId)) { - requestStatusResponse.status = FAILED; - requestStatusResponse.response = - coreAdminAsyncTracker.getRequestStatusMap(FAILED).get(requestId).getRspObject(); - } else { + + TaskObject taskObject = coreAdminAsyncTracker.getAsyncRequestForStatus(requestId); + String status = taskObject != null ? taskObject.getStatus() : null; + + if (status == null) { requestStatusResponse.status = "notfound"; requestStatusResponse.msg = "No task found in running, completed or failed tasks"; + } else { + requestStatusResponse.status = status; + + if (status.equals(COMPLETED)) { + requestStatusResponse.response = taskObject.getRspObject(); + requestStatusResponse.response = taskObject.getOperationRspObject(); + } else if (status.equals(FAILED)) { + requestStatusResponse.response = taskObject.getRspObject(); + } } + return requestStatusResponse; } } diff --git a/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminHandlerTest.java index 8a1bafde030..2dd9fce1224 100644 --- a/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminHandlerTest.java +++ b/solr/core/src/test/org/apache/solr/handler/admin/CoreAdminHandlerTest.java @@ -16,6 +16,8 @@ */ package org.apache.solr.handler.admin; +import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED; + import java.io.Reader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -24,7 +26,9 @@ import java.nio.file.StandardCopyOption; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.file.PathUtils; import org.apache.lucene.util.Constants; import org.apache.solr.SolrTestCaseJ4; @@ -44,6 +48,8 @@ import org.apache.solr.core.SolrCore; import org.apache.solr.embedded.JettyConfig; import org.apache.solr.embedded.JettySolrRunner; +import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker; +import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject; import org.apache.solr.response.SolrQueryResponse; import org.junit.BeforeClass; import org.junit.Test; @@ -532,4 +538,64 @@ public void testNonexistentCoreReload() throws Exception { e.getMessage()); admin.close(); } + + @Test + public void testTrackedRequestExpiration() throws Exception { + // Create a tracker with controlled clock, relative to 0 + AtomicLong clock = new AtomicLong(0L); + CoreAdminAsyncTracker asyncTracker = new CoreAdminAsyncTracker(clock::get, 100L, 10L); + try { + Set tasks = + Set.of( + new TaskObject("id1", "ACTION", false, SolrQueryResponse::new), + new TaskObject("id2", "ACTION", false, SolrQueryResponse::new)); + + // Submit all tasks and wait for internal status to be COMPLETED + tasks.forEach(asyncTracker::submitAsyncTask); + while (!tasks.stream().allMatch(t -> COMPLETED.equals(t.getStatus()))) { + Thread.sleep(10L); + } + + // Timeout for running tasks is 100n, so status can be retrieved after 20n. + // But timeout for complete tasks is 10n once we polled the status at least once, so status + // is not available anymore 20n later. + clock.set(20); + assertEquals(COMPLETED, asyncTracker.getAsyncRequestForStatus("id1").getStatus()); + clock.set(40L); + assertNull(asyncTracker.getAsyncRequestForStatus("id1")); + + // Move the clock after the running timeout. + // Status of second task is not available anymore, even if it wasn't retrieved yet + clock.set(110L); + assertNull(asyncTracker.getAsyncRequestForStatus("id2")); + + } finally { + asyncTracker.shutdown(); + } + } + + /** Check we reject a task is the async ID already exists. */ + @Test + public void testDuplicatedRequestId() { + + // Different tasks but with same ID + TaskObject task1 = new TaskObject("id1", "ACTION", false, null); + TaskObject task2 = new TaskObject("id1", "ACTION", false, null); + + CoreAdminAsyncTracker asyncTracker = new CoreAdminAsyncTracker(); + try { + asyncTracker.submitAsyncTask(task1); + try { + asyncTracker.submitAsyncTask(task2); + fail("Task should have been rejected."); + } catch (SolrException e) { + assertEquals("Duplicate request with the same requestid found.", e.getMessage()); + } + + assertNotNull(task1.getStatus()); + assertNull(task2.getStatus()); + } finally { + asyncTracker.shutdown(); + } + } } diff --git a/solr/core/src/test/org/apache/solr/handler/admin/api/GetNodeCommandStatusTest.java b/solr/core/src/test/org/apache/solr/handler/admin/api/GetNodeCommandStatusTest.java index 4a52383bf20..b22d3d4d4ec 100644 --- a/solr/core/src/test/org/apache/solr/handler/admin/api/GetNodeCommandStatusTest.java +++ b/solr/core/src/test/org/apache/solr/handler/admin/api/GetNodeCommandStatusTest.java @@ -19,12 +19,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.Map; import org.apache.solr.SolrTestCase; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi; import org.apache.solr.core.CoreContainer; import org.apache.solr.handler.admin.CoreAdminHandler; +import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -33,7 +33,6 @@ public class GetNodeCommandStatusTest extends SolrTestCase { private CoreContainer mockCoreContainer; private CoreAdminHandler.CoreAdminAsyncTracker mockAsyncTracker; - private CoreAdminHandler.CoreAdminAsyncTracker.TaskObject taskObject; private GetNodeCommandStatusApi requestNodeCommandApi; @BeforeClass @@ -45,7 +44,6 @@ public static void ensureWorkingMockito() { public void setupMocks() { mockCoreContainer = mock(CoreContainer.class); mockAsyncTracker = mock(CoreAdminHandler.CoreAdminAsyncTracker.class); - taskObject = new CoreAdminHandler.CoreAdminAsyncTracker.TaskObject(null, null, false, null); requestNodeCommandApi = new GetNodeCommandStatus(mockCoreContainer, mockAsyncTracker, null, null); } @@ -89,6 +87,9 @@ public void testReturnsStatusOfFailedCommandId() { } private void whenTaskExistsWithStatus(String taskId, String status) { - when(mockAsyncTracker.getRequestStatusMap(status)).thenReturn(Map.of(taskId, taskObject)); + TaskObject taskObject = mock(TaskObject.class); + when(taskObject.getStatus()).thenReturn(status); + + when(mockAsyncTracker.getAsyncRequestForStatus(taskId)).thenReturn(taskObject); } } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/EnvUtils.java b/solr/solrj/src/java/org/apache/solr/common/util/EnvUtils.java index 40bb7c0fdb3..6adac222ab7 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/EnvUtils.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/EnvUtils.java @@ -111,6 +111,20 @@ private static String camelCaseToDotSeparated(String key) { } /** Get property as integer */ + public static Integer getPropertyAsInteger(String key) { + return getPropertyAsInteger(key, null); + } + + /** Get property as integer, or default value */ + public static Integer getPropertyAsInteger(String key, Integer defaultValue) { + String value = getProperty(key); + if (value == null) { + return defaultValue; + } + return Integer.parseInt(value); + } + + /** Get property as long */ public static Long getPropertyAsLong(String key) { return getPropertyAsLong(key, null); }