Skip to content

Commit

Permalink
feat(imp):[#214] cleanup logging
Browse files Browse the repository at this point in the history
  • Loading branch information
dsmf committed Feb 1, 2024
1 parent f94838e commit b5e29e1
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@
@Slf4j
public class ResultFinder {

// TODO (mfischer): Remove when #214 is tested
public static final String LOGPREFIX_TO_BE_REMOVED_LATER = "#214@ ";

/**
* Returns a new {@link CompletableFuture} which completes
* when at least one of the given futures completes successfully or all fail.
Expand All @@ -58,14 +55,12 @@ public class ResultFinder {
*/
public <T> CompletableFuture<T> getFastestResult(final List<CompletableFuture<T>> futures) {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "getFastestResult - ";

if (futures == null || futures.isEmpty()) {
log.warn(logPrefix + "Called getFastestResult with empty list of futures");
log.warn("Called getFastestResult with empty list of futures");
return CompletableFuture.completedFuture(null);
}

log.info(logPrefix + "Trying to get fastest result from list of futures");
log.debug("Trying to get fastest result from list of futures");

final CompletableFuture<T> fastestResultPromise = new CompletableFuture<>();

Expand All @@ -78,22 +73,17 @@ public <T> CompletableFuture<T> getFastestResult(final List<CompletableFuture<T>

allOf(toArray(futuresList)).whenComplete((value, ex) -> {

log.info(logPrefix + "All of the futures completed");
log.debug("All of the futures completed");

if (ex != null) {

log.warn("All failed: " + System.lineSeparator() //
+ exceptions.stream()
.map(ExceptionUtils::getStackTrace)
.collect(Collectors.joining(System.lineSeparator())), ex);

fastestResultPromise.completeExceptionally(
new CompletionExceptions(LOGPREFIX_TO_BE_REMOVED_LATER + "None successful", exceptions));

} else if (fastestResultPromise.isDone()) {
log.info(logPrefix + "Fastest result already found, ignoring the others");
fastestResultPromise.completeExceptionally(new CompletionExceptions("None successful", exceptions));
} else {
log.info(logPrefix + "Completing");
log.debug("Completing");
fastestResultPromise.complete(null);
}
});
Expand All @@ -108,27 +98,25 @@ private static <T> CompletableFuture<T>[] toArray(final List<CompletableFuture<T
private static <T> BiFunction<T, Throwable, Boolean> completingOnFirstSuccessful(
final CompletableFuture<T> resultPromise) {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "completingOnFirstSuccessful - ";

return (value, throwable) -> {

log.info(logPrefix + "value: '{}', throwable: {}", value, throwable);
log.debug("value: '{}', throwable: {}", value, throwable);

final boolean notFinishedByOtherFuture = !resultPromise.isDone();
log.info(logPrefix + "notFinishedByOtherFuture {} ", notFinishedByOtherFuture);
log.debug("notFinishedByOtherFuture {} ", notFinishedByOtherFuture);

final boolean currentFutureSuccessful = throwable == null && value != null;

if (notFinishedByOtherFuture && currentFutureSuccessful) {

// first future that completes successfully completes the overall future
log.info(logPrefix + "First future that completed successfully");
log.debug("First future that completed successfully");
resultPromise.complete(value);
return true;

} else {
if (throwable != null) {
log.warn(logPrefix + "Exception occurred: " + throwable.getMessage(), throwable);
log.warn("Exception occurred: " + throwable.getMessage(), throwable);
throw new CompletionException(throwable.getMessage(), throwable);
}
return false;
Expand All @@ -138,8 +126,7 @@ private static <T> BiFunction<T, Throwable, Boolean> completingOnFirstSuccessful

private static <T> Function<Throwable, T> collectingExceptionsAndThrow(final List<Throwable> exceptions) {
return t -> {
log.error(LOGPREFIX_TO_BE_REMOVED_LATER + "collectingExceptionsAndThrow - " + "Exception occurred: "
+ t.getMessage(), t);
log.error("Exception occurred: " + t.getMessage(), t);
exceptions.add(t);
throw new CompletionException(t);
};
Expand All @@ -153,7 +140,7 @@ private static <T> Function<Throwable, T> collectingExceptionsAndThrow(final Lis
public static class CompletionExceptions extends CompletionException {

private final List<Throwable> causes;

public CompletionExceptions(final String msg, final List<Throwable> causes) {
super(msg);
this.causes = causes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
********************************************************************************/
package org.eclipse.tractusx.irs.registryclient.decentral;

import static org.eclipse.tractusx.irs.common.util.concurrent.ResultFinder.LOGPREFIX_TO_BE_REMOVED_LATER;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -57,6 +55,8 @@
@SuppressWarnings("PMD.TooManyMethods")
public class DecentralDigitalTwinRegistryService implements DigitalTwinRegistryService {

private static final String TOOK_MS = "{} took {} ms";

private final ConnectorEndpointsService connectorEndpointsService;
private final EndpointDataForConnectorsService endpointDataForConnectorsService;
private final DecentralDigitalTwinRegistryClient decentralDigitalTwinRegistryClient;
Expand Down Expand Up @@ -84,7 +84,7 @@ public Collection<AssetAdministrationShellDescriptor> fetchShells(final Collecti
throws RegistryServiceException {

final var watch = new StopWatch();
final String msg = LOGPREFIX_TO_BE_REMOVED_LATER + "Fetching shell(s) for %s key(s)".formatted(keys.size());
final String msg = "Fetching shell(s) for %s key(s)".formatted(keys.size());
watch.start(msg);
log.info(msg);

Expand All @@ -105,17 +105,16 @@ public Collection<AssetAdministrationShellDescriptor> fetchShells(final Collecti
}).toList();

if (collectedShells.isEmpty()) {
log.info(LOGPREFIX_TO_BE_REMOVED_LATER + "No shells found");
log.info("No shells found");
throw new ShellNotFoundException("Unable to find any of the requested shells", calledEndpoints);
} else {
log.info(LOGPREFIX_TO_BE_REMOVED_LATER + "Found {} shell(s) for {} key(s)", collectedShells.size(),
keys.size());
log.info("Found {} shell(s) for {} key(s)", collectedShells.size(), keys.size());
return collectedShells;
}

} finally {
watch.stop();
log.info("{} took {} ms", watch.getLastTaskName(), watch.getLastTaskTimeMillis());
log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis());
}
}

Expand All @@ -142,68 +141,60 @@ private CompletableFuture<List<AssetAdministrationShellDescriptor>> fetchShellDe
final Set<String> calledEndpoints, final String bpn, final List<DigitalTwinRegistryKey> keys) {

final var watch = new StopWatch();
final String msg =
LOGPREFIX_TO_BE_REMOVED_LATER + "Fetching %s shells for bpn '%s'".formatted(keys.size(), bpn);
final String msg = "Fetching %s shells for bpn '%s'".formatted(keys.size(), bpn);
watch.start(msg);
log.info(msg);

try {
final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn);

log.info(LOGPREFIX_TO_BE_REMOVED_LATER + "Found {} connector endpoints for bpn '{}'",
connectorEndpoints.size(), bpn);
log.info("Found {} connector endpoints for bpn '{}'", connectorEndpoints.size(), bpn);
calledEndpoints.addAll(connectorEndpoints);

return fetchShellDescriptorsForConnectorEndpoints(keys, connectorEndpoints);

} finally {
watch.stop();
log.info("{} took {} ms", watch.getLastTaskName(), watch.getLastTaskTimeMillis());
log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis());
}
}

private CompletableFuture<List<AssetAdministrationShellDescriptor>> fetchShellDescriptorsForConnectorEndpoints(
final List<DigitalTwinRegistryKey> keys, final List<String> connectorEndpoints) {

final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "fetchShellDescriptorsForConnectorEndpoints - ";

final var service = endpointDataForConnectorsService;
final var futures = service.createFindEndpointDataForConnectorsFutures(connectorEndpoints)
.stream()
.map(edrFuture -> edrFuture.thenCompose(edr -> CompletableFuture.supplyAsync(
() -> fetchShellDescriptorsForKey(keys, edr))))
.toList();

log.info(logPrefix + " Created {} futures", futures.size());
log.debug("Created {} futures", futures.size());

return resultFinder.getFastestResult(futures);
}

private List<AssetAdministrationShellDescriptor> fetchShellDescriptorsForKey(
final List<DigitalTwinRegistryKey> keys, final EndpointDataReference endpointDataReference) {

final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + " fetchShellDescriptorsForKey - ";

final var watch = new StopWatch();
final String msg = logPrefix + "Fetching shell descriptors for keys %s from endpoint '%s'".formatted(keys,
final String msg = "Fetching shell descriptors for keys %s from endpoint '%s'".formatted(keys,
endpointDataReference.getEndpoint());
watch.start(msg);
log.info(msg);
try {
return keys.stream().map(key -> fetchShellDescriptor(endpointDataReference, key)).toList();
} finally {
watch.stop();
log.info("{} took {} ms", watch.getLastTaskName(), watch.getLastTaskTimeMillis());
log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis());
}
}

private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDataReference endpointDataReference,
final DigitalTwinRegistryKey key) {

final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + " fetchShellDescriptor - ";

final var watch = new StopWatch();
final String msg = logPrefix + "Retrieving AAS identification for DigitalTwinRegistryKey: '%s'".formatted(key);
final String msg = "Retrieving AAS identification for DigitalTwinRegistryKey: '%s'".formatted(key);
watch.start(msg);
log.info(msg);
try {
Expand All @@ -212,7 +203,7 @@ private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDa
aaShellIdentification);
} finally {
watch.stop();
log.info("{} took {} ms", watch.getLastTaskName(), watch.getLastTaskTimeMillis());
log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis());
}
}

Expand All @@ -228,10 +219,8 @@ private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDa
@NotNull
private String mapToShellId(final EndpointDataReference endpointDataReference, final String key) {

final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "mapToShellId - ";

final var watch = new StopWatch();
final String msg = logPrefix + "Mapping '%s' to shell ID for endpoint '%s'".formatted(key,
final String msg = "Mapping '%s' to shell ID for endpoint '%s'".formatted(key,
endpointDataReference.getEndpoint());
watch.start(msg);
log.info(msg);
Expand All @@ -250,50 +239,48 @@ private String mapToShellId(final EndpointDataReference endpointDataReference, f
.orElse(key);

if (key.equals(aaShellIdentification)) {
log.info(logPrefix + "Found shell with shellId {} in registry", aaShellIdentification);
log.info("Found shell with shellId {} in registry", aaShellIdentification);
} else {
log.info(logPrefix + "Retrieved shellId {} for globalAssetId {}", aaShellIdentification, key);
log.info("Retrieved shellId {} for globalAssetId {}", aaShellIdentification, key);
}

return aaShellIdentification;

} finally {
watch.stop();
log.info("{} took {} ms", watch.getLastTaskName(), watch.getLastTaskTimeMillis());
log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis());
}
}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private Collection<String> lookupShellIds(final String bpn) throws RegistryServiceException {

final var logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "lookupShellIds - ";
log.info(logPrefix + "Looking up shell ids for bpn {}", bpn);
log.info("Looking up shell ids for bpn {}", bpn);

try {

final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn);
log.info(logPrefix + "Looking up shell ids for bpn '{}' with connector endpoints {}", bpn,
connectorEndpoints);
log.info("Looking up shell ids for bpn '{}' with connector endpoints {}", bpn, connectorEndpoints);

final var endpointDataReferenceFutures = endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures(
connectorEndpoints);
log.info(logPrefix + "Created endpointDataReferenceFutures");
log.debug("Created endpointDataReferenceFutures");

return lookupShellIds(bpn, endpointDataReferenceFutures, logPrefix);
return lookupShellIds(bpn, endpointDataReferenceFutures);

} catch (RuntimeException e) {
// catching generic exception is intended here,
// otherwise Jobs stay in state RUNNING forever
log.error(logPrefix + e.getMessage(), e);
final var msg = logPrefix + e.getClass().getSimpleName()
+ " occurred while looking up shell ids for bpn '%s'".formatted(bpn);
throw new RegistryServiceException(msg, e);
log.error(e.getMessage(), e);
throw new RegistryServiceException(
"%s occurred while looking up shell ids for bpn '%s'".formatted(e.getClass().getSimpleName(), bpn),
e);
}
}

@NotNull
private Collection<String> lookupShellIds(final String bpn,
final List<CompletableFuture<EndpointDataReference>> endpointDataReferenceFutures, final String logPrefix)
final List<CompletableFuture<EndpointDataReference>> endpointDataReferenceFutures)
throws RegistryServiceException {

try {
Expand All @@ -304,29 +291,26 @@ private Collection<String> lookupShellIds(final String bpn,
.toList();
final var shellIds = resultFinder.getFastestResult(futures).get();

log.info(logPrefix + "Found {} shell id(s) in total", shellIds.size());
log.info("Found {} shell id(s) in total", shellIds.size());
return shellIds;

} catch (InterruptedException e) {
log.error(logPrefix + "InterruptedException occurred while looking up shells ids for bpn '%s': ".formatted(
bpn) + e.getMessage(), e);
Thread.currentThread().interrupt();
throw new RegistryServiceException(LOGPREFIX_TO_BE_REMOVED_LATER + e.getClass().getSimpleName()
+ " occurred while looking up shell ids for bpn '%s'".formatted(bpn), e);
throw new RegistryServiceException(
"%s occurred while looking up shell ids for bpn '%s'".formatted(e.getClass().getSimpleName(), bpn),
e);
} catch (ExecutionException e) {
log.error(logPrefix + e.getMessage(), e);
throw new RegistryServiceException(LOGPREFIX_TO_BE_REMOVED_LATER + e.getClass().getSimpleName()
+ " occurred while looking up shell ids for bpn '%s'".formatted(bpn), e);
throw new RegistryServiceException(
"%s occurred while looking up shell ids for bpn '%s'".formatted(e.getClass().getSimpleName(), bpn),
e);
}
}

private Collection<String> lookupShellIds(final String bpn, final EndpointDataReference endpointDataReference) {

final String logPrefix = LOGPREFIX_TO_BE_REMOVED_LATER + "lookupShellIds - ";
final var watch = new StopWatch();
final String msg =
logPrefix + "Looking up shell IDs for bpn '%s' with endpointDataReference '%s'".formatted(bpn,
endpointDataReference);
final String msg = "Looking up shell IDs for bpn '%s' with endpointDataReference '%s'".formatted(bpn,
endpointDataReference);
watch.start(msg);
log.info(msg);

Expand All @@ -336,7 +320,7 @@ private Collection<String> lookupShellIds(final String bpn, final EndpointDataRe
List.of(IdentifierKeyValuePair.builder().name("manufacturerId").value(bpn).build())).getResult();
} finally {
watch.stop();
log.info("{} took {} ms", watch.getLastTaskName(), watch.getLastTaskTimeMillis());
log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis());
}
}

Expand Down
Loading

0 comments on commit b5e29e1

Please sign in to comment.