Skip to content

Commit

Permalink
feat(irs-api):[#246] Changed hops incrementation pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
ds-psosnowski committed Nov 15, 2023
1 parent de621fe commit 34370b2
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,29 @@ public class EssRecursiveNotificationHandler {
private final BpnInvestigationJobCache bpnInvestigationJobCache;
private final EdcNotificationSender edcNotificationSender;

/* package */ void handleNotification(final UUID finishedJobId, final SupplyChainImpacted supplyChainImpacted, final String bpn) {
/* package */ void handleNotification(final UUID finishedJobId, final SupplyChainImpacted supplyChainImpacted, final String bpn,
final Integer hops) {

final Optional<RelatedInvestigationJobs> relatedJobsId = relatedInvestigationJobsCache.findByRecursiveRelatedJobId(
finishedJobId);

relatedJobsId.ifPresentOrElse(relatedJobs -> {
if (SupplyChainImpacted.YES.equals(supplyChainImpacted)) {
log.debug("SupplyChain is impacted. Sending notification back to requestor.");
edcNotificationSender.sendEdcNotification(relatedJobs.originalNotification(), supplyChainImpacted, FIRST_HOP, bpn);
edcNotificationSender.sendEdcNotification(relatedJobs.originalNotification(), supplyChainImpacted, hops, bpn);
relatedInvestigationJobsCache.remove(
relatedJobs.originalNotification().getHeader().getNotificationId());
} else {
log.debug(
"SupplyChainImpacted in state '{}'. Waiting for Jobs to complete to send notification back to requestor.",
supplyChainImpacted);
sendNotificationAfterAllCompleted(relatedJobs, bpn);
sendNotificationAfterAllCompleted(relatedJobs, bpn, hops);
}
}, () -> log.debug("No RelatedInvestigationJob found for id '{}'.", finishedJobId));
}

private void sendNotificationAfterAllCompleted(final RelatedInvestigationJobs relatedInvestigationJobs,
final String bpn) {
final String bpn, final Integer hops) {
final List<BpnInvestigationJob> allInvestigationJobs = relatedInvestigationJobs.recursiveRelatedJobIds()
.stream()
.map(bpnInvestigationJobCache::findByJobId)
Expand All @@ -82,7 +83,7 @@ private void sendNotificationAfterAllCompleted(final RelatedInvestigationJobs re
.reduce(SupplyChainImpacted.NO,
SupplyChainImpacted::or);

edcNotificationSender.sendEdcNotification(relatedInvestigationJobs.originalNotification(), finalResult, getMinHops(allInvestigationJobs), bpn);
edcNotificationSender.sendEdcNotification(relatedInvestigationJobs.originalNotification(), finalResult, hops, bpn);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void handleNotificationCallback(final EdcNotification<ResponseNotificatio
bpnInvestigationJobCache.store(jobId,
job.update(job.getJobSnapshot(), supplyChainImpacted, notification.getContent().getParentBpn()));
recursiveNotificationHandler.handleNotification(jobId, supplyChainImpacted,
notification.getContent().getParentBpn());
notification.getContent().getParentBpn(), notification.getContent().getHops());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public void handleJobProcessingFinishedEvent(final JobProcessingFinishedEvent jo
investigationResult.supplyChainImpacted())) {
bpnInvestigationJobCache.store(completedJobId, investigationJobUpdate.complete());
recursiveNotificationHandler.handleNotification(investigationJob.getJobSnapshot().getJob().getId(),
investigationResult.supplyChainImpacted(), job.getJobParameter().getBpn());
investigationResult.supplyChainImpacted(), job.getJobParameter().getBpn(),
0);
} else {
triggerInvestigationOnNextLevel(investigationResult.completedJob(), investigationJobUpdate, job.getJobParameter().getBpn());
bpnInvestigationJobCache.store(completedJobId, investigationJobUpdate);
Expand Down Expand Up @@ -172,12 +173,12 @@ private void triggerInvestigationOnNextLevel(final Jobs completedJob,
+ "Updating SupplyChainImpacted to {}", SupplyChainImpacted.UNKNOWN);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN, jobBpn);
recursiveNotificationHandler.handleNotification(investigationJobUpdate.getJobSnapshot().getJob().getId(),
SupplyChainImpacted.UNKNOWN, jobBpn);
SupplyChainImpacted.UNKNOWN, jobBpn, 0);
} else if (resolvedBPNs.isEmpty()) {
log.info("No BPNs could not be found. Updating SupplyChainImpacted to {}", SupplyChainImpacted.UNKNOWN);
investigationJobUpdate.update(completedJob, SupplyChainImpacted.UNKNOWN, jobBpn);
recursiveNotificationHandler.handleNotification(investigationJobUpdate.getJobSnapshot().getJob().getId(),
SupplyChainImpacted.UNKNOWN, jobBpn);
SupplyChainImpacted.UNKNOWN, jobBpn, 0);
} else {
log.debug("Sending notification for BPNs '{}'", bpns);
sendNotifications(completedJob, investigationJobUpdate, bpns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class EssRecursiveNotificationHandlerTest {
@Test
void shouldDoNothingWhenThereIsNoInvestigationJob() {
// when
cut.handleNotification(UUID.randomUUID(), SupplyChainImpacted.UNKNOWN, "bpn");
cut.handleNotification(UUID.randomUUID(), SupplyChainImpacted.UNKNOWN, "bpn",
0);

// then
verifyNoInteractions(edcNotificationSender);
Expand All @@ -70,7 +71,7 @@ void shouldReplyWhenJobIsPresentAndSupplyChainIsImpacted() {
relatedInvestigationJobsCache.store("notification-id", createRelatedJobsWith(List.of(jobId)));

// when
cut.handleNotification(jobId, SupplyChainImpacted.YES, "bpn");
cut.handleNotification(jobId, SupplyChainImpacted.YES, "bpn", 0);

// then
verify(edcNotificationSender).sendEdcNotification(any(), eq(SupplyChainImpacted.YES), eq(hops), eq("bpn"));
Expand All @@ -90,7 +91,7 @@ void shouldReplyOnlyWhenAllJobsAreCompleted() {
when(pastBpnInvestigationJob.getSupplyChainImpacted()).thenReturn(Optional.of(SupplyChainImpacted.NO));

// when
cut.handleNotification(jobId, SupplyChainImpacted.NO, bpn);
cut.handleNotification(jobId, SupplyChainImpacted.NO, bpn, 0);

// then
verify(edcNotificationSender, times(1)).sendEdcNotification(any(), eq(SupplyChainImpacted.NO), eq(hops), eq(bpn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ void shouldStopProcessingIfNoRelationshipContainsBPN() throws EdcClientException
jobProcessingEventListener.handleJobProcessingFinishedEvent(jobProcessingFinishedEvent);

// then
verify(this.recursiveNotificationHandler, times(1)).handleNotification(any(), eq(SupplyChainImpacted.UNKNOWN), eq("bpn"));
verify(this.recursiveNotificationHandler, times(1)).handleNotification(any(), eq(SupplyChainImpacted.UNKNOWN), eq("bpn"),
eq(0));
}

@Test
Expand Down Expand Up @@ -226,7 +227,8 @@ void shouldSendCallbackIfNoMoreRelationshipsAreFound() throws EdcClientException

// then
verify(this.edcSubmodelFacade, times(0)).sendNotification(anyString(), anyString(), any(EdcNotification.class));
verify(this.recursiveNotificationHandler, times(1)).handleNotification(any(), eq(SupplyChainImpacted.NO), eq("bpn"));
verify(this.recursiveNotificationHandler, times(1)).handleNotification(any(), eq(SupplyChainImpacted.NO), eq("bpn"),
eq(0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ public class ResponseNotificationContent implements NotificationContent {
/**
* Incrementing hops value
*
* @return updated object
*/
public ResponseNotificationContent incrementHops() {
public void incrementHops() {
this.hops += 1;
return this;
}

/**
Expand Down

0 comments on commit 34370b2

Please sign in to comment.