Skip to content

Commit

Permalink
Merge pull request #439 from catenax-ng/fix/add-correct-bpn-handling
Browse files Browse the repository at this point in the history
fix(recursion): Fix BPN handling in delegates
  • Loading branch information
ds-jkreutzfeld authored Jul 20, 2023
2 parents 9bbf1f2 + 7319bf0 commit 12b1ad1
Show file tree
Hide file tree
Showing 20 changed files with 214 additions and 107 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- BPN is now passed on correctly while traversing the item graph
- Tombstone is created if no BPN is available for a child item

## [3.2.1] - 2023-07-19
### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Stream;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.connector.job.MultiTransferJob;
import org.eclipse.tractusx.irs.connector.job.RecursiveJobHandler;

Expand All @@ -44,7 +45,9 @@ public AASRecursiveJobHandler(final TreeRecursiveLogic logic) {
public Stream<ItemDataRequest> initiate(final MultiTransferJob job) {
log.info("Initiating request for job {}", job.getJobIdString());
final var partId = job.getGlobalAssetId();
final var dataRequest = ItemDataRequest.rootNode(partId);
final var bpn = job.getJobParameter().getBpn();
final var dataRequest = ItemDataRequest.rootNode(
PartChainIdentificationKey.builder().globalAssetId(partId).bpn(bpn).build());
return Stream.of(dataRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.connector.job.TransferProcess;

/**
Expand All @@ -40,12 +41,12 @@
@ToString
public class AASTransferProcess implements TransferProcess {

private final List<String> idsToProcess = new ArrayList<>();
private final List<PartChainIdentificationKey> idsToProcess = new ArrayList<>();
@SuppressWarnings("PMD.ShortVariable")
private String id;
private Integer depth;

public void addIdsToProcess(final List<String> childIds) {
public void addIdsToProcess(final List<PartChainIdentificationKey> childIds) {
idsToProcess.addAll(childIds);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.tractusx.irs.common.persistence.BlobPersistence;
import org.eclipse.tractusx.irs.common.persistence.BlobPersistenceException;
import org.eclipse.tractusx.irs.component.JobParameter;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.connector.job.ResponseStatus;
import org.eclipse.tractusx.irs.connector.job.TransferInitiateResponse;
import org.eclipse.tractusx.irs.connector.job.TransferProcessManager;
Expand Down Expand Up @@ -80,7 +81,7 @@ private Runnable getRunnable(final ItemDataRequest dataRequest,
return () -> {
final AASTransferProcess aasTransferProcess = new AASTransferProcess(processId, dataRequest.getDepth());

final String itemId = dataRequest.getItemId();
final PartChainIdentificationKey itemId = dataRequest.getItemId();

log.info("Starting processing Digital Twin Registry with itemId {}", itemId);
final ItemContainer itemContainer = abstractDelegate.process(ItemContainer.builder(), jobData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package org.eclipse.tractusx.irs.aaswrapper.job;

import lombok.Value;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.connector.job.DataRequest;

/**
Expand All @@ -31,14 +32,14 @@
@Value
public class ItemDataRequest implements DataRequest {

private final String itemId;
private final PartChainIdentificationKey itemId;
private final Integer depth;

public static ItemDataRequest rootNode(final String itemId) {
public static ItemDataRequest rootNode(final PartChainIdentificationKey itemId) {
return new ItemDataRequest(itemId, 0);
}

public static ItemDataRequest nextDepthNode(final String itemId, final Integer currentDepth) {
public static ItemDataRequest nextDepthNode(final PartChainIdentificationKey itemId, final Integer currentDepth) {
return new ItemDataRequest(itemId, currentDepth + 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.tractusx.irs.aaswrapper.job.AASTransferProcess;
import org.eclipse.tractusx.irs.aaswrapper.job.ItemContainer;
import org.eclipse.tractusx.irs.component.JobParameter;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.component.assetadministrationshell.Endpoint;
import org.eclipse.tractusx.irs.edc.client.EdcSubmodelFacade;
import org.eclipse.tractusx.irs.edc.client.ItemNotFoundInCatalogException;
Expand Down Expand Up @@ -62,7 +63,7 @@ public abstract class AbstractDelegate {
* and Tombstones (if requests fail).
*/
public abstract ItemContainer process(ItemContainer.ItemContainerBuilder itemContainerBuilder, JobParameter jobData,
AASTransferProcess aasTransferProcess, String itemId);
AASTransferProcess aasTransferProcess, PartChainIdentificationKey itemId);

/**
* Delegates processing to next step if exists or returns filled {@link ItemContainer}
Expand All @@ -75,7 +76,8 @@ public abstract ItemContainer process(ItemContainer.ItemContainerBuilder itemCon
* @return item container with filled data
*/
protected ItemContainer next(final ItemContainer.ItemContainerBuilder itemContainerBuilder,
final JobParameter jobData, final AASTransferProcess aasTransferProcess, final String itemId) {
final JobParameter jobData, final AASTransferProcess aasTransferProcess,
final PartChainIdentificationKey itemId) {
if (this.nextStep != null) {
return this.nextStep.process(itemContainerBuilder, jobData, aasTransferProcess, itemId);
}
Expand All @@ -91,7 +93,11 @@ protected String requestSubmodelAsString(final EdcSubmodelFacade submodelFacade,
for (final String connectorEndpoint : connectorEndpoints) {
addSubmodelToList(submodelFacade, endpoint, submodelPayload, connectorEndpoint);
}
return submodelPayload.stream().findFirst().orElseThrow();
return submodelPayload.stream()
.findFirst()
.orElseThrow(() -> new EdcClientException(String.format(
"Called %s connectorEndpoints but did not get any submodels. Connectors: '%s'",
connectorEndpoints.size(), String.join(", ", connectorEndpoints))));
}

private void addSubmodelToList(final EdcSubmodelFacade submodelFacade, final Endpoint endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.tractusx.irs.bpdm.BpdmFacade;
import org.eclipse.tractusx.irs.component.Bpn;
import org.eclipse.tractusx.irs.component.JobParameter;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.component.Tombstone;
import org.eclipse.tractusx.irs.component.enums.ProcessStep;
import org.springframework.web.client.RestClientException;
Expand All @@ -53,7 +54,7 @@ public BpdmDelegate(final AbstractDelegate nextStep, final BpdmFacade bpdmFacade

@Override
public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContainerBuilder,
final JobParameter jobData, final AASTransferProcess aasTransferProcess, final String itemId) {
final JobParameter jobData, final AASTransferProcess aasTransferProcess, final PartChainIdentificationKey itemId) {

if (jobData.isLookupBPNs()) {
log.debug("BPN Lookup enabled, collecting BPN information");
Expand All @@ -65,12 +66,12 @@ public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContai
try {
itemContainerBuilder.build()
.getBpns()
.forEach(bpn -> lookupBPN(itemContainerBuilder, itemId, bpn,
.forEach(bpn -> lookupBPN(itemContainerBuilder, itemId.getGlobalAssetId(), bpn,
requestMetric));
} catch (final RestClientException e) {
log.info("Business Partner endpoint could not be retrieved for Item: {}. Creating Tombstone.", itemId);
requestMetric.incrementFailed();
itemContainerBuilder.tombstone(Tombstone.from(itemId, null, e, retryCount, ProcessStep.BPDM_REQUEST));
itemContainerBuilder.tombstone(Tombstone.from(itemId.getGlobalAssetId(), null, e, retryCount, ProcessStep.BPDM_REQUEST));
}
} else {
log.debug("BPN lookup disabled, no BPN information will be collected.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.tractusx.irs.aaswrapper.job.AASTransferProcess;
import org.eclipse.tractusx.irs.aaswrapper.job.ItemContainer;
import org.eclipse.tractusx.irs.component.JobParameter;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.component.Tombstone;
import org.eclipse.tractusx.irs.component.enums.ProcessStep;
import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryKey;
Expand All @@ -52,15 +53,15 @@ public DigitalTwinDelegate(final AbstractDelegate nextStep,

@Override
public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContainerBuilder, final JobParameter jobData,
final AASTransferProcess aasTransferProcess, final String itemId) {
final AASTransferProcess aasTransferProcess, final PartChainIdentificationKey itemId) {

try {
itemContainerBuilder.shell(digitalTwinRegistryService.fetchShells(
List.of(new DigitalTwinRegistryKey(itemId, jobData.getBpn()))
List.of(new DigitalTwinRegistryKey(itemId.getGlobalAssetId(), itemId.getBpn()))
).stream().findFirst().orElseThrow());
} catch (final RestClientException | RegistryServiceException e) {
log.info("Shell Endpoint could not be retrieved for Item: {}. Creating Tombstone.", itemId);
itemContainerBuilder.tombstone(Tombstone.from(itemId, null, e, retryCount, ProcessStep.DIGITAL_TWIN_REQUEST));
itemContainerBuilder.tombstone(Tombstone.from(itemId.getGlobalAssetId(), null, e, retryCount, ProcessStep.DIGITAL_TWIN_REQUEST));
}

if (expectedDepthOfTreeIsNotReached(jobData.getDepth(), aasTransferProcess.getDepth())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.util.List;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.tractusx.irs.aaswrapper.job.AASTransferProcess;
import org.eclipse.tractusx.irs.aaswrapper.job.ItemContainer;
import org.eclipse.tractusx.irs.component.Bpn;
import org.eclipse.tractusx.irs.component.GlobalAssetIdentification;
import org.eclipse.tractusx.irs.component.JobParameter;
import org.eclipse.tractusx.irs.component.LinkedItem;
import org.eclipse.tractusx.irs.component.PartChainIdentificationKey;
import org.eclipse.tractusx.irs.component.Relationship;
import org.eclipse.tractusx.irs.component.Tombstone;
import org.eclipse.tractusx.irs.component.assetadministrationshell.Endpoint;
Expand Down Expand Up @@ -66,7 +66,8 @@ public RelationshipDelegate(final AbstractDelegate nextStep, final EdcSubmodelFa

@Override
public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContainerBuilder,
final JobParameter jobData, final AASTransferProcess aasTransferProcess, final String itemId) {
final JobParameter jobData, final AASTransferProcess aasTransferProcess,
final PartChainIdentificationKey itemId) {

final RelationshipAspect relationshipAspect = RelationshipAspect.from(jobData.getBomLifecycle(),
jobData.getDirection());
Expand All @@ -76,24 +77,34 @@ public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContai
.findFirst()
.ifPresent(shell -> shell.findRelationshipEndpointAddresses(
AspectType.fromValue(relationshipAspect.getName()))
.forEach(endpoint -> processEndpoint(endpoint, jobData,
relationshipAspect, aasTransferProcess,
itemContainerBuilder, itemId)));
.forEach(endpoint -> processEndpoint(endpoint, relationshipAspect,
aasTransferProcess, itemContainerBuilder, itemId)));

return next(itemContainerBuilder, jobData, aasTransferProcess, itemId);
}

private void processEndpoint(final Endpoint endpoint, final JobParameter jobData,
final RelationshipAspect relationshipAspect, final AASTransferProcess aasTransferProcess,
final ItemContainer.ItemContainerBuilder itemContainerBuilder, final String itemId) {
private void processEndpoint(final Endpoint endpoint, final RelationshipAspect relationshipAspect,
final AASTransferProcess aasTransferProcess, final ItemContainer.ItemContainerBuilder itemContainerBuilder,
final PartChainIdentificationKey itemId) {

if (StringUtils.isBlank(itemId.getBpn())) {
log.warn("Could not process item with id {} because no BPN was provided. Creating Tombstone.",
itemId.getGlobalAssetId());
itemContainerBuilder.tombstone(
Tombstone.from(itemId.getGlobalAssetId(), endpoint.getProtocolInformation().getHref(),
"Can't get relationship without a BPN", retryCount, ProcessStep.SUBMODEL_REQUEST));
return;
}

try {
final String submodelRawPayload = requestSubmodelAsString(submodelFacade, connectorEndpointsService,
endpoint, jobData.getBpn());
endpoint, itemId.getBpn());

final var relationships = jsonUtil.fromString(submodelRawPayload, relationshipAspect.getSubmodelClazz())
.asRelationships();

final List<String> idsToProcess = getIdsToProcess(relationships, relationshipAspect.getDirection());
final List<PartChainIdentificationKey> idsToProcess = getIdsToProcess(relationships,
relationshipAspect.getDirection());

log.info("Processing Relationships with {} items", idsToProcess.size());

Expand All @@ -104,39 +115,46 @@ private void processEndpoint(final Endpoint endpoint, final JobParameter jobData
log.info("Submodel Endpoint could not be retrieved for Endpoint: {}. Creating Tombstone.",
endpoint.getProtocolInformation().getHref());
itemContainerBuilder.tombstone(
Tombstone.from(itemId, endpoint.getProtocolInformation().getHref(), e, retryCount,
ProcessStep.SUBMODEL_REQUEST));
Tombstone.from(itemId.getGlobalAssetId(), endpoint.getProtocolInformation().getHref(), e,
retryCount, ProcessStep.SUBMODEL_REQUEST));
} catch (final JsonParseException e) {
log.info("Submodel payload did not match the expected AspectType. Creating Tombstone.");
itemContainerBuilder.tombstone(
Tombstone.from(itemId, endpoint.getProtocolInformation().getHref(), e, retryCount,
ProcessStep.SUBMODEL_REQUEST));
Tombstone.from(itemId.getGlobalAssetId(), endpoint.getProtocolInformation().getHref(), e,
retryCount, ProcessStep.SUBMODEL_REQUEST));
}
}

private static List<Bpn> getBpnsFrom(final List<Relationship> relationships) {
return relationships.stream().map(Relationship::getBpn).map(Bpn::withManufacturerId).toList();
}

private List<String> getIdsToProcess(final List<Relationship> relationships, final Direction direction) {
private List<PartChainIdentificationKey> getIdsToProcess(final List<Relationship> relationships,
final Direction direction) {
return switch (direction) {
case DOWNWARD -> getChildIds(relationships);
case UPWARD -> getParentIds(relationships);
};
}

private List<String> getParentIds(final List<Relationship> relationships) {
private List<PartChainIdentificationKey> getParentIds(final List<Relationship> relationships) {
return relationships.stream()
.map(Relationship::getCatenaXId)
.map(GlobalAssetIdentification::getGlobalAssetId)
.map(relationship -> PartChainIdentificationKey.builder()
.globalAssetId(relationship.getCatenaXId()
.getGlobalAssetId())
.bpn(relationship.getBpn())
.build())
.toList();
}

private List<String> getChildIds(final List<Relationship> relationships) {
private List<PartChainIdentificationKey> getChildIds(final List<Relationship> relationships) {
return relationships.stream()
.map(Relationship::getLinkedItem)
.map(LinkedItem::getChildCatenaXId)
.map(GlobalAssetIdentification::getGlobalAssetId)
.map(relationship -> PartChainIdentificationKey.builder()
.globalAssetId(relationship.getLinkedItem()
.getChildCatenaXId()
.getGlobalAssetId())
.bpn(relationship.getBpn())
.build())
.toList();
}
}
Loading

0 comments on commit 12b1ad1

Please sign in to comment.