Skip to content

Commit

Permalink
feat(imp):[#214] catching any runtime exception to avoid hanging jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
dsmf committed Jan 25, 2024
1 parent e3bb440 commit 90995dd
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryKey;
import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryService;
import org.eclipse.tractusx.irs.registryclient.exceptions.RegistryServiceException;
import org.springframework.web.client.RestClientException;

/**
* Retrieves AAShell from Digital Twin Registry service and storing it inside {@link ItemContainer}.
Expand All @@ -55,8 +54,10 @@ public DigitalTwinDelegate(final AbstractDelegate nextStep,
}

@Override
public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContainerBuilder, final JobParameter jobData,
final AASTransferProcess aasTransferProcess, final PartChainIdentificationKey itemId) {
@SuppressWarnings("PMD.AvoidCatchingGenericException")
public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContainerBuilder,
final JobParameter jobData, final AASTransferProcess aasTransferProcess,
final PartChainIdentificationKey itemId) {

if (StringUtils.isBlank(itemId.getBpn())) {
log.warn("Could not process item with id {} because no BPN was provided. Creating Tombstone.",
Expand All @@ -65,7 +66,9 @@ public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContai
Tombstone.from(itemId.getGlobalAssetId(), null, "Can't get relationship without a BPN", 0,
ProcessStep.DIGITAL_TWIN_REQUEST)).build();
}

try {

final AssetAdministrationShellDescriptor shell = digitalTwinRegistryService.fetchShells(
List.of(new DigitalTwinRegistryKey(itemId.getGlobalAssetId(), itemId.getBpn())))
.stream()
Expand All @@ -79,9 +82,13 @@ public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContai
// filter submodel descriptors if next delegate will not be executed
itemContainerBuilder.shell(shell.withFilteredSubmodelDescriptors(jobData.getAspects()));
}
} catch (final RestClientException | RegistryServiceException e) {

} catch (final RegistryServiceException | RuntimeException e) {
// catching generic exception is intended here,
// otherwise Jobs stay in state RUNNING forever
log.info("Shell Endpoint could not be retrieved for Item: {}. Creating Tombstone.", itemId);
itemContainerBuilder.tombstone(Tombstone.from(itemId.getGlobalAssetId(), null, e, retryCount, ProcessStep.DIGITAL_TWIN_REQUEST));
itemContainerBuilder.tombstone(
Tombstone.from(itemId.getGlobalAssetId(), null, e, retryCount, ProcessStep.DIGITAL_TWIN_REQUEST));
}

if (expectedDepthOfTreeIsNotReached(jobData.getDepth(), aasTransferProcess.getDepth())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.apache.commons.lang3.exception.ExceptionUtils;

/**
* Helper class to find the relevant result from a list of futures.
Expand Down Expand Up @@ -80,8 +81,15 @@ public <T> CompletableFuture<T> getFastestResult(final List<CompletableFuture<T>
log.info(logPrefix + "All of the futures completed");

if (ex != null) {
log.error(logPrefix + "Exception occurred: " + ex.getMessage(), ex);
fastestResultPromise.completeExceptionally(new CompletionExceptions(exceptions));

log.warn("All failed: " + System.lineSeparator() //
+ exceptions.stream()
.map(ExceptionUtils::getStackTrace)
.collect(Collectors.joining(System.lineSeparator())), ex);

fastestResultPromise.completeExceptionally(
new CompletionExceptions(LOGPREFIX_TO_BE_REMOVED_LATER + "None successful", exceptions));

} else if (fastestResultPromise.isDone()) {
log.info(logPrefix + "Fastest result already found, ignoring the others");
} else {
Expand Down Expand Up @@ -145,20 +153,11 @@ private static <T> Function<Throwable, T> collectingExceptionsAndThrow(final Lis
public static class CompletionExceptions extends CompletionException {

private final List<Throwable> causes;

public CompletionExceptions(final List<Throwable> causes) {
super(LOGPREFIX_TO_BE_REMOVED_LATER + "All failing, use getCauses() for details");
public CompletionExceptions(final String msg, final List<Throwable> causes) {
super(msg);
this.causes = causes;
}

public void logWarn(final Logger log, final String prefix) {

log.warn("{}{}", prefix, this.getMessage(), this.getCause());

for (final Throwable cause : this.getCauses()) {
log.warn(prefix + cause.getMessage(), cause);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.eclipse.tractusx.irs.common.util.concurrent.ResultFinder.LOGPREFIX_TO_BE_REMOVED_LATER;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -47,7 +46,6 @@
import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryService;
import org.eclipse.tractusx.irs.registryclient.discovery.ConnectorEndpointsService;
import org.eclipse.tractusx.irs.registryclient.exceptions.RegistryServiceException;
import org.eclipse.tractusx.irs.registryclient.exceptions.RegistryServiceRuntimeException;
import org.eclipse.tractusx.irs.registryclient.exceptions.ShellNotFoundException;
import org.jetbrains.annotations.NotNull;
import org.springframework.util.StopWatch;
Expand Down Expand Up @@ -82,6 +80,7 @@ private static Stream<Map.Entry<String, List<DigitalTwinRegistryKey>>> groupKeys
}

@Override
@SuppressWarnings("PMD.AvoidCatchingGenericException")
public Collection<AssetAdministrationShellDescriptor> fetchShells(final Collection<DigitalTwinRegistryKey> keys)
throws RegistryServiceException {

Expand All @@ -90,11 +89,20 @@ public Collection<AssetAdministrationShellDescriptor> fetchShells(final Collecti
LOGPREFIX_TO_BE_REMOVED_LATER + "Fetching shell(s) for %s key(s)".formatted(keys.size()));

try {

final var calledEndpoints = new HashSet<String>();
final var collectedShells = groupKeysByBpn(keys).flatMap(
entry -> fetchShellDescriptors(calledEndpoints, entry.getKey(), entry.getValue()).stream())
.toList();

final var collectedShells = groupKeysByBpn(keys).flatMap(entry -> {

try {
return fetchShellDescriptors(entry, calledEndpoints);
} catch (RuntimeException e) {
// catching generic exception is intended here,
// otherwise Jobs stay in state RUNNING forever
log.warn(e.getMessage(), e);
return Stream.empty();
}

}).toList();

if (collectedShells.isEmpty()) {
log.info(LOGPREFIX_TO_BE_REMOVED_LATER + "No shells found");
Expand All @@ -110,9 +118,27 @@ public Collection<AssetAdministrationShellDescriptor> fetchShells(final Collecti
}
}

@NotNull
private List<AssetAdministrationShellDescriptor> fetchShellDescriptors(final Set<String> calledEndpoints,
final String bpn, final List<DigitalTwinRegistryKey> keys) {
private Stream<AssetAdministrationShellDescriptor> fetchShellDescriptors(
final Map.Entry<String, List<DigitalTwinRegistryKey>> entry, final Set<String> calledEndpoints) {

try {

final var futures = fetchShellDescriptors(calledEndpoints, entry.getKey(), entry.getValue());
final var shellDescriptors = futures.get();
return shellDescriptors.stream();

} catch (InterruptedException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
return Stream.empty();
} catch (ExecutionException e) {
log.warn(e.getMessage(), e);
return Stream.empty();
}
}

private CompletableFuture<List<AssetAdministrationShellDescriptor>> fetchShellDescriptors(
final Set<String> calledEndpoints, final String bpn, final List<DigitalTwinRegistryKey> keys) {

final var watch = new StopWatch();
StopwatchUtils.startWatch(log, watch,
Expand All @@ -125,56 +151,34 @@ private List<AssetAdministrationShellDescriptor> fetchShellDescriptors(final Set
connectorEndpoints.size(), bpn);
calledEndpoints.addAll(connectorEndpoints);

return fetchShellDescriptorsForConnectorEndpoints(bpn, keys, connectorEndpoints);
return fetchShellDescriptorsForConnectorEndpoints(keys, connectorEndpoints);

} finally {
StopwatchUtils.stopWatch(log, watch);
}
}

private List<AssetAdministrationShellDescriptor> fetchShellDescriptorsForConnectorEndpoints(final String bpn,
private CompletableFuture<List<AssetAdministrationShellDescriptor>> fetchShellDescriptorsForConnectorEndpoints(
final List<DigitalTwinRegistryKey> keys, final List<String> connectorEndpoints) {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "fetchShellDescriptorsForConnectorEndpoints - ";

final EndpointDataForConnectorsService service = endpointDataForConnectorsService;
try {
final var futures = service.createFindEndpointDataForConnectorsFutures(connectorEndpoints)
.stream()
.map(edrFuture -> edrFuture.thenCompose(edr -> CompletableFuture.supplyAsync(
() -> fetchShellDescriptorsForKey(keys, edr))))
.toList();

log.info(logPrefix + " Created {} futures", futures.size());

return resultFinder.getFastestResult(futures).get();

} catch (InterruptedException e) {

handleInterruptedException(e,
logPrefix + "InterruptedException occurred while fetching shells for bpn '%s'".formatted(bpn));
return Collections.emptyList();
final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "fetchShellDescriptorsForConnectorEndpoints - ";

} catch (ResultFinder.CompletionExceptions e) {
final var service = endpointDataForConnectorsService;
final var futures = service.createFindEndpointDataForConnectorsFutures(connectorEndpoints)
.stream()
.map(edrFuture -> edrFuture.thenCompose(edr -> CompletableFuture.supplyAsync(
() -> fetchShellDescriptorsForKey(keys, edr))))
.toList();

e.logWarn(log, logPrefix);
log.info(logPrefix + " Created {} futures", futures.size());

throw new RegistryServiceRuntimeException(
"Exception occurred while fetching shells for bpn '%s'".formatted(bpn), e);

} catch (ExecutionException e) {

log.error(logPrefix + e.getMessage(), e);
throw new RegistryServiceRuntimeException(
"Exception occurred while fetching shells for bpn '%s'".formatted(bpn), e);

}
return resultFinder.getFastestResult(futures);
}

private List<AssetAdministrationShellDescriptor> fetchShellDescriptorsForKey(
final List<DigitalTwinRegistryKey> keys, final EndpointDataReference endpointDataReference) {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + " fetchShellDescriptorsForKey - ";
final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + " fetchShellDescriptorsForKey - ";

final var watch = new StopWatch();
StopwatchUtils.startWatch(log, watch,
Expand All @@ -190,7 +194,7 @@ private List<AssetAdministrationShellDescriptor> fetchShellDescriptorsForKey(
private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDataReference endpointDataReference,
final DigitalTwinRegistryKey key) {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + " fetchShellDescriptor - ";
final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + " fetchShellDescriptor - ";

final var watch = new StopWatch();
StopwatchUtils.startWatch(log, watch,
Expand All @@ -216,7 +220,7 @@ private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDa
@NotNull
private String mapToShellId(final EndpointDataReference endpointDataReference, final String key) {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "mapToShellId - ";
final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "mapToShellId - ";

final var watch = new StopWatch();
StopwatchUtils.startWatch(log, watch, logPrefix + "Mapping '%s' to shell ID for endpoint '%s'".formatted(key,
Expand Down Expand Up @@ -248,9 +252,10 @@ private String mapToShellId(final EndpointDataReference endpointDataReference, f
}
}

private Collection<String> lookupShellIds(final String bpn) {
@SuppressWarnings("PMD.AvoidCatchingGenericException")
private Collection<String> lookupShellIds(final String bpn) throws RegistryServiceException {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "lookupShellIds - ";
final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "lookupShellIds - ";
log.info(logPrefix + "Looking up shell ids for bpn {}", bpn);

final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn);
Expand All @@ -260,6 +265,25 @@ private Collection<String> lookupShellIds(final String bpn) {
connectorEndpoints);
log.info(logPrefix + "Created endpointDataReferenceFutures");

try {

return lookupShellIds(bpn, endpointDataReferenceFutures, logPrefix);

} catch (RuntimeException e) {
// catching generic exception is intended here,
// otherwise Jobs stay in state RUNNING forever
log.error(logPrefix + e.getMessage(), e); // TODO (mfischer) #214 do not log and throw
final var msg = logPrefix + e.getClass().getSimpleName()
+ " occurred while looking up shell ids for bpn '%s'".formatted(bpn);
throw new RegistryServiceException(msg, e);
}
}

@NotNull
private Collection<String> lookupShellIds(final String bpn,
final List<CompletableFuture<EndpointDataReference>> endpointDataReferenceFutures, final String logPrefix)
throws RegistryServiceException {

try {
final var futures = endpointDataReferenceFutures.stream()
.map(edrFuture -> edrFuture.thenCompose(
Expand All @@ -272,27 +296,19 @@ private Collection<String> lookupShellIds(final String bpn) {
return shellIds;

} catch (InterruptedException e) {

handleInterruptedException(e,
logPrefix + "InterruptedException occurred while looking up shells ids for bpn '%s'".formatted(
bpn));
return Collections.emptyList();

} catch (ResultFinder.CompletionExceptions e) {

e.logWarn(log, logPrefix);

throw new RegistryServiceRuntimeException(LOGPREFIX_TO_BE_REMOVED_LATER
+ "Exception occurred while looking up shell ids for bpn '%s'".formatted(bpn), e);

log.error(logPrefix + "InterruptedException occurred while looking up shells ids for bpn '%s': ".formatted(
bpn) + e.getMessage(), e); // #214 do not log and throw
Thread.currentThread().interrupt();
throw new RegistryServiceException(LOGPREFIX_TO_BE_REMOVED_LATER + e.getClass().getSimpleName()
+ " occurred while looking up shell ids for bpn '%s'".formatted(bpn), e);
} catch (ExecutionException e) {
log.error(logPrefix + e.getMessage(), e);
throw new RegistryServiceRuntimeException(LOGPREFIX_TO_BE_REMOVED_LATER
+ "Exception occurred while looking up shell ids for bpn '%s'".formatted(bpn), e);
log.error(logPrefix + e.getMessage(), e); // TODO (mfischer) #214 do not log and throw
throw new RegistryServiceException(LOGPREFIX_TO_BE_REMOVED_LATER + e.getClass().getSimpleName()
+ " occurred while looking up shell ids for bpn '%s'".formatted(bpn), e);
}
}

private List<String> lookupShellIds(final String bpn, final EndpointDataReference endpointDataReference) {
private Collection<String> lookupShellIds(final String bpn, final EndpointDataReference endpointDataReference) {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "lookupShellIds - ";
final var watch = new StopWatch();
Expand All @@ -310,12 +326,8 @@ private List<String> lookupShellIds(final String bpn, final EndpointDataReferenc
}

@Override
public Collection<DigitalTwinRegistryKey> lookupShellIdentifiers(final String bpn) {
public Collection<DigitalTwinRegistryKey> lookupShellIdentifiers(final String bpn) throws RegistryServiceException {
return lookupShellIds(bpn).stream().map(id -> new DigitalTwinRegistryKey(id, bpn)).toList();
}

private void handleInterruptedException(final InterruptedException interruptedException, final String msg) {
log.warn(msg, interruptedException);
Thread.currentThread().interrupt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public class RegistryServiceException extends Exception {
public RegistryServiceException(final String msg) {
super(msg);
}

public RegistryServiceException(final String msg, final Throwable cause) {
super(msg, cause);
}
}

This file was deleted.

Loading

0 comments on commit 90995dd

Please sign in to comment.