diff --git a/irs-common/src/main/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinder.java b/irs-common/src/main/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinder.java index 27d08284a4..04d1020e7e 100644 --- a/irs-common/src/main/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinder.java +++ b/irs-common/src/main/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinder.java @@ -54,11 +54,11 @@ public class ResultFinder { public CompletableFuture getFastestResult(final List> futures) { if (futures == null || futures.isEmpty()) { - log.warn("Called getFastestResult with empty list of futures"); + log.warn("getFastestResult#0 Called getFastestResult with empty list of futures"); return CompletableFuture.completedFuture(null); } - log.debug("Trying to get fastest result from list of futures"); + log.debug("getFastestResult#1 Trying to get fastest result from list of futures"); final CompletableFuture fastestResultPromise = new CompletableFuture<>(); @@ -69,17 +69,19 @@ public CompletableFuture getFastestResult(final List .handle(completingOnFirstSuccessful(fastestResultPromise))) .toList(); + log.debug("getFastestResult#2"); + allOf(toArray(futuresList)).whenComplete((value, ex) -> { - log.debug("List of futures completed"); + log.debug("getFastestResult#3 List of futures completed"); if (ex != null) { - log.error("Exception occurred: " + ex.getMessage(), ex); + log.error("getFastestResult#4 Exception occurred: " + ex.getMessage(), ex); fastestResultPromise.completeExceptionally(new CompletionExceptions(exceptions)); } else if (fastestResultPromise.isDone()) { - log.debug("Fastest result already found, ignoring the others"); + log.debug("getFastestResult#5 Fastest result already found, ignoring the others"); } else { - log.debug("Completing"); + log.debug("getFastestResult#6 Completing"); fastestResultPromise.complete(null); } }); @@ -96,20 +98,26 @@ private static BiFunction completingOnFirstSuccessful return (value, throwable) -> { + log.debug("completingOnFirstSuccessful#1 value: {}, throwable: {}", value, throwable); + final boolean notFinishedByOtherFuture = !resultPromise.isDone(); + log.debug("completingOnFirstSuccessful#2 notFinishedByOtherFuture {} ", notFinishedByOtherFuture); + final boolean currentFutureSuccessful = throwable == null && value != null; if (notFinishedByOtherFuture && currentFutureSuccessful) { // first future that completes successfully completes the overall future - log.debug("First future that completed successfully"); + log.debug("completingOnFirstSuccessful#3 First future that completed successfully"); resultPromise.complete(value); return true; } else { if (throwable != null) { - log.warn("Exception occurred: " + throwable.getMessage(), throwable); + log.warn("completingOnFirstSuccessful#4 Exception occurred: " + throwable.getMessage(), throwable); throw new CompletionException(throwable.getMessage(), throwable); + } else { + log.debug("completingOnFirstSuccessful#5 log just for debugging"); } return false; } @@ -118,7 +126,7 @@ private static BiFunction completingOnFirstSuccessful private static Function collectingExceptionsAndThrow(final List exceptions) { return t -> { - log.error("Exception occurred: " + t.getMessage(), t); + log.error("collectingExceptionsAndThrow -- Exception occurred: " + t.getMessage(), t); exceptions.add(t); throw new CompletionException(t); }; diff --git a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryService.java b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryService.java index bd43f9f800..3e83edfcaf 100644 --- a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryService.java +++ b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryService.java @@ -81,17 +81,25 @@ private static Stream>> groupKeys public Collection fetchShells(final Collection keys) throws RegistryServiceException { - log.info("Fetching shell(s) for {} key(s)", keys.size()); + try { + + log.info("fetchShells#1 Fetching shell(s) for {} key(s)", keys.size()); - final var calledEndpoints = new HashSet(); - final var collectedShells = groupKeysByBpn(keys).flatMap( - entry -> fetchShellDescriptors(calledEndpoints, entry.getKey(), entry.getValue()).stream()).toList(); + final var calledEndpoints = new HashSet(); + final var collectedShells = groupKeysByBpn(keys).flatMap( + entry -> fetchShellDescriptors(calledEndpoints, entry.getKey(), entry.getValue()).stream()) + .toList(); - if (collectedShells.isEmpty()) { - throw new ShellNotFoundException("Unable to find any of the requested shells", calledEndpoints); - } else { - log.info("Found {} shell(s) for {} key(s)", collectedShells.size(), keys.size()); - return collectedShells; + if (collectedShells.isEmpty()) { + log.debug("fetchShells#2 no shells found"); + throw new ShellNotFoundException("Unable to find any of the requested shells", calledEndpoints); + } else { + log.info("fetchShells#3 Found {} shell(s) for {} key(s)", collectedShells.size(), keys.size()); + return collectedShells; + } + + } finally { + log.debug("fetchShells#4 finally"); } } @@ -99,11 +107,11 @@ public Collection fetchShells(final Collecti private List fetchShellDescriptors(final Set calledEndpoints, final String bpn, final List keys) { - log.info("Fetching {} shells for bpn {}", keys.size(), bpn); + log.info("fetchShellDescriptors#1 Fetching {} shells for bpn {}", keys.size(), bpn); final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn); - log.debug("Found {} connector endpoints for bpn {}", connectorEndpoints.size(), bpn); + log.debug("fetchShellDescriptors#2 Found {} connector endpoints for bpn {}", connectorEndpoints.size(), bpn); calledEndpoints.addAll(connectorEndpoints); @@ -113,6 +121,8 @@ private List fetchShellDescriptors(final Set private List fetchShellDescriptorsForConnectorEndpoints(final String bpn, final List keys, final List connectorEndpoints) { + log.debug("fetchShellDescriptorsForConnectorEndpoints#1"); + final EndpointDataForConnectorsService service = endpointDataForConnectorsService; try { final var futures = service.createFindEndpointDataForConnectorsFutures(connectorEndpoints) @@ -121,19 +131,33 @@ private List fetchShellDescriptorsForConnect edr -> supplyAsync(() -> fetchShellDescriptorsForKey(keys, edr)))) .toList(); + log.debug("fetchShellDescriptorsForConnectorEndpoints#2"); + return resultFinder.getFastestResult(futures).get(); } catch (InterruptedException e) { - log.debug("InterruptedException occurred while fetching shells for bpn '%s'".formatted(bpn), e); + log.debug( + "fetchShellDescriptorsForConnectorEndpoints#3 InterruptedException occurred while fetching shells for bpn '%s'".formatted( + bpn), e); Thread.currentThread().interrupt(); return emptyList(); - } catch (ResultFinder.CompletionExceptions | ExecutionException e) { - // TODO do not log and throw, this is just for temporary debugging - log.error(e.getMessage(), e); + } catch (ResultFinder.CompletionExceptions e) { + + log.debug("fetchShellDescriptorsForConnectorEndpoints#4 " + e.getMessage(), e); + e.getCauses() + .forEach(ex -> log.debug("fetchShellDescriptorsForConnectorEndpoints#5 " + ex.getMessage(), ex)); + throw new RegistryServiceRuntimeException( "Exception occurred while fetching shells for bpn '%s'".formatted(bpn), e); + + } catch (ExecutionException e) { + + log.error("fetchShellDescriptorsForConnectorEndpoints#6 " + e.getMessage(), e); + throw new RegistryServiceRuntimeException( + "Exception occurred while fetching shells for bpn '%s'".formatted(bpn), e); + } finally { - log.debug("End fetchShellDescriptorsForConnectorEndpoints"); + log.debug("fetchShellDescriptorsForConnectorEndpoints#7 finally"); } } @@ -145,11 +169,15 @@ private List fetchShellDescriptorsForKey( private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDataReference endpointDataReference, final DigitalTwinRegistryKey key) { - log.info("Retrieving AAS Identification for DigitalTwinRegistryKey: {}", key); - final String aaShellIdentification = mapToShellId(endpointDataReference, key.shellId()); - log.debug("aaShellIdentification: {}", aaShellIdentification); - return decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(endpointDataReference, - aaShellIdentification); + try { + log.info("fetchShellDescriptor#1 Retrieving AAS Identification for DigitalTwinRegistryKey: {}", key); + final String aaShellIdentification = mapToShellId(endpointDataReference, key.shellId()); + log.debug("fetchShellDescriptor#2 aaShellIdentification: {}", aaShellIdentification); + return decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(endpointDataReference, + aaShellIdentification); + } finally { + log.debug("fetchShellDescriptor#3 finally"); + } } /** @@ -163,25 +191,47 @@ private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDa */ @NotNull private String mapToShellId(final EndpointDataReference endpointDataReference, final String key) { - final var identifierKeyValuePair = IdentifierKeyValuePair.builder().name("globalAssetId").value(key).build(); - final var aaShellIdentification = decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink( - endpointDataReference, List.of(identifierKeyValuePair)).getResult().stream().findFirst().orElse(key); - - if (key.equals(aaShellIdentification)) { - log.info("Found shell with shellId {} in registry", aaShellIdentification); - } else { - log.info("Retrieved shellId {} for globalAssetId {}", aaShellIdentification, key); + + log.debug("mapToShellId#1"); + + try { + + final var identifierKeyValuePair = IdentifierKeyValuePair.builder() + .name("globalAssetId") + .value(key) + .build(); + final var aaShellIdentification = decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink( + endpointDataReference, List.of(identifierKeyValuePair)) + .getResult() + .stream() + .findFirst() + .orElse(key); + + log.debug("mapToShellId#2"); + + if (key.equals(aaShellIdentification)) { + log.info("mapToShellId#3 Found shell with shellId {} in registry", aaShellIdentification); + } else { + log.info("mapToShellId#4 Retrieved shellId {} for globalAssetId {}", aaShellIdentification, key); + } + return aaShellIdentification; + } finally { + log.debug("mapToShellId#5 finally"); } - return aaShellIdentification; } private Collection lookupShellIds(final String bpn) { - log.info("Looking up shell ids for bpn {}", bpn); + + log.info("lookupShellIds#1 Looking up shell ids for bpn {}", bpn); + final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn); - log.debug("Looking up shell ids for bpn {} with connector endpoints {}", bpn, connectorEndpoints); + log.debug("lookupShellIds#2 Looking up shell ids for bpn {} with connector endpoints {}", bpn, + connectorEndpoints); final var endpointDataReferenceFutures = endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures( connectorEndpoints); + log.debug("lookupShellIds#3 created futures"); + try { final var futures = endpointDataReferenceFutures.stream() .map(edrFuture -> edrFuture.thenCompose( @@ -189,25 +239,40 @@ private Collection lookupShellIds(final String bpn) { .toList(); final var shellIds = resultFinder.getFastestResult(futures).get(); - log.info("Found {} shell id(s) in total", shellIds.size()); + log.info("lookupShellIds#4 Found {} shell id(s) in total", shellIds.size()); return shellIds; } catch (InterruptedException e) { - log.debug("InterruptedException occurred while looking up shells ids for bpn '%s'".formatted(bpn), e); + log.debug( + "lookupShellIds#5 InterruptedException occurred while looking up shells ids for bpn '%s'".formatted( + bpn), e); Thread.currentThread().interrupt(); return emptyList(); - } catch (ResultFinder.CompletionExceptions | ExecutionException e) { - // TODO do not log and throw, this is just for temporary debugging - log.error(e.getMessage(), e); + } catch (ResultFinder.CompletionExceptions e) { + + log.debug("lookupShellIds#6" + e.getMessage(), e); + e.getCauses().forEach(ex -> log.debug("lookupShellIds#7 " + ex.getMessage(), ex)); + + throw new RegistryServiceRuntimeException( + "Exception occurred while looking up shell ids for bpn '%s'".formatted(bpn), e); + + } catch (ExecutionException e) { + log.error("lookupShellIds#8 " + e.getMessage(), e); throw new RegistryServiceRuntimeException( "Exception occurred while looking up shell ids for bpn '%s'".formatted(bpn), e); } } private List lookupShellIds(final String bpn, final EndpointDataReference endpointDataReference) { - log.debug("lookupShellIds for bpn {} with endpointDataReference {}", bpn, endpointDataReference); - return decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(endpointDataReference, - List.of(IdentifierKeyValuePair.builder().name("manufacturerId").value(bpn).build())).getResult(); + log.debug("lookupShellIds#1 look up shell IDs for bpn {} with endpointDataReference {}", bpn, + endpointDataReference); + try { + return decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink( + endpointDataReference, + List.of(IdentifierKeyValuePair.builder().name("manufacturerId").value(bpn).build())).getResult(); + } finally { + log.debug("lookupShellIds#2 finally"); + } } @Override diff --git a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsService.java b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsService.java index 5fbad38a31..28a7edd321 100644 --- a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsService.java +++ b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsService.java @@ -25,6 +25,7 @@ import static java.util.concurrent.CompletableFuture.supplyAsync; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -49,25 +50,37 @@ public class EndpointDataForConnectorsService { public List> createFindEndpointDataForConnectorsFutures( final List connectorEndpoints) { - log.debug("createFindEndpointDataForConnectorsFutures for connector endpoints: {}", connectorEndpoints); - return connectorEndpoints.stream() - .map(connectorEndpoint -> supplyAsync( - () -> getEndpointReferenceForAsset(connectorEndpoint))) - .toList(); + List> futures = Collections.emptyList(); + try { + log.debug( + "createFindEndpointDataForConnectorsFutures#1 creating futures to get EndpointDataReferences for endpoints: {}", + connectorEndpoints); + futures = connectorEndpoints.stream() + .map(connectorEndpoint -> supplyAsync( + () -> getEndpointReferenceForAsset(connectorEndpoint))) + .toList(); + return futures; + } finally { + log.debug("createFindEndpointDataForConnectorsFutures#2 created {} futures", futures.size()); + } } private EndpointDataReference getEndpointReferenceForAsset(final String connector) { - log.info("Trying to retrieve EndpointDataReference for connector {}", connector); + log.info("getEndpointReferenceForAsset#1 Trying to retrieve EndpointDataReference for connector {}", connector); try { final var endpointDataReference = edcSubmodelFacade.getEndpointReferenceForAsset(connector, DT_REGISTRY_ASSET_TYPE, DT_REGISTRY_ASSET_VALUE); - log.debug("Got EndpointDataReference for connector {}", connector); + log.debug("getEndpointReferenceForAsset#2 Got EndpointDataReference for connector {}", connector); return endpointDataReference; } catch (EdcRetrieverException e) { - log.warn("Exception occurred when retrieving EndpointDataReference from connector {}", connector, e); + log.warn( + "getEndpointReferenceForAsset#3 Exception occurred when retrieving EndpointDataReference from connector {}", + connector, e); throw new CompletionException(e.getMessage(), e); + } finally { + log.debug("getEndpointReferenceForAsset#4 finally"); } }