Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added job cancellation capability for IRS #947

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@
import static org.eclipse.tractusx.irs.configuration.JobConfiguration.JOB_BLOB_PERSISTENCE;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.tractusx.irs.aaswrapper.job.delegate.AbstractDelegate;
import org.eclipse.tractusx.irs.common.persistence.BlobPersistence;
import org.eclipse.tractusx.irs.common.persistence.BlobPersistenceException;
import org.eclipse.tractusx.irs.component.JobParameter;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.connector.job.JobException;
import org.eclipse.tractusx.irs.connector.job.ResponseStatus;
import org.eclipse.tractusx.irs.connector.job.TransferInitiateResponse;
import org.eclipse.tractusx.irs.connector.job.TransferProcessManager;
Expand All @@ -55,6 +60,9 @@ public class AASTransferProcessManager implements TransferProcessManager<ItemDat

private final AbstractDelegate abstractDelegate;

@Getter
private final Map<String, Future<?>> futures = new ConcurrentHashMap<>();

public AASTransferProcessManager(final AbstractDelegate abstractDelegate, final ExecutorService executor,
@Qualifier(JOB_BLOB_PERSISTENCE) final BlobPersistence blobStore) {
this.abstractDelegate = abstractDelegate;
Expand All @@ -66,29 +74,50 @@ public AASTransferProcessManager(final AbstractDelegate abstractDelegate, final
public TransferInitiateResponse initiateRequest(final ItemDataRequest dataRequest,
final Consumer<String> preExecutionHandler, final Consumer<AASTransferProcess> completionCallback,
final JobParameter jobData) {

final String processId = UUID.randomUUID().toString();
preExecutionHandler.accept(processId);

executor.execute(getRunnable(dataRequest, completionCallback, processId, jobData));
if (Thread.currentThread().isInterrupted()) {
log.debug("Returning from initiateRequest due to interrupt");

return new TransferInitiateResponse(processId, ResponseStatus.NOT_STARTED_JOB_CANCELLED);
}
final Future<?> future = executor.submit(getRunnable(dataRequest, completionCallback, processId, jobData));
futures.put(processId, future);

return new TransferInitiateResponse(processId, ResponseStatus.OK);
}

@Override
public void cancelRequest(final String processId) {
final Future<?> future = futures.get(processId);
if (future == null) {
throw new JobException(CANCELLATION_IMPOSSIBLE_FUTURE_NOT_FOUND.formatted(processId), processId);
}

future.cancel(true);

if (future.isDone()) {
futures.remove(processId);
} else {
throw new JobException(CANCELLATION_FAILED.formatted(processId), processId);
}
}

private Runnable getRunnable(final ItemDataRequest dataRequest,
final Consumer<AASTransferProcess> transferProcessCompleted, final String processId,
final JobParameter jobData) {

return () -> {
final AASTransferProcess aasTransferProcess = new AASTransferProcess(processId, dataRequest.getDepth());

final PartChainIdentificationKey itemId = dataRequest.getItemId();

log.info("Starting processing Digital Twin Registry with itemId {}", itemId);
log.info("Starting processing Digital Twin Registry with itemId {} for process {}", itemId, processId);

final ItemContainer itemContainer = abstractDelegate.process(ItemContainer.builder(), jobData,
aasTransferProcess, itemId);
storeItemContainer(processId, itemContainer);

storeItemContainer(processId, itemContainer);
transferProcessCompleted.accept(aasTransferProcess);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ private <T> T writeLock(final Supplier<T> work) {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new JobException("Job Interrupted", e);
log.debug("Returning from writeLock due to interrupt");
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ public JobException(final String message) {
.build();
}

public JobException(final String message, final String detail) {
super(message);
jobErrorDetails = JobErrorDetails.builder()
.exception(message)
.errorDetail(detail)
.exceptionDate(ZonedDateTime.now(ZoneOffset.UTC))
.build();
}

public JobException(final String message, final Throwable cause) {
super(message, cause);
jobErrorDetails = JobErrorDetails.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -147,6 +149,57 @@ public JobInitiateResponse startJob(final String globalAssetId, final JobParamet
return JobInitiateResponse.builder().jobId(multiJob.getJobIdString()).status(ResponseStatus.OK).build();
}

public void cancelJob(final MultiTransferJob job) {
final Collection<String> transferProcessIds = job.getTransferProcessIds();

final List<String> futureNotFoundImpossibleCancellations = new ArrayList<>();
final List<String> failedCancellations = new ArrayList<>();

transferProcessIds.forEach(transferProcessId -> {
try {
processManager.cancelRequest(transferProcessId);
} catch (JobException e) {
if (e.getMessage()
.equals(TransferProcessManager.CANCELLATION_IMPOSSIBLE_FUTURE_NOT_FOUND.formatted(
transferProcessId))) {
futureNotFoundImpossibleCancellations.add(transferProcessId);
} else if (e.getMessage()
.equals(TransferProcessManager.CANCELLATION_FAILED.formatted(transferProcessId))) {
failedCancellations.add(transferProcessId);
}
}
});

final boolean anyImpossibleOrFailed = !(futureNotFoundImpossibleCancellations.isEmpty()
&& failedCancellations.isEmpty());

if (anyImpossibleOrFailed) {
reactIfAnyProcessNotCancelled(job, futureNotFoundImpossibleCancellations, failedCancellations);
return;
}

meterService.incrementJobCancelled();
}

private void reactIfAnyProcessNotCancelled(final MultiTransferJob job,
final List<String> futureNotFoundImpossibleCancellations, final List<String> failedCancellations) {
meterService.incrementException();
final StringBuilder stringBuilder = new StringBuilder();

if (!futureNotFoundImpossibleCancellations.isEmpty()) {
stringBuilder.append("Cancellation impossible because no Future(s) were found for PID(s) ")
.append(String.join(", ", futureNotFoundImpossibleCancellations));
}
if (!failedCancellations.isEmpty()) {
if (!stringBuilder.toString().isEmpty()) {
stringBuilder.append(" and ");
}
stringBuilder.append("Cancellation failed for PID(s) ").append(String.join(", ", failedCancellations));
}

markJobInError(job, new JobException(stringBuilder.toString()), "Error cancelling job");
}

/**
* Callback invoked when a transfer has completed.
*
Expand Down Expand Up @@ -251,9 +304,7 @@ private void markJobInError(final MultiTransferJob job, final Throwable exceptio

private void publishJobProcessingFinishedEventIfFinished(final String jobId) {
jobStore.find(jobId).ifPresent(job -> {
if (job.getJob().getState().equals(JobState.COMPLETED) || job.getJob()
.getState()
.equals(JobState.ERROR)) {
if (job.getJob().getState().equals(JobState.COMPLETED) || job.getJob().getState().equals(JobState.ERROR)) {
applicationEventPublisher.publishEvent(
new JobProcessingFinishedEvent(job.getJobIdString(), job.getJob().getState().name(),
job.getJobParameter().getCallbackUrl(), job.getBatchId()));
Expand All @@ -273,7 +324,8 @@ private TransferInitiateResponse startTransfer(final MultiTransferJob job,
transferId -> jobStore.addTransferProcess(job.getJobIdString(), transferId),
this::transferProcessCompleted, jobData);

if (response.getStatus() != ResponseStatus.OK) {
if (response.getStatus() != ResponseStatus.OK
&& response.getStatus() != ResponseStatus.NOT_STARTED_JOB_CANCELLED) {
throw new JobException(response.getStatus().toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@
public enum ResponseStatus {
OK,
ERROR_RETRY,
FATAL_ERROR;
FATAL_ERROR,
NOT_STARTED_JOB_CANCELLED
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
*/
public interface TransferProcessManager<T extends DataRequest, P extends TransferProcess> {

String CANCELLATION_IMPOSSIBLE_FUTURE_NOT_FOUND = "Cancellation impossible for transfer process %s: Future not found";
String CANCELLATION_FAILED = "Cancellation failed for transfer process %s";

/**
* Starts a data request asynchronously.
*
Expand All @@ -46,4 +49,6 @@ public interface TransferProcessManager<T extends DataRequest, P extends Transfe
*/
TransferInitiateResponse initiateRequest(T dataRequest, Consumer<String> transferProcessStarted,
Consumer<P> transferProcessCompleted, JobParameter jobData);

void cancelRequest(String processId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,15 @@ private void validateAspectTypeValues(final List<String> aspectTypeValues) {
public Job cancelJobById(final @NonNull UUID jobId) {
final String idAsString = String.valueOf(jobId);

final Optional<MultiTransferJob> canceled = this.jobStore.cancelJob(idAsString);
canceled.ifPresent(cancelledJob -> applicationEventPublisher.publishEvent(
new JobProcessingFinishedEvent(cancelledJob.getJobIdString(), cancelledJob.getJob().getState().name(),
cancelledJob.getJobParameter().getCallbackUrl(), cancelledJob.getBatchId())));
return canceled.orElseThrow(
final Optional<MultiTransferJob> cancelled = this.jobStore.cancelJob(idAsString);
cancelled.ifPresent(cancelledJob -> {
orchestrator.cancelJob(cancelledJob);
applicationEventPublisher.publishEvent(new JobProcessingFinishedEvent(cancelledJob.getJobIdString(),
cancelledJob.getJob().getState().name(), cancelledJob.getJobParameter().getCallbackUrl(),
cancelledJob.getBatchId()));
});

return cancelled.orElseThrow(
() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "No job exists with id " + jobId)).getJob();
}

Expand Down
1 change: 1 addition & 0 deletions irs-api/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<Logger name="org.apache.coyote.http11.Http11NioProtocol" level="warn" />
<Logger name="org.apache.sshd.common.util.SecurityUtils" level="warn"/>
<Logger name="org.apache.tomcat.util.net.NioSelectorPool" level="warn" />
<Logger name="okhttp3.OkHttpClient" level="trace" />
</Loggers>

</Configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -25,57 +25,73 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.tractusx.irs.util.TestMother.jobParameter;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.tractusx.irs.InMemoryBlobStore;
import org.eclipse.tractusx.irs.aaswrapper.job.delegate.DigitalTwinDelegate;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.connector.job.JobException;
import org.eclipse.tractusx.irs.connector.job.ResponseStatus;
import org.eclipse.tractusx.irs.connector.job.TransferInitiateResponse;
import org.eclipse.tractusx.irs.util.TestMother;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

@Slf4j
@ExtendWith(MockitoExtension.class)
class AASTransferProcessManagerTest {

final ItemDataRequest itemDataRequest = ItemDataRequest.rootNode(
PartChainIdentificationKey.builder().globalAssetId(UUID.randomUUID().toString()).bpn("bpn123").build());
private final TestMother generate = new TestMother();

DigitalTwinDelegate digitalTwinProcessor = mock(DigitalTwinDelegate.class);
ExecutorService pool = mock(ExecutorService.class);

final AASTransferProcessManager manager = new AASTransferProcessManager(digitalTwinProcessor, pool,
private final DigitalTwinDelegate digitalTwinProcessor = mock(DigitalTwinDelegate.class);
private final ExecutorService pool = mock(ExecutorService.class);
private final AASTransferProcessManager manager = new AASTransferProcessManager(digitalTwinProcessor, pool,
new InMemoryBlobStore());

@Test
void shouldExecuteThreadForProcessing() {
// given
final ItemDataRequest itemDataRequest = ItemDataRequest.rootNode(
PartChainIdentificationKey.builder().globalAssetId(UUID.randomUUID().toString()).bpn("bpn123").build());

// when
when(pool.submit(any(Runnable.class))).thenReturn(new CompletableFuture<>());

manager.initiateRequest(itemDataRequest, s -> {
}, aasTransferProcess -> {
}, jobParameter());

// then
verify(pool, times(1)).execute(any(Runnable.class));
verify(pool, times(1)).submit(any(Runnable.class));
}

@Test
void shouldInitiateProcessingAndReturnOkStatus() {
// given
final ItemDataRequest itemDataRequest = ItemDataRequest.rootNode(
PartChainIdentificationKey.builder().globalAssetId(UUID.randomUUID().toString()).bpn("bpn123").build());
void shouldInitiateProcessingAndAddFutureToMap() {
// when
when(pool.submit(any(Runnable.class))).thenReturn(new CompletableFuture<>());

final TransferInitiateResponse initiateResponse = manager.initiateRequest(itemDataRequest, s -> {
}, aasTransferProcess -> {
}, jobParameter());

// then
assertThat(manager.getFutures()).containsKey(initiateResponse.getTransferId());
assertThat(manager.getFutures().get(initiateResponse.getTransferId())).isNotNull();
}

@Test
void shouldInitiateProcessingAndReturnOkResponse() {
// when
when(pool.submit(any(Runnable.class))).thenReturn(new CompletableFuture<>());

final TransferInitiateResponse initiateResponse = manager.initiateRequest(itemDataRequest, s -> {
}, aasTransferProcess -> {
}, jobParameter());
Expand All @@ -85,4 +101,27 @@ void shouldInitiateProcessingAndReturnOkStatus() {
assertThat(initiateResponse.getStatus()).isEqualTo(ResponseStatus.OK);
}

@Test
void shouldThrowJobExceptionWhenNoFutureFound() {
assertThrows(JobException.class, () -> manager.cancelRequest("non-existent"));
}

@Test
void shouldCancelFutureAndRemoveFromMap() {
// when
when(pool.submit(any(Runnable.class))).thenReturn(new CompletableFuture<>());

// then
final TransferInitiateResponse initiateResponse = manager.initiateRequest(itemDataRequest, s -> {
}, aasTransferProcess -> {
}, jobParameter());

assertThat(manager.getFutures()).containsKey(initiateResponse.getTransferId());
assertThat(manager.getFutures().get(initiateResponse.getTransferId())).isNotNull();

manager.cancelRequest(initiateResponse.getTransferId());

assertThat(manager.getFutures()).doesNotContainKey(initiateResponse.getTransferId());
}

}
Loading