Skip to content

Commit

Permalink
feat(irs-registry-client):[#256] added cache for send notification
Browse files Browse the repository at this point in the history
  • Loading branch information
ds-psosnowski committed Dec 8, 2023
1 parent d9d43af commit 2f3994f
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.eclipse.tractusx.irs.edc.client.model.TransferProcessRequest;
import org.eclipse.tractusx.irs.edc.client.model.TransferProcessResponse;
import org.eclipse.tractusx.irs.edc.client.policy.PolicyCheckerService;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceStatus;
import org.eclipse.tractusx.irs.edc.client.util.Masker;
import org.springframework.stereotype.Service;

Expand All @@ -68,30 +68,33 @@ public NegotiationResponse negotiate(final String providerConnectorUrl, final Ca
new EndpointDataReferenceStatus(null, EndpointDataReferenceStatus.TokenStatus.REQUIRED_NEW));
}

@SuppressWarnings("PMD.AvoidReassigningParameters")
public NegotiationResponse negotiate(final String providerConnectorUrl, final CatalogItem catalogItem,
EndpointDataReferenceStatus endpointDataReferenceStatus)
final EndpointDataReferenceStatus endpointDataReferenceStatus)
throws ContractNegotiationException, UsagePolicyException, TransferProcessException {

EndpointDataReferenceStatus resultEndpointDataReferenceStatus;

if (endpointDataReferenceStatus == null) {
log.info(
"Missing information about endpoint data reference from storage, setting token status to REQUIRED_NEW.");
endpointDataReferenceStatus = new EndpointDataReferenceStatus(null,
resultEndpointDataReferenceStatus = new EndpointDataReferenceStatus(null,
EndpointDataReferenceStatus.TokenStatus.REQUIRED_NEW);
} else {
resultEndpointDataReferenceStatus = endpointDataReferenceStatus;
}

NegotiationResponse negotiationResponse = null;
String contractAgreementId = null;
String contractAgreementId;

switch (endpointDataReferenceStatus.tokenStatus()) {
switch (resultEndpointDataReferenceStatus.tokenStatus()) {
case REQUIRED_NEW -> {
final CompletableFuture<NegotiationResponse> responseFuture = startNewNegotiation(providerConnectorUrl,
catalogItem);
negotiationResponse = Objects.requireNonNull(getNegotiationResponse(responseFuture));
contractAgreementId = negotiationResponse.getContractAgreementId();
}
case EXPIRED -> {
final String authKey = endpointDataReferenceStatus.endpointDataReference().getAuthKey();
final String authKey = resultEndpointDataReferenceStatus.endpointDataReference().getAuthKey();
if (authKey == null) {
throw new IllegalStateException("Missing information about AuthKey.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.eclipse.tractusx.irs.edc.client;

import static org.eclipse.tractusx.irs.edc.client.configuration.JsonLdConfiguration.NAMESPACE_EDC_ID;
import static org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus.TokenStatus;
import static org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceStatus.TokenStatus;

import java.net.URI;
import java.util.List;
Expand All @@ -43,12 +43,13 @@
import org.eclipse.tractusx.irs.data.StringMapper;
import org.eclipse.tractusx.irs.edc.client.exceptions.EdcClientException;
import org.eclipse.tractusx.irs.edc.client.model.CatalogItem;
import org.eclipse.tractusx.irs.edc.client.model.EDRAuthCode;
import org.eclipse.tractusx.irs.edc.client.model.NegotiationResponse;
import org.eclipse.tractusx.irs.edc.client.model.notification.EdcNotification;
import org.eclipse.tractusx.irs.edc.client.model.notification.EdcNotificationResponse;
import org.eclipse.tractusx.irs.edc.client.model.notification.NotificationContent;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceCacheService;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceCacheService;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceStatus;
import org.eclipse.tractusx.irs.edc.client.util.Masker;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
Expand All @@ -70,8 +71,7 @@ CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(String end
String filterValue) throws EdcClientException;

CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(String endpointAddress, String filterKey,
String filterValue, EndpointDataReferenceStatus cachedEndpointDataReference)
throws EdcClientException;
String filterValue, EndpointDataReferenceStatus cachedEndpointDataReference) throws EdcClientException;
}

/**
Expand Down Expand Up @@ -162,31 +162,36 @@ private NegotiationResponse fetchNegotiationResponseWithFilter(final String conn
return contractNegotiationService.negotiate(connectorEndpoint, catalogItem);
}

private CompletableFuture<EdcNotificationResponse> sendNotificationAsync(final String contractAgreementId,
final EdcNotification<NotificationContent> notification, final StopWatch stopWatch) {
private CompletableFuture<EdcNotificationResponse> sendNotificationAsync(final String assetId,
final EdcNotification<NotificationContent> notification, final StopWatch stopWatch,
final EndpointDataReference endpointDataReference) {

return pollingService.<EdcNotificationResponse>createJob()
.action(() -> sendSubmodelNotification(contractAgreementId, notification, stopWatch))
.action(() -> sendSubmodelNotification(assetId, notification, stopWatch,
endpointDataReference))
.timeToLive(config.getSubmodel().getRequestTtl())
.description("waiting for submodel notification to be sent")
.build()
.schedule();

}

private Optional<String> retrieveSubmodelData(final String submodelDataplaneUrl, final StopWatch stopWatch,
final EndpointDataReference endpointDataReference) {
log.info("Retrieving data from EDC data plane for dataReference with id {}", endpointDataReference.getId());
final String data = edcDataPlaneClient.getData(endpointDataReference, submodelDataplaneUrl);
stopWatchOnEdcTask(stopWatch);
if (endpointDataReference != null) {
log.info("Retrieving data from EDC data plane for dataReference with id {}", endpointDataReference.getId());
final String data = edcDataPlaneClient.getData(endpointDataReference, submodelDataplaneUrl);
stopWatchOnEdcTask(stopWatch);

return Optional.of(data);
return Optional.of(data);
}

return Optional.empty();
}

private Optional<EndpointDataReference> retrieveEndpointReference(final String contractAgreementId,
private Optional<EndpointDataReference> retrieveEndpointReference(final String storageId,
final StopWatch stopWatch) {
final Optional<EndpointDataReference> dataReference = retrieveEndpointDataReferenceByContractAgreementId(
contractAgreementId);
storageId);

if (dataReference.isPresent()) {
final EndpointDataReference ref = dataReference.get();
Expand All @@ -198,19 +203,17 @@ private Optional<EndpointDataReference> retrieveEndpointReference(final String c
return Optional.empty();
}

private Optional<EdcNotificationResponse> sendSubmodelNotification(final String contractAgreementId,
final EdcNotification<NotificationContent> notification, final StopWatch stopWatch) {
final Optional<EndpointDataReference> dataReference = retrieveEndpointDataReferenceByContractAgreementId(
contractAgreementId);
private Optional<EdcNotificationResponse> sendSubmodelNotification(final String assetId,
final EdcNotification<NotificationContent> notification, final StopWatch stopWatch,
final EndpointDataReference endpointDataReference) {

if (dataReference.isPresent()) {
final EndpointDataReference ref = dataReference.get();
log.info("Sending dataReference to EDC data plane for contractAgreementId '{}'",
Masker.mask(contractAgreementId));
final EdcNotificationResponse response = edcDataPlaneClient.sendData(ref, notification);
if (endpointDataReference != null) {
log.info("Sending dataReference to EDC data plane for assetId '{}'", assetId);
final EdcNotificationResponse response = edcDataPlaneClient.sendData(endpointDataReference, notification);
stopWatchOnEdcTask(stopWatch);
return Optional.of(response);
}

return Optional.empty();
}

Expand All @@ -234,18 +237,19 @@ public CompletableFuture<String> getSubmodelRawPayload(final String connectorEnd
});
}

@SuppressWarnings("PMD.ConfusingTernary")
private EndpointDataReference getEndpointDataReference(final String connectorEndpoint, final String assetId)
throws EdcClientException {
log.info("Retrieving endpoint data reference from cache for assed id: {}", assetId);
final EndpointDataReferenceStatus cachedEndpointDataReference = endpointDataReferenceCacheService.getEndpointDataReference(
assetId);
EndpointDataReference endpointDataReference;

if (cachedEndpointDataReference.tokenStatus() != TokenStatus.VALID) {
if (cachedEndpointDataReference.tokenStatus() == TokenStatus.VALID) {
log.info("Endpoint data reference found in cache with token status valid, reusing cache record.");
endpointDataReference = cachedEndpointDataReference.endpointDataReference();
} else {
endpointDataReference = getEndpointDataReferenceAndAddToStorage(connectorEndpoint, assetId,
cachedEndpointDataReference);
} else {
endpointDataReference = cachedEndpointDataReference.endpointDataReference();
}

return endpointDataReference;
Expand Down Expand Up @@ -274,12 +278,9 @@ public CompletableFuture<EdcNotificationResponse> sendNotification(final String
return execute(connectorEndpoint, () -> {
final StopWatch stopWatch = new StopWatch();
stopWatch.start("Send EDC notification task, endpoint " + connectorEndpoint);
final var negotiationEndpoint = appendSuffix(connectorEndpoint,
config.getControlplane().getProviderSuffix());
final NegotiationResponse negotiationResponse = fetchNegotiationResponseWithFilter(negotiationEndpoint,
assetId);
final EndpointDataReference endpointDataReference = getEndpointDataReference(connectorEndpoint, assetId);

return sendNotificationAsync(negotiationResponse.getContractAgreementId(), notification, stopWatch);
return sendNotificationAsync(assetId, notification, stopWatch, endpointDataReference);
});
}

Expand Down Expand Up @@ -307,8 +308,10 @@ public CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(fin
final NegotiationResponse response = contractNegotiationService.negotiate(providerWithSuffix,
items.stream().findFirst().orElseThrow(), endpointDataReferenceStatus);

final String storageId = getStorageId(endpointDataReferenceStatus, response);

return pollingService.<EndpointDataReference>createJob()
.action(() -> retrieveEndpointReference(response.getContractAgreementId(), stopWatch))
.action(() -> retrieveEndpointReference(storageId, stopWatch))
.timeToLive(config.getSubmodel().getRequestTtl())
.description("waiting for Endpoint Reference retrieval")
.build()
Expand All @@ -317,6 +320,21 @@ public CompletableFuture<EndpointDataReference> getEndpointReferenceForAsset(fin
});
}

private static String getStorageId(final EndpointDataReferenceStatus endpointDataReferenceStatus,
final NegotiationResponse response) {
final String storageId;
if (response != null) {
storageId = response.getContractAgreementId();
} else {
final String authKey = endpointDataReferenceStatus.endpointDataReference().getAuthKey();
if (authKey == null) {
throw new IllegalStateException("Missing information about AuthKey.");
}
storageId = EDRAuthCode.fromAuthCodeToken(authKey).getCid();
}
return storageId;
}

private String appendSuffix(final String endpointAddress, final String providerSuffix) {
String addressWithSuffix;
if (endpointAddress.endsWith(providerSuffix)) {
Expand All @@ -329,10 +347,10 @@ private String appendSuffix(final String endpointAddress, final String providerS
return addressWithSuffix;
}

private Optional<EndpointDataReference> retrieveEndpointDataReferenceByContractAgreementId(
final String contractAgreementId) {
log.info("Retrieving dataReference from storage for contractAgreementId {}", Masker.mask(contractAgreementId));
return endpointDataReferenceStorage.remove(contractAgreementId);
private Optional<EndpointDataReference> retrieveEndpointDataReferenceByContractAgreementId(final String storageId) {
log.info("Retrieving dataReference from storage for storageId (assetId or contractAgreementId): {}",
Masker.mask(storageId));
return endpointDataReferenceStorage.get(storageId);
}

@SuppressWarnings({ "PMD.AvoidRethrowingException",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,8 @@ private void cleanup() {
});
}

public Optional<EndpointDataReference> remove(final String storageId) {
return Optional.ofNullable(storageMap.remove(storageId)).map(ExpiringContainer::getDataReference);
}

public Optional<EndpointDataReference> get(final String storageId) {
return Optional.ofNullable(
storageMap.get(storageId) != null ? storageMap.get(storageId).getDataReference() : null);
return Optional.ofNullable(storageMap.get(storageId)).map(ExpiringContainer::getDataReference);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
package org.eclipse.tractusx.irs.edc.client.util;
package org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference;

import java.time.Instant;
import java.util.Optional;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class EndpointDataReferenceCacheService {
* @param assetId key for
* {@link org.eclipse.tractusx.irs.edc.client.EndpointDataReferenceStorage}
* @return {@link org.eclipse.edc.spi.types.domain.edr.EndpointDataReference}
* and {@link org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus.TokenStatus}
* and {@link EndpointDataReferenceStatus.TokenStatus}
* describing token status
*/
public EndpointDataReferenceStatus getEndpointDataReference(final String assetId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
package org.eclipse.tractusx.irs.edc.client.util;
package org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference;

import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.eclipse.tractusx.irs.edc.client.model.Response;
import org.eclipse.tractusx.irs.edc.client.model.TransferProcessResponse;
import org.eclipse.tractusx.irs.edc.client.policy.PolicyCheckerService;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceStatus;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void shouldStoreAgreementId() {
testee.receiveEdcCallback(ref);

// assert
final var result = storage.remove("testId");
final var result = storage.get("testId");
assertThat(result).isNotNull().contains(ref);
}

Expand All @@ -62,7 +62,7 @@ void shouldDoNothingWhenEDRTokenIsInvalid() {
testee.receiveEdcCallback(ref);

// assert
final var result = storage.remove("testId");
final var result = storage.get("testId");
assertThat(result).isNotNull().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
package org.eclipse.tractusx.irs.edc.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus.TokenStatus;
import static org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceStatus.TokenStatus;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -70,8 +70,8 @@
import org.eclipse.tractusx.irs.edc.client.model.notification.EdcNotification;
import org.eclipse.tractusx.irs.edc.client.model.notification.EdcNotificationResponse;
import org.eclipse.tractusx.irs.edc.client.model.notification.NotificationContent;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceCacheService;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceStatus;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceCacheService;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceStatus;
import org.eclipse.tractusx.irs.testing.containers.LocalTestDataConfigurationAware;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -155,11 +155,12 @@ void shouldSendNotificationSuccessfully() throws Exception {
final EdcNotification<NotificationContent> notification = EdcNotification.builder().build();
when(catalogFacade.fetchCatalogByFilter(any(), any(), any())).thenReturn(
List.of(CatalogItem.builder().itemId("itemId").build()));
when(contractNegotiationService.negotiate(any(), any())).thenReturn(
when(contractNegotiationService.negotiate(any(), any(), any())).thenReturn(
NegotiationResponse.builder().contractAgreementId("agreementId").build());
final EndpointDataReference ref = mock(EndpointDataReference.class);
endpointDataReferenceStorage.put("agreementId", ref);
when(edcDataPlaneClient.sendData(ref, notification)).thenReturn(() -> true);
when(endpointDataReferenceCacheService.getEndpointDataReference(any())).thenReturn(new EndpointDataReferenceStatus(null, TokenStatus.REQUIRED_NEW));

// act
final var result = testee.sendNotification(CONNECTOR_ENDPOINT, "notify-request-asset", notification);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import org.eclipse.tractusx.irs.edc.client.policy.Permission;
import org.eclipse.tractusx.irs.edc.client.policy.PolicyCheckerService;
import org.eclipse.tractusx.irs.edc.client.policy.PolicyType;
import org.eclipse.tractusx.irs.edc.client.util.EndpointDataReferenceCacheService;
import org.eclipse.tractusx.irs.edc.client.cache.endpointdatareference.EndpointDataReferenceCacheService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Loading

0 comments on commit 2f3994f

Please sign in to comment.