Skip to content

Commit

Permalink
Merge pull request #184 from catenax-ng/cx_24.05_pcf_refactor_changes
Browse files Browse the repository at this point in the history
feat | PCF exchange code refactor and bpdm service api call changes.
  • Loading branch information
almadigabor authored May 24, 2024
2 parents 00b2e34 + 8ed79d0 commit 926df3b
Show file tree
Hide file tree
Showing 29 changed files with 638 additions and 145 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Correction in semanticId changes in pcf.
- EDC code changes refactor.
- PCF exchange code changes refactor.
- New changes for BPDM service api access/call using EDC.

### Fixed
- Fix for PCF data sovereignty test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.springframework.data.jpa.repository.JpaRepository;

public interface PcfRequestRepository extends JpaRepository<PcfRequestEntity, String> {


List<PcfRequestEntity> findByProductId(String productId);

Optional<PcfRequestEntity> findByRequestIdAndProductIdAndBpnNumber(String requestId, String productId,
String bpnNumber);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void pushPCFDataForApproveRequest(List<JsonObject> originalObjectList, Po
// push api call
Runnable runnable = () -> proxyRequestInterface.sendNotificationToConsumer(status,
calculatedPCFValue, request.getProductId(), request.getBpnNumber(),
request.getRequestId(), request.getMessage());
request.getRequestId(), request.getMessage(), false);

new Thread(runnable).start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public PcfRequestEntity savePcfStatus(String requestId, PCFRequestStatusEnum sta
return savePcfStatus(requestId, status, null);
}

@SneakyThrows
public List<PcfRequestEntity> findByProductId(String productId) {
return pcfRequestRepository.findByProductId(productId);
}

@SneakyThrows
public PcfRequestEntity savePcfStatus(String requestId, PCFRequestStatusEnum status, String remark) {

Expand Down Expand Up @@ -150,5 +155,4 @@ public PagingResponse getPcfData(List<PCFRequestStatusEnum> status, PCFTypeEnum
return PagingResponse.builder().items(requestList).pageSize(result.getSize()).page(result.getNumber())
.totalItems(result.getTotalElements()).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.UUID;

import org.apache.commons.lang3.StringUtils;
import org.eclipse.tractusx.sde.common.configuration.properties.PCFAssetStaticPropertyHolder;
import org.eclipse.tractusx.sde.common.entities.PolicyModel;
import org.eclipse.tractusx.sde.common.exception.NoDataFoundException;
import org.eclipse.tractusx.sde.common.exception.ServiceException;
Expand All @@ -35,6 +36,7 @@
import org.eclipse.tractusx.sde.edc.model.request.ConsumerRequest;
import org.eclipse.tractusx.sde.edc.model.response.QueryDataOfferModel;
import org.eclipse.tractusx.sde.edc.util.EDCAssetUrlCacheService;
import org.eclipse.tractusx.sde.pcfexchange.entity.PcfRequestEntity;
import org.eclipse.tractusx.sde.pcfexchange.entity.PcfResponseEntity;
import org.eclipse.tractusx.sde.pcfexchange.enums.PCFRequestStatusEnum;
import org.eclipse.tractusx.sde.pcfexchange.enums.PCFTypeEnum;
Expand All @@ -60,11 +62,13 @@ public class PcfExchangeServiceImpl implements IPCFExchangeService {
private final PCFRepositoryService pcfRepositoryService;
private final PcfReqsponseRepository pcfReqsponseRepository;
private final EDCAssetUrlCacheService edcAssetUrlCacheService;

@Qualifier("DatabaseUsecaseHandler")
private final DatabaseUsecaseStep databaseUsecaseStep;

private final ProxyRequestInterface proxyRequestInterface;

private final PCFAssetStaticPropertyHolder pcfAssetStaticPropertyHolder;

@SneakyThrows
@Override
Expand Down Expand Up @@ -99,7 +103,7 @@ public String requestForPcfDataExistingOffer(String productId, ConsumerRequest c

@Override
public Object requestForPcfNotExistDataOffer(PcfRequestModel pcfRequestModel) {

StringBuilder sb = new StringBuilder();
String requestId = UUID.randomUUID().toString();
List<QueryDataOfferModel> pcfExchangeUrlOffers = null;
Expand All @@ -109,8 +113,7 @@ public Object requestForPcfNotExistDataOffer(PcfRequestModel pcfRequestModel) {
PCFRequestStatusEnum.SENDING_REQUEST, "");

// 1 fetch EDC connectors and DTR Assets from EDC connectors
pcfExchangeUrlOffers = edcAssetUrlCacheService
.getPCFExchangeUrlFromTwin(pcfRequestModel.getBpnNumber());
pcfExchangeUrlOffers = edcAssetUrlCacheService.getPCFExchangeUrlFromTwin(pcfRequestModel.getBpnNumber());

// 2 request for PCF value for non existing sub model and send notification to
// call provider for data
Expand Down Expand Up @@ -142,16 +145,17 @@ public String actionOnPcfRequestAndSendNotificationToConsumer(PcfRequestModel pc
String remark = "";
try {

JsonObject calculatedPCFValue = databaseUsecaseStep.readCreatedTwinsBySpecifyColomn(
"urn:bamm:io.catenax.pcf", pcfRequestModel.getProductId()).get("json").getAsJsonObject();

JsonObject calculatedPCFValue = databaseUsecaseStep
.readCreatedTwinsBySpecifyColomn(pcfAssetStaticPropertyHolder.getSematicIdPart(), pcfRequestModel.getProductId())
.get("json").getAsJsonObject();

PCFRequestStatusEnum status = pcfRepositoryService.identifyRunningStatus(pcfRequestModel.getRequestId(),
pcfRequestModel.getStatus());

// push api call
Runnable runnable = () -> proxyRequestInterface.sendNotificationToConsumer(status, calculatedPCFValue,
pcfRequestModel.getProductId(), pcfRequestModel.getBpnNumber(), pcfRequestModel.getRequestId(),
pcfRequestModel.getMessage());
pcfRequestModel.getMessage(), true);

new Thread(runnable).start();

Expand All @@ -178,7 +182,7 @@ public PcfRequestModel savePcfRequestData(String requestId, String productId, St
PCFRequestStatusEnum status = PCFRequestStatusEnum.REQUESTED;
String remark = "";
try {
databaseUsecaseStep.readCreatedTwinsBySpecifyColomn("urn:bamm:io.catenax.pcf", productId);
databaseUsecaseStep.readCreatedTwinsBySpecifyColomn(pcfAssetStaticPropertyHolder.getSematicIdPart(), productId);
} catch (NoDataFoundException e) {
String msg = "The PCF calculated value does not exist in system, please upload PCF value for '" + productId
+ "' in systems using Manual/Recurring Upload";
Expand All @@ -197,28 +201,46 @@ public void recievedPCFData(String productId, String bpnNumber, String requestId

PCFRequestStatusEnum status = null;

PcfResponseEntity entity = PcfResponseEntity.builder()
.pcfData(pcfData)
.requestId(requestId)
.message(message)
.responseId(UUID.randomUUID().toString())
.lastUpdatedTime(Instant.now().getEpochSecond())
.build();

pcfReqsponseRepository.save(entity);

if(StringUtils.isBlank(requestId))
throw new ServiceException("RequestId not recieved from provider to marked PCF exchange request");

if (pcfData != null && !pcfData.isEmpty() && !pcfData.asText().equals("{}")) {
status = PCFRequestStatusEnum.RECEIVED;
} else if (StringUtils.isNotBlank(requestId)) {
status = PCFRequestStatusEnum.REJECTED;
} else
status = PCFRequestStatusEnum.FAILED;

pcfRepositoryService.savePcfStatus(requestId, status);

if (StringUtils.isNotBlank(requestId)) {

PcfResponseEntity entity = PcfResponseEntity.builder()
.pcfData(pcfData)
.requestId(requestId)
.message(message)
.responseId(UUID.randomUUID().toString())
.lastUpdatedTime(Instant.now().getEpochSecond())
.build();

pcfReqsponseRepository.save(entity);

pcfRepositoryService.savePcfStatus(requestId, status);
}
else {
List<PcfRequestEntity> findByProductId = pcfRepositoryService.findByProductId(productId);

for (PcfRequestEntity fentity : findByProductId) {

PcfResponseEntity entity = PcfResponseEntity.builder()
.pcfData(pcfData)
.requestId(fentity.getRequestId())
.message(message)
.responseId(UUID.randomUUID().toString())
.lastUpdatedTime(Instant.now().getEpochSecond())
.build();

pcfReqsponseRepository.save(entity);

pcfRepositoryService.savePcfStatus(fentity.getRequestId(), status);
}

}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public class ProxyRequestInterface {
public void requestToProviderForPCFValue(String productId, StringBuilder reponseMap, String requestId, String message,
QueryDataOfferModel dataset, boolean isRequestToNonexistingTwin) {
try {
String connectorOfferUrl = dataset.getConnectorOfferUrl();
String pcfProductPath = "";
if (connectorOfferUrl.contains("@")) {
String[] split = connectorOfferUrl.split("@");
if (split.length > 1) {
dataset.setConnectorOfferUrl(split[0]);
pcfProductPath = split[1];
}
}

EDRCachedByIdResponse edrToken = edcAssetUrlCacheService.verifyAndGetToken(dataset.getConnectorId(),
dataset);

Expand All @@ -73,7 +83,7 @@ public void requestToProviderForPCFValue(String productId, StringBuilder reponse
pcfpushEnpoint = new URI(
edrToken.getEndpoint() + SLASH_DELIMETER + PRODUCT_IDS + SLASH_DELIMETER + productId);
else
pcfpushEnpoint = new URI(edrToken.getEndpoint());
pcfpushEnpoint = new URI(edrToken.getEndpoint() + pcfProductPath);

Map<String, String> header = new HashMap<>();
header.put("authorization", edrToken.getAuthorization());
Expand All @@ -100,7 +110,7 @@ public void requestToProviderForPCFValue(String productId, StringBuilder reponse

@SneakyThrows
public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject calculatedPCFValue,
String productId, String bpnNumber, String requestId, String message) {
String productId, String bpnNumber, String requestId, String message, boolean isNeedToSendRequestIdtoConsumer) {

// 1 fetch EDC connectors and DTR Assets from EDC connectors
List<QueryDataOfferModel> pcfExchangeUrlOffers = edcAssetUrlCacheService.getPCFExchangeUrlFromTwin(bpnNumber);
Expand All @@ -114,9 +124,9 @@ public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject c
pcfExchangeUrlOffers.parallelStream().forEach(dtOffer -> {

if (PCFRequestStatusEnum.SENDING_REJECT_NOTIFICATION.equals(status)) {
sendNotification(null, productId, bpnNumber, requestId, dtOffer, status, message);
sendNotification(null, productId, bpnNumber, requestId, dtOffer, status, message, isNeedToSendRequestIdtoConsumer);
} else {
sendNotification(calculatedPCFValue, productId, bpnNumber, requestId, dtOffer, status, message);
sendNotification(calculatedPCFValue, productId, bpnNumber, requestId, dtOffer, status, message, isNeedToSendRequestIdtoConsumer);
}

});
Expand All @@ -125,7 +135,7 @@ public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject c

@SneakyThrows
private void sendNotification(JsonObject calculatedPCFValue, String productId, String bpnNumber, String requestId,
QueryDataOfferModel dtOffer, PCFRequestStatusEnum status, String message) {
QueryDataOfferModel dtOffer, PCFRequestStatusEnum status, String message, boolean isNeedToSendRequestIdtoConsumer) {
String sendNotificationStatus = "";
try {
EDRCachedByIdResponse edrToken = edcAssetUrlCacheService.verifyAndGetToken(bpnNumber, dtOffer);
Expand All @@ -139,7 +149,11 @@ private void sendNotification(JsonObject calculatedPCFValue, String productId, S
header.put("authorization", edrToken.getAuthorization());
header.put("Edc-Bpn", bpnNumber);

pcfExchangeProxy.uploadPcfSubmodel(pcfpushEnpoint, header, requestId, message,
String sendRequestId = requestId;
if (!isNeedToSendRequestIdtoConsumer)
sendRequestId = "";

pcfExchangeProxy.uploadPcfSubmodel(pcfpushEnpoint, header, sendRequestId, message,
jsonObjectMapper.gsonObjectToJsonNode(calculatedPCFValue));

sendNotificationStatus = "SUCCESS";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/********************************************************************************
* Copyright (c) 2024 T-Systems International GmbH
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
package org.eclipse.tractusx.sde.common.configuration.properties;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Component
@Configuration
public class PCFAssetStaticPropertyHolder {

@Value(value = "${digital-twin.pcf.sematicid:}")
public String sematicId;

@Value(value = "${digital-twin.pcf.sematicid:}")
public String sematicIdPart;

private String pcfExchangeAssetId;

public String getSematicId() {
if (StringUtils.isAllBlank(sematicId))
sematicId = "urn:samm:io.catenax.pcf:6.0.0#Pcf";
return sematicId;
}

public void setSematicId(String sematicId) {
this.sematicId = sematicId;
}

public String getSematicIdPart() {
if (StringUtils.isAllBlank(sematicIdPart))
sematicIdPart = "urn:samm:io.catenax.pcf";
return sematicIdPart;
}

public void setSematicIdPart(String sematicIdPart) {
this.sematicIdPart = sematicIdPart;
}

public String getPcfExchangeAssetId() {
return pcfExchangeAssetId;
}

public void setPcfExchangeAssetId(String pcfExchangeAssetId) {
this.pcfExchangeAssetId = pcfExchangeAssetId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ public String getDataPlaneUrlOfSubmodule() {
JsonElement jsonElement = this.submodelSchema.get("submodelDataPlaneUrl");
return jsonElement == null || jsonElement.isJsonNull() ? "" : jsonElement.getAsString();
}

public String getSematicIdReferenceOfSubmodule() {
JsonElement jsonElement = this.submodelSchema.get("sematicIdReference");
return jsonElement == null || jsonElement.isJsonNull() ? "" : jsonElement.getAsString();
}

public String getInterfaceNameOfSubmodule() {
JsonElement jsonElement = this.submodelSchema.get("interfaceName");
return jsonElement == null || jsonElement.isJsonNull() ? "" : jsonElement.getAsString();
}

public JsonObject getAddOnOfModel() {
return this.submodelSchema.get("addOn").getAsJsonObject();
Expand Down Expand Up @@ -111,7 +121,11 @@ public boolean checkAppendURNUUIDWithIdentifier() {
return jsonElement == null || jsonElement.isJsonNull() || jsonElement.getAsBoolean();
}


public boolean usePCFAssetIdAsDTSubprotocolBodyId() {
JsonElement jsonElement = this.getAddOnOfModel().get("usePCFAssetIdAsDTSubprotocolBodyId");
return !(jsonElement == null || jsonElement.isJsonNull()) || (jsonElement !=null && jsonElement.getAsBoolean());
}

public JsonObject checkIsRelationSubmodel() {
JsonElement jsonElement = this.getAddOnOfModel().get("isRelationSubmodel");
return jsonElement == null || jsonElement.isJsonNull() ? null : jsonElement.getAsJsonObject();
Expand Down
Loading

0 comments on commit 926df3b

Please sign in to comment.