diff --git a/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/dto/externaltask/ExternalTaskQueryDto.java b/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/dto/externaltask/ExternalTaskQueryDto.java index f0802d58792..e13cf2cd63f 100644 --- a/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/dto/externaltask/ExternalTaskQueryDto.java +++ b/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/dto/externaltask/ExternalTaskQueryDto.java @@ -16,51 +16,39 @@ */ package org.camunda.bpm.engine.rest.dto.externaltask; -import java.util.ArrayList; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Set; - +import java.util.function.Consumer; import javax.ws.rs.core.MultivaluedMap; - import org.camunda.bpm.engine.ProcessEngine; import org.camunda.bpm.engine.externaltask.ExternalTaskQuery; import org.camunda.bpm.engine.rest.dto.AbstractQueryDto; import org.camunda.bpm.engine.rest.dto.CamundaQueryParam; import org.camunda.bpm.engine.rest.dto.converter.BooleanConverter; import org.camunda.bpm.engine.rest.dto.converter.DateConverter; +import org.camunda.bpm.engine.rest.dto.converter.LongConverter; import org.camunda.bpm.engine.rest.dto.converter.StringListConverter; import org.camunda.bpm.engine.rest.dto.converter.StringSetConverter; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.camunda.bpm.engine.rest.dto.converter.LongConverter; - /** * @author Thorben Lindhauer * */ public class ExternalTaskQueryDto extends AbstractQueryDto { - public static final String SORT_BY_ID_VALUE = "id"; - public static final String SORT_BY_LOCK_EXPIRATION_TIME = "lockExpirationTime"; - public static final String SORT_BY_PROCESS_INSTANCE_ID = "processInstanceId"; - public static final String SORT_BY_PROCESS_DEFINITION_ID = "processDefinitionId"; - public static final String SORT_BY_PROCESS_DEFINITION_KEY = "processDefinitionKey"; - public static final String SORT_BY_TENANT_ID = "tenantId"; - public static final String SORT_BY_PRIORITY = "taskPriority"; - - public static final List VALID_SORT_BY_VALUES; - static { - VALID_SORT_BY_VALUES = new ArrayList<>(); - VALID_SORT_BY_VALUES.add(SORT_BY_ID_VALUE); - VALID_SORT_BY_VALUES.add(SORT_BY_LOCK_EXPIRATION_TIME); - VALID_SORT_BY_VALUES.add(SORT_BY_PROCESS_INSTANCE_ID); - VALID_SORT_BY_VALUES.add(SORT_BY_PROCESS_DEFINITION_ID); - VALID_SORT_BY_VALUES.add(SORT_BY_PROCESS_DEFINITION_KEY); - VALID_SORT_BY_VALUES.add(SORT_BY_TENANT_ID); - VALID_SORT_BY_VALUES.add(SORT_BY_PRIORITY); - } + public static final Map> SORT_METHODS_BY_FIELD = Map.of( + "id", ExternalTaskQuery::orderById, + "lockExpirationTime", ExternalTaskQuery::orderByLockExpirationTime, + "processInstanceId", ExternalTaskQuery::orderByProcessInstanceId, + "processDefinitionId", ExternalTaskQuery::orderByProcessDefinitionId, + "processDefinitionKey", ExternalTaskQuery::orderByProcessDefinitionKey, + "tenantId", ExternalTaskQuery::orderByTenantId, + "taskPriority", ExternalTaskQuery::orderByPriority, + "createTime", ExternalTaskQuery::orderByCreateTime + ); protected String externalTaskId; protected Set externalTaskIds; @@ -201,7 +189,7 @@ public void setPriorityLowerThanOrEquals(Long priorityLowerThanOrEquals) { @Override protected boolean isValidSortByValue(String value) { - return VALID_SORT_BY_VALUES.contains(value); + return SORT_METHODS_BY_FIELD.containsKey(value); } @Override @@ -278,24 +266,10 @@ protected void applyFilters(ExternalTaskQuery query) { @Override protected void applySortBy(ExternalTaskQuery query, String sortBy, Map parameters, ProcessEngine engine) { - if (SORT_BY_ID_VALUE.equals(sortBy)) { - query.orderById(); - } - else if (SORT_BY_LOCK_EXPIRATION_TIME.equals(sortBy)) { - query.orderByLockExpirationTime(); - } - else if (SORT_BY_PROCESS_DEFINITION_ID.equals(sortBy)) { - query.orderByProcessDefinitionId(); - } - else if (SORT_BY_PROCESS_DEFINITION_KEY.equals(sortBy)) { - query.orderByProcessDefinitionKey(); - } - else if (SORT_BY_PROCESS_INSTANCE_ID.equals(sortBy)) { - query.orderByProcessInstanceId(); - } else if (sortBy.equals(SORT_BY_TENANT_ID)) { - query.orderByTenantId(); - } else if (sortBy.equals(SORT_BY_PRIORITY)) { - query.orderByPriority(); + var sortByMethod = SORT_METHODS_BY_FIELD.get(sortBy); + + if (sortByMethod != null) { + sortByMethod.accept(query); } } } diff --git a/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/dto/externaltask/FetchExternalTasksDto.java b/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/dto/externaltask/FetchExternalTasksDto.java index f0ffd5a737d..e319a8e57ef 100644 --- a/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/dto/externaltask/FetchExternalTasksDto.java +++ b/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/dto/externaltask/FetchExternalTasksDto.java @@ -16,14 +16,20 @@ */ package org.camunda.bpm.engine.rest.dto.externaltask; -import org.camunda.bpm.engine.ProcessEngine; -import org.camunda.bpm.engine.externaltask.ExternalTaskQueryBuilder; -import org.camunda.bpm.engine.externaltask.ExternalTaskQueryTopicBuilder; - import static java.lang.Boolean.TRUE; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import org.camunda.bpm.engine.ExternalTaskService; +import org.camunda.bpm.engine.ProcessEngine; +import org.camunda.bpm.engine.externaltask.ExternalTaskQueryTopicBuilder; +import org.camunda.bpm.engine.externaltask.FetchAndLockBuilder; +import org.camunda.bpm.engine.impl.util.CollectionUtil; +import org.camunda.bpm.engine.rest.dto.SortingDto; /** * @author Thorben Lindhauer @@ -37,21 +43,28 @@ public class FetchExternalTasksDto { protected List topics; protected boolean includeExtensionProperties = false; + protected List sortings; + public int getMaxTasks() { return maxTasks; } + public void setMaxTasks(int maxTasks) { this.maxTasks = maxTasks; } + public String getWorkerId() { return workerId; } + public void setWorkerId(String workerId) { this.workerId = workerId; } + public List getTopics() { return topics; } + public void setTopics(List topics) { this.topics = topics; } @@ -64,6 +77,14 @@ public void setUsePriority(boolean usePriority) { this.usePriority = usePriority; } + public void setSortings(List sortings) { + this.sortings = sortings; + } + + public List getSortings() { + return this.sortings; + } + public boolean isIncludeExtensionProperties() { return includeExtensionProperties; } @@ -182,73 +203,136 @@ public void setIncludeExtensionProperties(boolean includeExtensionProperties) { } } - public ExternalTaskQueryBuilder buildQuery(ProcessEngine processEngine) { - ExternalTaskQueryBuilder fetchBuilder = processEngine - .getExternalTaskService() - .fetchAndLock(getMaxTasks(), getWorkerId(), isUsePriority()); + public ExternalTaskQueryTopicBuilder buildQuery(ProcessEngine processEngine) { + FetchAndLockBuilder fetchAndLockBuilder = getBuilder(processEngine); - if (getTopics() != null) { - for (FetchExternalTaskTopicDto topicDto : getTopics()) { - ExternalTaskQueryTopicBuilder topicFetchBuilder = - fetchBuilder.topic(topicDto.getTopicName(), topicDto.getLockDuration()); + return configureTopics(fetchAndLockBuilder); + } - if (topicDto.getBusinessKey() != null) { - topicFetchBuilder = topicFetchBuilder.businessKey(topicDto.getBusinessKey()); - } + protected ExternalTaskQueryTopicBuilder configureTopics(FetchAndLockBuilder builder) { + ExternalTaskQueryTopicBuilder topicBuilder = builder.subscribe(); - if (topicDto.getProcessDefinitionId() != null) { - topicFetchBuilder.processDefinitionId(topicDto.getProcessDefinitionId()); - } + if (CollectionUtil.isEmpty(topics)) { + return topicBuilder; + } - if (topicDto.getProcessDefinitionIdIn() != null) { - topicFetchBuilder.processDefinitionIdIn(topicDto.getProcessDefinitionIdIn()); - } + topics.forEach(topic -> { + topicBuilder.topic(topic.getTopicName(), topic.getLockDuration()); - if (topicDto.getProcessDefinitionKey() != null) { - topicFetchBuilder.processDefinitionKey(topicDto.getProcessDefinitionKey()); - } + if (topic.getBusinessKey() != null) { + topicBuilder.businessKey(topic.getBusinessKey()); + } - if (topicDto.getProcessDefinitionKeyIn() != null) { - topicFetchBuilder.processDefinitionKeyIn(topicDto.getProcessDefinitionKeyIn()); - } + if (topic.getProcessDefinitionId() != null) { + topicBuilder.processDefinitionId(topic.getProcessDefinitionId()); + } - if (topicDto.getVariables() != null) { - topicFetchBuilder = topicFetchBuilder.variables(topicDto.getVariables()); - } + if (topic.getProcessDefinitionIdIn() != null) { + topicBuilder.processDefinitionIdIn(topic.getProcessDefinitionIdIn()); + } - if (topicDto.getProcessVariables() != null) { - topicFetchBuilder = topicFetchBuilder.processInstanceVariableEquals(topicDto.getProcessVariables()); - } + if (topic.getProcessDefinitionKey() != null) { + topicBuilder.processDefinitionKey(topic.getProcessDefinitionKey()); + } - if (topicDto.isDeserializeValues()) { - topicFetchBuilder = topicFetchBuilder.enableCustomObjectDeserialization(); - } + if (topic.getProcessDefinitionKeyIn() != null) { + topicBuilder.processDefinitionKeyIn(topic.getProcessDefinitionKeyIn()); + } - if (topicDto.isLocalVariables()) { - topicFetchBuilder = topicFetchBuilder.localVariables(); - } + if (topic.getVariables() != null) { + topicBuilder.variables(topic.getVariables()); + } - if (TRUE.equals(topicDto.isWithoutTenantId())) { - topicFetchBuilder = topicFetchBuilder.withoutTenantId(); - } + if (topic.getProcessVariables() != null) { + topicBuilder.processInstanceVariableEquals(topic.getProcessVariables()); + } - if (topicDto.getTenantIdIn() != null) { - topicFetchBuilder = topicFetchBuilder.tenantIdIn(topicDto.getTenantIdIn()); - } + if (topic.isDeserializeValues()) { + topicBuilder.enableCustomObjectDeserialization(); + } - if(topicDto.getProcessDefinitionVersionTag() != null) { - topicFetchBuilder = topicFetchBuilder.processDefinitionVersionTag(topicDto.getProcessDefinitionVersionTag()); - } + if (topic.isLocalVariables()) { + topicBuilder.localVariables(); + } + + if (TRUE.equals(topic.isWithoutTenantId())) { + topicBuilder.withoutTenantId(); + } - if(topicDto.isIncludeExtensionProperties()) { - topicFetchBuilder = topicFetchBuilder.includeExtensionProperties(); - } + if (topic.getTenantIdIn() != null) { + topicBuilder.tenantIdIn(topic.getTenantIdIn()); + } + + if(topic.getProcessDefinitionVersionTag() != null) { + topicBuilder.processDefinitionVersionTag(topic.getProcessDefinitionVersionTag()); + } - fetchBuilder = topicFetchBuilder; + if(topic.isIncludeExtensionProperties()) { + topicBuilder.includeExtensionProperties(); } + }); + + return topicBuilder; + } + + protected FetchAndLockBuilder getBuilder(ProcessEngine engine) { + ExternalTaskService service = engine.getExternalTaskService(); + + FetchAndLockBuilder builder = service.fetchAndLock() + .workerId(workerId) + .maxTasks(maxTasks) + .usePriority(usePriority); + + SortMapper mapper = new SortMapper(sortings, builder); + + return mapper.getBuilderWithSortConfigs(); + } + + /** + * Encapsulates the mapping of sorting configurations (field, order) to the respective methods builder config methods + * and applies them. + *

+ * To achieve that, maps are used internally to map fields and orders to the corresponding builder method. + * It works with case-insensitive orders (e.g will work with "asc", "ASC"). + */ + static class SortMapper { + + protected static Map> FIELD_MAPPINGS = Map.of( + "createTime", FetchAndLockBuilder::orderByCreateTime + ); + + protected static Map> ORDER_MAPPINGS = Map.of( + "asc", FetchAndLockBuilder::asc, + "desc", FetchAndLockBuilder::desc + ); + + protected final List sortings; + protected final FetchAndLockBuilder builder; + + protected SortMapper(List sortings, FetchAndLockBuilder builder) { + this.sortings = (sortings == null) ? Collections.emptyList() : sortings; + this.builder = builder; } - return fetchBuilder; + /** + * Applies the sorting field mappings to the builder and returns it. + */ + protected FetchAndLockBuilder getBuilderWithSortConfigs() { + sortings.forEach(dto -> { + fieldMappingKey(dto).ifPresent(key -> FIELD_MAPPINGS.get(key).accept(builder)); + orderMappingKey(dto).ifPresent(key -> ORDER_MAPPINGS.get(key).accept(builder)); + }); + + return builder; + } + + protected Optional fieldMappingKey(SortingDto dto) { + return Optional.ofNullable(dto.getSortBy()); + } + + protected Optional orderMappingKey(SortingDto dto) { + return Optional.ofNullable(dto.getSortOrder()); + } } } diff --git a/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerImpl.java b/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerImpl.java index 50089c94e60..ebe5ae1f0ae 100644 --- a/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerImpl.java +++ b/engine-rest/engine-rest/src/main/java/org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerImpl.java @@ -23,15 +23,13 @@ import java.util.concurrent.BlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; - import javax.servlet.ServletContext; import javax.servlet.ServletContextEvent; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response.Status; - import org.camunda.bpm.engine.IdentityService; import org.camunda.bpm.engine.ProcessEngine; -import org.camunda.bpm.engine.externaltask.ExternalTaskQueryBuilder; +import org.camunda.bpm.engine.externaltask.ExternalTaskQueryTopicBuilder; import org.camunda.bpm.engine.externaltask.LockedExternalTask; import org.camunda.bpm.engine.impl.ProcessEngineImpl; import org.camunda.bpm.engine.impl.identity.Authentication; @@ -264,8 +262,9 @@ protected FetchAndLockResult tryFetchAndLock(FetchAndLockRequest request) { } protected List executeFetchAndLock(FetchExternalTasksExtendedDto fetchingDto, ProcessEngine processEngine) { - ExternalTaskQueryBuilder fetchBuilder = fetchingDto.buildQuery(processEngine); + ExternalTaskQueryTopicBuilder fetchBuilder = fetchingDto.buildQuery(processEngine); List externalTasks = fetchBuilder.execute(); + return LockedExternalTaskDto.fromLockedExternalTasks(externalTasks); } diff --git a/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/ExternalTaskRestServiceInteractionTest.java b/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/ExternalTaskRestServiceInteractionTest.java index 629ba12d5ff..8e87d249e98 100644 --- a/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/ExternalTaskRestServiceInteractionTest.java +++ b/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/ExternalTaskRestServiceInteractionTest.java @@ -17,35 +17,36 @@ package org.camunda.bpm.engine.rest; import static io.restassured.RestAssured.given; -import static org.camunda.bpm.engine.rest.helper.MockProvider.createMockBatch; import static org.assertj.core.api.Assertions.assertThat; +import static org.camunda.bpm.engine.rest.helper.MockProvider.EXAMPLE_PRIMITIVE_VARIABLE_VALUE; +import static org.camunda.bpm.engine.rest.helper.MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME; +import static org.camunda.bpm.engine.rest.helper.MockProvider.createMockBatch; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.anyLong; -import static org.mockito.hamcrest.MockitoHamcrest.argThat; -import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.never; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; +import io.restassured.http.ContentType; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - import javax.servlet.ServletContextEvent; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; - import org.camunda.bpm.engine.AuthorizationException; import org.camunda.bpm.engine.BadUserRequestException; import org.camunda.bpm.engine.ExternalTaskService; @@ -63,6 +64,8 @@ import org.camunda.bpm.engine.impl.HistoryServiceImpl; import org.camunda.bpm.engine.impl.ProcessInstanceQueryImpl; import org.camunda.bpm.engine.impl.RuntimeServiceImpl; +import org.camunda.bpm.engine.impl.externaltask.FetchAndLockBuilderImpl; +import org.camunda.bpm.engine.rest.dto.SortingDto; import org.camunda.bpm.engine.rest.dto.externaltask.ExternalTaskQueryDto; import org.camunda.bpm.engine.rest.dto.history.HistoricProcessInstanceQueryDto; import org.camunda.bpm.engine.rest.dto.runtime.ProcessInstanceQueryDto; @@ -84,8 +87,6 @@ import org.mockito.InOrder; import org.mockito.Mockito; -import io.restassured.http.ContentType; - /** * @author Thorben Lindhauer * @@ -117,6 +118,7 @@ public class ExternalTaskRestServiceInteractionTest extends AbstractRestServiceT protected LockedExternalTask lockedExternalTaskMock; protected ExternalTaskQueryTopicBuilder fetchTopicBuilder; + protected FetchAndLockBuilderImpl fetchAndLockBuilder; protected ExternalTask externalTaskMock; protected ExternalTaskQuery externalTaskQueryMock; @@ -159,6 +161,18 @@ public void setUpRuntimeData() { when(fetchTopicBuilder.tenantIdIn(Mockito.any())).thenReturn(fetchTopicBuilder); when(fetchTopicBuilder.includeExtensionProperties()).thenReturn(fetchTopicBuilder); + fetchAndLockBuilder = mock(FetchAndLockBuilderImpl.class); + when(fetchAndLockBuilder.orderByCreateTime()).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.asc()).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.desc()).thenReturn(fetchAndLockBuilder); + + when(fetchAndLockBuilder.workerId(anyString())).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.maxTasks(anyInt())).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.usePriority(anyBoolean())).thenReturn(fetchAndLockBuilder); + + when(fetchAndLockBuilder.subscribe()).thenReturn(fetchTopicBuilder); + when(externalTaskService.fetchAndLock()).thenReturn(fetchAndLockBuilder); + Batch batch = createMockBatch(); updateRetriesBuilder = mock(UpdateExternalTaskRetriesBuilder.class); when(externalTaskService.updateRetries()).thenReturn(updateRetriesBuilder); @@ -195,17 +209,25 @@ public void testFetchAndLock() { Map topicParameter = new HashMap<>(); topicParameter.put("topicName", "aTopicName"); topicParameter.put("lockDuration", 12354L); - topicParameter.put("variables", Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + topicParameter.put("variables", Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); parameters.put("topics", Arrays.asList(topicParameter)); executePost(parameters); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", true); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(true); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); - inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchTopicBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -223,18 +245,26 @@ public void testFetchAndLockWithBusinessKey() { topicParameter.put("topicName", "aTopicName"); topicParameter.put("businessKey", EXAMPLE_BUSINESS_KEY); topicParameter.put("lockDuration", 12354L); - topicParameter.put("variables", Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + topicParameter.put("variables", Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); parameters.put("topics", Arrays.asList(topicParameter)); executePost(parameters); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", true); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(true); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).businessKey(EXAMPLE_BUSINESS_KEY); - inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -259,15 +289,23 @@ public void testFetchAndLockWithProcessDefinition() { executePost(parameters); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", true); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(true); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).processDefinitionId(EXAMPLE_PROCESS_DEFINITION_ID); inOrder.verify(fetchTopicBuilder).processDefinitionIdIn(EXAMPLE_PROCESS_DEFINITION_ID); inOrder.verify(fetchTopicBuilder).processDefinitionKey(EXAMPLE_PROCESS_DEFINITION_KEY); inOrder.verify(fetchTopicBuilder).processDefinitionKeyIn(EXAMPLE_PROCESS_DEFINITION_KEY); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -275,7 +313,6 @@ public void testFetchAndLockWithVariableValue() { // given when(fetchTopicBuilder.execute()).thenReturn(Arrays.asList(lockedExternalTaskMock)); - // when Map parameters = new HashMap<>(); parameters.put("maxTasks", 5); parameters.put("workerId", "aWorkerId"); @@ -285,24 +322,177 @@ public void testFetchAndLockWithVariableValue() { topicParameter.put("topicName", "aTopicName"); topicParameter.put("businessKey", EXAMPLE_BUSINESS_KEY); topicParameter.put("lockDuration", 12354L); - topicParameter.put("variables", Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + topicParameter.put("variables", Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); Map variableValueParameter = new HashMap<>(); - variableValueParameter.put(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME, MockProvider.EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue()); + variableValueParameter.put(EXAMPLE_VARIABLE_INSTANCE_NAME, EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue()); topicParameter.put("processVariables", variableValueParameter); parameters.put("topics", Arrays.asList(topicParameter)); + // when executePost(parameters); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", true); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(true); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).businessKey(EXAMPLE_BUSINESS_KEY); - inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); inOrder.verify(fetchTopicBuilder).processInstanceVariableEquals(variableValueParameter); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + } + + @Test + public void testFetchAndLockWithCreateTimeAsc() { + // given + when(fetchTopicBuilder.execute()).thenReturn(Arrays.asList(lockedExternalTaskMock)); + + Map parameters = new HashMap<>(); + parameters.put("maxTasks", 5); + parameters.put("workerId", "aWorkerId"); + parameters.put("usePriority", false); + parameters.put("sortings", List.of(create("createTime", "asc"))); + + Map topicParameter = new HashMap<>(); + topicParameter.put("topicName", "aTopicName"); + topicParameter.put("businessKey", EXAMPLE_BUSINESS_KEY); + topicParameter.put("lockDuration", 12354L); + topicParameter.put("variables", Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); + + Map variableValueParameter = new HashMap<>(); + variableValueParameter.put(EXAMPLE_VARIABLE_INSTANCE_NAME, EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue()); + topicParameter.put("processVariables", variableValueParameter); + + parameters.put("topics", Arrays.asList(topicParameter)); + + // when + executePost(parameters); + + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + + // then + inOrder.verify(fetchAndLockBuilder).orderByCreateTime(); + inOrder.verify(fetchAndLockBuilder).asc(); + + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); + inOrder.verify(fetchTopicBuilder).businessKey(EXAMPLE_BUSINESS_KEY); + inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); + inOrder.verify(fetchTopicBuilder).processInstanceVariableEquals(variableValueParameter); + inOrder.verify(fetchTopicBuilder).execute(); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + } + + @Test + public void testFetchAndLockWithCreateTimeDesc() { + // given + when(fetchTopicBuilder.execute()).thenReturn(Arrays.asList(lockedExternalTaskMock)); + + Map parameters = new HashMap<>(); + parameters.put("maxTasks", 5); + parameters.put("workerId", "aWorkerId"); + parameters.put("usePriority", false); + parameters.put("sortings", List.of(create("createTime", "desc"))); + + Map topicParameter = new HashMap<>(); + topicParameter.put("topicName", "aTopicName"); + topicParameter.put("businessKey", EXAMPLE_BUSINESS_KEY); + topicParameter.put("lockDuration", 12354L); + topicParameter.put("variables", Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); + + Map variableValueParameter = new HashMap<>(); + variableValueParameter.put(EXAMPLE_VARIABLE_INSTANCE_NAME, EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue()); + topicParameter.put("processVariables", variableValueParameter); + + parameters.put("topics", Arrays.asList(topicParameter)); + + // when + executePost(parameters); + + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + + // then + inOrder.verify(fetchAndLockBuilder).orderByCreateTime(); + inOrder.verify(fetchAndLockBuilder).desc(); + + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); + inOrder.verify(fetchTopicBuilder).businessKey(EXAMPLE_BUSINESS_KEY); + inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); + inOrder.verify(fetchTopicBuilder).processInstanceVariableEquals(variableValueParameter); + inOrder.verify(fetchTopicBuilder).execute(); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + } + + @Test + public void testFetchAndLockWithCreateTimeWithoutOrder() { + // given + when(fetchTopicBuilder.execute()).thenReturn(Arrays.asList(lockedExternalTaskMock)); + + Map parameters = new HashMap<>(); + parameters.put("maxTasks", 5); + parameters.put("workerId", "aWorkerId"); + parameters.put("usePriority", false); + parameters.put("sortings", List.of(create("createTime", null))); + + Map topicParameter = new HashMap<>(); + topicParameter.put("topicName", "aTopicName"); + topicParameter.put("businessKey", EXAMPLE_BUSINESS_KEY); + topicParameter.put("lockDuration", 12354L); + topicParameter.put("variables", Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); + + Map variableValueParameter = new HashMap<>(); + variableValueParameter.put(EXAMPLE_VARIABLE_INSTANCE_NAME, EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue()); + topicParameter.put("processVariables", variableValueParameter); + + parameters.put("topics", Arrays.asList(topicParameter)); + + // when + executePost(parameters); + + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + // then + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + inOrder.verify(fetchAndLockBuilder).orderByCreateTime(); + inOrder.verify(fetchAndLockBuilder, never()).desc(); // no order call on builder + inOrder.verify(fetchAndLockBuilder, never()).asc(); + inOrder.verify(fetchAndLockBuilder).subscribe(); + + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); + inOrder.verify(fetchTopicBuilder).businessKey(EXAMPLE_BUSINESS_KEY); + inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); + inOrder.verify(fetchTopicBuilder).processInstanceVariableEquals(variableValueParameter); + inOrder.verify(fetchTopicBuilder).execute(); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -331,11 +521,19 @@ public void testFetchWithoutVariables() { .when() .post(FETCH_EXTERNAL_TASK_URL); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", false); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchTopicBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -359,13 +557,21 @@ public void testFetchAndLockWithTenant() { executePost(parameters); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", true); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(true); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).withoutTenantId(); inOrder.verify(fetchTopicBuilder).tenantIdIn("tenant2"); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -386,12 +592,20 @@ public void testFetchAndLockByProcessDefinitionVersionTag() { executePost(parameters); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", false); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).processDefinitionVersionTag("versionTag"); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -413,12 +627,19 @@ public void testFetchAndLockIncludeExtensionProperties() { executePost(parameters); // then - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", false); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).includeExtensionProperties(); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -434,7 +655,7 @@ public void testEnableCustomObjectDeserialization() { Map topicParameter = new HashMap<>(); topicParameter.put("topicName", "aTopicName"); topicParameter.put("lockDuration", 12354L); - topicParameter.put("variables", Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + topicParameter.put("variables", Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); topicParameter.put("deserializeValues", true); parameters.put("topics", Arrays.asList(topicParameter)); @@ -448,13 +669,22 @@ public void testEnableCustomObjectDeserialization() { .when() .post(FETCH_EXTERNAL_TASK_URL); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", false); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); - inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + + inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); inOrder.verify(fetchTopicBuilder).enableCustomObjectDeserialization(); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, externalTaskService); } @Test @@ -470,7 +700,7 @@ public void testLocalVariables() { Map topicParameter = new HashMap<>(); topicParameter.put("topicName", "aTopicName"); topicParameter.put("lockDuration", 12354L); - topicParameter.put("variables", Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); + topicParameter.put("variables", Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); topicParameter.put("localVariables", true); parameters.put("topics", Arrays.asList(topicParameter)); @@ -484,15 +714,24 @@ public void testLocalVariables() { .when() .post(FETCH_EXTERNAL_TASK_URL); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", false); - inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); - inOrder.verify(fetchTopicBuilder).variables(Arrays.asList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); - inOrder.verify(fetchTopicBuilder).localVariables(); + var topicBuilder = fetchAndLockBuilder.subscribe(); + + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + inOrder.verify(fetchAndLockBuilder).subscribe(); + + inOrder.verify(topicBuilder).topic("aTopicName", 12354L); + inOrder.verify(topicBuilder).variables(Arrays.asList(EXAMPLE_VARIABLE_INSTANCE_NAME)); + inOrder.verify(topicBuilder).localVariables(); + inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); - } + verifyNoMoreInteractions(fetchTopicBuilder, fetchTopicBuilder, externalTaskService); + } @Test public void testComplete() { @@ -1822,6 +2061,7 @@ protected void executePost(Map parameters) { .body(parameters) .header("accept", MediaType.APPLICATION_JSON) .then().expect().statusCode(Status.OK.getStatusCode()) + .body("[0].id", equalTo(MockProvider.EXTERNAL_TASK_ID)) .body("[0].topicName", equalTo(MockProvider.EXTERNAL_TASK_TOPIC_NAME)) .body("[0].workerId", equalTo(MockProvider.EXTERNAL_TASK_WORKER_ID)) @@ -1836,13 +2076,18 @@ protected void executePost(Map parameters) { .body("[0].retries", equalTo(MockProvider.EXTERNAL_TASK_RETRIES)) .body("[0].errorMessage", equalTo(MockProvider.EXTERNAL_TASK_ERROR_MESSAGE)) .body("[0].priority", equalTo(MockProvider.EXTERNAL_TASK_PRIORITY)) - .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME, - notNullValue()) - .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME + ".value", - equalTo(MockProvider.EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue())) - .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME + ".type", - equalTo("String")) + .body("[0].variables." + EXAMPLE_VARIABLE_INSTANCE_NAME, notNullValue()) + .body("[0].variables." + EXAMPLE_VARIABLE_INSTANCE_NAME + ".value", equalTo(EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue())) + .body("[0].variables." + EXAMPLE_VARIABLE_INSTANCE_NAME + ".type", equalTo("String")) + .when().post(FETCH_EXTERNAL_TASK_URL); } + public static SortingDto create(String sortBy, String sortOrder) { + var result = new SortingDto(); + result.setSortOrder(sortOrder); + result.setSortBy(sortBy); + return result; + } + } diff --git a/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerTest.java b/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerTest.java index 7d02bde1b06..d29257a7667 100644 --- a/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerTest.java +++ b/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/impl/FetchAndLockHandlerTest.java @@ -16,11 +16,35 @@ */ package org.camunda.bpm.engine.rest.impl; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.core.Response.Status; import org.camunda.bpm.engine.ExternalTaskService; import org.camunda.bpm.engine.IdentityService; import org.camunda.bpm.engine.ProcessEngine; import org.camunda.bpm.engine.ProcessEngineException; import org.camunda.bpm.engine.externaltask.ExternalTaskQueryTopicBuilder; +import org.camunda.bpm.engine.externaltask.FetchAndLockBuilder; import org.camunda.bpm.engine.externaltask.LockedExternalTask; import org.camunda.bpm.engine.impl.util.ClockUtil; import org.camunda.bpm.engine.rest.dto.externaltask.FetchExternalTasksExtendedDto; @@ -37,28 +61,6 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.core.Response.Status; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.hamcrest.MockitoHamcrest.argThat; - /** * @author Tassilo Weidner */ @@ -75,7 +77,10 @@ public class FetchAndLockHandlerTest { protected ExternalTaskService externalTaskService; @Mock - protected ExternalTaskQueryTopicBuilder fetchTopicBuilder; + protected ExternalTaskQueryTopicBuilder externalTaskQueryTopicBuilder; + + @Mock + protected FetchAndLockBuilder fetchAndLockBuilder; @Spy protected FetchAndLockHandlerImpl handler; @@ -86,14 +91,19 @@ public class FetchAndLockHandlerTest { @Before public void initMocks() { + when(fetchAndLockBuilder.workerId(anyString())).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.maxTasks(anyInt())).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.usePriority(anyBoolean())).thenReturn(fetchAndLockBuilder); + when(processEngine.getIdentityService()).thenReturn(identityService); when(processEngine.getExternalTaskService()).thenReturn(externalTaskService); when(processEngine.getName()).thenReturn("default"); - when(externalTaskService.fetchAndLock(anyInt(), any(String.class), any(Boolean.class))) - .thenReturn(fetchTopicBuilder); - when(fetchTopicBuilder.topic(any(String.class), anyLong())) - .thenReturn(fetchTopicBuilder); + when(externalTaskService.fetchAndLock()).thenReturn(fetchAndLockBuilder); + + when(fetchAndLockBuilder.subscribe()).thenReturn(externalTaskQueryTopicBuilder); + + when(externalTaskQueryTopicBuilder.topic(any(String.class), anyLong())).thenReturn(externalTaskQueryTopicBuilder); doNothing().when(handler).suspend(anyLong()); doReturn(processEngine).when(handler).getProcessEngine(any(FetchAndLockRequest.class)); @@ -121,7 +131,10 @@ public void shouldResumeAsyncResponseDueToAvailableTasks() { // given List tasks = new ArrayList(); tasks.add(lockedExternalTaskMock); - doReturn(tasks).when(fetchTopicBuilder).execute(); + + when(fetchAndLockBuilder.subscribe()).thenReturn(externalTaskQueryTopicBuilder); + when(externalTaskQueryTopicBuilder.execute()).thenReturn(tasks); + AsyncResponse asyncResponse = mock(AsyncResponse.class); handler.addPendingRequest(createDto(5000L), asyncResponse, processEngine); @@ -138,7 +151,8 @@ public void shouldResumeAsyncResponseDueToAvailableTasks() { @Test public void shouldNotResumeAsyncResponseDueToNoAvailableTasks() { // given - doReturn(Collections.emptyList()).when(fetchTopicBuilder).execute(); + when(fetchAndLockBuilder.subscribe()).thenReturn(externalTaskQueryTopicBuilder); + when(externalTaskQueryTopicBuilder.execute()).thenReturn(Collections.emptyList()); AsyncResponse asyncResponse = mock(AsyncResponse.class); handler.addPendingRequest(createDto(5000L), asyncResponse, processEngine); @@ -155,7 +169,8 @@ public void shouldNotResumeAsyncResponseDueToNoAvailableTasks() { @Test public void shouldResumeAsyncResponseDueToTimeoutExpired_1() { // given - doReturn(Collections.emptyList()).when(fetchTopicBuilder).execute(); + when(fetchAndLockBuilder.subscribe()).thenReturn(externalTaskQueryTopicBuilder); + when(externalTaskQueryTopicBuilder.execute()).thenReturn(Collections.emptyList()); AsyncResponse asyncResponse = mock(AsyncResponse.class); handler.addPendingRequest(createDto(5000L), asyncResponse, processEngine); @@ -167,7 +182,8 @@ public void shouldResumeAsyncResponseDueToTimeoutExpired_1() { List tasks = new ArrayList(); tasks.add(lockedExternalTaskMock); - doReturn(tasks).when(fetchTopicBuilder).execute(); + + when(externalTaskQueryTopicBuilder.execute()).thenReturn(tasks); addSecondsToClock(5); @@ -183,7 +199,8 @@ public void shouldResumeAsyncResponseDueToTimeoutExpired_1() { @Test public void shouldResumeAsyncResponseDueToTimeoutExpired_2() { // given - doReturn(Collections.emptyList()).when(fetchTopicBuilder).execute(); + when(fetchAndLockBuilder.subscribe()).thenReturn(externalTaskQueryTopicBuilder); + when(externalTaskQueryTopicBuilder.execute()).thenReturn(Collections.emptyList()); AsyncResponse asyncResponse = mock(AsyncResponse.class); handler.addPendingRequest(createDto(5000L), asyncResponse, processEngine); @@ -209,7 +226,8 @@ public void shouldResumeAsyncResponseDueToTimeoutExpired_2() { @Test public void shouldResumeAsyncResponseDueToTimeoutExpired_3() { // given - doReturn(Collections.emptyList()).when(fetchTopicBuilder).execute(); + when(fetchAndLockBuilder.subscribe()).thenReturn(externalTaskQueryTopicBuilder); + when(externalTaskQueryTopicBuilder.execute()).thenReturn(Collections.emptyList()); AsyncResponse asyncResponse = mock(AsyncResponse.class); handler.addPendingRequest(createDto(5000L), asyncResponse, processEngine); @@ -236,7 +254,8 @@ public void shouldResumeAsyncResponseDueToTimeoutExpired_3() { @Test public void shouldResumeAsyncResponseImmediatelyDueToProcessEngineException() { // given - doThrow(new ProcessEngineException()).when(fetchTopicBuilder).execute(); + when(fetchAndLockBuilder.subscribe()).thenReturn(externalTaskQueryTopicBuilder); + when(externalTaskQueryTopicBuilder.execute()).thenThrow(new ProcessEngineException()); // when AsyncResponse asyncResponse = mock(AsyncResponse.class); @@ -251,7 +270,8 @@ public void shouldResumeAsyncResponseImmediatelyDueToProcessEngineException() { @Test public void shouldResumeAsyncResponseAfterBackoffDueToProcessEngineException() { // given - doReturn(Collections.emptyList()).when(fetchTopicBuilder).execute(); + when(fetchAndLockBuilder.subscribe()).thenReturn(externalTaskQueryTopicBuilder); + when(externalTaskQueryTopicBuilder.execute()).thenReturn(Collections.emptyList()); AsyncResponse asyncResponse = mock(AsyncResponse.class); handler.addPendingRequest(createDto(5000L), asyncResponse, processEngine); @@ -262,7 +282,7 @@ public void shouldResumeAsyncResponseAfterBackoffDueToProcessEngineException() { verify(handler).suspend(5000L); // when - doThrow(new ProcessEngineException()).when(fetchTopicBuilder).execute(); + doThrow(new ProcessEngineException()).when(externalTaskQueryTopicBuilder).execute(); handler.acquire(); // then @@ -295,7 +315,7 @@ public void shouldResumeAsyncResponseDueToTimeoutExceeded() { @Test public void shouldPollPeriodicallyWhenRequestPending() { // given - doReturn(Collections.emptyList()).when(fetchTopicBuilder).execute(); + doReturn(Collections.emptyList()).when(externalTaskQueryTopicBuilder).execute(); // when AsyncResponse asyncResponse = mock(AsyncResponse.class); @@ -318,7 +338,7 @@ public void shouldNotPollPeriodicallyWhenNotRequestsPending() { @Test public void shouldCancelPreviousPendingRequestWhenWorkerIdsEqual() { // given - doReturn(Collections.emptyList()).when(fetchTopicBuilder).execute(); + doReturn(Collections.emptyList()).when(externalTaskQueryTopicBuilder).execute(); handler.parseUniqueWorkerRequestParam("true"); @@ -339,7 +359,7 @@ public void shouldCancelPreviousPendingRequestWhenWorkerIdsEqual() { @Test public void shouldNotCancelPreviousPendingRequestWhenWorkerIdsDiffer() { // given - doReturn(Collections.emptyList()).when(fetchTopicBuilder).execute(); + doReturn(Collections.emptyList()).when(externalTaskQueryTopicBuilder).execute(); handler.parseUniqueWorkerRequestParam("true"); @@ -402,6 +422,7 @@ public void shouldRejectRequestDueToShutdown() { // then ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(RestException.class); + verify(asyncResponse).resume(argumentCaptor.capture()); assertThat(argumentCaptor.getValue().getStatus(), is(Status.INTERNAL_SERVER_ERROR)); assertThat(argumentCaptor.getValue().getMessage(), is("Request rejected due to shutdown of application server.")); diff --git a/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/impl/FetchAndLockRestServiceInteractionTest.java b/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/impl/FetchAndLockRestServiceInteractionTest.java index 81c9df676b3..83affafab98 100644 --- a/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/impl/FetchAndLockRestServiceInteractionTest.java +++ b/engine-rest/engine-rest/src/test/java/org/camunda/bpm/engine/rest/impl/FetchAndLockRestServiceInteractionTest.java @@ -16,11 +16,39 @@ */ package org.camunda.bpm.engine.rest.impl; +import static io.restassured.RestAssured.given; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyList; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + import io.restassured.http.ContentType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import javax.servlet.ServletContextEvent; +import javax.ws.rs.core.Response.Status; import org.camunda.bpm.engine.ExternalTaskService; import org.camunda.bpm.engine.IdentityService; import org.camunda.bpm.engine.ProcessEngineException; import org.camunda.bpm.engine.externaltask.ExternalTaskQueryTopicBuilder; +import org.camunda.bpm.engine.externaltask.FetchAndLockBuilder; import org.camunda.bpm.engine.externaltask.LockedExternalTask; import org.camunda.bpm.engine.identity.Group; import org.camunda.bpm.engine.identity.Tenant; @@ -41,33 +69,6 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import javax.servlet.ServletContextEvent; -import javax.ws.rs.core.Response.Status; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import static io.restassured.RestAssured.given; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.hamcrest.core.IsNull.notNullValue; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyList; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - /** * @author Tassilo Weidner */ @@ -85,6 +86,9 @@ public class FetchAndLockRestServiceInteractionTest extends AbstractRestServiceT @Mock private ExternalTaskQueryTopicBuilder fetchTopicBuilder; + @Mock + private FetchAndLockBuilder fetchAndLockBuilder; + @Mock private IdentityService identityServiceMock; @@ -95,28 +99,32 @@ public class FetchAndLockRestServiceInteractionTest extends AbstractRestServiceT @Before public void setUpRuntimeData() { - when(processEngine.getExternalTaskService()) - .thenReturn(externalTaskService); + when(processEngine.getExternalTaskService()).thenReturn(externalTaskService); lockedExternalTaskMock = MockProvider.createMockLockedExternalTask(); + when(externalTaskService.fetchAndLock(anyInt(), any(String.class), any(Boolean.class))) .thenReturn(fetchTopicBuilder); - when(fetchTopicBuilder.topic(any(String.class), anyLong())) - .thenReturn(fetchTopicBuilder); + when(externalTaskService.fetchAndLock()).thenReturn(fetchAndLockBuilder); - when(fetchTopicBuilder.variables(anyList())) - .thenReturn(fetchTopicBuilder); + // fetch and lock builder + when(fetchAndLockBuilder.workerId(anyString())).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.maxTasks(anyInt())).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.usePriority(anyBoolean())).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.orderByCreateTime()).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.asc()).thenReturn(fetchAndLockBuilder); + when(fetchAndLockBuilder.desc()).thenReturn(fetchAndLockBuilder); - when(fetchTopicBuilder.enableCustomObjectDeserialization()) - .thenReturn(fetchTopicBuilder); + when(fetchAndLockBuilder.subscribe()).thenReturn(fetchTopicBuilder); - when(fetchTopicBuilder.processDefinitionVersionTag(anyString())) - .thenReturn(fetchTopicBuilder); + when(fetchTopicBuilder.topic(anyString(), anyLong())).thenReturn(fetchTopicBuilder); + when(fetchTopicBuilder.variables(anyList())).thenReturn(fetchTopicBuilder); + when(fetchTopicBuilder.enableCustomObjectDeserialization()).thenReturn(fetchTopicBuilder); + when(fetchTopicBuilder.processDefinitionVersionTag(anyString())).thenReturn(fetchTopicBuilder); // for authentication - when(processEngine.getIdentityService()) - .thenReturn(identityServiceMock); + when(processEngine.getIdentityService()).thenReturn(identityServiceMock); List groupMocks = MockProvider.createMockGroups(); groupIds = groupMocks.stream().map(Group::getId).collect(Collectors.toList()); @@ -129,8 +137,7 @@ public void setUpRuntimeData() { @Test public void shouldFetchAndLock() { - when(fetchTopicBuilder.execute()) - .thenReturn(new ArrayList(Collections.singleton(lockedExternalTaskMock))); + when(fetchTopicBuilder.execute()).thenReturn(new ArrayList<>(Collections.singleton(lockedExternalTaskMock))); FetchExternalTasksExtendedDto fetchExternalTasksDto = createDto(null, true, true, false); given() @@ -153,26 +160,31 @@ public void shouldFetchAndLock() { .body("[0].errorMessage", equalTo(MockProvider.EXTERNAL_TASK_ERROR_MESSAGE)) .body("[0].errorMessage", equalTo(MockProvider.EXTERNAL_TASK_ERROR_MESSAGE)) .body("[0].priority", equalTo(MockProvider.EXTERNAL_TASK_PRIORITY)) - .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME, - notNullValue()) - .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME + ".value", - equalTo(MockProvider.EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue())) - .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME + ".type", - equalTo("String")) + .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME, notNullValue()) + .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME + ".value", equalTo(MockProvider.EXAMPLE_PRIMITIVE_VARIABLE_VALUE.getValue())) + .body("[0].variables." + MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME + ".type", equalTo("String")) .when().post(FETCH_EXTERNAL_TASK_URL); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", true); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService, times(1)).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(true); + + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).variables(Collections.singletonList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, externalTaskService); } @Test public void shouldFetchWithoutVariables() { - when(fetchTopicBuilder.execute()) - .thenReturn(new ArrayList(Collections.singleton(lockedExternalTaskMock))); + when(fetchTopicBuilder.execute()).thenReturn(new ArrayList<>(Collections.singleton(lockedExternalTaskMock))); FetchExternalTasksExtendedDto fetchExternalTasksDto = createDto(null); given() @@ -185,17 +197,25 @@ public void shouldFetchWithoutVariables() { .when() .post(FETCH_EXTERNAL_TASK_URL); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", false); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService, times(1)).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, externalTaskService); } @Test public void shouldFetchWithCustomObjectDeserializationEnabled() { when(fetchTopicBuilder.execute()) - .thenReturn(new ArrayList(Collections.singleton(lockedExternalTaskMock))); + .thenReturn(new ArrayList<>(Collections.singleton(lockedExternalTaskMock))); FetchExternalTasksExtendedDto fetchExternalTasksDto = createDto(null, false, true, true); given() @@ -207,13 +227,22 @@ public void shouldFetchWithCustomObjectDeserializationEnabled() { .when() .post(FETCH_EXTERNAL_TASK_URL); - InOrder inOrder = inOrder(fetchTopicBuilder, externalTaskService); - inOrder.verify(externalTaskService).fetchAndLock(5, "aWorkerId", false); + InOrder inOrder = inOrder(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); + + inOrder.verify(externalTaskService).fetchAndLock(); + + inOrder.verify(fetchAndLockBuilder).workerId("aWorkerId"); + inOrder.verify(fetchAndLockBuilder).maxTasks(5); + inOrder.verify(fetchAndLockBuilder).usePriority(false); + + inOrder.verify(fetchAndLockBuilder).subscribe(); + inOrder.verify(fetchTopicBuilder).topic("aTopicName", 12354L); inOrder.verify(fetchTopicBuilder).variables(Collections.singletonList(MockProvider.EXAMPLE_VARIABLE_INSTANCE_NAME)); inOrder.verify(fetchTopicBuilder).enableCustomObjectDeserialization(); inOrder.verify(fetchTopicBuilder).execute(); - verifyNoMoreInteractions(fetchTopicBuilder, externalTaskService); + + verifyNoMoreInteractions(fetchAndLockBuilder, fetchTopicBuilder, externalTaskService); } @Test @@ -279,7 +308,7 @@ public void shouldThrowProcessEngineExceptionNotDuringTimeout() { @Test public void shouldResponseImmediatelyDueToAvailableTasks() { when(fetchTopicBuilder.execute()) - .thenReturn(new ArrayList(Collections.singleton(lockedExternalTaskMock))); + .thenReturn(new ArrayList<>(Collections.singleton(lockedExternalTaskMock))); FetchExternalTasksExtendedDto fetchExternalTasksDto = createDto(500L); @@ -364,7 +393,7 @@ public void shouldFetchAndLockByProcessDefinitionVersionTag() { private FetchExternalTasksExtendedDto createDto(Long responseTimeout) { return createDto(responseTimeout, false, false, false); } - + private FetchExternalTasksExtendedDto createDto(Long responseTimeout, boolean usePriority, boolean withVariables, boolean withDeserialization) { FetchExternalTasksExtendedDto fetchExternalTasksDto = new FetchExternalTasksExtendedDto(); if (responseTimeout != null) { diff --git a/engine/src/main/java/org/camunda/bpm/engine/ExternalTaskService.java b/engine/src/main/java/org/camunda/bpm/engine/ExternalTaskService.java index 923c9595196..2231f9e20f3 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/ExternalTaskService.java +++ b/engine/src/main/java/org/camunda/bpm/engine/ExternalTaskService.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; - import org.camunda.bpm.engine.authorization.BatchPermissions; import org.camunda.bpm.engine.authorization.Permissions; import org.camunda.bpm.engine.authorization.Resources; @@ -27,6 +26,7 @@ import org.camunda.bpm.engine.externaltask.ExternalTask; import org.camunda.bpm.engine.externaltask.ExternalTaskQuery; import org.camunda.bpm.engine.externaltask.ExternalTaskQueryBuilder; +import org.camunda.bpm.engine.externaltask.FetchAndLockBuilder; import org.camunda.bpm.engine.externaltask.UpdateExternalTaskRetriesBuilder; import org.camunda.bpm.engine.externaltask.UpdateExternalTaskRetriesSelectBuilder; @@ -93,6 +93,14 @@ public interface ExternalTaskService { */ public ExternalTaskQueryBuilder fetchAndLock(int maxTasks, String workerId, boolean usePriority); + /** + * Fetch and Lock method which allows the configuration of all parameters through a Fluent API. + * Configuration options of the builder allow for extra sorting options such as sorting by createTime. + * + * @return a builder to define and execute an external task fetching operation + */ + public FetchAndLockBuilder fetchAndLock(); + /** *

Lock an external task on behalf of a worker. * Note: Attempting to lock an already locked external task with the same workerId diff --git a/engine/src/main/java/org/camunda/bpm/engine/externaltask/ExternalTask.java b/engine/src/main/java/org/camunda/bpm/engine/externaltask/ExternalTask.java index f616833e514..b505c94bf5c 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/externaltask/ExternalTask.java +++ b/engine/src/main/java/org/camunda/bpm/engine/externaltask/ExternalTask.java @@ -50,6 +50,12 @@ public interface ExternalTask { */ Date getLockExpirationTime(); + /** + * @return the absolute time at which the task was created + * @return + */ + Date getCreateTime(); + /** * @return the id of the process instance the task exists in */ diff --git a/engine/src/main/java/org/camunda/bpm/engine/externaltask/ExternalTaskQuery.java b/engine/src/main/java/org/camunda/bpm/engine/externaltask/ExternalTaskQuery.java index 16cf6109484..3bba6f68540 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/externaltask/ExternalTaskQuery.java +++ b/engine/src/main/java/org/camunda/bpm/engine/externaltask/ExternalTaskQuery.java @@ -178,4 +178,8 @@ public interface ExternalTaskQuery extends Querynull */ diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskQueryImpl.java b/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskQueryImpl.java index 0f2e29c3ed0..23c31b84165 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskQueryImpl.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskQueryImpl.java @@ -22,7 +22,6 @@ import java.util.Date; import java.util.List; import java.util.Set; - import org.camunda.bpm.engine.externaltask.ExternalTask; import org.camunda.bpm.engine.externaltask.ExternalTaskQuery; import org.camunda.bpm.engine.impl.interceptor.CommandContext; @@ -222,6 +221,12 @@ public ExternalTaskQuery orderByTenantId() { public ExternalTaskQuery orderByPriority() { return orderBy(ExternalTaskQueryProperty.PRIORITY); } + + @Override + public ExternalTaskQuery orderByCreateTime() { + return orderBy(ExternalTaskQueryProperty.CREATE_TIME); + } + @Override public long executeCount(CommandContext commandContext) { checkQueryOk(); diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskQueryProperty.java b/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskQueryProperty.java index 74d7153b19a..bf5ab84169b 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskQueryProperty.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskQueryProperty.java @@ -24,12 +24,13 @@ */ public interface ExternalTaskQueryProperty { - public static final QueryProperty ID = new QueryPropertyImpl("ID_"); - public static final QueryProperty LOCK_EXPIRATION_TIME = new QueryPropertyImpl("LOCK_EXP_TIME_"); - public static final QueryProperty PROCESS_INSTANCE_ID = new QueryPropertyImpl("PROC_INST_ID_"); - public static final QueryProperty PROCESS_DEFINITION_ID = new QueryPropertyImpl("PROC_DEF_ID_"); - public static final QueryProperty PROCESS_DEFINITION_KEY = new QueryPropertyImpl("PROC_DEF_KEY_"); - public static final QueryProperty TENANT_ID = new QueryPropertyImpl("TENANT_ID_"); - public static final QueryProperty PRIORITY = new QueryPropertyImpl("PRIORITY_"); + QueryProperty ID = new QueryPropertyImpl("ID_"); + QueryProperty LOCK_EXPIRATION_TIME = new QueryPropertyImpl("LOCK_EXP_TIME_"); + QueryProperty PROCESS_INSTANCE_ID = new QueryPropertyImpl("PROC_INST_ID_"); + QueryProperty PROCESS_DEFINITION_ID = new QueryPropertyImpl("PROC_DEF_ID_"); + QueryProperty PROCESS_DEFINITION_KEY = new QueryPropertyImpl("PROC_DEF_KEY_"); + QueryProperty TENANT_ID = new QueryPropertyImpl("TENANT_ID_"); + QueryProperty PRIORITY = new QueryPropertyImpl("PRIORITY_"); + QueryProperty CREATE_TIME = new QueryPropertyImpl("CREATE_TIME_"); -} +} \ No newline at end of file diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskServiceImpl.java b/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskServiceImpl.java index 06ea1d4e985..83170e09161 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskServiceImpl.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/ExternalTaskServiceImpl.java @@ -18,13 +18,24 @@ import java.util.List; import java.util.Map; - import org.camunda.bpm.engine.ExternalTaskService; import org.camunda.bpm.engine.batch.Batch; import org.camunda.bpm.engine.externaltask.ExternalTaskQuery; import org.camunda.bpm.engine.externaltask.ExternalTaskQueryBuilder; +import org.camunda.bpm.engine.externaltask.FetchAndLockBuilder; +import org.camunda.bpm.engine.impl.externaltask.FetchAndLockBuilderImpl; import org.camunda.bpm.engine.externaltask.UpdateExternalTaskRetriesSelectBuilder; -import org.camunda.bpm.engine.impl.cmd.*; +import org.camunda.bpm.engine.impl.cmd.CompleteExternalTaskCmd; +import org.camunda.bpm.engine.impl.cmd.ExtendLockOnExternalTaskCmd; +import org.camunda.bpm.engine.impl.cmd.GetExternalTaskErrorDetailsCmd; +import org.camunda.bpm.engine.impl.cmd.GetTopicNamesCmd; +import org.camunda.bpm.engine.impl.cmd.HandleExternalTaskBpmnErrorCmd; +import org.camunda.bpm.engine.impl.cmd.HandleExternalTaskFailureCmd; +import org.camunda.bpm.engine.impl.cmd.LockExternalTaskCmd; +import org.camunda.bpm.engine.impl.cmd.SetExternalTaskPriorityCmd; +import org.camunda.bpm.engine.impl.cmd.SetExternalTaskRetriesCmd; +import org.camunda.bpm.engine.impl.cmd.UnlockExternalTaskCmd; +import org.camunda.bpm.engine.impl.cmd.UpdateExternalTaskRetriesBuilderImpl; import org.camunda.bpm.engine.impl.externaltask.ExternalTaskQueryTopicBuilderImpl; /** @@ -44,6 +55,11 @@ public ExternalTaskQueryBuilder fetchAndLock(int maxTasks, String workerId, bool return new ExternalTaskQueryTopicBuilderImpl(commandExecutor, workerId, maxTasks, usePriority); } + @Override + public FetchAndLockBuilder fetchAndLock() { + return new FetchAndLockBuilderImpl(commandExecutor); + } + @Override public void lock(String externalTaskId, String workerId, long lockDuration) { commandExecutor.execute(new LockExternalTaskCmd(externalTaskId, workerId, lockDuration)); @@ -156,4 +172,4 @@ public void extendLock(String externalTaskId, String workerId, long lockDuration commandExecutor.execute(new ExtendLockOnExternalTaskCmd(externalTaskId, workerId, lockDuration)); } -} +} \ No newline at end of file diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/cmd/FetchExternalTasksCmd.java b/engine/src/main/java/org/camunda/bpm/engine/impl/cmd/FetchExternalTasksCmd.java index 2f87271db66..807b58b56e6 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/cmd/FetchExternalTasksCmd.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/cmd/FetchExternalTasksCmd.java @@ -16,14 +16,17 @@ */ package org.camunda.bpm.engine.impl.cmd; +import static org.camunda.bpm.engine.impl.Direction.DESCENDING; +import static org.camunda.bpm.engine.impl.ExternalTaskQueryProperty.PRIORITY; + import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; - import org.camunda.bpm.engine.externaltask.LockedExternalTask; import org.camunda.bpm.engine.impl.ProcessEngineLogger; +import org.camunda.bpm.engine.impl.QueryOrderingProperty; import org.camunda.bpm.engine.impl.db.DbEntity; import org.camunda.bpm.engine.impl.db.EnginePersistenceLogger; import org.camunda.bpm.engine.impl.db.entitymanager.OptimisticLockingListener; @@ -49,18 +52,23 @@ public class FetchExternalTasksCmd implements Command> protected String workerId; protected int maxResults; - protected boolean usePriority; - protected Map fetchInstructions = new HashMap<>(); + protected List orderingProperties; + + protected Map fetchInstructions; public FetchExternalTasksCmd(String workerId, int maxResults, Map instructions) { - this(workerId, maxResults, instructions, false); + this(workerId, maxResults, instructions, false, Collections.emptyList()); } - public FetchExternalTasksCmd(String workerId, int maxResults, Map instructions, boolean usePriority) { + public FetchExternalTasksCmd(String workerId, + int maxResults, + Map instructions, + boolean usePriority, + List orderingProperties) { this.workerId = workerId; this.maxResults = maxResults; this.fetchInstructions = instructions; - this.usePriority = usePriority; + this.orderingProperties = orderingPropertiesWithPriority(usePriority, orderingProperties); } @Override @@ -73,7 +81,7 @@ public List execute(CommandContext commandContext) { List externalTasks = commandContext .getExternalTaskManager() - .selectExternalTasksForTopics(new ArrayList<>(fetchInstructions.values()), maxResults, usePriority); + .selectExternalTasksForTopics(new ArrayList<>(fetchInstructions.values()), maxResults, orderingProperties); final List result = new ArrayList<>(); @@ -86,8 +94,15 @@ public List execute(CommandContext commandContext) { if (execution != null) { entity.lock(workerId, fetchInstruction.getLockDuration()); - LockedExternalTaskImpl resultTask = LockedExternalTaskImpl.fromEntity(entity, fetchInstruction.getVariablesToFetch(), fetchInstruction.isLocalVariables(), - fetchInstruction.isDeserializeVariables(), fetchInstruction.isIncludeExtensionProperties()); + + LockedExternalTaskImpl resultTask = LockedExternalTaskImpl.fromEntity( + entity, + fetchInstruction.getVariablesToFetch(), + fetchInstruction.isLocalVariables(), + fetchInstruction.isDeserializeVariables(), + fetchInstruction.isIncludeExtensionProperties() + ); + result.add(resultTask); } else { LOG.logTaskWithoutExecution(workerId); @@ -172,4 +187,19 @@ protected void validateInput() { EnsureUtil.ensurePositive("lockTime", instruction.getLockDuration()); } } -} + + protected List orderingPropertiesWithPriority(boolean usePriority, + List queryOrderingProperties) { + List results = new ArrayList<>(); + + // Priority needs to be the first item in the list because it takes precedence over other sorting options + // Multi level ordering works by going through the list of ordering properties from first to last item + if (usePriority) { + results.add(new QueryOrderingProperty(PRIORITY, DESCENDING)); + } + + results.addAll(queryOrderingProperties); + + return results; + } +} \ No newline at end of file diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/ExternalTaskQueryTopicBuilderImpl.java b/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/ExternalTaskQueryTopicBuilderImpl.java index 2f04afed975..7364114db7c 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/ExternalTaskQueryTopicBuilderImpl.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/ExternalTaskQueryTopicBuilderImpl.java @@ -21,9 +21,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - import org.camunda.bpm.engine.externaltask.ExternalTaskQueryTopicBuilder; import org.camunda.bpm.engine.externaltask.LockedExternalTask; +import org.camunda.bpm.engine.impl.QueryOrderingProperty; import org.camunda.bpm.engine.impl.cmd.FetchExternalTasksCmd; import org.camunda.bpm.engine.impl.interceptor.CommandExecutor; @@ -43,21 +43,71 @@ public class ExternalTaskQueryTopicBuilderImpl implements ExternalTaskQueryTopic */ protected boolean usePriority; + protected List orderingProperties; + protected Map instructions; protected TopicFetchInstruction currentInstruction; - public ExternalTaskQueryTopicBuilderImpl(CommandExecutor commandExecutor, String workerId, int maxTasks, boolean usePriority) { + /** + * All args constructor. + */ + public ExternalTaskQueryTopicBuilderImpl(CommandExecutor commandExecutor, + String workerId, + int maxTasks, + boolean usePriority, + List orderingProperties, + Map instructions, + TopicFetchInstruction currentInstruction) { this.commandExecutor = commandExecutor; this.workerId = workerId; this.maxTasks = maxTasks; this.usePriority = usePriority; - this.instructions = new HashMap(); + this.orderingProperties = orderingProperties; + this.instructions = instructions; + this.currentInstruction = currentInstruction; + } + + /** + * Constructor using priority & createTime. + */ + public ExternalTaskQueryTopicBuilderImpl(CommandExecutor commandExecutor, + String workerId, + int maxTasks, + boolean usePriority, + List orderingProperties) { + this(commandExecutor, workerId, maxTasks, usePriority, orderingProperties, new HashMap<>(), null); + } + + /** + * Constructor using priority. + */ + public ExternalTaskQueryTopicBuilderImpl(CommandExecutor commandExecutor, + String workerId, + int maxTasks, + boolean usePriority) { + this(commandExecutor, workerId, maxTasks, usePriority, new ArrayList<>(), new HashMap<>(), null); + } + + /** + * Copy constructor + */ + public ExternalTaskQueryTopicBuilderImpl(ExternalTaskQueryTopicBuilderImpl builder) { + this( + builder.commandExecutor, + builder.workerId, + builder.maxTasks, + builder.usePriority, + builder.orderingProperties, + builder.instructions, + builder.currentInstruction + ); } public List execute() { submitCurrentInstruction(); - return commandExecutor.execute(new FetchExternalTasksCmd(workerId, maxTasks, instructions, usePriority)); + return commandExecutor.execute( + new FetchExternalTasksCmd(workerId, maxTasks, instructions, usePriority, orderingProperties)); } public ExternalTaskQueryTopicBuilder topic(String topicName, long lockDuration) { diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/FetchAndLockBuilderImpl.java b/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/FetchAndLockBuilderImpl.java new file mode 100644 index 00000000000..cfb485cf771 --- /dev/null +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/FetchAndLockBuilderImpl.java @@ -0,0 +1,107 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.camunda.bpm.engine.impl.externaltask; + +import static org.camunda.bpm.engine.impl.Direction.ASCENDING; +import static org.camunda.bpm.engine.impl.Direction.DESCENDING; +import static org.camunda.bpm.engine.impl.ExternalTaskQueryProperty.CREATE_TIME; +import static org.camunda.bpm.engine.impl.util.CollectionUtil.getLastElement; +import static org.camunda.bpm.engine.impl.util.EnsureUtil.ensureNotNull; +import static org.camunda.bpm.engine.impl.util.EnsureUtil.ensureNull; + +import java.util.ArrayList; +import java.util.List; +import org.camunda.bpm.engine.exception.NotValidException; +import org.camunda.bpm.engine.externaltask.ExternalTaskQueryTopicBuilder; +import org.camunda.bpm.engine.externaltask.FetchAndLockBuilder; +import org.camunda.bpm.engine.impl.Direction; +import org.camunda.bpm.engine.impl.QueryOrderingProperty; +import org.camunda.bpm.engine.impl.interceptor.CommandExecutor; + +/** + * Implementation of {@link FetchAndLockBuilder}. + */ +public class FetchAndLockBuilderImpl implements FetchAndLockBuilder { + + protected final CommandExecutor commandExecutor; + + protected String workerId; + protected int maxTasks; + + protected boolean usePriority; + + protected List orderingProperties = new ArrayList<>(); + + public FetchAndLockBuilderImpl(CommandExecutor commandExecutor) { + this.commandExecutor = commandExecutor; + } + + public FetchAndLockBuilderImpl workerId(String workerId) { + this.workerId = workerId; + return this; + } + + public FetchAndLockBuilderImpl maxTasks(int maxTasks) { + this.maxTasks = maxTasks; + return this; + } + + public FetchAndLockBuilderImpl usePriority(boolean usePriority) { + this.usePriority = usePriority; + return this; + } + + public FetchAndLockBuilderImpl orderByCreateTime() { + orderingProperties.add(new QueryOrderingProperty(CREATE_TIME, null)); + return this; + } + + public FetchAndLockBuilderImpl asc() throws NotValidException { + configureLastOrderingPropertyDirection(ASCENDING); + return this; + } + + public FetchAndLockBuilderImpl desc() throws NotValidException { + configureLastOrderingPropertyDirection(DESCENDING); + return this; + } + + @Override + public ExternalTaskQueryTopicBuilder subscribe() { + checkQueryOk(); + return new ExternalTaskQueryTopicBuilderImpl(commandExecutor, workerId, maxTasks, usePriority, orderingProperties); + } + + protected void configureLastOrderingPropertyDirection(Direction direction) { + QueryOrderingProperty lastProperty = !orderingProperties.isEmpty() ? getLastElement(orderingProperties) : null; + + ensureNotNull(NotValidException.class, "You should call any of the orderBy methods first before specifying a direction", "currentOrderingProperty", lastProperty); + + if (lastProperty.getDirection() != null) { + ensureNull(NotValidException.class, "Invalid query: can specify only one direction desc() or asc() for an ordering constraint", "direction", direction); + } + + lastProperty.setDirection(direction); + } + + protected void checkQueryOk() { + for (QueryOrderingProperty orderingProperty : orderingProperties) { + ensureNotNull(NotValidException.class, "Invalid query: call asc() or desc() after using orderByXX()", "direction", orderingProperty.getDirection()); + } + } +} \ No newline at end of file diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/LockedExternalTaskImpl.java b/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/LockedExternalTaskImpl.java index aace347ff75..23ea6223fe9 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/LockedExternalTaskImpl.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/externaltask/LockedExternalTaskImpl.java @@ -39,6 +39,7 @@ public class LockedExternalTaskImpl implements LockedExternalTask { protected String topicName; protected String workerId; protected Date lockExpirationTime; + protected Date createTime; protected Integer retries; protected String errorMessage; protected String errorDetails; @@ -71,6 +72,10 @@ public Date getLockExpirationTime() { return lockExpirationTime; } + public Date getCreateTime() { + return createTime; + } + public Integer getRetries() { return retries; } @@ -155,6 +160,7 @@ public static LockedExternalTaskImpl fromEntity(ExternalTaskEntity externalTaskE result.topicName = externalTaskEntity.getTopicName(); result.workerId = externalTaskEntity.getWorkerId(); result.lockExpirationTime = externalTaskEntity.getLockExpirationTime(); + result.createTime = externalTaskEntity.getCreateTime(); result.retries = externalTaskEntity.getRetries(); result.errorMessage = externalTaskEntity.getErrorMessage(); result.errorDetails = externalTaskEntity.getErrorDetails(); diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/history/producer/DefaultHistoryEventProducer.java b/engine/src/main/java/org/camunda/bpm/engine/impl/history/producer/DefaultHistoryEventProducer.java index 45df14eb1d5..3effe4e95b2 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/history/producer/DefaultHistoryEventProducer.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/history/producer/DefaultHistoryEventProducer.java @@ -1226,7 +1226,8 @@ public HistoryEvent createHistoricExternalTaskLogDeletedEvt(ExternalTask task) { protected HistoricExternalTaskLogEntity initHistoricExternalTaskLog(ExternalTaskEntity entity, ExternalTaskState state) { HistoricExternalTaskLogEntity event = new HistoricExternalTaskLogEntity(); - event.setTimestamp(ClockUtil.getCurrentTime()); + + event.setTimestamp(getTimestamp(entity, state)); event.setExternalTaskId(entity.getId()); event.setTopicName(entity.getTopicName()); event.setWorkerId(entity.getWorkerId()); @@ -1256,6 +1257,10 @@ protected HistoricExternalTaskLogEntity initHistoricExternalTaskLog(ExternalTask return event; } + protected Date getTimestamp(ExternalTaskEntity entity, ExternalTaskState state) { + return state == ExternalTaskState.CREATED ? entity.getCreateTime() : ClockUtil.getCurrentTime(); + } + protected boolean isRootProcessInstance(HistoricProcessInstanceEventEntity evt) { return evt.getProcessInstanceId().equals(evt.getRootProcessInstanceId()); } diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/persistence/entity/ExternalTaskEntity.java b/engine/src/main/java/org/camunda/bpm/engine/impl/persistence/entity/ExternalTaskEntity.java index 731118320ce..303464928ae 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/persistence/entity/ExternalTaskEntity.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/persistence/entity/ExternalTaskEntity.java @@ -72,6 +72,7 @@ public class ExternalTaskEntity implements ExternalTask, DbEntity, protected String topicName; protected String workerId; protected Date lockExpirationTime; + protected Date createTime; protected Integer retries; protected String errorMessage; @@ -123,9 +124,20 @@ public void setWorkerId(String workerId) { public Date getLockExpirationTime() { return lockExpirationTime; } + public void setLockExpirationTime(Date lockExpirationTime) { this.lockExpirationTime = lockExpirationTime; } + + @Override + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + @Override public String getExecutionId() { return executionId; @@ -564,6 +576,7 @@ public static ExternalTaskEntity createAndInsert(ExecutionEntity execution, Stri externalTask.setActivityInstanceId(execution.getActivityInstanceId()); externalTask.setTenantId(execution.getTenantId()); externalTask.setPriority(priority); + externalTask.setCreateTime(ClockUtil.getCurrentTime()); ProcessDefinitionEntity processDefinition = execution.getProcessDefinition(); externalTask.setProcessDefinitionKey(processDefinition.getKey()); diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/persistence/entity/ExternalTaskManager.java b/engine/src/main/java/org/camunda/bpm/engine/impl/persistence/entity/ExternalTaskManager.java index d4d30cdb472..d27be6bf13e 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/persistence/entity/ExternalTaskManager.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/persistence/entity/ExternalTaskManager.java @@ -16,16 +16,17 @@ */ package org.camunda.bpm.engine.impl.persistence.entity; -import java.util.ArrayList; +import static org.camunda.bpm.engine.impl.db.sql.DbSqlSessionFactory.CRDB; +import static org.camunda.bpm.engine.impl.db.sql.DbSqlSessionFactory.POSTGRES; +import static org.camunda.bpm.engine.impl.util.DatabaseUtil.checkDatabaseType; + import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.camunda.bpm.engine.externaltask.ExternalTask; -import org.camunda.bpm.engine.impl.Direction; import org.camunda.bpm.engine.impl.ExternalTaskQueryImpl; -import org.camunda.bpm.engine.impl.ExternalTaskQueryProperty; import org.camunda.bpm.engine.impl.ProcessEngineImpl; import org.camunda.bpm.engine.impl.QueryOrderingProperty; import org.camunda.bpm.engine.impl.cfg.TransactionListener; @@ -33,12 +34,10 @@ import org.camunda.bpm.engine.impl.context.Context; import org.camunda.bpm.engine.impl.db.ListQueryParameterObject; import org.camunda.bpm.engine.impl.db.entitymanager.DbEntityManager; -import org.camunda.bpm.engine.impl.db.sql.DbSqlSessionFactory; import org.camunda.bpm.engine.impl.externaltask.TopicFetchInstruction; import org.camunda.bpm.engine.impl.interceptor.CommandContext; import org.camunda.bpm.engine.impl.persistence.AbstractManager; import org.camunda.bpm.engine.impl.util.ClockUtil; -import org.camunda.bpm.engine.impl.util.DatabaseUtil; import org.camunda.bpm.engine.impl.util.ImmutablePair; /** @@ -47,8 +46,6 @@ */ public class ExternalTaskManager extends AbstractManager { - public static QueryOrderingProperty EXT_TASK_PRIORITY_ORDERING_PROPERTY = new QueryOrderingProperty(ExternalTaskQueryProperty.PRIORITY, Direction.DESCENDING); - public ExternalTaskEntity findExternalTaskById(String id) { return getDbEntityManager().selectById(ExternalTaskEntity.class, id); } @@ -73,20 +70,22 @@ public List findExternalTasksByProcessInstanceId(String proc } @SuppressWarnings("unchecked") - public List selectExternalTasksForTopics(Collection queryFilters, int maxResults, boolean usePriority) { + public List selectExternalTasksForTopics(Collection queryFilters, + int maxResults, + List orderingProperties) { if (queryFilters.isEmpty()) { - return new ArrayList<>(); + return Collections.emptyList(); } - Map parameters = new HashMap<>(); - parameters.put("topics", queryFilters); - parameters.put("now", ClockUtil.getCurrentTime()); - parameters.put("applyOrdering", usePriority); - List orderingProperties = new ArrayList<>(); - orderingProperties.add(EXT_TASK_PRIORITY_ORDERING_PROPERTY); - parameters.put("orderingProperties", orderingProperties); - parameters.put("usesPostgres", - DatabaseUtil.checkDatabaseType(DbSqlSessionFactory.POSTGRES, DbSqlSessionFactory.CRDB)); + boolean shouldApplyOrdering = !orderingProperties.isEmpty(); + + Map parameters = Map.of( + "topics", queryFilters, + "now", ClockUtil.getCurrentTime(), + "applyOrdering", shouldApplyOrdering, + "orderingProperties", orderingProperties, + "usesPostgres", checkDatabaseType(POSTGRES, CRDB) + ); ListQueryParameterObject parameter = new ListQueryParameterObject(parameters, 0, maxResults); configureQuery(parameter); @@ -125,7 +124,9 @@ public List selectTopicNamesByQuery(ExternalTaskQueryImpl externalTaskQu } protected void updateExternalTaskSuspensionState(String processInstanceId, - String processDefinitionId, String processDefinitionKey, SuspensionState suspensionState) { + String processDefinitionId, + String processDefinitionKey, + SuspensionState suspensionState) { Map parameters = new HashMap<>(); parameters.put("processInstanceId", processInstanceId); parameters.put("processDefinitionId", processDefinitionId); @@ -172,13 +173,13 @@ protected ListQueryParameterObject configureParameterizedQuery(Object parameter) public void fireExternalTaskAvailableEvent() { Context.getCommandContext() - .getTransactionContext() - .addTransactionListener(TransactionState.COMMITTED, new TransactionListener() { - @Override - public void execute(CommandContext commandContext) { - ProcessEngineImpl.EXT_TASK_CONDITIONS.signalAll(); - } - }); + .getTransactionContext() + .addTransactionListener(TransactionState.COMMITTED, new TransactionListener() { + @Override + public void execute(CommandContext commandContext) { + ProcessEngineImpl.EXT_TASK_CONDITIONS.signalAll(); + } + }); } } diff --git a/engine/src/main/java/org/camunda/bpm/engine/impl/util/CollectionUtil.java b/engine/src/main/java/org/camunda/bpm/engine/impl/util/CollectionUtil.java index 3ac55d879c6..f69e2964712 100644 --- a/engine/src/main/java/org/camunda/bpm/engine/impl/util/CollectionUtil.java +++ b/engine/src/main/java/org/camunda/bpm/engine/impl/util/CollectionUtil.java @@ -122,6 +122,16 @@ public static List collectInList(Iterator iterator) { return result; } + public static T getLastElement(final Iterable elements) { + T lastElement = null; + + for (T element : elements) { + lastElement = element; + } + + return lastElement; + } + public static boolean isEmpty(Collection collection) { return collection == null || collection.isEmpty(); } diff --git a/engine/src/main/resources/org/camunda/bpm/engine/impl/mapping/entity/ExternalTask.xml b/engine/src/main/resources/org/camunda/bpm/engine/impl/mapping/entity/ExternalTask.xml index 12ec21df879..af480769d97 100644 --- a/engine/src/main/resources/org/camunda/bpm/engine/impl/mapping/entity/ExternalTask.xml +++ b/engine/src/main/resources/org/camunda/bpm/engine/impl/mapping/entity/ExternalTask.xml @@ -31,6 +31,7 @@ + @@ -57,6 +58,7 @@ WORKER_ID_, TOPIC_NAME_, LOCK_EXP_TIME_, + CREATE_TIME_, RETRIES_, ERROR_MSG_, ERROR_DETAILS_ID_, @@ -76,6 +78,7 @@ #{workerId, jdbcType=VARCHAR}, #{topicName, jdbcType=VARCHAR}, #{lockExpirationTime, jdbcType=TIMESTAMP}, + #{createTime, jdbcType=TIMESTAMP}, #{retries, jdbcType=INTEGER}, #{errorMessage, jdbcType=VARCHAR}, #{errorDetailsByteArrayId, jdbcType=VARCHAR}, @@ -442,6 +445,7 @@ RES.TOPIC_NAME_, RES.WORKER_ID_, RES.LOCK_EXP_TIME_, + RES.CREATE_TIME_, RES.RETRIES_, RES.ERROR_MSG_, RES.ERROR_DETAILS_ID_, diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/api/externaltask/ExternalTaskQueryByCreateTimeTest.java b/engine/src/test/java/org/camunda/bpm/engine/test/api/externaltask/ExternalTaskQueryByCreateTimeTest.java new file mode 100644 index 00000000000..1aabe1ee46c --- /dev/null +++ b/engine/src/test/java/org/camunda/bpm/engine/test/api/externaltask/ExternalTaskQueryByCreateTimeTest.java @@ -0,0 +1,303 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.camunda.bpm.engine.test.api.externaltask; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Date; +import org.camunda.bpm.engine.CaseService; +import org.camunda.bpm.engine.HistoryService; +import org.camunda.bpm.engine.ManagementService; +import org.camunda.bpm.engine.ProcessEngine; +import org.camunda.bpm.engine.ProcessEngineConfiguration; +import org.camunda.bpm.engine.RepositoryService; +import org.camunda.bpm.engine.RuntimeService; +import org.camunda.bpm.engine.TaskService; +import org.camunda.bpm.engine.impl.util.ClockUtil; +import org.camunda.bpm.engine.test.ProcessEngineRule; +import org.camunda.bpm.engine.test.RequiredHistoryLevel; +import org.camunda.bpm.engine.test.util.ClockTestUtil; +import org.camunda.bpm.engine.test.util.ProcessEngineTestRule; +import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule; +import org.camunda.bpm.model.bpmn.Bpmn; +import org.camunda.bpm.model.bpmn.BpmnModelInstance; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; + +@RequiredHistoryLevel(ProcessEngineConfiguration.HISTORY_FULL) +public class ExternalTaskQueryByCreateTimeTest { + + public ProcessEngineRule engineRule = new ProvidedProcessEngineRule(); + public ProcessEngineTestRule testHelper = new ProcessEngineTestRule(engineRule); + + @Rule + public RuleChain chain = RuleChain.outerRule(engineRule).around(testHelper); + + protected ProcessEngine engine; + + protected RepositoryService repositoryService; + protected RuntimeService runtimeService; + protected ManagementService managementService; + protected HistoryService historyService; + protected TaskService taskService; + protected CaseService caseService; + + @Before + public void setup() { + engine = engineRule.getProcessEngine(); + repositoryService = engineRule.getRepositoryService(); + runtimeService = engineRule.getRuntimeService(); + managementService = engineRule.getManagementService(); + historyService = engineRule.getHistoryService(); + taskService = engineRule.getTaskService(); + caseService = engineRule.getCaseService(); + + // given four process definitions with one external task each, external tasks have priorities 4, 3, 0, and 0 + deployProcessesWithExternalTasks(); + } + + @After + public void tearDown() { + ClockUtil.reset(); + } + + @Test + public void shouldHaveNonNullCreateTime() { + // given + runtimeService.startProcessInstanceByKey("process1"); + + // when + var result = engineRule.getExternalTaskService() + .createExternalTaskQuery() + .list(); + + // then + assertThat(result).hasSize(1); + assertThat(result.get(0).getCreateTime()).isNotNull(); + } + + @Test + public void shouldProduceEventWithCreateTimeValue() { + // given + runtimeService.startProcessInstanceByKey("process1"); + + var extTask = engineRule.getExternalTaskService() + .createExternalTaskQuery() + .singleResult(); + + // when + var result = historyService.createHistoricExternalTaskLogQuery().list(); + + // then + assertThat(result.size()).isEqualTo(1); + + var historyEventTimestamp = result.get(0).getTimestamp(); + + assertThat(extTask.getCreateTime()).isEqualTo(historyEventTimestamp); + } + + @Test + public void shouldReturnTasksInDescOrder() { + // given + startProcessInstanceAfter("process1", 1); + startProcessInstanceAfter("process2", 1); + + // when + var result = engineRule.getExternalTaskService() + .createExternalTaskQuery() + + .orderByCreateTime().desc() + .list(); + + // then + assertThat(result.size()).isEqualTo(2); + + var extTask1 = result.get(0); + var extTask2 = result.get(1); + + assertThat(extTask2.getCreateTime()) + .isBefore(extTask1.getCreateTime()); + } + + @Test + public void shouldReturnTasksInAscOrder() { + // given + startProcessInstanceAfter("process1", 1); + startProcessInstanceAfter("process2", 1); + + // when + var result = engineRule.getExternalTaskService() + .createExternalTaskQuery() + + .orderByCreateTime().asc() + .list(); + + // then + assertThat(result.size()).isEqualTo(2); + + var extTask1 = result.get(0); + var extTask2 = result.get(1); + + assertThat(extTask1.getCreateTime()) + .isBefore(extTask2.getCreateTime()); + } + + // Multi-Level Sorting with CreateTime & Priority + + @Test + public void shouldReturnTasksInCreateTimeAscOrderOnPriorityEquality() { + // given + startProcessInstanceAfter("process1", 1); + startProcessInstanceAfter("process2", 1); + startProcessInstanceAfter("process3", 1); + startProcessInstanceAfter("process4", 1); + + // when + var result = engineRule.getExternalTaskService() + .createExternalTaskQuery() + + .orderByPriority().desc() + .orderByCreateTime().asc() + + .list(); + + // then + assertThat(result.size()).isEqualTo(4); + + assertThat(result.get(0).getActivityId()).isEqualTo("task1"); + assertThat(result.get(1).getActivityId()).isEqualTo("task2"); + assertThat(result.get(2).getActivityId()).isEqualTo("task3"); + assertThat(result.get(3).getActivityId()).isEqualTo("task4"); + } + + @Test + public void shouldReturnTasksInCreateTimeDescOrderOnPriorityEquality() { + // given + startProcessInstanceAfter("process1", 1); + startProcessInstanceAfter("process2", 1); + startProcessInstanceAfter("process3", 1); + startProcessInstanceAfter("process4", 1); + + // when + var result = engineRule.getExternalTaskService() + .createExternalTaskQuery() + + .orderByPriority().desc() + .orderByCreateTime().desc() + + .list(); + + // then + assertThat(result.size()).isEqualTo(4); + + assertThat(result.get(0).getActivityId()).isEqualTo("task1"); // due to priority DESC + assertThat(result.get(1).getActivityId()).isEqualTo("task2"); + assertThat(result.get(2).getActivityId()).isEqualTo("task4"); // due to CreateTime DESC + assertThat(result.get(3).getActivityId()).isEqualTo("task3"); + } + + @Test + public void shouldReturnTasksInPriorityAscOnCreateTimeEquality() { + var now = ClockTestUtil.setClockToDateWithoutMilliseconds(); + + // given + startProcessInstanceWithDate("process1", now); + startProcessInstanceWithDate("process2", now); + + startProcessInstanceAfter("process3", 1); + startProcessInstanceAfter("process4", 1); + + // when + var result = engineRule.getExternalTaskService() + .createExternalTaskQuery() + + .orderByCreateTime().asc() + .orderByPriority().asc() + + .list(); + + // then + assertThat(result.size()).isEqualTo(4); + + assertThat(result.get(0).getActivityId()).isEqualTo("task2"); // due to CreateTime Equality, priority ASC + assertThat(result.get(1).getActivityId()).isEqualTo("task1"); + + assertThat(result.get(2).getActivityId()).isEqualTo("task3"); + assertThat(result.get(3).getActivityId()).isEqualTo("task4"); + } + + @Test + public void shouldReturnTasksInPriorityDescOnCreateTimeEquality() { + var now = ClockTestUtil.setClockToDateWithoutMilliseconds(); + + // given + startProcessInstanceWithDate("process1", now); + startProcessInstanceWithDate("process2", now); + + startProcessInstanceAfter("process3", 1); + startProcessInstanceAfter("process4", 1); + + // when + var result = engineRule.getExternalTaskService() + .createExternalTaskQuery() + + .orderByCreateTime().asc() + .orderByPriority().desc() + + .list(); + + // then + assertThat(result.size()).isEqualTo(4); + + assertThat(result.get(0).getActivityId()).isEqualTo("task1"); // due to CreateTime equality, priority DESC + assertThat(result.get(1).getActivityId()).isEqualTo("task2"); + + assertThat(result.get(2).getActivityId()).isEqualTo("task3"); + assertThat(result.get(3).getActivityId()).isEqualTo("task4"); + } + + private void deployProcessesWithExternalTasks() { + var process1 = createProcessWithTask("process1", "task1", "topic1", "4"); + var process2 = createProcessWithTask("process2", "task2", "topic2", "3"); + var process3 = createProcessWithTask("process3", "task3", "topic3", "0"); + var process4 = createProcessWithTask("process4", "task4", "topic4", "0"); + + testHelper.deploy(process1, process2, process3, process4); + } + + private void startProcessInstanceWithDate(String processKey, Date fixedDate) { + ClockUtil.setCurrentTime(fixedDate); + runtimeService.startProcessInstanceByKey(processKey); + } + + private void startProcessInstanceAfter(String processKey, long minutes) { + ClockTestUtil.incrementClock(minutes * 60_000); + runtimeService.startProcessInstanceByKey(processKey); + } + + private BpmnModelInstance createProcessWithTask(String processId, String taskId, String topic, String priority) { + return Bpmn.createExecutableProcess(processId) + .startEvent() + .serviceTask(taskId).camundaExternalTask(topic).camundaTaskPriority(priority) + .endEvent() + .done(); + } +} diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/api/externaltask/ExternalTaskServiceTest.java b/engine/src/test/java/org/camunda/bpm/engine/test/api/externaltask/ExternalTaskServiceTest.java index eca5d979015..236c46363b4 100644 --- a/engine/src/test/java/org/camunda/bpm/engine/test/api/externaltask/ExternalTaskServiceTest.java +++ b/engine/src/test/java/org/camunda/bpm/engine/test/api/externaltask/ExternalTaskServiceTest.java @@ -16,6 +16,7 @@ */ package org.camunda.bpm.engine.test.api.externaltask; +import static java.util.Comparator.reverseOrder; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.entry; @@ -39,9 +40,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - -import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.time.DateUtils; import org.apache.ibatis.jdbc.RuntimeSqlException; import org.camunda.bpm.engine.BadUserRequestException; import org.camunda.bpm.engine.ParseException; @@ -50,6 +50,7 @@ import org.camunda.bpm.engine.delegate.DelegateExecution; import org.camunda.bpm.engine.delegate.ExecutionListener; import org.camunda.bpm.engine.exception.NotFoundException; +import org.camunda.bpm.engine.exception.NotValidException; import org.camunda.bpm.engine.exception.NullValueException; import org.camunda.bpm.engine.externaltask.ExternalTask; import org.camunda.bpm.engine.externaltask.ExternalTaskQuery; @@ -70,6 +71,7 @@ import org.camunda.bpm.engine.test.Deployment; import org.camunda.bpm.engine.test.RequiredHistoryLevel; import org.camunda.bpm.engine.test.util.AssertUtil; +import org.camunda.bpm.engine.test.util.ClockTestUtil; import org.camunda.bpm.engine.test.util.PluggableProcessEngineTest; import org.camunda.bpm.engine.variable.VariableMap; import org.camunda.bpm.engine.variable.Variables; @@ -156,7 +158,6 @@ public void testFetch() { assertEquals(WORKER_ID, task.getWorkerId()); } - @Deployment(resources = "org/camunda/bpm/engine/test/api/externaltask/twoExternalTaskWithPriorityProcess.bpmn20.xml") @Test public void testFetchWithPriority() { @@ -192,6 +193,242 @@ public void testFetchWithPriority() { assertEquals(WORKER_ID, task.getWorkerId()); } + @Deployment(resources = { + "org/camunda/bpm/engine/test/api/externaltask/oneExternalTaskProcess.bpmn20.xml", + "org/camunda/bpm/engine/test/api/externaltask/twoExternalTaskWithPriorityProcess.bpmn20.xml" + }) + @Test + public void shouldFetchWithCreateTimeDESCAndPriority() { + // given + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("oneExternalTaskProcess"); // null priority + + // when + var result = externalTaskService.fetchAndLock() + .maxTasks(5) + .workerId(WORKER_ID) + + .orderByCreateTime().desc() + .usePriority(true) + + .subscribe() + .topic(TOPIC_NAME, LOCK_TIME) + .execute(); + + // then + assertThat(result.size()).isEqualTo(5); + + assertThat(result.get(0).getPriority()).isEqualTo(7); + assertThat(result.get(1).getPriority()).isEqualTo(7); + assertThat(result.get(2).getPriority()).isEqualTo(0); + assertThat(result.get(3).getPriority()).isEqualTo(0); + assertThat(result.get(4).getPriority()).isEqualTo(0); + + + // given the same priority, DESC date is applied + assertThat(result.get(0).getCreateTime()).isAfterOrEqualsTo(result.get(1).getCreateTime()); + + // given the rest of priorities, DESC date should apply between them + assertThat(result.get(2).getCreateTime()).isAfterOrEqualsTo(result.get(3).getCreateTime()); + assertThat(result.get(3).getCreateTime()).isAfterOrEqualsTo(result.get(4).getCreateTime()); + } + + @Deployment(resources = { + "org/camunda/bpm/engine/test/api/externaltask/oneExternalTaskProcess.bpmn20.xml", + "org/camunda/bpm/engine/test/api/externaltask/twoExternalTaskWithPriorityProcess.bpmn20.xml" + }) + @Test + public void shouldFetchWithCreateTimeASCAndPriority() { + // given + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("oneExternalTaskProcess"); // null priority + + // when + var result = externalTaskService.fetchAndLock() + .maxTasks(5) + .workerId(WORKER_ID) + + .orderByCreateTime().asc() + .usePriority(true) + + .subscribe() + .topic(TOPIC_NAME, LOCK_TIME) + .execute(); + + // then + assertThat(result.size()).isEqualTo(5); + + assertThat(result.get(0).getPriority()).isEqualTo(7); + assertThat(result.get(1).getPriority()).isEqualTo(7); + + assertThat(result.get(2).getPriority()).isEqualTo(0); + assertThat(result.get(3).getPriority()).isEqualTo(0); + assertThat(result.get(4).getPriority()).isEqualTo(0); + + + // given the same priority, ASC date is applied + assertThat(result.get(0).getCreateTime()).isBeforeOrEqualsTo(result.get(1).getCreateTime()); + + // given the rest of priorities, ASC date should apply between them + assertThat(result.get(2).getCreateTime()).isBeforeOrEqualsTo(result.get(3).getCreateTime()); + assertThat(result.get(3).getCreateTime()).isBeforeOrEqualsTo(result.get(4).getCreateTime()); + } + + @Deployment(resources = { + "org/camunda/bpm/engine/test/api/externaltask/oneExternalTaskProcess.bpmn20.xml", + "org/camunda/bpm/engine/test/api/externaltask/twoExternalTaskWithPriorityProcess.bpmn20.xml" + }) + @Test + public void shouldFetchWithCreateTimeASCWithoutPriority() { + // given + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("oneExternalTaskProcess"); // null priority + + // when + var result = externalTaskService.fetchAndLock() + .maxTasks(5) + .workerId(WORKER_ID) + + .orderByCreateTime().asc() + .usePriority(false) + + .subscribe() + .topic(TOPIC_NAME, LOCK_TIME) + .execute(); + + // then + assertThat(result.size()).isEqualTo(5); + assertThat(result).extracting("createTime", Date.class).isSorted(); + } + + @Deployment(resources = { + "org/camunda/bpm/engine/test/api/externaltask/oneExternalTaskProcess.bpmn20.xml", + "org/camunda/bpm/engine/test/api/externaltask/twoExternalTaskWithPriorityProcess.bpmn20.xml" + }) + @Test + public void shouldFetchWithCreateTimeDESCWithoutPriority() { + // given + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("oneExternalTaskProcess"); // null priority + + // when + var result = externalTaskService.fetchAndLock() + .maxTasks(5) + .workerId(WORKER_ID) + + .orderByCreateTime().desc() + .usePriority(false) + + .subscribe() + .topic(TOPIC_NAME, LOCK_TIME) + .execute(); + + // then + assertThat(result.size()).isEqualTo(5); + assertThat(result).extracting("createTime", Date.class).isSortedAccordingTo(reverseOrder()); + } + + @Deployment(resources = { + "org/camunda/bpm/engine/test/api/externaltask/oneExternalTaskProcess.bpmn20.xml", + "org/camunda/bpm/engine/test/api/externaltask/twoExternalTaskWithPriorityProcess.bpmn20.xml" + }) + @Test + public void shouldIgnoreCreateOrderingWhenCreateTimeIsNotConfigured() { + // given + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("oneExternalTaskProcess"); // null priority + + // when + var result = externalTaskService.fetchAndLock() + .maxTasks(5) + .workerId(WORKER_ID) + + // create time ordering is omitted + .usePriority(true) + + .subscribe() + .topic(TOPIC_NAME, LOCK_TIME) + .execute(); + + // then + assertThat(result.size()).isEqualTo(5); + // create time ordering will be ignored, only priority will be used + assertThat(result).extracting("priority", Long.class).isSortedAccordingTo(reverseOrder()); + } + + @Deployment(resources = { + "org/camunda/bpm/engine/test/api/externaltask/oneExternalTaskProcess.bpmn20.xml", + "org/camunda/bpm/engine/test/api/externaltask/twoExternalTaskWithPriorityProcess.bpmn20.xml" + }) + @Test + public void shouldIgnoreCreateTimeConfigWhenOrderIsNull() { + // given + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("twoExternalTaskWithPriorityProcess"); // priority 7 & null + ClockTestUtil.incrementClock(60_000); + runtimeService.startProcessInstanceByKey("oneExternalTaskProcess"); // null priority + + // when + var result = externalTaskService.fetchAndLock(6, WORKER_ID, true) + .topic(TOPIC_NAME, LOCK_TIME) + .execute(); + + // then + assertThat(result.size()).isEqualTo(5); + // create time ordering will be ignored, only priority will be used + assertThat(result).extracting("priority", Long.class).isSortedAccordingTo(reverseOrder()); + } + + @Test + public void shouldThrowExceptionOnSubscribeWithInvalidOrderConfig() { + // when + assertThatThrownBy(() -> externalTaskService.fetchAndLock() + .orderByCreateTime() + .subscribe() + .execute()) + // then + .isInstanceOf(NotValidException.class) + .hasMessage("Invalid query: call asc() or desc() after using orderByXX(): direction is null"); + } + + @Test + public void shouldThrowExceptionOnChainedSortingConfigs() { + // when + assertThatThrownBy(() -> externalTaskService.fetchAndLock() + .orderByCreateTime() + .desc() + .desc()) + // then + .isInstanceOf(NotValidException.class) + .hasMessage("Invalid query: can specify only one direction desc() or asc() for an ordering constraint: direction is not null"); + } + + @Test + public void shouldThrowExceptionOnUnspecifiedSortingField() { + // when + assertThatThrownBy(() -> externalTaskService.fetchAndLock() + .desc()) + // then + .isInstanceOf(NotValidException.class) + .hasMessage("You should call any of the orderBy methods first before specifying a direction: currentOrderingProperty is null"); + } + @Deployment(resources = "org/camunda/bpm/engine/test/api/externaltask/externalTaskPriorityProcess.bpmn20.xml") @Test public void testFetchProcessWithPriority() { @@ -202,9 +439,10 @@ public void testFetchProcessWithPriority() { List externalTasks = externalTaskService.fetchAndLock(2, WORKER_ID, true) .topic(TOPIC_NAME, LOCK_TIME) .execute(); - assertEquals(2, externalTasks.size()); // then + assertEquals(2, externalTasks.size()); + //task with no prio gets prio defined by process assertEquals(9, externalTasks.get(0).getPriority()); //task with own prio overrides prio defined by process @@ -222,6 +460,7 @@ public void testFetchProcessWithPriorityExpression() { List externalTasks = externalTaskService.fetchAndLock(2, WORKER_ID, true) .topic(TOPIC_NAME, LOCK_TIME) .execute(); + assertEquals(2, externalTasks.size()); // then @@ -4255,8 +4494,6 @@ public void testFetchAndLockWithoutExtensionProperties_shouldReturnEmptyMap() { assertThat(lockedExternalTasks.get(0).getExtensionProperties()).isEmpty(); } - - protected Date nowPlus(long millis) { return new Date(ClockUtil.getCurrentTime().getTime() + millis); } diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/api/multitenancy/query/history/MultiTenancyHistoricExternalTaskLogTest.java b/engine/src/test/java/org/camunda/bpm/engine/test/api/multitenancy/query/history/MultiTenancyHistoricExternalTaskLogTest.java index a6b626b005f..13e27659b09 100644 --- a/engine/src/test/java/org/camunda/bpm/engine/test/api/multitenancy/query/history/MultiTenancyHistoricExternalTaskLogTest.java +++ b/engine/src/test/java/org/camunda/bpm/engine/test/api/multitenancy/query/history/MultiTenancyHistoricExternalTaskLogTest.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; - import org.camunda.bpm.engine.ExternalTaskService; import org.camunda.bpm.engine.HistoryService; import org.camunda.bpm.engine.IdentityService; @@ -385,7 +384,8 @@ protected void reportExternalTaskFailure(String externalTaskId) { reportExternalTaskFailure(externalTaskId, DEFAULT_TOPIC, WORKER_ID, 1, false, "This is an error!"); } - protected void reportExternalTaskFailure(String externalTaskId, String topic, String workerId, Integer retries, boolean usePriority, String errorMessage) { + protected void reportExternalTaskFailure(String externalTaskId, String topic, String workerId, Integer retries, + boolean usePriority, String errorMessage) { List list = externalTaskService.fetchAndLock(100, workerId, usePriority) .topic(topic, LOCK_DURATION) .execute(); diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/history/HistoricExternalTaskLogQuerySortingTest.java b/engine/src/test/java/org/camunda/bpm/engine/test/history/HistoricExternalTaskLogQuerySortingTest.java index 4f826b21215..457c222b2d0 100644 --- a/engine/src/test/java/org/camunda/bpm/engine/test/history/HistoricExternalTaskLogQuerySortingTest.java +++ b/engine/src/test/java/org/camunda/bpm/engine/test/history/HistoricExternalTaskLogQuerySortingTest.java @@ -36,7 +36,6 @@ import java.util.Date; import java.util.LinkedList; import java.util.List; - import org.camunda.bpm.engine.ExternalTaskService; import org.camunda.bpm.engine.HistoryService; import org.camunda.bpm.engine.ProcessEngineConfiguration; @@ -538,7 +537,8 @@ protected void reportExternalTaskFailure(List taskLIst) { } } - protected void reportExternalTaskFailure(String externalTaskId, String topic, String workerId, Integer retries, boolean usePriority, String errorMessage) { + protected void reportExternalTaskFailure(String externalTaskId, String topic, String workerId, Integer retries, + boolean usePriority, String errorMessage) { List list = externalTaskService.fetchAndLock(100, workerId, usePriority) .topic(topic, LOCK_DURATION) .execute(); diff --git a/engine/src/test/java/org/camunda/bpm/engine/test/history/HistoricExternalTaskLogQueryTest.java b/engine/src/test/java/org/camunda/bpm/engine/test/history/HistoricExternalTaskLogQueryTest.java index fc881be4501..f7714198733 100644 --- a/engine/src/test/java/org/camunda/bpm/engine/test/history/HistoricExternalTaskLogQueryTest.java +++ b/engine/src/test/java/org/camunda/bpm/engine/test/history/HistoricExternalTaskLogQueryTest.java @@ -27,7 +27,6 @@ import java.util.LinkedList; import java.util.List; - import org.camunda.bpm.engine.ExternalTaskService; import org.camunda.bpm.engine.HistoryService; import org.camunda.bpm.engine.ProcessEngineConfiguration; @@ -904,7 +903,8 @@ protected void reportExternalTaskFailure(String externalTaskId, String errorMess reportExternalTaskFailure(externalTaskId, DEFAULT_TOPIC, WORKER_ID, 1, false, errorMessage); } - protected void reportExternalTaskFailure(String externalTaskId, String topic, String workerId, Integer retries, boolean usePriority, String errorMessage) { + protected void reportExternalTaskFailure(String externalTaskId, String topic, String workerId, Integer retries, + boolean usePriority, String errorMessage) { List list = externalTaskService.fetchAndLock(100, workerId, usePriority) .topic(topic, LOCK_DURATION) .execute();