diff --git a/CHANGELOG.md b/CHANGELOG.md index 25219e13a8..4796e2533a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added Integration Tests for the entire IRS flow using stubbed responses of Discovery Service, Semantic Hub, EDC, Digital Twin Registry and BPDM Pool ### Changed + +- Dataspace Discovery Service handles multiple EDC-Urls received for BPN now - Updated license header to "Copyright (c) 2021,2024 Contributors to the Eclipse Foundation" - Changed lookupGlobalAssetIds to lookupShellsByBPN, which provides full object. - Suppressed CVE-2024-20932 from graal-sdk-21.2.0.jar because this is not applicable for IRS. diff --git a/DEPENDENCIES b/DEPENDENCIES index 4e8646dda6..a951a92282 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -205,7 +205,6 @@ maven/mavencentral/net.minidev/accessors-smart/2.4.9, Apache-2.0, approved, #751 maven/mavencentral/net.minidev/json-smart/2.4.10, Apache-2.0, approved, #3288 maven/mavencentral/net.minidev/json-smart/2.4.11, Apache-2.0, approved, #3288 maven/mavencentral/net.sf.saxon/Saxon-HE/10.6, MPL-2.0 AND W3C, approved, #7945 -maven/mavencentral/org.apache.commons/commons-compress/1.23.0, Apache-2.0 AND BSD-3-Clause, approved, #7506 maven/mavencentral/org.apache.commons/commons-compress/1.24.0, Apache-2.0 AND BSD-3-Clause AND bzip2-1.0.6 AND LicenseRef-Public-Domain, approved, #10368 maven/mavencentral/org.apache.commons/commons-lang3/3.12.0, Apache-2.0, approved, clearlydefined maven/mavencentral/org.apache.commons/commons-pool2/2.11.1, Apache-2.0, approved, CQ23795 diff --git a/docs/src/docs/arc42/cross-cutting/discovery-DTR--EDC-with-multiple-DTRs.puml b/docs/src/docs/arc42/cross-cutting/discovery-DTR--EDC-with-multiple-DTRs.puml new file mode 100644 index 0000000000..0689cd80a5 --- /dev/null +++ b/docs/src/docs/arc42/cross-cutting/discovery-DTR--EDC-with-multiple-DTRs.puml @@ -0,0 +1,28 @@ +@startuml +participant IRS +participant DiscoveryService +participant "EDC Provider" as EDCProvider +participant "DTR 1" as DTR1 +participant "DTR 2" as DTR2 + +IRS ->> DiscoveryService: Get EDCs for BPN +DiscoveryService ->> IRS: Return list of 1 EDC +IRS ->> EDCProvider: Query for DTR contract offer +EDCProvider ->> IRS: 2 DTR contract offers + +par + group Query DTR 1 + IRS ->> EDCProvider: Negotiate contract + IRS ->> DTR1: Query for DT + DTR1 ->> IRS: no DT + end + + else + + group Query DTR 2 + IRS ->> EDCProvider: Negotiate contract + IRS ->> DTR2: Query for DT + DTR2 ->> IRS: DT + end +end +@enduml \ No newline at end of file diff --git a/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-DTs-with-the-same-globalAssedId-in-one-DTR.puml b/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-DTs-with-the-same-globalAssedId-in-one-DTR.puml new file mode 100644 index 0000000000..5956218ecc --- /dev/null +++ b/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-DTs-with-the-same-globalAssedId-in-one-DTR.puml @@ -0,0 +1,8 @@ +@startuml +actor IRS +participant DTR + +IRS -> DTR: /query for globalAssetId +DTR -> IRS: return list of two results +IRS -> IRS: use first +@enduml diff --git a/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-multiple-DTRs.puml b/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-multiple-DTRs.puml new file mode 100644 index 0000000000..be20fd8964 --- /dev/null +++ b/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-multiple-DTRs.puml @@ -0,0 +1,38 @@ +@startuml +participant IRS +participant DiscoveryService +participant "EDC Provider 1" as EDCProvider1 +participant "EDC Provider 2" as EDCProvider2 +participant "EDC Provider 3" as EDCProvider3 +participant "DTR" as DTR + +IRS ->> DiscoveryService: Get EDCs for BPN +DiscoveryService ->> IRS: Return list of 3 EDCs + +par + group CatalogRequestEDC1 + IRS ->> EDCProvider1: Query for DTR contract offer + EDCProvider1 ->> IRS: No offer + end + + else + + group CatalogRequestEDC2 + IRS ->> EDCProvider2: Query for DTR contract offer + EDCProvider2 ->> IRS: DTR contract offer + IRS -> EDCProvider2: Negotiate contract + IRS ->> DTR: Query for DT + DTR ->> IRS: DT + end + + else + + group CatalogRequestEDC3 + IRS ->> EDCProvider3: Query for DTR contract offer + EDCProvider3 ->> IRS: DTR contract offer + IRS -> EDCProvider3: Negotiate contract + IRS ->> DTR: Query for DT + DTR ->> IRS: No DT + end +end +@enduml diff --git a/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-no-DTRs.puml b/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-no-DTRs.puml new file mode 100644 index 0000000000..aaed9497b2 --- /dev/null +++ b/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-no-DTRs.puml @@ -0,0 +1,34 @@ +@startuml +actor IRS +actor "Discovery Service" as DiscoveryService + +participant "EDC 1" as EDCProvider1 +participant "EDC 2" as EDCProvider2 +participant "EDC 3" as EDCProvider3 + +IRS -> DiscoveryService: Get EDCs for BPN +DiscoveryService -> IRS: Return list of 3 EDCs + +par + group Catalog Request to EDC 1 + IRS -> EDCProvider1: Query for DTR contract offer + EDCProvider1 -> IRS: No offer + end + + else + + group Catalog Request to EDC 2 + IRS -> EDCProvider2: Query for DTR contract offer + EDCProvider2 -> IRS: No offer + end + + else + + group Catalog Request to EDC 3 + IRS -> EDCProvider3: Query for DTR contract offer + EDCProvider3 -> IRS: No offer + end +end + +IRS -> IRS: Tombstone +@enduml diff --git a/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-one-DTR.puml b/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-one-DTR.puml new file mode 100644 index 0000000000..f70cc4ac49 --- /dev/null +++ b/docs/src/docs/arc42/cross-cutting/discovery-DTR--multiple-EDCs-with-one-DTR.puml @@ -0,0 +1,35 @@ +@startuml +participant IRS +participant DiscoveryService +participant "EDC Provider 1" as EDCProvider1 +participant "EDC Provider 2" as EDCProvider2 +participant "EDC Provider 3" as EDCProvider3 +participant DTR + +IRS ->> DiscoveryService: Get EDCs for BPN +DiscoveryService ->> IRS: Return list of 3 EDCs + +par + group CatalogRequestEDC1 + IRS ->> EDCProvider1: Query for DTR contract offer + EDCProvider1 ->> IRS: No offer + end + + else + + group CatalogRequestEDC2 + IRS ->> EDCProvider2: Query for DTR contract offer + EDCProvider2 ->> IRS: No offer + end + + else + + group CatalogRequestEDC3 + IRS ->> EDCProvider3: Query for DTR contract offer + EDCProvider3 ->> IRS: DTR contract offer + IRS -> EDCProvider3: Negotiate contract + IRS ->> DTR: Query for DT + DTR ->> IRS: DT + end +end +@enduml \ No newline at end of file diff --git a/docs/src/docs/arc42/cross-cutting/discovery-DTR--one-EDC-with-one-DTR.puml b/docs/src/docs/arc42/cross-cutting/discovery-DTR--one-EDC-with-one-DTR.puml new file mode 100644 index 0000000000..8b3277445d --- /dev/null +++ b/docs/src/docs/arc42/cross-cutting/discovery-DTR--one-EDC-with-one-DTR.puml @@ -0,0 +1,14 @@ +@startuml +participant IRS +participant DiscoveryService +participant "EDC Provider 3" as EDCProvider3 +participant DTR + +IRS ->> DiscoveryService: Get EDCs for BPN +DiscoveryService ->> IRS: Return list of 1 EDC +IRS ->> EDCProvider3: Query for DTR contract offer +EDCProvider3 ->> IRS: DTR contract offer +IRS -> EDCProvider3: Negotiate contract +IRS ->> DTR: Query for DT +DTR ->> IRS: DT +@enduml diff --git a/docs/src/docs/arc42/cross-cutting/discovery-process-dtr.adoc b/docs/src/docs/arc42/cross-cutting/discovery-process-dtr.adoc new file mode 100644 index 0000000000..10cea2a582 --- /dev/null +++ b/docs/src/docs/arc42/cross-cutting/discovery-process-dtr.adoc @@ -0,0 +1,67 @@ +The Dataspace Discovery Service handles multiple EDC-Urls received for BPN. +This applies to the following scenarios. + +__Please note that the expression "the first result" in the subsequent sections means the first successful answer.__ + +==== Scenario 1: EDC with multiple DTRs + +IRS queries all DTRs for the globalAssetId and will take the first result it gets. +If none of the DTRs return a result, IRS will create a tombstone. + +[plantuml,target=discovery-DTR--EDC-with-multiple-DTRs,format=svg] +.... +include::discovery-DTR--EDC-with-multiple-DTRs.puml[] +.... + +==== Scenario 2: Multiple EDCs with one DTR + +IRS starts a contract negotiation for all registry contract offers in parallel and queries the DTRs for all successful negotiations. +The first registry which responds with a DT will be the one used by IRS. + +[plantuml,target=discovery-DTR--multiple-EDCs-with-one-DTR,format=svg] +.... +include::discovery-DTR--multiple-EDCs-with-one-DTR.puml[] +.... + +==== Scenario 3: One EDC with one DTR + +Only one EDC found for BPN and the catalog only contains one offer for the DTR. +IRS will use this registry and will create a tombstone if no DT could be found for the globalAssetId. + +[plantuml,target=discovery-DTR--one-EDC-with-one-DTR,format=svg] +.... +include::discovery-DTR--one-EDC-with-one-DTR.puml[] +.... + +==== Scenario 4: Multiple EDCs with multiple DTRs + +IRS starts a contract negotiation for all the registry offers. + +[plantuml,target=discovery-DTR--multiple-EDCs-with-multiple-DTRs,format=svg] +.... +include::discovery-DTR--multiple-EDCs-with-multiple-DTRs.puml[] +.... + +==== Scenario 5: Multiple EDCs with no DTRs + +IRS starts a contract negotiation for all the registry offers and creates a tombstone since no DTR could be discovered. + +[plantuml,target=discovery-DTR--multiple-EDCs-with-no-DTRs,format=svg] +.... +include::discovery-DTR--multiple-EDCs-with-no-DTRs.puml[] +.... + +==== Special Scenario: Same DT in multiple DTRs + +IRS will use all registries to query for the globalAssetId and takes the first result which is returned. +If no DT could be found in any of the DTRs, IRS will create a tombstone. + +==== Special Scenario: Multiple DTs (with the same globalAssetId) in one DTR + +IRS uses the `/query` endpoint of the DTR to get the DT id based on the globalAssetId. +If more than one id is present for a globalAssetId, IRS will use the first of the list. + +[plantuml,target=discovery-DTR--multiple-DTs-with-the-same-globalAssedId-in-one-DTR,format=svg] +.... +include::discovery-DTR--multiple-DTs-with-the-same-globalAssedId-in-one-DTR.puml[] +.... diff --git a/docs/src/docs/arc42/cross-cutting/under-the-hood.adoc b/docs/src/docs/arc42/cross-cutting/under-the-hood.adoc index aaa8440e10..38419e28cc 100644 --- a/docs/src/docs/arc42/cross-cutting/under-the-hood.adoc +++ b/docs/src/docs/arc42/cross-cutting/under-the-hood.adoc @@ -1,82 +1,115 @@ = "Under-the-hood" concepts == Persistence + The IRS stores two types of data in a persistent way: - Job metadata - Job payloads, e.g. AAS shells or submodel data -All of this is data is stored in an object store. The currently used implementation is Minio (Amazon S3 compatible). -This reduces the complexity in storing and retrieving data. There also is no predefined model for the data, every document can be stored as it is. +All of this is data is stored in an object store. +The currently used implementation is Minio (Amazon S3 compatible). +This reduces the complexity in storing and retrieving data. +There also is no predefined model for the data, every document can be stored as it is. The downside of this approach is lack of query functionality, as we can only search through the keys of the entries but not based on the value data. In the future, another approach or an additional way to to index the data might be required. -To let the data survive system restarts, Minio needs to use a persistent volume for the data storage. A default configuration for this is provided in the Helm charts. +To let the data survive system restarts, Minio needs to use a persistent volume for the data storage. +A default configuration for this is provided in the Helm charts. == Transaction handling + There currently is no transaction management in the IRS. == Session handling + There is no session handling in the IRS, access is solely based on bearer tokens, the API is stateless. == Communication and integration -All interfaces to other systems are using RESTful calls over HTTP(S). Where central authentication is required, a common OAuth2 provider is used. + +All interfaces to other systems are using RESTful calls over HTTP(S). +Where central authentication is required, a common OAuth2 provider is used. For outgoing calls, the Spring RestTemplate mechanism is used and separate RestTemplates are created for the different ways of authentication. For incoming calls, we utilize the Spring REST Controller mechanism, annotating the interfaces accordingly and also documenting the endpoints using OpenAPI annotations. == Exception and error handling + There are two types of potential errors in the IRS: === Technical errors -Technical errors occur when there is a problem with the application itself, its configuration or directly connected infrastructure, e.g. the Minio persistence. Usually, the application cannot solve these problems by itself and requires some external support (manual work or automated recovery mechanisms, e.g. Kubernetes liveness probes). + +Technical errors occur when there is a problem with the application itself, its configuration or directly connected infrastructure, e.g. the Minio persistence. +Usually, the application cannot solve these problems by itself and requires some external support (manual work or automated recovery mechanisms, e.g. Kubernetes liveness probes). These errors are printed mainly to the application log and are relevant for the healthchecks. === Functional errors -Functional errors occur when there is a problem with the data that is being processed or external systems are unavailable and data cannot be sent / fetched as required for the process. While the system might not be able to provide the required function at that moment, it may work with a different dataset or as soon as the external systems recover. + +Functional errors occur when there is a problem with the data that is being processed or external systems are unavailable and data cannot be sent / fetched as required for the process. +While the system might not be able to provide the required function at that moment, it may work with a different dataset or as soon as the external systems recover. These errors are reported in the Job response and do not directly affect application health. === Rules for exception handling + ==== Throw or log, don't do both -When catching an exception, either log the exception and handle the problem or rethrow it, so it can be handled at a higher level of the code. By doing both, an exception might be written to the log multiple times, which can be confusing. + +When catching an exception, either log the exception and handle the problem or rethrow it, so it can be handled at a higher level of the code. +By doing both, an exception might be written to the log multiple times, which can be confusing. ==== Write own base exceptions for (internal) interfaces -By defining a common (checked) base exception for an interface, the caller is forced to handle potential errors, but can keep the logic simple. On the other hand, you still have the possibility to derive various, meaningful exceptions for different error cases, which can then be thrown via the API. + +By defining a common (checked) base exception for an interface, the caller is forced to handle potential errors, but can keep the logic simple. +On the other hand, you still have the possibility to derive various, meaningful exceptions for different error cases, which can then be thrown via the API. Of course, when using only RuntimeExceptions, this is not necessary - but those can be overlooked quite easily, so be careful there. ==== Central fallback exception handler -There will always be some exception that cannot be handled inside of the code correctly - or it may just have been unforeseen. A central fallback exception handler is required so all problems are visible in the log and the API always returns meaningful responses. In some cases, this is as simple as a HTTP 500. + +There will always be some exception that cannot be handled inside of the code correctly - or it may just have been unforeseen. +A central fallback exception handler is required so all problems are visible in the log and the API always returns meaningful responses. +In some cases, this is as simple as a HTTP 500. ==== Dont expose too much exception details over API -It's good to inform the user, why their request did not work, but only if they can do something about it (HTTP 4xx). So in case of application problems, you should not expose details of the problem to the caller. This way, we avoid opening potential attack vectors. -== Parallelization and threading -The heart of the IRS is the parallel execution of planned jobs. As almost each job requires multiple calls to various endpoints, those are done in parallel as well to reduce the total execution time for each job. +It's good to inform the user, why their request did not work, but only if they can do something about it (HTTP 4xx). +So in case of application problems, you should not expose details of the problem to the caller. +This way, we avoid opening potential attack vectors. -Tasks execution is orchestrated by the JobOrchestrator class. It utilizes a central ExecutorService, which manages the number of threads and schedules new Task as they come in. +== Parallelization and threading +The heart of the IRS is the parallel execution of planned jobs. +As almost each job requires multiple calls to various endpoints, those are done in parallel as well to reduce the total execution time for each job. +Tasks execution is orchestrated by the JobOrchestrator class. +It utilizes a central ExecutorService, which manages the number of threads and schedules new Task as they come in. == Plausibility checks and validation + Data validation happens at two points: -- IRS API: the data sent by the client is validated to match the model defined in the IRS. If the validation fails, the IRS sends a HTTP 400 response and indicates the problem to the caller. +- IRS API: the data sent by the client is validated to match the model defined in the IRS. +If the validation fails, the IRS sends a HTTP 400 response and indicates the problem to the caller. - Submodel payload: each time a submodel payload is requested from via EDC, the data is validated against the model defined in the SemanticHub for the matching aspect type. -- EDC Contract Offer Policy: each time IRS consumes data over the EDC, the policies of the offered contract will be validated. IDs of so-called "Rahmenverträgen" or Framework-Agreements can be added to the IRS Policy Store to be accepted by the IRS. If a Contract Offer does not match any of the IDs store in Policy Store, the contract offer will be declined and no data will be consumed. +- EDC Contract Offer Policy: each time IRS consumes data over the EDC, the policies of the offered contract will be validated. +IDs of so-called "Rahmenverträgen" or Framework-Agreements can be added to the IRS Policy Store to be accepted by the IRS. +If a Contract Offer does not match any of the IDs store in Policy Store, the contract offer will be declined and no data will be consumed. == Policy Store -The IRS gives its users the ability to manage, create and delete complex policies containing permissions and constraints in order to obtain the most precise control over access and use of data received from the edc provider. Policies stored in Policy Store will serve as input with allowed restriction and will be checked against every item from EDC Catalog. +The IRS gives its users the ability to manage, create and delete complex policies containing permissions and constraints in order to obtain the most precise control over access and use of data received from the edc provider. +Policies stored in Policy Store will serve as input with allowed restriction and will be checked against every item from EDC Catalog. -The structure of a Policy that can be stored in storage can be easily viewed by using Policy Store endpoints in the published API documentation. Each policy may contain more than one permission, which in turn consists of constraints linked together by AND or OR relationships. This model provides full flexibility and control over stored access and use policies. +The structure of a Policy that can be stored in storage can be easily viewed by using Policy Store endpoints in the published API documentation. +Each policy may contain more than one permission, which in turn consists of constraints linked together by AND or OR relationships. +This model provides full flexibility and control over stored access and use policies. == Digital Twin / EDC requirements -In order to work with the decentral network approach, IRS requires the Digital Twin to contain a `"subprotocolBody"` in each of the submodelDescriptor endpoints. This `"subprotocolBody"` has to contain the `"id"` of the EDC asset, as well as the `"dspEndpoint"` of the EDC, separated by a semicolon (e.g. `"subprotocolBody": "id=123;dspEndpoint=http://edc.control.plane/api/v1/dsp"`). +In order to work with the decentral network approach, IRS requires the Digital Twin to contain a `"subprotocolBody"` in each of the submodelDescriptor endpoints. +This `"subprotocolBody"` has to contain the `"id"` of the EDC asset, as well as the `"dspEndpoint"` of the EDC, separated by a semicolon (e.g. `"subprotocolBody": "id=123;dspEndpoint=http://edc.control.plane/api/v1/dsp"`). The `"dspEndpoint"` is used to request the EDC catalog of the dataprovider and the `"id"` to filter for the exact asset inside this catalog. @@ -94,7 +127,8 @@ Whenever a BPN is resolved via BPDM, the partner name is cached on IRS side, as === Semantics Hub -Whenever a semantic model schema is requested from the Semantic Hub, it is stored locally until the cache is evicted (configurable). The IRS can preload configured schema models on startup to reduce on demand call times. +Whenever a semantic model schema is requested from the Semantic Hub, it is stored locally until the cache is evicted (configurable). +The IRS can preload configured schema models on startup to reduce on demand call times. Additionally, models can be deployed with the system as a backup to the real Semantic Hub service. @@ -109,45 +143,54 @@ The time to live for both caches can be configured separately as described in th Further information on Discovery Service can be found in the chapter "System scope and context". +== Discovery Process + +=== Digital Twin Registry + +include::discovery-process-dtr.adoc[] + === EDC EndpointDataReferenceStorage is in-memory local storage that holds records (EndpointDataReferences) by either assetId or contractAgreementId. When EDC gets EndpointDataReference describing endpoint serving data it uses EndpointDataReferenceStorage and query it by assetId. -This allows reuse of already existing EndpointDataReference if it is present, valid, and it's token is not expired, -rather than starting whole new contract negotiation process. - -In case token is expired the process is also shortened. We don't have to start new contract negotiation process, -since we can obtain required contractAgreementId from present authCode. This improves request processing time. - -[source, mermaid] -.... -sequenceDiagram - autonumber - participant EdcSubmodelClient - participant ContractNegotiationService - participant EndpointDataReferenceStorage - participant EdcCallbackController - participant EdcDataPlaneClient - EdcSubmodelClient ->> EndpointDataReferenceStorage: Get EDR Token for EDC asset id - EndpointDataReferenceStorage ->> EdcSubmodelClient: Return Optional - alt Token is present and not expired - EdcSubmodelClient ->> EdcSubmodelClient: Optional.get - else - alt Token is expired - EdcSubmodelClient ->> ContractNegotiationService: Renew EDR Token based on existing Token - else Token is not present - EdcSubmodelClient ->> ContractNegotiationService: Negotiate new EDR Token - end - ContractNegotiationService -->> EdcCallbackController: EDC flow - EdcCallbackController ->> EndpointDataReferenceStorage: Store EDR token by EDC asset id after EDC callback - loop While EDR Token is not present - EdcSubmodelClient ->> EndpointDataReferenceStorage: Poll for EDR Token - end - EndpointDataReferenceStorage ->> EdcSubmodelClient: Return EDR Token +This allows reuse of already existing EndpointDataReference if it is present, valid, and it's token is not expired, rather than starting whole new contract negotiation process. + +In case token is expired the process is also shortened. +We don't have to start new contract negotiation process, since we can obtain required contractAgreementId from present authCode. +This improves request processing time. + +[plantuml,target=discovery-process-edc,format=svg] +---- +@startuml +autonumber +participant EdcSubmodelClient +participant ContractNegotiationService +participant EndpointDataReferenceStorage +participant EdcCallbackController +participant EdcDataPlaneClient + +EdcSubmodelClient ->> EndpointDataReferenceStorage: Get EDR Token for EDC asset id +EndpointDataReferenceStorage -->> EdcSubmodelClient: Return Optional + +alt Token is present and not expired + EdcSubmodelClient ->> EdcSubmodelClient: Optional.get +else + alt Token is expired + EdcSubmodelClient ->> ContractNegotiationService: Renew EDR Token based on existing Token + else Token is not present + EdcSubmodelClient ->> ContractNegotiationService: Negotiate new EDR Token + end + ContractNegotiationService -->> EdcCallbackController: EDC flow + EdcCallbackController ->> EndpointDataReferenceStorage: Store EDR token by EDC asset id after EDC callback + loop While EDR Token is not present + EdcSubmodelClient ->> EndpointDataReferenceStorage: Poll for EDR Token end - EdcSubmodelClient ->> EdcDataPlaneClient: Get data(EDR Token, Dataplane URL) - EdcDataPlaneClient ->> EdcSubmodelClient: Return data -.... + EndpointDataReferenceStorage -->> EdcSubmodelClient: Return EDR Token +end +EdcSubmodelClient ->> EdcDataPlaneClient: Get data(EDR Token, Dataplane URL) +EdcDataPlaneClient -->> EdcSubmodelClient: Return data +@enduml +---- diff --git a/docs/src/docs/arc42/glossary.adoc b/docs/src/docs/arc42/glossary.adoc index 9fb18f3da7..dbdb67a492 100644 --- a/docs/src/docs/arc42/glossary.adoc +++ b/docs/src/docs/arc42/glossary.adoc @@ -4,20 +4,32 @@ |Term |Description |AAS | Asset Administration Shell (Industry 4.0) -|Aspect servers (submodel endpoints) -|Companies participating in the interorganizational data exchange provides their data over aspect servers. The so called "submodel-descriptors" in the AAS shells are pointing to these AspectServers which provide the data-assets of the participating these companies in Catena-X. -|BoM |Bill of Materials -|Edge |see Traversal Aspect -|IRS |Item Relationship Service +|Aspect servers (submodel endpoints) | Companies participating in the interorganizational data exchange provides their data over aspect servers. The so called "submodel-descriptors" in the AAS shells are pointing to these AspectServers which provide the data-assets of the participating these companies in Catena-X. +|Bill of Materials (BoM) | A Bill of Materials is a comprehensive list of materials, components, sub-assemblies, and the quantities of each needed to manufacture or build a product. It serves as a structured document that provides information about the raw materials, parts, and components required for the production process. +|BPN | Business Partner Number +|Data Space|Data Spaces are the key concept for a large-scale, cross-border data economy. This is also the vision of the Gaia-X initiative for a data infrastructure in Europe. The International Data Space Association (IDSA) contributes significantly to this with the architectural model, interfaces, and standards. +|DT | Digital Twin +|DTR | Digital Twin Registry. The Digital Twin Registry is a registry which lists all digital twins and references their aspects including information about the underlying asset, asset manufacturer, and access options (e.g. aspect endpoints). +|Eclipse Dataspace Connector (EDC) | The Eclipse Data Space Connector (EDC) is a standard and policy-compliant connector that can be used within the scope of Catena-X, but also more generally as a connector for Data Spaces. It is split up into Control-Plane and Data-Plane, whereas the Control-Plane functions as administration layer and has responsibility of resource management, contract negotiation and administer data transfer. The Data-Plane does the heavy lifting of transferring and receiving data streams. For more information see: +https://github.com/eclipse-edc/Connector[EDC Connector] , https://github.com/eclipse-tractusx/tractusx-edc[Tractus-X EDC (Eclipse Dataspace Connector)] +|Edge | see Traversal Aspect +|IRS | Item Relationship Service |Item Graph |The result returned via the IRS. This corresponds to a tree structure in which each node represents a part of a virtual asset. -|MTPDC |Formerly known Service Name: Multi Tier Parts Data Chain -|PRS |Formerly known Service Name: Parts Relationship Service +|Managed Identity Wallet (MIW) +| The Managed Identity Wallets (MIW) service implements the Self-Sovereign-Identity (SSI) readiness by providing a wallet hosting platform including a decentralized identifier (DID) resolver, service endpoints and the company wallets itself. +For more information see: +https://github.com/eclipse-tractusx/managed-identity-wallet[eclipse-tractusx/managed-identity-wallet] , https://github.com/catenax-ng/tx-managed-identity-wallets[catenax-ng/tx-managed-identity-wallets] +|MTPDC | Formerly known Service Name: Multi Tier Parts Data Chain +|PolicyStore +| The Policy Store is an IRS component which provides an interface for getting, adding and deleting accepted IRS EDC policies. These policies will be used to validate EDC contract offers. EDC contract offers must include permissions that are equal to permission defined by an admin user in order to be allowed to use in IRS use cases. +For more information see: +https://github.com/eclipse-tractusx/ssi-docu/blob/main/docs/architecture/cx-3-2/edc/policy.definitions.md#0-introduction[Policy specification for Catena-X verifiable credentials] +|PRS | Formerly known Service Name: Parts Relationship Service +|Self-Sovereign Identity (SSI) +| For more information see: https://github.com/eclipse-tractusx/ssi-docu/tree/main/docs/architecture/cx-3-2[ssi-docu] |Traversal Aspect |aka Edge: Aspect which the IRS uses for traversal through the data chain. Identified by a parent-child or a child-parent relationship. Samples: SingleLevelBomAsPlanned, SingleLevelBomAsBuilt and SingleLevelUsageAsBuilt -|Verifiable Credential (VC) | For more information see: https://github.com/eclipse-tractusx/ssi-docu/tree/main/docs/architecture/cx-3-2/3.%20Verifiable%20Credentials[Verifiable Credentials] -|Eclipse Dataspace Connector (EDC) | For more information see: https://github.com/eclipse-tractusx/tractusx-edc -|Managed Identity Wallet (MIW) | For more information see: https://github.com/eclipse-tractusx/managed-identity-wallet -|Self-Sovereign Identity (SSI) | For more information see: https://github.com/eclipse-tractusx/ssi-docu/tree/main/docs/architecture/cx-3-2 -|PolicyStore | The Policy Store is an IRS component which provides an interface for getting, adding and deleting accepted IRS EDC policies. These policies will be used to validate EDC contract offers. EDC contract offers must include permissions that are equal to permission defined by an admin user in order to be allowed to use in IRS use cases. For more information see: https://github.com/eclipse-tractusx/ssi-docu/blob/main/docs/architecture/cx-3-2/edc/policy.definitions.md#0-introduction +|Verifiable Credential (VC) +| For more information see: https://github.com/eclipse-tractusx/ssi-docu/tree/main/docs/architecture/cx-3-2/3.%20Verifiable%20Credentials[Verifiable Credentials] |=== \ No newline at end of file diff --git a/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/delegate/AbstractDelegate.java b/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/delegate/AbstractDelegate.java index 2a033b8c6d..a999df7e53 100644 --- a/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/delegate/AbstractDelegate.java +++ b/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/delegate/AbstractDelegate.java @@ -86,30 +86,35 @@ protected ItemContainer next(final ItemContainer.ItemContainerBuilder itemContai } protected String requestSubmodelAsString(final EdcSubmodelFacade submodelFacade, - final ConnectorEndpointsService connectorEndpointsService, final Endpoint endpoint, final String bpn) + final ConnectorEndpointsService connectorEndpointsService, final Endpoint digitalTwinRegistryEndpoint, + final String bpn) throws EdcClientException { - final String subprotocolBody = endpoint.getProtocolInformation().getSubprotocolBody(); + + final String subprotocolBody = digitalTwinRegistryEndpoint.getProtocolInformation().getSubprotocolBody(); final Optional dspEndpoint = extractDspEndpoint(subprotocolBody); + if (dspEndpoint.isPresent()) { log.debug("Using dspEndpoint of subprotocolBody '{}' to get submodel payload", subprotocolBody); - return submodelFacade.getSubmodelRawPayload(dspEndpoint.get(), endpoint.getProtocolInformation().getHref(), + return submodelFacade.getSubmodelRawPayload(dspEndpoint.get(), + digitalTwinRegistryEndpoint.getProtocolInformation().getHref(), extractAssetId(subprotocolBody)); } else { log.info("SubprotocolBody does not contain '{}'. Using Discovery Service as fallback.", DSP_ENDPOINT); final List connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn); - return getSubmodel(submodelFacade, endpoint, connectorEndpoints); + + return getSubmodel(submodelFacade, digitalTwinRegistryEndpoint, connectorEndpoints); } } - private String getSubmodel(final EdcSubmodelFacade submodelFacade, final Endpoint endpoint, + private String getSubmodel(final EdcSubmodelFacade submodelFacade, final Endpoint digitalTwinRegistryEndpoint, final List connectorEndpoints) throws EdcClientException { for (final String connectorEndpoint : connectorEndpoints) { try { return submodelFacade.getSubmodelRawPayload(connectorEndpoint, - endpoint.getProtocolInformation().getHref(), - extractAssetId(endpoint.getProtocolInformation().getSubprotocolBody())); + digitalTwinRegistryEndpoint.getProtocolInformation().getHref(), + extractAssetId(digitalTwinRegistryEndpoint.getProtocolInformation().getSubprotocolBody())); } catch (EdcClientException e) { - log.info("EdcClientException while accessing endpoint '{}'", connectorEndpoint, e); + log.info("EdcClientException while accessing digitalTwinRegistryEndpoint '{}'", connectorEndpoint, e); } } throw new EdcClientException( diff --git a/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/delegate/DigitalTwinDelegate.java b/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/delegate/DigitalTwinDelegate.java index af4392bfce..468a221ba6 100644 --- a/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/delegate/DigitalTwinDelegate.java +++ b/irs-api/src/main/java/org/eclipse/tractusx/irs/aaswrapper/job/delegate/DigitalTwinDelegate.java @@ -37,7 +37,6 @@ import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryKey; import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryService; import org.eclipse.tractusx.irs.registryclient.exceptions.RegistryServiceException; -import org.springframework.web.client.RestClientException; /** * Retrieves AAShell from Digital Twin Registry service and storing it inside {@link ItemContainer}. @@ -55,8 +54,10 @@ public DigitalTwinDelegate(final AbstractDelegate nextStep, } @Override - public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContainerBuilder, final JobParameter jobData, - final AASTransferProcess aasTransferProcess, final PartChainIdentificationKey itemId) { + @SuppressWarnings("PMD.AvoidCatchingGenericException") + public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContainerBuilder, + final JobParameter jobData, final AASTransferProcess aasTransferProcess, + final PartChainIdentificationKey itemId) { if (StringUtils.isBlank(itemId.getBpn())) { log.warn("Could not process item with id {} because no BPN was provided. Creating Tombstone.", @@ -65,7 +66,9 @@ public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContai Tombstone.from(itemId.getGlobalAssetId(), null, "Can't get relationship without a BPN", 0, ProcessStep.DIGITAL_TWIN_REQUEST)).build(); } + try { + final AssetAdministrationShellDescriptor shell = digitalTwinRegistryService.fetchShells( List.of(new DigitalTwinRegistryKey(itemId.getGlobalAssetId(), itemId.getBpn()))) .stream() @@ -79,9 +82,13 @@ public ItemContainer process(final ItemContainer.ItemContainerBuilder itemContai // filter submodel descriptors if next delegate will not be executed itemContainerBuilder.shell(shell.withFilteredSubmodelDescriptors(jobData.getAspects())); } - } catch (final RestClientException | RegistryServiceException e) { + + } catch (final RegistryServiceException | RuntimeException e) { + // catching generic exception is intended here, + // otherwise Jobs stay in state RUNNING forever log.info("Shell Endpoint could not be retrieved for Item: {}. Creating Tombstone.", itemId); - itemContainerBuilder.tombstone(Tombstone.from(itemId.getGlobalAssetId(), null, e, retryCount, ProcessStep.DIGITAL_TWIN_REQUEST)); + itemContainerBuilder.tombstone( + Tombstone.from(itemId.getGlobalAssetId(), null, e, retryCount, ProcessStep.DIGITAL_TWIN_REQUEST)); } if (expectedDepthOfTreeIsNotReached(jobData.getDepth(), aasTransferProcess.getDepth())) { diff --git a/irs-common/src/main/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinder.java b/irs-common/src/main/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinder.java new file mode 100644 index 0000000000..a8fd1fb069 --- /dev/null +++ b/irs-common/src/main/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinder.java @@ -0,0 +1,150 @@ +/******************************************************************************** + * Copyright (c) 2022,2024 + * 2022: ZF Friedrichshafen AG + * 2022: ISTOS GmbH + * 2022,2024: Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * 2022,2023: BOSCH AG + * Copyright (c) 2021,2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +package org.eclipse.tractusx.irs.common.util.concurrent; + +import static java.util.concurrent.CompletableFuture.allOf; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; + +/** + * Helper class to find the relevant result from a list of futures. + */ +@Slf4j +public class ResultFinder { + + /** + * Returns a new {@link CompletableFuture} which completes + * when at least one of the given futures completes successfully or all fail. + * The result from the fastest successful future is returned. The others are ignored. + * + * @param futures the futures + * @param the return type + * @return a {@link CompletableFuture} returning the fastest successful result or empty + */ + public CompletableFuture getFastestResult(final List> futures) { + + if (futures == null || futures.isEmpty()) { + log.warn("Called getFastestResult with empty list of futures"); + return CompletableFuture.completedFuture(null); + } + + log.debug("Trying to get fastest result from list of futures"); + + // The purpose of this overall future is to track when the first data request is successful. + // This way we do not need to wait for the others to complete. + final CompletableFuture overallFuture = new CompletableFuture<>(); + + final List exceptions = new ArrayList<>(); + + final var futuresList = futures.stream() + .map(future -> future.exceptionally(collectingExceptionsAndThrow(exceptions)) + .handle(completingOnFirstSuccessful(overallFuture))) + .toList(); + + allOf(toArray(futuresList)).whenComplete((value, ex) -> { + + log.debug("All of the futures completed"); + + if (ex != null) { + log.warn("All failed: " + System.lineSeparator() // + + exceptions.stream() + .map(ExceptionUtils::getStackTrace) + .collect(Collectors.joining(System.lineSeparator())), ex); + + overallFuture.completeExceptionally(new CompletionExceptions("None successful", exceptions)); + } else { + overallFuture.complete(null); + } + }); + + return overallFuture; + } + + private static CompletableFuture[] toArray(final List> handledFutures) { + return handledFutures.toArray(new CompletableFuture[0]); + } + + private static BiFunction completingOnFirstSuccessful( + final CompletableFuture overallFuture) { + + return (value, throwable) -> { + + log.debug("value: '{}', throwable: {}", value, throwable); + + final boolean notFinishedByOtherFuture = !overallFuture.isDone(); + log.debug("notFinishedByOtherFuture {} ", notFinishedByOtherFuture); + + final boolean currentFutureSuccessful = throwable == null && value != null; + + if (notFinishedByOtherFuture && currentFutureSuccessful) { + + log.debug("First future that completed successfully"); + overallFuture.complete(value); + return true; + + } else { + if (throwable != null) { + log.warn("Exception occurred: " + throwable.getMessage(), throwable); + throw new CompletionException(throwable.getMessage(), throwable); + } + return false; + } + }; + } + + private static Function collectingExceptionsAndThrow(final List exceptions) { + return t -> { + log.error("Exception occurred: " + t.getMessage(), t); + exceptions.add(t); + throw new CompletionException(t); + }; + } + + /** + * Helper exception that can hold multiple causes. + */ + @Getter + @ToString + public static class CompletionExceptions extends CompletionException { + + private final List causes; + + public CompletionExceptions(final String msg, final List causes) { + super(msg); + this.causes = causes; + } + + } +} diff --git a/irs-common/src/test/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinderTest.java b/irs-common/src/test/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinderTest.java new file mode 100644 index 0000000000..c98ea5e83c --- /dev/null +++ b/irs-common/src/test/java/org/eclipse/tractusx/irs/common/util/concurrent/ResultFinderTest.java @@ -0,0 +1,162 @@ +/******************************************************************************** + * Copyright (c) 2022,2024 + * 2022: ZF Friedrichshafen AG + * 2022: ISTOS GmbH + * 2022,2024: Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * 2022,2023: BOSCH AG + * Copyright (c) 2021,2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +package org.eclipse.tractusx.irs.common.util.concurrent; + +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullAndEmptySource; + +/** + * Test for {@link ResultFinder} + */ +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +@DisplayName("ResultFinder") +class ResultFinderTest { + + final ResultFinder sut = new ResultFinder(); + + @NullAndEmptySource + @ParameterizedTest + void with_null_or_empty_input_list_it_should_return_null(final List> list) + throws ExecutionException, InterruptedException { + final var result = sut.getFastestResult(list).get(); + assertThat(result).isNull(); + } + + @Test + void with_one_successful_CompletableFuture_it_should_return_the_successful_result() + throws ExecutionException, InterruptedException { + final var futures = List.of(supplyAsync(() -> "ok")); + final String result = sut.getFastestResult(futures).get(); + assertThat(result).isEqualTo("ok"); + } + + @Test + void with_only_successful_and_other_CompletableFutures_failing_it_should_return_the_successful_result() + throws ExecutionException, InterruptedException { + + // given + final List> futures = List.of( // + supplyAsync(() -> { + throw new RuntimeException("failing"); + }), // + supplyAsync(() -> "ok"), // + supplyAsync(() -> { + throw new RuntimeException("failing"); + })); + + // when + final String result = sut.getFastestResult(futures).get(); + + // then + assertThat(result).isEqualTo("ok"); + } + + @Test + void with_all_CompletableFutures_failing_it_should_throw() { + + // given + final List> futures = List.of( // + futureThrowAfterMillis(5000, () -> new RuntimeException("failing 1")), // + supplyAsync(() -> { + throw new RuntimeException("failing 2"); + }), // + futureThrowAfterMillis(1000, () -> new RuntimeException("failing 3"))); + + // when + final ThrowingCallable call = () -> sut.getFastestResult(futures).get(); + + // then + assertThatThrownBy(call).isInstanceOf(ExecutionException.class) + .extracting(Throwable::getCause) + .isInstanceOf(ResultFinder.CompletionExceptions.class) + .extracting(collectedFailures -> (ResultFinder.CompletionExceptions) collectedFailures) + .extracting(ResultFinder.CompletionExceptions::getCauses) + .describedAs("should have collected all exceptions") + .satisfies(causes -> assertThat( + causes.stream().map(Throwable::getMessage).toList()).containsExactlyInAnyOrder( + "java.lang.RuntimeException: failing 1", + "java.lang.RuntimeException: failing 2", + "java.lang.RuntimeException: failing 3")); + + } + + @Test + void with_multiple_successful_CompletableFutures_it_should_return_fastest_successful_result() + throws ExecutionException, InterruptedException { + + // given + final List> futures = List.of( // + futureThrowAfterMillis(200, () -> new RuntimeException("slower failing")), // + futureReturnAfterMillis(1000, () -> "slowest success"), // + futureReturnAfterMillis(300, () -> "fastest success"), // + futureThrowAfterMillis(0, () -> new RuntimeException("failing immediately")) // + ); + + // when + final String result = sut.getFastestResult(futures).get(); + + // then + assertThat(result).isEqualTo("fastest success"); + } + + private static CompletableFuture futureThrowAfterMillis(final int sleepMillis, + final Supplier exceptionSupplier) { + return supplyAsync(() -> { + sleep(sleepMillis); + throw exceptionSupplier.get(); + }); + } + + private static CompletableFuture futureReturnAfterMillis(final int sleepMillis, + final Supplier resultSupplier) { + return supplyAsync(() -> { + sleep(sleepMillis); + return resultSupplier.get(); + }); + } + + private static void sleep(final int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file diff --git a/irs-edc-client/pom.xml b/irs-edc-client/pom.xml index 10d3049c1e..a767cde24d 100644 --- a/irs-edc-client/pom.xml +++ b/irs-edc-client/pom.xml @@ -168,6 +168,11 @@ org.eclipse.edc ${edc.version} + + org.eclipse.tractusx.irs + irs-common + ${revision} + org.eclipse.tractusx.irs diff --git a/irs-edc-client/src/test/java/org/eclipse/tractusx/irs/edc/client/EdcSubmodelFacadeTest.java b/irs-edc-client/src/test/java/org/eclipse/tractusx/irs/edc/client/EdcSubmodelFacadeTest.java index a0430a84d9..43146d4147 100644 --- a/irs-edc-client/src/test/java/org/eclipse/tractusx/irs/edc/client/EdcSubmodelFacadeTest.java +++ b/irs-edc-client/src/test/java/org/eclipse/tractusx/irs/edc/client/EdcSubmodelFacadeTest.java @@ -36,6 +36,8 @@ import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.eclipse.tractusx.irs.edc.client.exceptions.EdcClientException; import org.eclipse.tractusx.irs.edc.client.model.notification.EdcNotificationResponse; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -55,134 +57,151 @@ class EdcSubmodelFacadeTest { @Mock private EdcSubmodelClient client; + @Nested + @DisplayName("getSubmodelRawPayload") + class GetSubmodelRawPayloadTests { + + @Test + void shouldThrowExecutionExceptionForSubmodel() throws EdcClientException { + // arrange + final ExecutionException e = new ExecutionException(new EdcClientException("test")); + final CompletableFuture future = CompletableFuture.failedFuture(e); + when(client.getSubmodelRawPayload(any(), any(), any())).thenReturn(future); + + // act + ThrowableAssert.ThrowingCallable action = () -> testee.getSubmodelRawPayload(CONNECTOR_ENDPOINT, + SUBMODEL_SUFIX, ASSET_ID); + + // assert + assertThatThrownBy(action).isInstanceOf(EdcClientException.class); + } + + @Test + void shouldThrowEdcClientExceptionForSubmodel() throws EdcClientException { + // arrange + final EdcClientException e = new EdcClientException("test"); + when(client.getSubmodelRawPayload(any(), any(), any())).thenThrow(e); + + // act + ThrowableAssert.ThrowingCallable action = () -> testee.getSubmodelRawPayload(CONNECTOR_ENDPOINT, + SUBMODEL_SUFIX, ASSET_ID); + + // assert + assertThatThrownBy(action).isInstanceOf(EdcClientException.class); + } + + @Test + void shouldRestoreInterruptOnInterruptExceptionForSubmodel() + throws EdcClientException, ExecutionException, InterruptedException { + // arrange + final CompletableFuture future = mock(CompletableFuture.class); + final InterruptedException e = new InterruptedException(); + when(future.get()).thenThrow(e); + when(client.getSubmodelRawPayload(any(), any(), any())).thenReturn(future); + + // act + testee.getSubmodelRawPayload(CONNECTOR_ENDPOINT, SUBMODEL_SUFIX, ASSET_ID); + + // assert + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } - @Test - void shouldThrowExecutionExceptionForSubmodel() throws EdcClientException { - // arrange - final ExecutionException e = new ExecutionException(new EdcClientException("test")); - final CompletableFuture future = CompletableFuture.failedFuture(e); - when(client.getSubmodelRawPayload(any(), any(), any())).thenReturn(future); - - // act - ThrowableAssert.ThrowingCallable action = () -> testee.getSubmodelRawPayload(CONNECTOR_ENDPOINT, SUBMODEL_SUFIX, ASSET_ID); - - // assert - assertThatThrownBy(action).isInstanceOf(EdcClientException.class); - } - - @Test - void shouldThrowEdcClientExceptionForSubmodel() throws EdcClientException { - // arrange - final EdcClientException e = new EdcClientException("test"); - when(client.getSubmodelRawPayload(any(), any(), any())).thenThrow(e); - - // act - ThrowableAssert.ThrowingCallable action = () -> testee.getSubmodelRawPayload(CONNECTOR_ENDPOINT, SUBMODEL_SUFIX, ASSET_ID); - - // assert - assertThatThrownBy(action).isInstanceOf(EdcClientException.class); - } - - @Test - void shouldRestoreInterruptOnInterruptExceptionForSubmodel() - throws EdcClientException, ExecutionException, InterruptedException { - // arrange - final CompletableFuture future = mock(CompletableFuture.class); - final InterruptedException e = new InterruptedException(); - when(future.get()).thenThrow(e); - when(client.getSubmodelRawPayload(any(), any(), any())).thenReturn(future); - - // act - testee.getSubmodelRawPayload(CONNECTOR_ENDPOINT, SUBMODEL_SUFIX, ASSET_ID); - - // assert - assertThat(Thread.currentThread().isInterrupted()).isTrue(); - } - - @Test - void shouldRestoreInterruptOnInterruptExceptionForNotification() - throws EdcClientException, ExecutionException, InterruptedException { - // arrange - final CompletableFuture future = mock(CompletableFuture.class); - final InterruptedException e = new InterruptedException(); - when(future.get()).thenThrow(e); - when(client.sendNotification(any(), any(), any())).thenReturn(future); - - // act - testee.sendNotification("", "notify-request-asset", null); - - // assert - assertThat(Thread.currentThread().isInterrupted()).isTrue(); - } - - @Test - void shouldThrowExecutionExceptionForNotification() throws EdcClientException { - // arrange - final ExecutionException e = new ExecutionException(new EdcClientException("test")); - final CompletableFuture future = CompletableFuture.failedFuture(e); - when(client.sendNotification(any(), any(), any())).thenReturn(future); - - // act - ThrowableAssert.ThrowingCallable action = () -> testee.sendNotification("", "notify-request-asset", null); - - // assert - assertThatThrownBy(action).isInstanceOf(EdcClientException.class); } - @Test - void shouldThrowEdcClientExceptionForNotification() throws EdcClientException { - // arrange - final EdcClientException e = new EdcClientException("test"); - when(client.sendNotification(any(), any(), any())).thenThrow(e); - - // act - ThrowableAssert.ThrowingCallable action = () -> testee.sendNotification("", "notify-request-asset", null); - - // assert - assertThatThrownBy(action).isInstanceOf(EdcClientException.class); + @Nested + @DisplayName("sendNotification") + class SendNotificationTests { + + @Test + void shouldRestoreInterruptOnInterruptExceptionForNotification() + throws EdcClientException, ExecutionException, InterruptedException { + // arrange + final CompletableFuture future = mock(CompletableFuture.class); + final InterruptedException e = new InterruptedException(); + when(future.get()).thenThrow(e); + when(client.sendNotification(any(), any(), any())).thenReturn(future); + + // act + testee.sendNotification("", "notify-request-asset", null); + + // assert + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } + + @Test + void shouldThrowExecutionExceptionForNotification() throws EdcClientException { + // arrange + final ExecutionException e = new ExecutionException(new EdcClientException("test")); + final CompletableFuture future = CompletableFuture.failedFuture(e); + when(client.sendNotification(any(), any(), any())).thenReturn(future); + + // act + ThrowableAssert.ThrowingCallable action = () -> testee.sendNotification("", "notify-request-asset", null); + + // assert + assertThatThrownBy(action).isInstanceOf(EdcClientException.class); + } + + @Test + void shouldThrowEdcClientExceptionForNotification() throws EdcClientException { + // arrange + final EdcClientException e = new EdcClientException("test"); + when(client.sendNotification(any(), any(), any())).thenThrow(e); + + // act + ThrowableAssert.ThrowingCallable action = () -> testee.sendNotification("", "notify-request-asset", null); + + // assert + assertThatThrownBy(action).isInstanceOf(EdcClientException.class); + } } - @Test - void shouldThrowEdcClientExceptionForEndpointReference() throws EdcClientException { - // arrange - final EdcClientException e = new EdcClientException("test"); - when(client.getEndpointReferenceForAsset(any(), any(), any())).thenThrow(e); - - // act - ThrowableAssert.ThrowingCallable action = () -> testee.getEndpointReferenceForAsset("", "", ""); - - // assert - assertThatThrownBy(action).isInstanceOf(EdcClientException.class); - } - - @Test - void shouldThrowExecutionExceptionForEndpointReference() throws EdcClientException { - // arrange - final ExecutionException e = new ExecutionException(new EdcClientException("test")); - final CompletableFuture future = CompletableFuture.failedFuture(e); - when(client.getEndpointReferenceForAsset(any(), any(), any())).thenReturn(future); - - // act - ThrowableAssert.ThrowingCallable action = () -> testee.getEndpointReferenceForAsset("", "", ""); - - // assert - assertThatThrownBy(action).isInstanceOf(EdcClientException.class); - } - - @Test - void shouldRestoreInterruptOnInterruptExceptionForEndpointReference() - throws EdcClientException, ExecutionException, InterruptedException { - // arrange - final CompletableFuture future = mock(CompletableFuture.class); - final InterruptedException e = new InterruptedException(); - when(future.get()).thenThrow(e); - when(client.getEndpointReferenceForAsset(any(), any(), any())).thenReturn(future); - - // act - testee.getEndpointReferenceForAsset("", "", ""); - - // assert - assertThat(Thread.currentThread().isInterrupted()).isTrue(); + @Nested + @DisplayName("getEndpointReferenceForAsset") + class GetEndpointReferenceForAssetTests { + + @Test + void shouldThrowEdcClientExceptionForEndpointReference() throws EdcClientException { + // arrange + final EdcClientException e = new EdcClientException("test"); + when(client.getEndpointReferenceForAsset(any(), any(), any())).thenThrow(e); + + // act + ThrowableAssert.ThrowingCallable action = () -> testee.getEndpointReferenceForAsset("", "", ""); + + // assert + assertThatThrownBy(action).isInstanceOf(EdcClientException.class); + } + + @Test + void shouldThrowExecutionExceptionForEndpointReference() throws EdcClientException { + // arrange + final ExecutionException e = new ExecutionException(new EdcClientException("test")); + final CompletableFuture future = CompletableFuture.failedFuture(e); + when(client.getEndpointReferenceForAsset(any(), any(), any())).thenReturn(future); + + // act + ThrowableAssert.ThrowingCallable action = () -> testee.getEndpointReferenceForAsset("", "", ""); + + // assert + assertThatThrownBy(action).isInstanceOf(EdcClientException.class); + } + + @Test + void shouldRestoreInterruptOnInterruptExceptionForEndpointReference() + throws EdcClientException, ExecutionException, InterruptedException { + // arrange + final CompletableFuture future = mock(CompletableFuture.class); + final InterruptedException e = new InterruptedException(); + when(future.get()).thenThrow(e); + when(client.getEndpointReferenceForAsset(any(), any(), any())).thenReturn(future); + + // act + testee.getEndpointReferenceForAsset("", "", ""); + + // assert + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + } } } \ No newline at end of file diff --git a/irs-registry-client/pom.xml b/irs-registry-client/pom.xml index 7c9a483a26..378022ac12 100644 --- a/irs-registry-client/pom.xml +++ b/irs-registry-client/pom.xml @@ -65,6 +65,11 @@ snakeyaml ${snakeyaml.version} + + org.eclipse.tractusx.irs + irs-common + ${revision} + org.eclipse.tractusx.irs irs-testing diff --git a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryService.java b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryService.java index f372c21668..603feb0b84 100644 --- a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryService.java +++ b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryService.java @@ -23,104 +23,188 @@ ********************************************************************************/ package org.eclipse.tractusx.irs.registryclient.decentral; -import java.time.Instant; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.tractusx.irs.common.util.concurrent.ResultFinder; import org.eclipse.tractusx.irs.component.assetadministrationshell.AssetAdministrationShellDescriptor; import org.eclipse.tractusx.irs.component.assetadministrationshell.IdentifierKeyValuePair; -import org.eclipse.tractusx.irs.edc.client.model.EDRAuthCode; import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryKey; import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryService; import org.eclipse.tractusx.irs.registryclient.discovery.ConnectorEndpointsService; import org.eclipse.tractusx.irs.registryclient.exceptions.RegistryServiceException; import org.eclipse.tractusx.irs.registryclient.exceptions.ShellNotFoundException; import org.jetbrains.annotations.NotNull; +import org.springframework.util.StopWatch; /** * Decentral implementation of DigitalTwinRegistryService */ @RequiredArgsConstructor @Slf4j +@SuppressWarnings("PMD.TooManyMethods") public class DecentralDigitalTwinRegistryService implements DigitalTwinRegistryService { + private static final String TOOK_MS = "{} took {} ms"; + private final ConnectorEndpointsService connectorEndpointsService; private final EndpointDataForConnectorsService endpointDataForConnectorsService; private final DecentralDigitalTwinRegistryClient decentralDigitalTwinRegistryClient; + private ResultFinder resultFinder = new ResultFinder(); + private static Stream>> groupKeysByBpn( final Collection keys) { return keys.stream().collect(Collectors.groupingBy(DigitalTwinRegistryKey::bpn)).entrySet().stream(); } + /** + * Package private setter in order to allow simulating {@link InterruptedException} + * and {@link ExecutionException} in tests. + * + * @param resultFinder the {@link ResultFinder} + */ + /* package */ void setResultFinder(final ResultFinder resultFinder) { + this.resultFinder = resultFinder; + } + @Override + @SuppressWarnings("PMD.AvoidCatchingGenericException") public Collection fetchShells(final Collection keys) throws RegistryServiceException { - log.info("Fetching shell(s) for {} key(s)", keys.size()); - final var calledEndpoints = new HashSet(); - final var collectedShells = groupKeysByBpn(keys).flatMap( - entry -> fetchShellDescriptors(calledEndpoints, entry.getKey(), entry.getValue())).toList(); - if (collectedShells.isEmpty()) { - throw new ShellNotFoundException("Unable to find any of the requested shells", calledEndpoints); - } else { - log.info("Found {} shell(s) for {} key(s)", collectedShells.size(), keys.size()); - return collectedShells; + + final var watch = new StopWatch(); + final String msg = "Fetching shell(s) for %s key(s)".formatted(keys.size()); + watch.start(msg); + log.info(msg); + + try { + final var calledEndpoints = new HashSet(); + + final var collectedShells = groupKeysByBpn(keys).flatMap(entry -> { + + try { + return fetchShellDescriptors(entry, calledEndpoints); + } catch (RuntimeException e) { + // catching generic exception is intended here, + // otherwise Jobs stay in state RUNNING forever + log.warn(e.getMessage(), e); + return Stream.empty(); + } + + }).toList(); + + if (collectedShells.isEmpty()) { + log.info("No shells found"); + throw new ShellNotFoundException("Unable to find any of the requested shells", calledEndpoints); + } else { + log.info("Found {} shell(s) for {} key(s)", collectedShells.size(), keys.size()); + return collectedShells; + } + + } finally { + watch.stop(); + log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis()); } } - @NotNull - private Stream fetchShellDescriptors(final Set calledEndpoints, - final String bpn, final List keys) { - log.info("Fetching {} shells for bpn {}", keys.size(), bpn); - final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn); - calledEndpoints.addAll(connectorEndpoints); + private Stream fetchShellDescriptors( + final Map.Entry> entry, final Set calledEndpoints) { - final List descriptors = new ArrayList<>(); + try { - EndpointDataReference endpointDataReference = null; + final var futures = fetchShellDescriptors(calledEndpoints, entry.getKey(), entry.getValue()); + final var shellDescriptors = futures.get(); + return shellDescriptors.stream(); - for (final DigitalTwinRegistryKey key : keys) { - endpointDataReference = renewIfNecessary(endpointDataReference, connectorEndpoints); - descriptors.add(fetchShellDescriptor(endpointDataReference, key)); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + Thread.currentThread().interrupt(); + return Stream.empty(); + } catch (ExecutionException e) { + log.warn(e.getMessage(), e); + return Stream.empty(); } - - return descriptors.stream(); } - private EndpointDataReference renewIfNecessary(final EndpointDataReference endpointDataReference, - final List connectorEndpoints) { - if (endpointDataReference == null || endpointDataReference.getAuthCode() == null) { - return getEndpointDataReference(connectorEndpoints); - } else { - final var tokenExpirationInstant = extractTokenExpiration(endpointDataReference.getAuthCode()); - if (Instant.now().isAfter(tokenExpirationInstant)) { - log.info("EndpointDataReference token has expired, getting a new one."); - return getEndpointDataReference(connectorEndpoints); - } - return endpointDataReference; + private CompletableFuture> fetchShellDescriptors( + final Set calledEndpoints, final String bpn, final List keys) { + + final var watch = new StopWatch(); + final String msg = "Fetching %s shells for bpn '%s'".formatted(keys.size(), bpn); + watch.start(msg); + log.info(msg); + + try { + final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn); + + log.info("Found {} connector endpoints for bpn '{}'", connectorEndpoints.size(), bpn); + calledEndpoints.addAll(connectorEndpoints); + + return fetchShellDescriptorsForConnectorEndpoints(keys, connectorEndpoints); + + } finally { + watch.stop(); + log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis()); } } - private Instant extractTokenExpiration(final String token) { - return Instant.ofEpochSecond(EDRAuthCode.fromAuthCodeToken(token).getExp()); + private CompletableFuture> fetchShellDescriptorsForConnectorEndpoints( + final List keys, final List connectorEndpoints) { + + final var service = endpointDataForConnectorsService; + final var futures = service.createFindEndpointDataForConnectorsFutures(connectorEndpoints) + .stream() + .map(edrFuture -> edrFuture.thenCompose(edr -> CompletableFuture.supplyAsync( + () -> fetchShellDescriptorsForKey(keys, edr)))) + .toList(); + + log.debug("Created {} futures", futures.size()); + + return resultFinder.getFastestResult(futures); + } + + private List fetchShellDescriptorsForKey( + final List keys, final EndpointDataReference endpointDataReference) { + + final var watch = new StopWatch(); + final String msg = "Fetching shell descriptors for keys %s from endpoint '%s'".formatted(keys, + endpointDataReference.getEndpoint()); + watch.start(msg); + log.info(msg); + try { + return keys.stream().map(key -> fetchShellDescriptor(endpointDataReference, key)).toList(); + } finally { + watch.stop(); + log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis()); + } } private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDataReference endpointDataReference, final DigitalTwinRegistryKey key) { - log.info("Retrieving AAS Identification for DigitalTwinRegistryKey: {}", key); - final String aaShellIdentification = mapToShellId(endpointDataReference, key.shellId()); - return decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(endpointDataReference, - aaShellIdentification); + final var watch = new StopWatch(); + final String msg = "Retrieving AAS identification for DigitalTwinRegistryKey: '%s'".formatted(key); + watch.start(msg); + log.info(msg); + try { + final String aaShellIdentification = mapToShellId(endpointDataReference, key.shellId()); + return decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(endpointDataReference, + aaShellIdentification); + } finally { + watch.stop(); + log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis()); + } } /** @@ -134,37 +218,114 @@ private AssetAdministrationShellDescriptor fetchShellDescriptor(final EndpointDa */ @NotNull private String mapToShellId(final EndpointDataReference endpointDataReference, final String key) { - final var identifierKeyValuePair = IdentifierKeyValuePair.builder().name("globalAssetId").value(key).build(); - final var aaShellIdentification = decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink( - endpointDataReference, List.of(identifierKeyValuePair)).getResult().stream().findFirst().orElse(key); - - if (key.equals(aaShellIdentification)) { - log.info("Found shell with shellId {} in registry", aaShellIdentification); - } else { - log.info("Retrieved shellId {} for globalAssetId {}", aaShellIdentification, key); + + final var watch = new StopWatch(); + final String msg = "Mapping '%s' to shell ID for endpoint '%s'".formatted(key, + endpointDataReference.getEndpoint()); + watch.start(msg); + log.info(msg); + + try { + + final var identifierKeyValuePair = IdentifierKeyValuePair.builder() + .name("globalAssetId") + .value(key) + .build(); + final var aaShellIdentification = decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink( + endpointDataReference, List.of(identifierKeyValuePair)) + .getResult() + .stream() + .findFirst() + .orElse(key); + + if (key.equals(aaShellIdentification)) { + log.info("Found shell with shellId {} in registry", aaShellIdentification); + } else { + log.info("Retrieved shellId {} for globalAssetId {}", aaShellIdentification, key); + } + + return aaShellIdentification; + + } finally { + watch.stop(); + log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis()); + } + } + + @SuppressWarnings("PMD.AvoidCatchingGenericException") + private Collection lookupShellIds(final String bpn) throws RegistryServiceException { + + log.info("Looking up shell ids for bpn {}", bpn); + + try { + + final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn); + log.info("Looking up shell ids for bpn '{}' with connector endpoints {}", bpn, connectorEndpoints); + + final var endpointDataReferenceFutures = endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures( + connectorEndpoints); + log.debug("Created endpointDataReferenceFutures"); + + return lookupShellIds(bpn, endpointDataReferenceFutures); + + } catch (RuntimeException e) { + // catching generic exception is intended here, + // otherwise Jobs stay in state RUNNING forever + log.error(e.getMessage(), e); + throw new RegistryServiceException( + "%s occurred while looking up shell ids for bpn '%s'".formatted(e.getClass().getSimpleName(), bpn), + e); } - return aaShellIdentification; } @NotNull - private EndpointDataReference getEndpointDataReference(final List connectorEndpoints) { - return endpointDataForConnectorsService.findEndpointDataForConnectors(connectorEndpoints); + private Collection lookupShellIds(final String bpn, + final List> endpointDataReferenceFutures) + throws RegistryServiceException { + + try { + final var futures = endpointDataReferenceFutures.stream() + .map(edrFuture -> edrFuture.thenCompose( + edr -> CompletableFuture.supplyAsync( + () -> lookupShellIds(bpn, edr)))) + .toList(); + final var shellIds = resultFinder.getFastestResult(futures).get(); + + log.info("Found {} shell id(s) in total", shellIds.size()); + return shellIds; + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RegistryServiceException( + "%s occurred while looking up shell ids for bpn '%s'".formatted(e.getClass().getSimpleName(), bpn), + e); + } catch (ExecutionException e) { + throw new RegistryServiceException( + "%s occurred while looking up shell ids for bpn '%s'".formatted(e.getClass().getSimpleName(), bpn), + e); + } } - private Collection lookupShellIds(final String bpn) { - log.info("Looking up shell ids for bpn {}", bpn); - final var connectorEndpoints = connectorEndpointsService.fetchConnectorEndpoints(bpn); - final var endpointDataReference = getEndpointDataReference(connectorEndpoints); - - final var shellIds = decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink( - endpointDataReference, - List.of(IdentifierKeyValuePair.builder().name("manufacturerId").value(bpn).build())).getResult(); - log.info("Found {} shell id(s) in total", shellIds.size()); - return shellIds; + private Collection lookupShellIds(final String bpn, final EndpointDataReference endpointDataReference) { + + final var watch = new StopWatch(); + final String msg = "Looking up shell IDs for bpn '%s' with endpointDataReference '%s'".formatted(bpn, + endpointDataReference); + watch.start(msg); + log.info(msg); + + try { + return decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink( + endpointDataReference, + List.of(IdentifierKeyValuePair.builder().name("manufacturerId").value(bpn).build())).getResult(); + } finally { + watch.stop(); + log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis()); + } } @Override - public Collection lookupShellIdentifiers(final String bpn) { + public Collection lookupShellIdentifiers(final String bpn) throws RegistryServiceException { return lookupShellIds(bpn).stream().map(id -> new DigitalTwinRegistryKey(id, bpn)).toList(); } diff --git a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsService.java b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsService.java index e54e6ebfb1..f6442bdaf2 100644 --- a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsService.java +++ b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsService.java @@ -23,12 +23,17 @@ ********************************************************************************/ package org.eclipse.tractusx.irs.registryclient.decentral; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; -import org.springframework.web.client.RestClientException; +import org.springframework.util.StopWatch; /** * Service that use edc client to make calls to edc connector endpoints @@ -40,22 +45,51 @@ public class EndpointDataForConnectorsService { private static final String DT_REGISTRY_ASSET_TYPE = "https://w3id.org/edc/v0.0.1/ns/type"; private static final String DT_REGISTRY_ASSET_VALUE = "data.core.digitalTwinRegistry"; + private static final String TOOK_MS = "{} took {} ms"; private final EdcEndpointReferenceRetriever edcSubmodelFacade; - public EndpointDataReference findEndpointDataForConnectors(final List connectorEndpoints) { - for (final String connector : connectorEndpoints) { - log.info("Trying to retrieve EndpointDataReference for connector {}", connector); - try { - return edcSubmodelFacade.getEndpointReferenceForAsset(connector, DT_REGISTRY_ASSET_TYPE, - DT_REGISTRY_ASSET_VALUE); - } catch (EdcRetrieverException e) { - log.warn("Exception occurred when retrieving EndpointDataReference from connector {}", connector, e); - } + public List> createFindEndpointDataForConnectorsFutures( + final List connectorEndpoints) { + + final var watch = new StopWatch(); + final String msg = "Creating futures to get EndpointDataReferences for endpoints: %s".formatted( + connectorEndpoints); + watch.start(msg); + log.info(msg); + + List> futures = Collections.emptyList(); + try { + futures = connectorEndpoints.stream() + .map(connectorEndpoint -> supplyAsync( + () -> getEndpointReferenceForAsset(connectorEndpoint))) + .toList(); + return futures; + } finally { + log.info("Created {} futures", futures.size()); + watch.stop(); + log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis()); } - throw new RestClientException( - "EndpointDataReference was not found. Requested connectorEndpoints: " + String.join(", ", - connectorEndpoints)); + } + + private EndpointDataReference getEndpointReferenceForAsset(final String connector) { + + final var watch = new StopWatch(); + final String msg = "Trying to retrieve EndpointDataReference for connector '%s'".formatted(connector); + watch.start(msg); + log.info(msg); + + try { + return edcSubmodelFacade.getEndpointReferenceForAsset(connector, DT_REGISTRY_ASSET_TYPE, + DT_REGISTRY_ASSET_VALUE); + } catch (EdcRetrieverException e) { + log.warn("Exception occurred when retrieving EndpointDataReference from connector '{}'", connector, e); + throw new CompletionException(e.getMessage(), e); + } finally { + watch.stop(); + log.info(TOOK_MS, watch.getLastTaskName(), watch.getLastTaskTimeMillis()); + } + } } diff --git a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/discovery/ConnectorEndpointsService.java b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/discovery/ConnectorEndpointsService.java index 511156f104..590eb66a5b 100644 --- a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/discovery/ConnectorEndpointsService.java +++ b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/discovery/ConnectorEndpointsService.java @@ -46,26 +46,27 @@ public class ConnectorEndpointsService { @Cacheable(CONNECTOR_ENDPOINT_SERVICE_CACHE_NAME) public List fetchConnectorEndpoints(final String bpn) { + if (StringUtils.isBlank(bpn)) { log.warn("BPN was null, cannot search for any connector endpoints. Returning empty list."); return List.of(); } log.info("Requesting connector endpoints for BPN {}", bpn); - final DiscoveryFinderRequest onlyBpn = new DiscoveryFinderRequest(List.of("bpn")); - final List discoveryEndpoints = discoveryFinderClient.findDiscoveryEndpoints(onlyBpn) - .endpoints(); - final List providedBpn = List.of(bpn); + + final var onlyBpn = new DiscoveryFinderRequest(List.of("bpn")); + final var discoveryEndpoints = discoveryFinderClient.findDiscoveryEndpoints(onlyBpn).endpoints(); final var endpoints = discoveryEndpoints.stream() .flatMap( discoveryEndpoint -> discoveryFinderClient.findConnectorEndpoints( - discoveryEndpoint.endpointAddress(), providedBpn) + discoveryEndpoint.endpointAddress(), List.of(bpn)) .stream() .filter(edcDiscoveryResult -> edcDiscoveryResult.bpn() .equals(bpn)) .map(EdcDiscoveryResult::connectorEndpoint)) .flatMap(List::stream) .toList(); + log.info("Discovered the following endpoints for BPN '{}': '{}'", bpn, String.join(", ", endpoints)); return endpoints; } diff --git a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/discovery/DiscoveryFinderClientImpl.java b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/discovery/DiscoveryFinderClientImpl.java index dd47d4a513..b2d375759b 100644 --- a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/discovery/DiscoveryFinderClientImpl.java +++ b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/discovery/DiscoveryFinderClientImpl.java @@ -65,9 +65,11 @@ public void evictDiscoveryEndpointsCacheValues() { @Override @Retry(name = "registry") public List findConnectorEndpoints(final String endpointAddress, final List bpns) { - final EdcDiscoveryResult[] edcDiscoveryResults = restTemplate.postForObject(endpointAddress, bpns, - EdcDiscoveryResult[].class); - return edcDiscoveryResults == null ? List.of() : List.of(edcDiscoveryResults); + return toList(restTemplate.postForObject(endpointAddress, bpns, EdcDiscoveryResult[].class)); + } + + private static List toList(final T... arr) { + return arr == null ? List.of() : List.of(arr); } } diff --git a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/exceptions/RegistryServiceException.java b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/exceptions/RegistryServiceException.java index 1aebfdad34..3995b27a30 100644 --- a/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/exceptions/RegistryServiceException.java +++ b/irs-registry-client/src/main/java/org/eclipse/tractusx/irs/registryclient/exceptions/RegistryServiceException.java @@ -31,4 +31,8 @@ public class RegistryServiceException extends Exception { public RegistryServiceException(final String msg) { super(msg); } + + public RegistryServiceException(final String msg, final Throwable cause) { + super(msg, cause); + } } diff --git a/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/DefaultConfigurationTest.java b/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/DefaultConfigurationTest.java index 0372bb3ef5..4a3b67fd5f 100644 --- a/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/DefaultConfigurationTest.java +++ b/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/DefaultConfigurationTest.java @@ -26,17 +26,20 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.List; +import java.util.concurrent.ExecutionException; +import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.eclipse.tractusx.irs.edc.client.EdcSubmodelClient; import org.eclipse.tractusx.irs.edc.client.EdcSubmodelFacade; import org.eclipse.tractusx.irs.edc.client.exceptions.EdcClientException; +import org.eclipse.tractusx.irs.registryclient.decentral.EdcRetrieverException; import org.junit.jupiter.api.Test; -import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; class DefaultConfigurationTest { @@ -74,12 +77,27 @@ void edcSubmodelFacade() { @Test void endpointDataForConnectorsService() throws EdcClientException { + + // ARRANGE final var mock = mock(EdcSubmodelFacade.class); + final var endpointAddress = "endpointaddress"; + final var endpointDataReference = EndpointDataReference.Builder.newInstance().endpoint(endpointAddress).build(); + when(mock.getEndpointReferenceForAsset(eq(endpointAddress), any(), any())).thenReturn(endpointDataReference); + // ACT final var endpointDataForConnectorsService = testee.endpointDataForConnectorsService(mock); - endpointDataForConnectorsService.findEndpointDataForConnectors(List.of("test")); - verify(mock).getEndpointReferenceForAsset(any(), any(), any()); + endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures(List.of(endpointAddress)) // + .forEach(future -> { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + // ASSERT + verify(mock).getEndpointReferenceForAsset(eq(endpointAddress), any(), any()); } @Test @@ -89,9 +107,10 @@ void endpointDataForConnectorsService_withException() throws EdcClientException final var endpointDataForConnectorsService = testee.endpointDataForConnectorsService(mock); final var dummyEndpoints = List.of("test"); - assertThatThrownBy( - () -> endpointDataForConnectorsService.findEndpointDataForConnectors(dummyEndpoints)).isInstanceOf( - RestClientException.class); - + endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures(dummyEndpoints).forEach(future -> { + assertThatThrownBy(future::get).isInstanceOf(ExecutionException.class) + .extracting(Throwable::getCause) + .isInstanceOf(EdcRetrieverException.class); + }); } } \ No newline at end of file diff --git a/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryServiceTest.java b/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryServiceTest.java index 5df9a62db4..5d182a3f38 100644 --- a/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryServiceTest.java +++ b/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryServiceTest.java @@ -23,192 +23,251 @@ ********************************************************************************/ package org.eclipse.tractusx.irs.registryclient.decentral; +import static java.util.Collections.emptyList; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.nio.charset.StandardCharsets; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Base64; -import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.function.Function; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; -import org.assertj.core.api.Assertions; +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.tractusx.irs.common.util.concurrent.ResultFinder; import org.eclipse.tractusx.irs.component.assetadministrationshell.AssetAdministrationShellDescriptor; import org.eclipse.tractusx.irs.component.assetadministrationshell.IdentifierKeyValuePair; import org.eclipse.tractusx.irs.component.assetadministrationshell.SubmodelDescriptor; -import org.eclipse.tractusx.irs.data.StringMapper; -import org.eclipse.tractusx.irs.edc.client.model.EDRAuthCode; import org.eclipse.tractusx.irs.registryclient.DigitalTwinRegistryKey; import org.eclipse.tractusx.irs.registryclient.discovery.ConnectorEndpointsService; import org.eclipse.tractusx.irs.registryclient.exceptions.RegistryServiceException; +import org.eclipse.tractusx.irs.registryclient.exceptions.ShellNotFoundException; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.Mockito; +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) class DecentralDigitalTwinRegistryServiceTest { - private final ConnectorEndpointsService connectorEndpointsService = Mockito.mock(ConnectorEndpointsService.class); - private final EndpointDataForConnectorsService endpointDataForConnectorsService = Mockito.mock( + private final ConnectorEndpointsService connectorEndpointsService = mock(ConnectorEndpointsService.class); + private final EndpointDataForConnectorsService endpointDataForConnectorsService = mock( EndpointDataForConnectorsService.class); - private final DecentralDigitalTwinRegistryClient decentralDigitalTwinRegistryClient = Mockito.mock( + private final DecentralDigitalTwinRegistryClient decentralDigitalTwinRegistryClient = mock( DecentralDigitalTwinRegistryClient.class); - private final DecentralDigitalTwinRegistryService decentralDigitalTwinRegistryService = new DecentralDigitalTwinRegistryService( + private final DecentralDigitalTwinRegistryService sut = new DecentralDigitalTwinRegistryService( connectorEndpointsService, endpointDataForConnectorsService, decentralDigitalTwinRegistryClient); - private static String createAuthCode(final Function expirationModifier) { - final var serializedEdrAuthCode = StringMapper.mapToString( - EDRAuthCode.builder().exp(expirationModifier.apply(Instant.now()).getEpochSecond()).build()); - final var bytes = serializedEdrAuthCode.getBytes(StandardCharsets.UTF_8); - return Base64.getUrlEncoder().encodeToString(bytes); - } - public static AssetAdministrationShellDescriptor shellDescriptor( final List submodelDescriptors) { + + final var specificAssetIds = List.of( + IdentifierKeyValuePair.builder().name("ManufacturerId").value("BPNL00000003AYRE").build()); + return AssetAdministrationShellDescriptor.builder() - .specificAssetIds(List.of(IdentifierKeyValuePair.builder() - .name("ManufacturerId") - .value("BPNL00000003AYRE") - .build())) + .specificAssetIds(specificAssetIds) .submodelDescriptors(submodelDescriptors) .build(); } - @Test - void shouldReturnExpectedShell() throws RegistryServiceException { - // given - final DigitalTwinRegistryKey digitalTwinRegistryKey = new DigitalTwinRegistryKey( - "urn:uuid:4132cd2b-cbe7-4881-a6b4-39fdc31cca2b", "bpn"); - final AssetAdministrationShellDescriptor expectedShell = shellDescriptor(Collections.emptyList()); - EndpointDataReference endpointDataReference = EndpointDataReference.Builder.newInstance() - .endpoint("url.to.host") - .build(); - final LookupShellsResponse lookupShellsResponse = LookupShellsResponse.builder() - .result(Collections.emptyList()) - .build(); - when(connectorEndpointsService.fetchConnectorEndpoints(any())).thenReturn(List.of("address")); - when(endpointDataForConnectorsService.findEndpointDataForConnectors(ArgumentMatchers.anyList())).thenReturn( - endpointDataReference); - when(decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(any(), - ArgumentMatchers.anyList())).thenReturn(lookupShellsResponse); - when(decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(any(), any())).thenReturn( - expectedShell); - - // when - final Collection actualShell = decentralDigitalTwinRegistryService.fetchShells( - List.of(digitalTwinRegistryKey)); - - // then - Assertions.assertThat(actualShell).containsExactly(expectedShell); + @Nested + @DisplayName("fetchShells") + class FetchShellsTests { + + @Test + void should_return_expected_shell() throws RegistryServiceException { + // given + final var digitalTwinRegistryKey = new DigitalTwinRegistryKey( + "urn:uuid:4132cd2b-cbe7-4881-a6b4-39fdc31cca2b", "bpn"); + final var expectedShell = shellDescriptor(emptyList()); + final var endpointDataReference = endpointDataReference("url.to.host"); + final var lookupShellsResponse = LookupShellsResponse.builder().result(emptyList()).build(); + + when(connectorEndpointsService.fetchConnectorEndpoints(any())).thenReturn(List.of("address")); + + final var endpointDataRefFutures = List.of(completedFuture(endpointDataReference)); + when(endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures(anyList())).thenReturn( + endpointDataRefFutures); + + when(decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(any(), + anyList())).thenReturn(lookupShellsResponse); + when(decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(any(), any())).thenReturn( + expectedShell); + + // when + final var actualShell = sut.fetchShells(List.of(digitalTwinRegistryKey)); + + // then + assertThat(actualShell).containsExactly(expectedShell); + } + + @Test + void when_InterruptedException_occurs() throws ExecutionException, InterruptedException { + + // given + simulateResultFinderInterrupted(); + + final var lookupShellsResponse = LookupShellsResponse.builder().result(emptyList()).build(); + + final List connectorEndpoints = List.of("address1", "address2"); + when(connectorEndpointsService.fetchConnectorEndpoints(any())).thenReturn(connectorEndpoints); + + final var dataRefFutures = List.of( // + completedFuture(endpointDataReference("url.to.host1")), // + completedFuture(endpointDataReference("url.to.host2"))); + when(endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures( + connectorEndpoints)).thenReturn(dataRefFutures); + + when(decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(any(), + anyList())).thenReturn(lookupShellsResponse); + when(decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(any(), any())).thenReturn( + shellDescriptor(emptyList())); + + // when + final ThrowingCallable call = () -> sut.fetchShells( + List.of(new DigitalTwinRegistryKey("dummyShellId", "dummyBpn"))); + + // then + assertThatThrownBy(call).isInstanceOf(ShellNotFoundException.class) + .hasMessage("Unable to find any of the requested shells") + .satisfies(e -> assertThat( + ((ShellNotFoundException) e).getCalledEndpoints()).containsExactlyInAnyOrder( + "address1", "address2")); + } + + @Test + void when_ExecutionException_occurs() { + + // given + simulateGetFastestResultFailedFuture(); + + final var lookupShellsResponse = LookupShellsResponse.builder().result(emptyList()).build(); + + final List connectorEndpoints = List.of("address"); + when(connectorEndpointsService.fetchConnectorEndpoints(any())).thenReturn(connectorEndpoints); + + final var dataRefFutures = List.of(completedFuture(endpointDataReference("url.to.host"))); + when(endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures( + connectorEndpoints)).thenReturn(dataRefFutures); + + when(decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(any(), + anyList())).thenReturn(lookupShellsResponse); + when(decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(any(), any())).thenReturn( + shellDescriptor(emptyList())); + + // when + final var bpn = "dummyBpn"; + final ThrowingCallable call = () -> sut.fetchShells( + List.of(new DigitalTwinRegistryKey("dummyShellId", bpn))); + + // then + assertThatThrownBy(call).isInstanceOf(ShellNotFoundException.class) + .hasMessageContaining("Unable to find any of the requested shells"); + + } + + @Test + void should_throw_ShellNotFoundException_if_no_digital_twin_registry_keys_given() { + assertThatThrownBy(() -> sut.fetchShells(emptyList())).isInstanceOf(ShellNotFoundException.class); + } + + } + + private void simulateGetFastestResultFailedFuture() { + final ResultFinder resultFinderMock = mock(ResultFinder.class); + when(resultFinderMock.getFastestResult(any())).thenReturn( + CompletableFuture.failedFuture(new IllegalStateException("some illegal state"))); + sut.setResultFinder(resultFinderMock); + } + + private void simulateResultFinderInterrupted() throws InterruptedException, ExecutionException { + final ResultFinder resultFinderMock = mock(ResultFinder.class); + final CompletableFuture completableFutureMock = mock(CompletableFuture.class); + when(completableFutureMock.get()).thenThrow(new InterruptedException("interrupted")); + when(resultFinderMock.getFastestResult(any())).thenReturn(completableFutureMock); + sut.setResultFinder(resultFinderMock); } - @Test - void shouldRenewEndpointDataReferenceForMultipleAssets() throws RegistryServiceException { - // given - final DigitalTwinRegistryKey digitalTwinRegistryKey = new DigitalTwinRegistryKey( - "urn:uuid:4132cd2b-cbe7-4881-a6b4-39fdc31cca2b", "bpn"); - final AssetAdministrationShellDescriptor expectedShell = shellDescriptor(Collections.emptyList()); - final var authCode = "test." + createAuthCode(exp -> exp.minus(1, ChronoUnit.DAYS)); - EndpointDataReference endpointDataReference = EndpointDataReference.Builder.newInstance() - .endpoint("url.to.host") - .authKey("test") - .authCode(authCode) - .build(); - EndpointDataReference renewedReference = EndpointDataReference.Builder.newInstance() - .endpoint("url.to.host") - .build(); - final LookupShellsResponse lookupShellsResponse = LookupShellsResponse.builder() - .result(Collections.emptyList()) - .build(); - when(connectorEndpointsService.fetchConnectorEndpoints(any())).thenReturn(List.of("address")); - when(endpointDataForConnectorsService.findEndpointDataForConnectors(ArgumentMatchers.anyList())).thenReturn( - endpointDataReference, renewedReference); - when(decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(any(), - ArgumentMatchers.anyList())).thenReturn(lookupShellsResponse); - when(decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(any(), any())).thenReturn( - expectedShell); - - // when - final Collection actualShell = decentralDigitalTwinRegistryService.fetchShells( - List.of(digitalTwinRegistryKey, digitalTwinRegistryKey)); - - // then - Assertions.assertThat(actualShell).containsExactly(expectedShell, expectedShell); - - verify(endpointDataForConnectorsService, times(2)).findEndpointDataForConnectors(anyList()); + private static EndpointDataReference endpointDataReference(final String url) { + return endpointDataReferenceBuilder().endpoint(url).build(); } - @Test - void shouldNotRenewEndpointDataReferenceForMultipleAssets() throws RegistryServiceException { - // given - final DigitalTwinRegistryKey digitalTwinRegistryKey = new DigitalTwinRegistryKey( - "urn:uuid:4132cd2b-cbe7-4881-a6b4-39fdc31cca2b", "bpn"); - final AssetAdministrationShellDescriptor expectedShell = shellDescriptor(Collections.emptyList()); - final var authCode = "test." + createAuthCode(exp -> exp.plus(1, ChronoUnit.DAYS)); - EndpointDataReference endpointDataReference = EndpointDataReference.Builder.newInstance() - .endpoint("url.to.host") - .authKey("test") - .authCode(authCode) - .build(); - final LookupShellsResponse lookupShellsResponse = LookupShellsResponse.builder() - .result(Collections.emptyList()) - .build(); - when(connectorEndpointsService.fetchConnectorEndpoints(any())).thenReturn(List.of("address")); - when(endpointDataForConnectorsService.findEndpointDataForConnectors(ArgumentMatchers.anyList())).thenReturn( - endpointDataReference); - when(decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(any(), - ArgumentMatchers.anyList())).thenReturn(lookupShellsResponse); - when(decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(any(), any())).thenReturn( - expectedShell); - - // when - final Collection actualShell = decentralDigitalTwinRegistryService.fetchShells( - List.of(digitalTwinRegistryKey, digitalTwinRegistryKey, digitalTwinRegistryKey)); - - // then - Assertions.assertThat(actualShell).containsExactly(expectedShell, expectedShell, expectedShell); - - verify(endpointDataForConnectorsService, times(1)).findEndpointDataForConnectors(anyList()); + @Nested + @DisplayName("lookupGlobalAssetIds") + class LookupGlobalAssetIdsTests { + + @Test + void should_return_the_expected_globalAssetId() throws RegistryServiceException { + // given + final var digitalTwinRegistryKey = new DigitalTwinRegistryKey( + "urn:uuid:4132cd2b-cbe7-4881-a6b4-39fdc31cca2b", "bpn"); + + final var expectedGlobalAssetId = "urn:uuid:4132cd2b-cbe7-4881-a6b4-aaaaaaaaaaaa"; + final var expectedShell = shellDescriptor(emptyList()).toBuilder() + .globalAssetId(expectedGlobalAssetId) + .build(); + final var dataRefFutures = List.of(completedFuture(endpointDataReference("url.to.host"))); + final var lookupShellsResponse = LookupShellsResponse.builder() + .result(List.of(digitalTwinRegistryKey.shellId())) + .build(); + when(connectorEndpointsService.fetchConnectorEndpoints(any())).thenReturn(List.of("address")); + when(endpointDataForConnectorsService.createFindEndpointDataForConnectorsFutures(anyList())).thenReturn( + dataRefFutures); + when(decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(any(), + anyList())).thenReturn(lookupShellsResponse); + when(decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(any(), any())).thenReturn( + expectedShell); + + // when + final var assetAdministrationShellDescriptors = sut.lookupShellsByBPN(digitalTwinRegistryKey.bpn()); + + String actualGlobalAssetId = assetAdministrationShellDescriptors.stream() + .findFirst() + .map(AssetAdministrationShellDescriptor::getGlobalAssetId) + .get();// then + assertThat(actualGlobalAssetId).isEqualTo(expectedGlobalAssetId); + } + + @Test + void when_InterruptedException_occurs() throws ExecutionException, InterruptedException { + // given + simulateResultFinderInterrupted(); + + // when + final ThrowingCallable call = () -> sut.lookupShellsByBPN("dummyBpn"); + + // then + assertThatThrownBy(call).isInstanceOf(RegistryServiceException.class) + .hasMessageContaining( + "InterruptedException occurred while looking up shell ids for bpn") + .hasMessageContaining("dummyBpn"); + } + + @Test + void when_ExecutionException_occurs() { + // given + simulateGetFastestResultFailedFuture(); + + // when + final var bpn = "dummyBpn"; + final ThrowingCallable call = () -> sut.lookupShellsByBPN(bpn); + + // then + assertThatThrownBy(call).isInstanceOf(RegistryServiceException.class) + .hasMessageContaining("Exception occurred while looking up shell ids for bpn") + .hasMessageContaining("'" + bpn + "'"); + } } - @Test - void shouldReturnExpectedGlobalAssetId() throws RegistryServiceException { - // given - final DigitalTwinRegistryKey digitalTwinRegistryKey = new DigitalTwinRegistryKey( - "urn:uuid:4132cd2b-cbe7-4881-a6b4-39fdc31cca2b", "bpn"); - - final String expectedGlobalAssetId = "urn:uuid:4132cd2b-cbe7-4881-a6b4-aaaaaaaaaaaa"; - final var expectedShell = shellDescriptor(Collections.emptyList()).toBuilder() - .globalAssetId(expectedGlobalAssetId) - .build(); - final var endpointDataReference = EndpointDataReference.Builder.newInstance().endpoint("url.to.host").build(); - final LookupShellsResponse lookupShellsResponse = LookupShellsResponse.builder() - .result(List.of( - digitalTwinRegistryKey.shellId())) - .build(); - when(connectorEndpointsService.fetchConnectorEndpoints(any())).thenReturn(List.of("address")); - when(endpointDataForConnectorsService.findEndpointDataForConnectors(ArgumentMatchers.anyList())).thenReturn( - endpointDataReference); - when(decentralDigitalTwinRegistryClient.getAllAssetAdministrationShellIdsByAssetLink(any(), - ArgumentMatchers.anyList())).thenReturn(lookupShellsResponse); - when(decentralDigitalTwinRegistryClient.getAssetAdministrationShellDescriptor(any(), any())).thenReturn( - expectedShell); - - // when - final Collection assetAdministrationShellDescriptors = decentralDigitalTwinRegistryService.lookupShellsByBPN( - digitalTwinRegistryKey.bpn()); - - String actualGlobalAssetId = assetAdministrationShellDescriptors.stream().findFirst().map(AssetAdministrationShellDescriptor::getGlobalAssetId).get(); - // then - Assertions.assertThat(actualGlobalAssetId).isEqualTo(expectedGlobalAssetId); + private static EndpointDataReference.Builder endpointDataReferenceBuilder() { + return EndpointDataReference.Builder.newInstance(); } } diff --git a/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryServiceWiremockTest.java b/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryServiceWiremockTest.java index 7c6bee5b80..5347da5449 100644 --- a/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryServiceWiremockTest.java +++ b/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/DecentralDigitalTwinRegistryServiceWiremockTest.java @@ -63,9 +63,9 @@ import org.eclipse.tractusx.irs.registryclient.discovery.ConnectorEndpointsService; import org.eclipse.tractusx.irs.registryclient.discovery.DiscoveryFinderClientImpl; import org.eclipse.tractusx.irs.registryclient.exceptions.RegistryServiceException; +import org.eclipse.tractusx.irs.registryclient.exceptions.ShellNotFoundException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.RestTemplate; @WireMockTest @@ -116,7 +116,7 @@ void shouldDiscoverEDCAndRequestRegistry() throws RegistryServiceException { } @Test - void shouldThrowHttpClientExceptionInCaseOfDiscoveryError() { + void shouldThrowInCaseOfDiscoveryError() { // Arrange givenThat(postDiscoveryFinder404()); final List testId = List.of(new DigitalTwinRegistryKey("testId", TEST_BPN)); @@ -124,27 +124,26 @@ void shouldThrowHttpClientExceptionInCaseOfDiscoveryError() { // Act & Assert // TODO (#405) fix implementation to not throw HttpClientErrorException$NotFound assertThatThrownBy(() -> decentralDigitalTwinRegistryService.fetchShells(testId)).isInstanceOf( - HttpClientErrorException.class); + ShellNotFoundException.class); verify(exactly(1), postRequestedFor(urlPathEqualTo(DISCOVERY_FINDER_PATH))); } @Test - void shouldThrowHttpClientExceptionInCaseOfEdcDiscoveryError() { + void shouldThrowInCaseOfEdcDiscoveryError() { // Arrange givenThat(postDiscoveryFinder200()); givenThat(postEdcDiscovery404()); final List testId = List.of(new DigitalTwinRegistryKey("testId", TEST_BPN)); // Act & Assert - // TODO (#405) fix implementation to not throw HttpClientErrorException$NotFound assertThatThrownBy(() -> decentralDigitalTwinRegistryService.fetchShells(testId)).isInstanceOf( - HttpClientErrorException.class); + ShellNotFoundException.class); verify(exactly(1), postRequestedFor(urlPathEqualTo(DISCOVERY_FINDER_PATH))); verify(exactly(1), postRequestedFor(urlPathEqualTo(EDC_DISCOVERY_PATH))); } @Test - void shouldThrowHttpClientExceptionInCaseOfLookupShellsError() { + void shouldThrowInCaseOfLookupShellsError() { // Arrange givenThat(postDiscoveryFinder200()); givenThat(postEdcDiscovery200()); @@ -152,16 +151,15 @@ void shouldThrowHttpClientExceptionInCaseOfLookupShellsError() { final List testId = List.of(new DigitalTwinRegistryKey("testId", TEST_BPN)); // Act & Assert - // TODO (#405) fix implementation to not throw HttpClientErrorException$NotFound assertThatThrownBy(() -> decentralDigitalTwinRegistryService.fetchShells(testId)).isInstanceOf( - HttpClientErrorException.class); + ShellNotFoundException.class); verify(exactly(1), postRequestedFor(urlPathEqualTo(DISCOVERY_FINDER_PATH))); verify(exactly(1), postRequestedFor(urlPathEqualTo(EDC_DISCOVERY_PATH))); verify(exactly(1), getRequestedFor(urlPathEqualTo(LOOKUP_SHELLS_PATH))); } @Test - void shouldThrowHttpClientExceptionInCaseOfShellDescriptorsError() { + void shouldThrowInCaseOfShellDescriptorsError() { // Arrange givenThat(postDiscoveryFinder200()); givenThat(postEdcDiscovery200()); @@ -170,9 +168,8 @@ void shouldThrowHttpClientExceptionInCaseOfShellDescriptorsError() { final List testId = List.of(new DigitalTwinRegistryKey("testId", TEST_BPN)); // Act & Assert - // TODO (#405) fix implementation to not throw HttpClientErrorException$NotFound assertThatThrownBy(() -> decentralDigitalTwinRegistryService.fetchShells(testId)).isInstanceOf( - HttpClientErrorException.class); + ShellNotFoundException.class); verify(exactly(1), postRequestedFor(urlPathEqualTo(DISCOVERY_FINDER_PATH))); verify(exactly(1), postRequestedFor(urlPathEqualTo(EDC_DISCOVERY_PATH))); verify(exactly(1), getRequestedFor(urlPathEqualTo(LOOKUP_SHELLS_PATH))); @@ -189,9 +186,8 @@ void shouldThrowExceptionOnEmptyShells() { final List testId = List.of(new DigitalTwinRegistryKey("testId", TEST_BPN)); // Act & Assert - // TODO (#405) fix implementation to not throw HttpClientErrorException$NotFound assertThatThrownBy(() -> decentralDigitalTwinRegistryService.fetchShells(testId)).isInstanceOf( - HttpClientErrorException.class); + ShellNotFoundException.class); verify(exactly(1), postRequestedFor(urlPathEqualTo(DISCOVERY_FINDER_PATH))); verify(exactly(1), postRequestedFor(urlPathEqualTo(EDC_DISCOVERY_PATH))); verify(exactly(1), getRequestedFor(urlPathEqualTo(LOOKUP_SHELLS_PATH))); diff --git a/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsServiceTest.java b/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsServiceTest.java index 288aaa67f0..085b015d8b 100644 --- a/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsServiceTest.java +++ b/irs-registry-client/src/test/java/org/eclipse/tractusx/irs/registryclient/decentral/EndpointDataForConnectorsServiceTest.java @@ -24,19 +24,21 @@ package org.eclipse.tractusx.irs.registryclient.decentral; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.eclipse.tractusx.irs.edc.client.exceptions.EdcClientException; import org.junit.jupiter.api.Test; -import org.springframework.web.client.RestClientException; class EndpointDataForConnectorsServiceTest { @@ -46,58 +48,100 @@ class EndpointDataForConnectorsServiceTest { private static final String connectionOneAddress = "connectionOneAddress"; private static final String connectionTwoAddress = "connectionTwoAddress"; + private static final EndpointDataReference CONNECTION_ONE_DATA_REF = // + EndpointDataReference.Builder.newInstance().endpoint(connectionOneAddress).build(); + + private static final EndpointDataReference CONNECTION_TWO_DATA_REF = // + EndpointDataReference.Builder.newInstance().endpoint(connectionTwoAddress).build(); + private final EdcEndpointReferenceRetriever edcSubmodelFacade = mock(EdcEndpointReferenceRetriever.class); - private final EndpointDataForConnectorsService endpointDataForConnectorsService = new EndpointDataForConnectorsService( - edcSubmodelFacade); + private final EndpointDataForConnectorsService sut = new EndpointDataForConnectorsService(edcSubmodelFacade); @Test void shouldReturnExpectedEndpointDataReference() throws EdcRetrieverException { - // given + + // GIVEN when(edcSubmodelFacade.getEndpointReferenceForAsset(connectionOneAddress, DT_REGISTRY_ASSET_TYPE, - DT_REGISTRY_ASSET_VALUE)).thenReturn( - EndpointDataReference.Builder.newInstance().endpoint(connectionOneAddress).build()); + DT_REGISTRY_ASSET_VALUE)).thenReturn(CONNECTION_ONE_DATA_REF); - // when - final EndpointDataReference endpointDataReference = endpointDataForConnectorsService.findEndpointDataForConnectors( + // WHEN + final List> endpointDataReferences = sut.createFindEndpointDataForConnectorsFutures( Collections.singletonList(connectionOneAddress)); - // then - assertThat(endpointDataReference).isNotNull(); - assertThat(endpointDataReference.getEndpoint()).isEqualTo(connectionOneAddress); + // THEN + assertThat(endpointDataReferences).isNotEmpty() + .extracting(CompletableFuture::get) + .isNotEmpty() + .extracting(EndpointDataReference::getEndpoint) + .contains(connectionOneAddress); } @Test void shouldReturnExpectedEndpointDataReferenceFromSecondConnectionEndpoint() throws EdcRetrieverException { - // given + + // GIVEN + + // a first endpoint failing (1) when(edcSubmodelFacade.getEndpointReferenceForAsset(connectionOneAddress, DT_REGISTRY_ASSET_TYPE, DT_REGISTRY_ASSET_VALUE)).thenThrow( new EdcRetrieverException(new EdcClientException("EdcClientException"))); - when(edcSubmodelFacade.getEndpointReferenceForAsset(connectionTwoAddress, DT_REGISTRY_ASSET_TYPE, - DT_REGISTRY_ASSET_VALUE)).thenReturn( - EndpointDataReference.Builder.newInstance().endpoint(connectionTwoAddress).build()); - // when - final EndpointDataReference endpointDataReference = endpointDataForConnectorsService.findEndpointDataForConnectors( - List.of(connectionOneAddress, connectionTwoAddress)); + // and a second endpoint returning successfully (2) + when(edcSubmodelFacade.getEndpointReferenceForAsset(connectionTwoAddress, DT_REGISTRY_ASSET_TYPE, + DT_REGISTRY_ASSET_VALUE)).thenReturn(CONNECTION_TWO_DATA_REF); + + // WHEN + final List> dataRefFutures = // + sut.createFindEndpointDataForConnectorsFutures(List.of(connectionOneAddress, // (1) + connectionTwoAddress // (2) + )); + + // THEN + final List dataReferences = // + dataRefFutures.stream() + .map(EndpointDataForConnectorsServiceTest::executeFutureMappingErrorsToNull) + .filter(Objects::nonNull) + .toList(); + + assertThat(dataReferences).isNotEmpty() // + .extracting(EndpointDataReference::getEndpoint) // + .contains(connectionTwoAddress); + } - // then - assertThat(endpointDataReference).isNotNull(); - assertThat(endpointDataReference.getEndpoint()).isEqualTo(connectionTwoAddress); + private static EndpointDataReference executeFutureMappingErrorsToNull( + final CompletableFuture future) { + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + // ignore + return null; + } } @Test void shouldThrowExceptionWhenConnectorEndpointsNotReachable() throws EdcRetrieverException { - // given + + // GIVEN when(edcSubmodelFacade.getEndpointReferenceForAsset(anyString(), eq(DT_REGISTRY_ASSET_TYPE), eq(DT_REGISTRY_ASSET_VALUE))).thenThrow( new EdcRetrieverException(new EdcClientException("EdcClientException"))); - final List connectorEndpoints = List.of(connectionOneAddress, connectionTwoAddress); - // when + then - assertThatThrownBy( - () -> endpointDataForConnectorsService.findEndpointDataForConnectors(connectorEndpoints)).isInstanceOf( - RestClientException.class).hasMessageContainingAll(connectionOneAddress, connectionTwoAddress); + // WHEN + final var exceptions = new ArrayList<>(); + + // THEN + final List connectorEndpoints = List.of(connectionOneAddress, connectionTwoAddress); + sut.createFindEndpointDataForConnectorsFutures(connectorEndpoints) // + .forEach(future -> { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + exceptions.add(e); + } + }); + + assertThat(exceptions).hasSize(connectorEndpoints.size()); } }