Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:(DOS-1442)Termination of pull transfer process from consumer side not success in real but stated as "TERMINATED" #3

Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ private ServiceResult<TransferProcess> completedAction(TransferCompletionMessage

@NotNull
private ServiceResult<TransferProcess> suspendedAction(TransferSuspensionMessage message, TransferProcess transferProcess) {
if (transferProcess.getType() == PROVIDER) {
var suspension = dataFlowManager.suspend(transferProcess);
if (suspension.failed()) {
return ServiceResult.conflict("Cannot suspend transfer process %s: %s".formatted(transferProcess.getId(), suspension.getFailureDetail()));
}
}
if (transferProcess.canBeTerminated()) {
if (transferProcess.getType() == PROVIDER) {
var suspension = dataFlowManager.suspend(transferProcess);
if (suspension.failed()) {
return ServiceResult.conflict("Cannot suspend transfer process %s: %s".formatted(transferProcess.getId(), suspension.getFailureDetail()));
}
}
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved
var reason = message.getReason().stream().map(Object::toString).collect(joining(", "));
transferProcess.transitionSuspended(reason);
transferProcess.protocolMessageReceived(message.getId());
Expand All @@ -233,11 +233,21 @@ private ServiceResult<TransferProcess> suspendedAction(TransferSuspensionMessage
@NotNull
private ServiceResult<TransferProcess> terminatedAction(TransferTerminationMessage message, TransferProcess transferProcess) {
if (transferProcess.canBeTerminated()) {
if (transferProcess.getType() == PROVIDER) {
var termination = dataFlowManager.terminate(transferProcess);
if (termination.failed()) {
return ServiceResult.conflict("Cannot terminate transfer process %s: %s".formatted(transferProcess.getId(), termination.getFailureDetail()));
}
}
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved
observable.invokeForEach(l -> l.preTerminated(transferProcess));
transferProcess.transitionTerminated();
transferProcess.protocolMessageReceived(message.getId());
update(transferProcess);
observable.invokeForEach(l -> l.terminated(transferProcess));
if (transferProcess.getType() == PROVIDER) {
transferProcess.transitionDeprovisioning();
update(transferProcess);
}
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be terminated"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,99 +310,6 @@ void notifyCompleted_shouldReturnBadRequest_whenCounterPartyUnauthorized() {

}

@Test
void notifyTerminated_shouldTransitionToTerminated() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();
var agreement = contractAgreement();
var transferProcess = transferProcess(STARTED, "transferProcessId");

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result).isSucceeded();
verify(listener).preTerminated(any());
verify(store).save(argThat(t -> t.getState() == TERMINATED.code()));
verify(listener).terminated(any());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
}

@Test
void notifyTerminated_shouldReturnConflict_whenTransferProcessCannotBeTerminated() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var transferProcess = transferProcess(DEPROVISIONING, UUID.randomUUID().toString());
var agreement = contractAgreement();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());

var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT);
// state didn't change
verify(store, times(1)).save(argThat(tp -> tp.getState() == DEPROVISIONING.code()));
verifyNoInteractions(listener);
}

@Test
void notifyTerminated_shouldReturnBadRequest_whenCounterPartyUnauthorized() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var agreement = contractAgreement();
var transferProcess = transferProcess(TERMINATED, UUID.randomUUID().toString());
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error"));

var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result)
.isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(BAD_REQUEST);

verify(store, times(1)).save(any());

}

@Test
void findById_shouldReturnTransferProcess_whenValidCounterParty() {
var participantAgent = participantAgent();
Expand Down Expand Up @@ -881,6 +788,135 @@ void shouldReturnBadRequest_whenCounterPartyUnauthorized() {
}
}

@Nested
class NotifyTerminated {

@Test
void consumer_shouldTransitionToTerminated() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();
var agreement = contractAgreement();
var transferProcess = transferProcess(STARTED, "transferProcessId");

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result).isSucceeded();
verify(listener).preTerminated(any());
verify(store).save(argThat(t -> t.getState() == TERMINATED.code()));
verify(listener).terminated(any());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
}

@Test
void provider_shouldTerminateDataFlowAndTransitionToDeprovisioning() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();
var agreement = contractAgreement();
var transferProcess = transferProcessBuilder().state(STARTED.code()).type(PROVIDER).build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success());
var result = service.notifyTerminated(message, tokenRepresentation);


assertThat(result).isSucceeded();
verify(listener).preTerminated(any());
verify(store, atLeastOnce()).save(argThat(t -> t.getState() == DEPROVISIONING.code()));
verify(listener).terminated(any());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
verify(dataFlowManager).terminate(any());
}

@Test
void shouldReturnConflict_whenTransferProcessCannotBeTerminated() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var transferProcess = transferProcess(DEPROVISIONING, UUID.randomUUID().toString());
var agreement = contractAgreement();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());

var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT);
// state didn't change
verify(store, times(1)).save(argThat(tp -> tp.getState() == DEPROVISIONING.code()));
verifyNoInteractions(listener);
}

@Test
void shouldReturnBadRequest_whenCounterPartyUnauthorized() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var agreement = contractAgreement();
var transferProcess = transferProcess(TERMINATED, UUID.randomUUID().toString());
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error"));

var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result)
.isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(BAD_REQUEST);

verify(store, times(1)).save(any());
}
}

@Nested
class IdempotencyProcessStateReplication {

Expand All @@ -897,12 +933,13 @@ <M extends ProcessRemoteMessage> void notify_shouldStoreReceivedMessageId(Method
when(validationService.validateAgreement(any(ParticipantAgent.class), any())).thenAnswer(i -> Result.success(i.getArgument(1)));
when(validationService.validateRequest(any(ParticipantAgent.class), isA(ContractAgreement.class))).thenReturn(Result.success());
when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success());
when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success());

var result = methodCall.call(service, message, tokenRepresentation());

assertThat(result).isSucceeded();
var captor = ArgumentCaptor.forClass(TransferProcess.class);
verify(store).save(captor.capture());
verify(store, atLeastOnce()).save(captor.capture());
var storedTransferProcess = captor.getValue();
assertThat(storedTransferProcess.getProtocolMessages().isAlreadyReceived(message.getId())).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
* Essentially a wrapper around the management API enabling to test interactions with other participants, eg. catalog, transfer...
*/
public class Participant {

protected String id;
protected String name;
protected Endpoint managementEndpoint;
Expand Down Expand Up @@ -458,11 +457,11 @@ public String getTransferProcessState(String id) {
*
* @param id transfer process id.
*/
public void suspendTransfer(String id, String reason) {
public void suspendTransfer(String id) {
var requestBodyBuilder = createObjectBuilder()
.add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE))
.add(TYPE, "SuspendTransfer")
.add("reason", reason);
.add("reason", "any reason");
AndrYurk marked this conversation as resolved.
Show resolved Hide resolved

managementEndpoint.baseRequest()
.contentType(JSON)
Expand Down
Loading