Skip to content

Commit

Permalink
feat(irs-api):[#246] First level supply chain changed to show only lo…
Browse files Browse the repository at this point in the history
…west level of hops, code clean up, added test
  • Loading branch information
ds-psosnowski committed Nov 16, 2023
1 parent 34370b2 commit 214f291
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 92 deletions.
2 changes: 1 addition & 1 deletion docs/src/api/irs-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2524,7 +2524,7 @@ components:
hops:
type: integer
format: int32
parentBpn:
bpn:
type: string
result:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -65,14 +65,13 @@ public BpnInvestigationJob(final Jobs jobSnapshot, final List<String> incidentBp
this(jobSnapshot, incidentBpns, new ArrayList<>(), new ArrayList<>(), JobState.RUNNING);
}

public BpnInvestigationJob update(final Jobs jobSnapshot, final SupplyChainImpacted newSupplyChain,
final String bpn) {
public BpnInvestigationJob update(final Jobs jobSnapshot, final SupplyChainImpacted newSupplyChain) {
final Optional<SupplyChainImpacted> previousSupplyChain = getSupplyChainImpacted();

final SupplyChainImpacted supplyChainImpacted = previousSupplyChain.map(
prevSupplyChain -> prevSupplyChain.or(newSupplyChain)).orElse(newSupplyChain);

this.jobSnapshot = extendJobWithSupplyChainSubmodel(jobSnapshot, supplyChainImpacted, bpn);
this.jobSnapshot = extendJobWithSupplyChainSubmodel(jobSnapshot, supplyChainImpacted);
this.jobSnapshot = extendSummary(this.jobSnapshot);
this.jobSnapshot = updateLastModified(this.jobSnapshot, ZonedDateTime.now(ZoneOffset.UTC));
return this;
Expand All @@ -85,21 +84,21 @@ public BpnInvestigationJob withUnansweredNotifications(final List<Notification>

public BpnInvestigationJob withAnsweredNotification(
final EdcNotification<ResponseNotificationContent> notification) {
final Optional<String> parentBpn = getParentBpn(notification);
final Optional<String> bpn = getChildBpn(notification);
removeFromUnansweredNotification(notification);
notification.getContent().setParentBpn(parentBpn.orElse(null));
notification.getContent().setBpn(bpn.orElse(null));
notification.getContent().incrementHops();
this.answeredNotifications.add(notification);

return this;
}

private Optional<String> getParentBpn(final EdcNotification<ResponseNotificationContent> notification) {
private Optional<String> getChildBpn(final EdcNotification<ResponseNotificationContent> notification) {
return this.unansweredNotifications.stream()
.filter(unansweredNotification -> unansweredNotification.notificationId()
.equals(notification.getHeader()
.getOriginalNotificationId()))
.map(Notification::bpn)
.map(Notification::childBpn)
.findAny();
}

Expand All @@ -125,40 +124,41 @@ public BpnInvestigationJob complete() {
.findFirst();
}

private Jobs extendJobWithSupplyChainSubmodel(final Jobs irsJob, final SupplyChainImpacted supplyChainImpacted,
final String bpn) {
log.debug(bpn);
private Jobs extendJobWithSupplyChainSubmodel(final Jobs irsJob, final SupplyChainImpacted supplyChainImpacted) {
final SupplyChainImpactedAspect.SupplyChainImpactedAspectBuilder supplyChainImpactedAspectBuilder = SupplyChainImpactedAspect.builder()
.supplyChainImpacted(
supplyChainImpacted);

if (getUnansweredNotifications().isEmpty()) {
final List<ResponseNotificationContent> incidentWithMinHopsValue = getAnsweredNotifications().stream()
.map(EdcNotification::getContent)
.filter(ResponseNotificationContent::thereIsIncident)
.toList();

final List<SupplyChainImpactedAspect.ImpactedSupplierFirstLevel> suppliersFirstLevel = incidentWithMinHopsValue.stream()
.map(impacted -> new SupplyChainImpactedAspect.ImpactedSupplierFirstLevel(
impacted.getParentBpn(),
impacted.getHops()))
.toList();

supplyChainImpactedAspectBuilder.impactedSuppliersOnFirstTier(Set.copyOf(suppliersFirstLevel));
final Optional<SupplyChainImpactedAspect.ImpactedSupplierFirstLevel> impactedSupplierWithLowestHopsNumber = getImpactedSupplierWithLowestHopsNumber();
supplyChainImpactedAspectBuilder.impactedSuppliersOnFirstTier(
impactedSupplierWithLowestHopsNumber.orElse(null));
}

final Submodel supplyChainImpactedSubmodel = Submodel.builder()
.aspectType(SUPPLY_CHAIN_ASPECT_TYPE)
.payload(new JsonUtil().asMap(
supplyChainImpactedAspectBuilder.build()))
.build();

return irsJob.toBuilder()
.clearSubmodels()
.submodels(Collections.singletonList(supplyChainImpactedSubmodel))
.submodels(Collections.singletonList(
createSupplyChainImpactedSubmodel(supplyChainImpactedAspectBuilder)))
.build();
}

private Optional<SupplyChainImpactedAspect.ImpactedSupplierFirstLevel> getImpactedSupplierWithLowestHopsNumber() {
return getAnsweredNotifications().stream()
.map(EdcNotification::getContent)
.filter(ResponseNotificationContent::thereIsIncident)
.min(Comparator.comparing(ResponseNotificationContent::getHops))
.map(impacted -> new SupplyChainImpactedAspect.ImpactedSupplierFirstLevel(
impacted.getBpn(), impacted.getHops()));
}

private static Submodel createSupplyChainImpactedSubmodel(
final SupplyChainImpactedAspect.SupplyChainImpactedAspectBuilder supplyChainImpactedAspectBuilder) {
return Submodel.builder()
.aspectType(SUPPLY_CHAIN_ASPECT_TYPE)
.payload(new JsonUtil().asMap(supplyChainImpactedAspectBuilder.build()))
.build();
}

private Jobs updateLastModified(final Jobs irsJob, final ZonedDateTime lastModifiedOn) {
final Job job = irsJob.getJob().toBuilder().completedOn(lastModifiedOn).lastModifiedOn(lastModifiedOn).build();
return irsJob.toBuilder().job(job).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void sendEdcNotification(final EdcNotification<InvestigationNotificationC
final NotificationContent notificationContent = ResponseNotificationContent.builder()
.result(supplyChainImpacted.getDescription())
.hops(hops)
.parentBpn(bpn)
.bpn(bpn)
.build();
final EdcNotification<NotificationContent> responseNotification = edcRequest(notificationId,
originalNotificationId, essLocalEdcEndpoint, localBpn, recipientBpn, notificationContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
********************************************************************************/
package org.eclipse.tractusx.irs.ess.service;

import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.tractusx.irs.edc.client.model.notification.EdcNotification;
import org.eclipse.tractusx.irs.edc.client.model.notification.ResponseNotificationContent;
import org.springframework.stereotype.Service;

/**
Expand All @@ -42,22 +39,21 @@
@Slf4j
public class EssRecursiveNotificationHandler {

private static final Integer FIRST_HOP = 0;

private final RelatedInvestigationJobsCache relatedInvestigationJobsCache;
private final BpnInvestigationJobCache bpnInvestigationJobCache;
private final EdcNotificationSender edcNotificationSender;

/* package */ void handleNotification(final UUID finishedJobId, final SupplyChainImpacted supplyChainImpacted, final String bpn,
final Integer hops) {
/* package */ void handleNotification(final UUID finishedJobId, final SupplyChainImpacted supplyChainImpacted,
final String bpn, final Integer hops) {

final Optional<RelatedInvestigationJobs> relatedJobsId = relatedInvestigationJobsCache.findByRecursiveRelatedJobId(
finishedJobId);

relatedJobsId.ifPresentOrElse(relatedJobs -> {
if (SupplyChainImpacted.YES.equals(supplyChainImpacted)) {
log.debug("SupplyChain is impacted. Sending notification back to requestor.");
edcNotificationSender.sendEdcNotification(relatedJobs.originalNotification(), supplyChainImpacted, hops, bpn);
edcNotificationSender.sendEdcNotification(relatedJobs.originalNotification(), supplyChainImpacted, hops,
bpn);
relatedInvestigationJobsCache.remove(
relatedJobs.originalNotification().getHeader().getNotificationId());
} else {
Expand All @@ -76,28 +72,19 @@ private void sendNotificationAfterAllCompleted(final RelatedInvestigationJobs re
.map(bpnInvestigationJobCache::findByJobId)
.flatMap(Optional::stream)
.toList();

if (checkAllFinished(allInvestigationJobs)) {
final SupplyChainImpacted finalResult = allInvestigationJobs.stream()
.map(BpnInvestigationJob::getSupplyChainImpacted)
.flatMap(Optional::stream)
.reduce(SupplyChainImpacted.NO,
SupplyChainImpacted::or);

edcNotificationSender.sendEdcNotification(relatedInvestigationJobs.originalNotification(), finalResult, hops, bpn);
edcNotificationSender.sendEdcNotification(relatedInvestigationJobs.originalNotification(), finalResult,
hops, bpn);
}
}

private Integer getMinHops(final List<BpnInvestigationJob> allInvestigationJobs) {
return allInvestigationJobs
.stream()
.flatMap(investigationJobs -> investigationJobs.getAnsweredNotifications().stream())
.map(EdcNotification::getContent)
.filter(ResponseNotificationContent::thereIsIncident)
.min(Comparator.comparing(ResponseNotificationContent::getHops))
.map(ResponseNotificationContent::getHops)
.orElse(FIRST_HOP);
}

private boolean checkAllFinished(final List<BpnInvestigationJob> allInvestigationJobs) {
return allInvestigationJobs.stream()
.allMatch(bpnInvestigationJob -> bpnInvestigationJob.getSupplyChainImpacted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public void handleNotificationCallback(final EdcNotification<ResponseNotificatio
}

bpnInvestigationJobCache.store(jobId,
job.update(job.getJobSnapshot(), supplyChainImpacted, notification.getContent().getParentBpn()));
job.update(job.getJobSnapshot(), supplyChainImpacted));
recursiveNotificationHandler.handleNotification(jobId, supplyChainImpacted,
notification.getContent().getParentBpn(), notification.getContent().getHops());
notification.getContent().getBpn(), notification.getContent().getHops());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class InvestigationJobProcessingEventListener {
private final String localEdcEndpoint;
private final List<String> mockRecursiveEdcAssets;
private final EssRecursiveNotificationHandler recursiveNotificationHandler;
private static final int FIRST_HOP = 0;

/* package */ InvestigationJobProcessingEventListener(final IrsItemGraphQueryService irsItemGraphQueryService,
final ConnectorEndpointsService connectorEndpointsService, final EdcSubmodelFacade edcSubmodelFacade,
Expand Down Expand Up @@ -111,14 +112,14 @@ public void handleJobProcessingFinishedEvent(final JobProcessingFinishedEvent jo
completedJobId);

final BpnInvestigationJob investigationJobUpdate = investigationJob.update(
investigationResult.completedJob(), investigationResult.supplyChainImpacted(), job.getJobParameter().getBpn());
investigationResult.completedJob(), investigationResult.supplyChainImpacted());

if (leafNodeIsReached(investigationResult.completedJob()) || supplyChainIsImpacted(
investigationResult.supplyChainImpacted())) {
bpnInvestigationJobCache.store(completedJobId, investigationJobUpdate.complete());
recursiveNotificationHandler.handleNotification(investigationJob.getJobSnapshot().getJob().getId(),
investigationResult.supplyChainImpacted(), job.getJobParameter().getBpn(),
0);
FIRST_HOP);
} else {
triggerInvestigationOnNextLevel(investigationResult.completedJob(), investigationJobUpdate, job.getJobParameter().getBpn());
bpnInvestigationJobCache.store(completedJobId, investigationJobUpdate);
Expand Down Expand Up @@ -155,8 +156,9 @@ private void triggerInvestigationOnNextLevel(final Jobs completedJob,
log.debug("Triggering investigation on the next level.");
if (anyBpnIsMissingFromRelationship(completedJob)) {
log.error("One or more Relationship items did not contain a BPN.");
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN, jobBpn);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN);
}
// Map<BPN, List<GlobalAssetID>>
final Map<String, List<String>> bpns = getBPNsFromRelationships(completedJob.getRelationships());
log.debug("Extracted BPNs '{}'", bpns);

Expand All @@ -171,14 +173,14 @@ private void triggerInvestigationOnNextLevel(final Jobs completedJob,
log.debug("BPNs '{}' could not be resolved to an EDC address using DiscoveryService.", unresolvedBPNs);
log.info("Some EDC addresses could not be resolved with DiscoveryService. "
+ "Updating SupplyChainImpacted to {}", SupplyChainImpacted.UNKNOWN);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN, jobBpn);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN);
recursiveNotificationHandler.handleNotification(investigationJobUpdate.getJobSnapshot().getJob().getId(),
SupplyChainImpacted.UNKNOWN, jobBpn, 0);
SupplyChainImpacted.UNKNOWN, jobBpn, FIRST_HOP);
} else if (resolvedBPNs.isEmpty()) {
log.info("No BPNs could not be found. Updating SupplyChainImpacted to {}", SupplyChainImpacted.UNKNOWN);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN, jobBpn);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN);
recursiveNotificationHandler.handleNotification(investigationJobUpdate.getJobSnapshot().getJob().getId(),
SupplyChainImpacted.UNKNOWN, jobBpn, 0);
SupplyChainImpacted.UNKNOWN, jobBpn, FIRST_HOP);
} else {
log.debug("Sending notification for BPNs '{}'", bpns);
sendNotifications(completedJob, investigationJobUpdate, bpns);
Expand All @@ -192,7 +194,7 @@ private void sendNotifications(final Jobs completedJob, final BpnInvestigationJo
if (edcBaseUrl.isEmpty()) {
log.warn("No EDC URL found for BPN '{}'. Setting investigation result to '{}'", bpn,
SupplyChainImpacted.UNKNOWN);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN, bpn);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN);
}
edcBaseUrl.forEach(url -> {
try {
Expand All @@ -201,7 +203,7 @@ private void sendNotifications(final Jobs completedJob, final BpnInvestigationJo
investigationJobUpdate.withUnansweredNotifications(Collections.singletonList(new Notification(notificationId, bpn)));
} catch (final EdcClientException e) {
log.error("Exception during sending EDC notification.", e);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN, bpn);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN);
}
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
********************************************************************************/
package org.eclipse.tractusx.irs.ess.service;

import java.util.Set;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -41,7 +39,7 @@
@Builder
public class SupplyChainImpactedAspect {
private SupplyChainImpacted supplyChainImpacted;
private Set<ImpactedSupplierFirstLevel> impactedSuppliersOnFirstTier;
private ImpactedSupplierFirstLevel impactedSuppliersOnFirstTier;

/**
* BPNLs on first tier level where infection was detected
Expand Down
Loading

0 comments on commit 214f291

Please sign in to comment.