Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat | PCF exchange code refactor and bpdm service api call changes. #184

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Correction in semanticId changes in pcf.
- EDC code changes refactor.
- PCF exchange code changes refactor.
- New changes for BPDM service api access/call using EDC.

### Fixed
- Fix for PCF data sovereignty test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.springframework.data.jpa.repository.JpaRepository;

public interface PcfRequestRepository extends JpaRepository<PcfRequestEntity, String> {


List<PcfRequestEntity> findByProductId(String productId);

Optional<PcfRequestEntity> findByRequestIdAndProductIdAndBpnNumber(String requestId, String productId,
String bpnNumber);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void pushPCFDataForApproveRequest(List<JsonObject> originalObjectList, Po
// push api call
Runnable runnable = () -> proxyRequestInterface.sendNotificationToConsumer(status,
calculatedPCFValue, request.getProductId(), request.getBpnNumber(),
request.getRequestId(), request.getMessage());
request.getRequestId(), request.getMessage(), false);

new Thread(runnable).start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public PcfRequestEntity savePcfStatus(String requestId, PCFRequestStatusEnum sta
return savePcfStatus(requestId, status, null);
}

@SneakyThrows
public List<PcfRequestEntity> findByProductId(String productId) {
return pcfRequestRepository.findByProductId(productId);
}

@SneakyThrows
public PcfRequestEntity savePcfStatus(String requestId, PCFRequestStatusEnum status, String remark) {

Expand Down Expand Up @@ -150,5 +155,4 @@ public PagingResponse getPcfData(List<PCFRequestStatusEnum> status, PCFTypeEnum
return PagingResponse.builder().items(requestList).pageSize(result.getSize()).page(result.getNumber())
.totalItems(result.getTotalElements()).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.UUID;

import org.apache.commons.lang3.StringUtils;
import org.eclipse.tractusx.sde.common.configuration.properties.PCFAssetStaticPropertyHolder;
import org.eclipse.tractusx.sde.common.entities.PolicyModel;
import org.eclipse.tractusx.sde.common.exception.NoDataFoundException;
import org.eclipse.tractusx.sde.common.exception.ServiceException;
Expand All @@ -35,6 +36,7 @@
import org.eclipse.tractusx.sde.edc.model.request.ConsumerRequest;
import org.eclipse.tractusx.sde.edc.model.response.QueryDataOfferModel;
import org.eclipse.tractusx.sde.edc.util.EDCAssetUrlCacheService;
import org.eclipse.tractusx.sde.pcfexchange.entity.PcfRequestEntity;
import org.eclipse.tractusx.sde.pcfexchange.entity.PcfResponseEntity;
import org.eclipse.tractusx.sde.pcfexchange.enums.PCFRequestStatusEnum;
import org.eclipse.tractusx.sde.pcfexchange.enums.PCFTypeEnum;
Expand All @@ -60,11 +62,13 @@ public class PcfExchangeServiceImpl implements IPCFExchangeService {
private final PCFRepositoryService pcfRepositoryService;
private final PcfReqsponseRepository pcfReqsponseRepository;
private final EDCAssetUrlCacheService edcAssetUrlCacheService;

@Qualifier("DatabaseUsecaseHandler")
private final DatabaseUsecaseStep databaseUsecaseStep;

private final ProxyRequestInterface proxyRequestInterface;

private final PCFAssetStaticPropertyHolder pcfAssetStaticPropertyHolder;

@SneakyThrows
@Override
Expand Down Expand Up @@ -99,7 +103,7 @@ public String requestForPcfDataExistingOffer(String productId, ConsumerRequest c

@Override
public Object requestForPcfNotExistDataOffer(PcfRequestModel pcfRequestModel) {

StringBuilder sb = new StringBuilder();
String requestId = UUID.randomUUID().toString();
List<QueryDataOfferModel> pcfExchangeUrlOffers = null;
Expand All @@ -109,8 +113,7 @@ public Object requestForPcfNotExistDataOffer(PcfRequestModel pcfRequestModel) {
PCFRequestStatusEnum.SENDING_REQUEST, "");

// 1 fetch EDC connectors and DTR Assets from EDC connectors
pcfExchangeUrlOffers = edcAssetUrlCacheService
.getPCFExchangeUrlFromTwin(pcfRequestModel.getBpnNumber());
pcfExchangeUrlOffers = edcAssetUrlCacheService.getPCFExchangeUrlFromTwin(pcfRequestModel.getBpnNumber());

// 2 request for PCF value for non existing sub model and send notification to
// call provider for data
Expand Down Expand Up @@ -142,16 +145,17 @@ public String actionOnPcfRequestAndSendNotificationToConsumer(PcfRequestModel pc
String remark = "";
try {

JsonObject calculatedPCFValue = databaseUsecaseStep.readCreatedTwinsBySpecifyColomn(
"urn:bamm:io.catenax.pcf", pcfRequestModel.getProductId()).get("json").getAsJsonObject();

JsonObject calculatedPCFValue = databaseUsecaseStep
.readCreatedTwinsBySpecifyColomn(pcfAssetStaticPropertyHolder.getSematicIdPart(), pcfRequestModel.getProductId())
.get("json").getAsJsonObject();

PCFRequestStatusEnum status = pcfRepositoryService.identifyRunningStatus(pcfRequestModel.getRequestId(),
pcfRequestModel.getStatus());

// push api call
Runnable runnable = () -> proxyRequestInterface.sendNotificationToConsumer(status, calculatedPCFValue,
pcfRequestModel.getProductId(), pcfRequestModel.getBpnNumber(), pcfRequestModel.getRequestId(),
pcfRequestModel.getMessage());
pcfRequestModel.getMessage(), true);

new Thread(runnable).start();

Expand All @@ -178,7 +182,7 @@ public PcfRequestModel savePcfRequestData(String requestId, String productId, St
PCFRequestStatusEnum status = PCFRequestStatusEnum.REQUESTED;
String remark = "";
try {
databaseUsecaseStep.readCreatedTwinsBySpecifyColomn("urn:bamm:io.catenax.pcf", productId);
databaseUsecaseStep.readCreatedTwinsBySpecifyColomn(pcfAssetStaticPropertyHolder.getSematicIdPart(), productId);
} catch (NoDataFoundException e) {
String msg = "The PCF calculated value does not exist in system, please upload PCF value for '" + productId
+ "' in systems using Manual/Recurring Upload";
Expand All @@ -197,28 +201,46 @@ public void recievedPCFData(String productId, String bpnNumber, String requestId

PCFRequestStatusEnum status = null;

PcfResponseEntity entity = PcfResponseEntity.builder()
.pcfData(pcfData)
.requestId(requestId)
.message(message)
.responseId(UUID.randomUUID().toString())
.lastUpdatedTime(Instant.now().getEpochSecond())
.build();

pcfReqsponseRepository.save(entity);

if(StringUtils.isBlank(requestId))
throw new ServiceException("RequestId not recieved from provider to marked PCF exchange request");

if (pcfData != null && !pcfData.isEmpty() && !pcfData.asText().equals("{}")) {
status = PCFRequestStatusEnum.RECEIVED;
} else if (StringUtils.isNotBlank(requestId)) {
status = PCFRequestStatusEnum.REJECTED;
} else
status = PCFRequestStatusEnum.FAILED;

pcfRepositoryService.savePcfStatus(requestId, status);

if (StringUtils.isNotBlank(requestId)) {

PcfResponseEntity entity = PcfResponseEntity.builder()
.pcfData(pcfData)
.requestId(requestId)
.message(message)
.responseId(UUID.randomUUID().toString())
.lastUpdatedTime(Instant.now().getEpochSecond())
.build();

pcfReqsponseRepository.save(entity);

pcfRepositoryService.savePcfStatus(requestId, status);
}
else {
List<PcfRequestEntity> findByProductId = pcfRepositoryService.findByProductId(productId);

for (PcfRequestEntity fentity : findByProductId) {

PcfResponseEntity entity = PcfResponseEntity.builder()
.pcfData(pcfData)
.requestId(fentity.getRequestId())
.message(message)
.responseId(UUID.randomUUID().toString())
.lastUpdatedTime(Instant.now().getEpochSecond())
.build();

pcfReqsponseRepository.save(entity);

pcfRepositoryService.savePcfStatus(fentity.getRequestId(), status);
}

}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public class ProxyRequestInterface {
public void requestToProviderForPCFValue(String productId, StringBuilder reponseMap, String requestId, String message,
QueryDataOfferModel dataset, boolean isRequestToNonexistingTwin) {
try {
String connectorOfferUrl = dataset.getConnectorOfferUrl();
String pcfProductPath = "";
if (connectorOfferUrl.contains("@")) {
String[] split = connectorOfferUrl.split("@");
if (split.length > 1) {
dataset.setConnectorOfferUrl(split[0]);
pcfProductPath = split[1];
}
}

EDRCachedByIdResponse edrToken = edcAssetUrlCacheService.verifyAndGetToken(dataset.getConnectorId(),
dataset);

Expand All @@ -73,7 +83,7 @@ public void requestToProviderForPCFValue(String productId, StringBuilder reponse
pcfpushEnpoint = new URI(
edrToken.getEndpoint() + SLASH_DELIMETER + PRODUCT_IDS + SLASH_DELIMETER + productId);
else
pcfpushEnpoint = new URI(edrToken.getEndpoint());
pcfpushEnpoint = new URI(edrToken.getEndpoint() + pcfProductPath);

Map<String, String> header = new HashMap<>();
header.put("authorization", edrToken.getAuthorization());
Expand All @@ -100,7 +110,7 @@ public void requestToProviderForPCFValue(String productId, StringBuilder reponse

@SneakyThrows
public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject calculatedPCFValue,
String productId, String bpnNumber, String requestId, String message) {
String productId, String bpnNumber, String requestId, String message, boolean isNeedToSendRequestIdtoConsumer) {

// 1 fetch EDC connectors and DTR Assets from EDC connectors
List<QueryDataOfferModel> pcfExchangeUrlOffers = edcAssetUrlCacheService.getPCFExchangeUrlFromTwin(bpnNumber);
Expand All @@ -114,9 +124,9 @@ public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject c
pcfExchangeUrlOffers.parallelStream().forEach(dtOffer -> {

if (PCFRequestStatusEnum.SENDING_REJECT_NOTIFICATION.equals(status)) {
sendNotification(null, productId, bpnNumber, requestId, dtOffer, status, message);
sendNotification(null, productId, bpnNumber, requestId, dtOffer, status, message, isNeedToSendRequestIdtoConsumer);
} else {
sendNotification(calculatedPCFValue, productId, bpnNumber, requestId, dtOffer, status, message);
sendNotification(calculatedPCFValue, productId, bpnNumber, requestId, dtOffer, status, message, isNeedToSendRequestIdtoConsumer);
}

});
Expand All @@ -125,7 +135,7 @@ public void sendNotificationToConsumer(PCFRequestStatusEnum status, JsonObject c

@SneakyThrows
private void sendNotification(JsonObject calculatedPCFValue, String productId, String bpnNumber, String requestId,
QueryDataOfferModel dtOffer, PCFRequestStatusEnum status, String message) {
QueryDataOfferModel dtOffer, PCFRequestStatusEnum status, String message, boolean isNeedToSendRequestIdtoConsumer) {
String sendNotificationStatus = "";
try {
EDRCachedByIdResponse edrToken = edcAssetUrlCacheService.verifyAndGetToken(bpnNumber, dtOffer);
Expand All @@ -139,7 +149,11 @@ private void sendNotification(JsonObject calculatedPCFValue, String productId, S
header.put("authorization", edrToken.getAuthorization());
header.put("Edc-Bpn", bpnNumber);

pcfExchangeProxy.uploadPcfSubmodel(pcfpushEnpoint, header, requestId, message,
String sendRequestId = requestId;
if (!isNeedToSendRequestIdtoConsumer)
sendRequestId = "";

pcfExchangeProxy.uploadPcfSubmodel(pcfpushEnpoint, header, sendRequestId, message,
jsonObjectMapper.gsonObjectToJsonNode(calculatedPCFValue));

sendNotificationStatus = "SUCCESS";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/********************************************************************************
* Copyright (c) 2024 T-Systems International GmbH
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
package org.eclipse.tractusx.sde.common.configuration.properties;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Component
@Configuration
public class PCFAssetStaticPropertyHolder {

@Value(value = "${digital-twin.pcf.sematicid:}")
public String sematicId;

@Value(value = "${digital-twin.pcf.sematicid:}")
public String sematicIdPart;

private String pcfExchangeAssetId;

public String getSematicId() {
if (StringUtils.isAllBlank(sematicId))
sematicId = "urn:samm:io.catenax.pcf:6.0.0#Pcf";
return sematicId;
}

public void setSematicId(String sematicId) {
this.sematicId = sematicId;
}

public String getSematicIdPart() {
if (StringUtils.isAllBlank(sematicIdPart))
sematicIdPart = "urn:samm:io.catenax.pcf";
return sematicIdPart;
}

public void setSematicIdPart(String sematicIdPart) {
this.sematicIdPart = sematicIdPart;
}

public String getPcfExchangeAssetId() {
return pcfExchangeAssetId;
}

public void setPcfExchangeAssetId(String pcfExchangeAssetId) {
this.pcfExchangeAssetId = pcfExchangeAssetId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ public String getDataPlaneUrlOfSubmodule() {
JsonElement jsonElement = this.submodelSchema.get("submodelDataPlaneUrl");
return jsonElement == null || jsonElement.isJsonNull() ? "" : jsonElement.getAsString();
}

public String getSematicIdReferenceOfSubmodule() {
JsonElement jsonElement = this.submodelSchema.get("sematicIdReference");
return jsonElement == null || jsonElement.isJsonNull() ? "" : jsonElement.getAsString();
}

public String getInterfaceNameOfSubmodule() {
JsonElement jsonElement = this.submodelSchema.get("interfaceName");
return jsonElement == null || jsonElement.isJsonNull() ? "" : jsonElement.getAsString();
}

public JsonObject getAddOnOfModel() {
return this.submodelSchema.get("addOn").getAsJsonObject();
Expand Down Expand Up @@ -111,7 +121,11 @@ public boolean checkAppendURNUUIDWithIdentifier() {
return jsonElement == null || jsonElement.isJsonNull() || jsonElement.getAsBoolean();
}


public boolean usePCFAssetIdAsDTSubprotocolBodyId() {
JsonElement jsonElement = this.getAddOnOfModel().get("usePCFAssetIdAsDTSubprotocolBodyId");
return !(jsonElement == null || jsonElement.isJsonNull()) || (jsonElement !=null && jsonElement.getAsBoolean());
}

public JsonObject checkIsRelationSubmodel() {
JsonElement jsonElement = this.getAddOnOfModel().get("isRelationSubmodel");
return jsonElement == null || jsonElement.isJsonNull() ? null : jsonElement.getAsJsonObject();
Expand Down
Loading
Loading