Skip to content

Commit

Permalink
feat(rest): make LockAndFetchHandler blocking queue's capacity config…
Browse files Browse the repository at this point in the history
…urable

Related to #4885
  • Loading branch information
joaquinfelici committed Jan 15, 2025
1 parent 27a6f35 commit 23c3e23
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">

<!-- Fetch And Lock Handler (long polling): Unique Worker Request (default value: false) -->
<!--
<!-- Fetch And Lock Handler (long polling): -->

<!-- Unique Worker Request (default value: false)
<context-param>
<param-name>fetch-and-lock-unique-worker-request</param-name>
<param-value>true</param-value>
</context-param>
-->

<!-- Queue capacity (default value: 200)
<context-param>
<param-name>fetch-and-lock-queue-capacity</param-name>
<param-value>250</param-value>
</context-param>
-->

<!-- rest bootstrap listener -->
<listener>
<listener-class>org.camunda.bpm.engine.rest.impl.web.bootstrap.RestContainerBootstrap</listener-class>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@

<display-name>Camunda Platform rest api</display-name>

<!-- Fetch And Lock Handler (long polling): Unique Worker Request (default value: false) -->
<!--
<!-- Fetch And Lock Handler (long polling): -->

<!-- Unique Worker Request (default value: false)
<context-param>
<param-name>fetch-and-lock-unique-worker-request</param-name>
<param-value>true</param-value>
</context-param>
-->

<!-- Queue capacity (default value: 200)
<context-param>
<param-name>fetch-and-lock-queue-capacity</param-name>
<param-value>250</param-value>
</context-param>
-->

<!-- rest bootstrap listener -->
<listener>
<listener-class>org.camunda.bpm.engine.rest.impl.web.bootstrap.RestContainerBootstrap</listener-class>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">

<!-- Fetch And Lock Handler (long polling): Unique Worker Request (default value: false) -->
<!--
<!-- Fetch And Lock Handler (long polling): -->

<!-- Unique Worker Request (default value: false)
<context-param>
<param-name>fetch-and-lock-unique-worker-request</param-name>
<param-value>true</param-value>
</context-param>
-->

<!-- Queue capacity (default value: 200)
<context-param>
<param-name>fetch-and-lock-queue-capacity</param-name>
<param-value>250</param-value>
</context-param>
-->

<!-- rest bootstrap listener -->
<listener>
<listener-class>org.camunda.bpm.engine.rest.impl.web.bootstrap.RestContainerBootstrap</listener-class>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">

<!-- Fetch And Lock Handler (long polling): Unique Worker Request (default value: false) -->
<!--
<!-- Fetch And Lock Handler (long polling): -->

<!-- Unique Worker Request (default value: false)
<context-param>
<param-name>fetch-and-lock-unique-worker-request</param-name>
<param-value>true</param-value>
</context-param>
-->

<!-- Queue capacity (default value: 200)
<context-param>
<param-name>fetch-and-lock-queue-capacity</param-name>
<param-value>250</param-value>
</context-param>
-->

<!-- rest bootstrap listener -->
<listener>
<listener-class>org.camunda.bpm.engine.rest.impl.web.bootstrap.RestContainerBootstrap</listener-class>
Expand Down
12 changes: 10 additions & 2 deletions engine-rest/assembly/src/main/runtime/was/webapp/WEB-INF/web.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@

<display-name>Camunda Platform REST API</display-name>

<!-- Fetch And Lock Handler (long polling): Unique Worker Request (default value: false) -->
<!--
<!-- Fetch And Lock Handler (long polling): -->

<!-- Unique Worker Request (default value: false)
<context-param>
<param-name>fetch-and-lock-unique-worker-request</param-name>
<param-value>true</param-value>
</context-param>
-->

<!-- Queue capacity (default value: 200)
<context-param>
<param-name>fetch-and-lock-queue-capacity</param-name>
<param-value>250</param-value>
</context-param>
-->

<!-- rest bootstrap listener -->
<listener>
<listener-class>org.camunda.bpm.engine.rest.impl.web.bootstrap.RestContainerBootstrap</listener-class>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@

<display-name>Camunda Platform rest api</display-name>

<!-- Fetch And Lock Handler (long polling): Unique Worker Request (default value: false) -->
<!--
<!-- Fetch And Lock Handler (long polling): -->

<!-- Unique Worker Request (default value: false)
<context-param>
<param-name>fetch-and-lock-unique-worker-request</param-name>
<param-value>true</param-value>
</context-param>
-->

<!-- Queue capacity (default value: 200)
<context-param>
<param-name>fetch-and-lock-queue-capacity</param-name>
<param-value>250</param-value>
</context-param>
-->

<!-- rest bootstrap listener -->
<listener>
<listener-class>org.camunda.bpm.engine.rest.impl.web.bootstrap.RestContainerBootstrap</listener-class>
Expand Down
12 changes: 10 additions & 2 deletions engine-rest/assembly/src/main/runtime/wls/webapp/WEB-INF/web.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@

<display-name>Camunda Platform rest api</display-name>

<!-- Fetch And Lock Handler (long polling): Unique Worker Request (default value: false) -->
<!--
<!-- Fetch And Lock Handler (long polling): -->

<!-- Unique Worker Request (default value: false)
<context-param>
<param-name>fetch-and-lock-unique-worker-request</param-name>
<param-value>true</param-value>
</context-param>
-->

<!-- Queue capacity (default value: 200)
<context-param>
<param-name>fetch-and-lock-queue-capacity</param-name>
<param-value>250</param-value>
</context-param>
-->

<!-- rest bootstrap listener -->
<listener>
<listener-class>org.camunda.bpm.engine.rest.impl.web.bootstrap.RestContainerBootstrap</listener-class>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ public class FetchAndLockHandlerImpl implements Runnable, FetchAndLockHandler {
private static final Logger LOG = Logger.getLogger(FetchAndLockHandlerImpl.class.getName());

protected static final String UNIQUE_WORKER_REQUEST_PARAM_NAME = "fetch-and-lock-unique-worker-request";
protected static final String BLOCKING_QUEUE_CAPACITY_PARAM_NAME = "fetch-and-lock-queue-capacity";

protected static final long PENDING_REQUEST_FETCH_INTERVAL = 30L * 1000;
protected static final long MAX_BACK_OFF_TIME = Long.MAX_VALUE;
protected static final long MAX_REQUEST_TIMEOUT = 1800000; // 30 minutes
protected static final int DEFAULT_BLOCKING_QUEUE_CAPACITY = 200;

protected SingleConsumerCondition condition;

protected BlockingQueue<FetchAndLockRequest> queue = new ArrayBlockingQueue<>(200);
protected BlockingQueue<FetchAndLockRequest> queue;
protected List<FetchAndLockRequest> pendingRequests = new ArrayList<>();
protected List<FetchAndLockRequest> newRequests = new ArrayList<>();

Expand Down Expand Up @@ -180,6 +182,9 @@ public void start() {

isRunning = true;
handlerThread.start();
if (queue == null) {
initializeQueue(DEFAULT_BLOCKING_QUEUE_CAPACITY);
}

ProcessEngineImpl.EXT_TASK_CONDITIONS.addConsumer(condition);
}
Expand Down Expand Up @@ -337,15 +342,19 @@ public void addPendingRequest(FetchExternalTasksExtendedDto dto, AsyncResponse a
}

public void contextInitialized(ServletContextEvent servletContextEvent) {
ServletContext servletContext = null;
ServletContext servletContext;
int queueCapacity = DEFAULT_BLOCKING_QUEUE_CAPACITY;

if (servletContextEvent != null) {
servletContext = servletContextEvent.getServletContext();

if (servletContext != null) {
parseUniqueWorkerRequestParam(servletContext.getInitParameter(UNIQUE_WORKER_REQUEST_PARAM_NAME));
queueCapacity = parseBlockingQueueCapacityParam(servletContext.getInitParameter(BLOCKING_QUEUE_CAPACITY_PARAM_NAME));
}
}

initializeQueue(queueCapacity);
}

protected void parseUniqueWorkerRequestParam(String uniqueWorkerRequestParam) {
Expand All @@ -356,6 +365,23 @@ protected void parseUniqueWorkerRequestParam(String uniqueWorkerRequestParam) {
}
}

protected void initializeQueue(int capacity) {
LOG.log(Level.FINEST, "Initializing queue with capacity [{0}]", capacity);
queue = new ArrayBlockingQueue<>(capacity);
}

private static int parseBlockingQueueCapacityParam(String queueSizeRequestParam) {
int capacity = DEFAULT_BLOCKING_QUEUE_CAPACITY;
if (queueSizeRequestParam != null) {
try {
capacity = Integer.parseInt(queueSizeRequestParam);
} catch (NumberFormatException e) {
LOG.log(Level.WARNING, "Invalid blocking queue capacity parameter: [" + queueSizeRequestParam + "], falling back to default value", e);
}
}
return capacity;
}

public List<FetchAndLockRequest> getPendingRequests() {
return pendingRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.camunda.bpm.engine.rest.impl;

import static org.camunda.bpm.engine.rest.impl.FetchAndLockHandlerImpl.BLOCKING_QUEUE_CAPACITY_PARAM_NAME;
import static org.camunda.bpm.engine.rest.impl.FetchAndLockHandlerImpl.DEFAULT_BLOCKING_QUEUE_CAPACITY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.ArgumentMatchers.anyBoolean;
Expand All @@ -37,6 +39,8 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response.Status;
import org.camunda.bpm.engine.ExternalTaskService;
Expand Down Expand Up @@ -82,6 +86,12 @@ public class FetchAndLockHandlerTest {
@Mock
protected FetchAndLockBuilder fetchAndLockBuilder;

@Mock
private ServletContext servletContext;

@Mock
private ServletContextEvent servletContextEvent;

@Spy
protected FetchAndLockHandlerImpl handler;

Expand Down Expand Up @@ -109,6 +119,9 @@ public void initMocks() {
doReturn(processEngine).when(handler).getProcessEngine(any(FetchAndLockRequest.class));

lockedExternalTaskMock = MockProvider.createMockLockedExternalTask();

when(servletContextEvent.getServletContext()).thenReturn(servletContext);
handler.contextInitialized(servletContextEvent);
}

@Before
Expand Down Expand Up @@ -428,6 +441,43 @@ public void shouldRejectRequestDueToShutdown() {
assertThat(argumentCaptor.getValue().getMessage(), is("Request rejected due to shutdown of application server."));
}

@Test
public void shouldInitialiseQueueWithSpecifiedParam() {
// given
String queueSizeParamValue = "5";
when(servletContext.getInitParameter(BLOCKING_QUEUE_CAPACITY_PARAM_NAME)).then(invocation -> queueSizeParamValue);

// when
handler.contextInitialized(servletContextEvent);

// then
assertThat(handler.queue.remainingCapacity(), is(5));
}

@Test
public void shouldInitialiseQueueWithDefaultCapacityWhenAbsentParam() {
// given no parameter

// when
handler.contextInitialized(servletContextEvent);

// then
assertThat(handler.queue.remainingCapacity(), is(DEFAULT_BLOCKING_QUEUE_CAPACITY));
}

@Test
public void shouldInitialiseQueueWithDefaultCapacityIfInvalidParam() {
// given
String queueSizeParamValue = "invalid";
when(servletContext.getInitParameter(BLOCKING_QUEUE_CAPACITY_PARAM_NAME)).then(invocation -> queueSizeParamValue);

// when
handler.contextInitialized(servletContextEvent);

// then
assertThat(handler.queue.remainingCapacity(), is(DEFAULT_BLOCKING_QUEUE_CAPACITY));
}

protected FetchExternalTasksExtendedDto createDto(Long responseTimeout, String workerId) {
FetchExternalTasksExtendedDto externalTask = new FetchExternalTasksExtendedDto();

Expand Down

0 comments on commit 23c3e23

Please sign in to comment.