Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:(DOS-1442)Termination of pull transfer process from consumer side not success in real but stated as "TERMINATED" #3

Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,22 @@ private ServiceResult<TransferProcess> suspendedAction(TransferSuspensionMessage

@NotNull
private ServiceResult<TransferProcess> terminatedAction(TransferTerminationMessage message, TransferProcess transferProcess) {
if (transferProcess.getType() == PROVIDER) {
var termination = dataFlowManager.terminate(transferProcess);
if (termination.failed()) {
return ServiceResult.conflict("Cannot terminate transfer process %s: %s".formatted(transferProcess.getId(), termination.getFailureDetail()));
}
}
ndr-brt marked this conversation as resolved.
Show resolved Hide resolved
if (transferProcess.canBeTerminated()) {
observable.invokeForEach(l -> l.preTerminated(transferProcess));
transferProcess.transitionTerminated();
transferProcess.protocolMessageReceived(message.getId());
update(transferProcess);
observable.invokeForEach(l -> l.terminated(transferProcess));
if (transferProcess.getType() == PROVIDER) {
transferProcess.transitionDeprovisioning();
this.update(transferProcess);
ndr-brt marked this conversation as resolved.
Show resolved Hide resolved
}
return ServiceResult.success(transferProcess);
} else {
return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be terminated"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,99 +310,6 @@ void notifyCompleted_shouldReturnBadRequest_whenCounterPartyUnauthorized() {

}

@Test
void notifyTerminated_shouldTransitionToTerminated() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();
var agreement = contractAgreement();
var transferProcess = transferProcess(STARTED, "transferProcessId");

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result).isSucceeded();
verify(listener).preTerminated(any());
verify(store).save(argThat(t -> t.getState() == TERMINATED.code()));
verify(listener).terminated(any());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
}

@Test
void notifyTerminated_shouldReturnConflict_whenTransferProcessCannotBeTerminated() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var transferProcess = transferProcess(DEPROVISIONING, UUID.randomUUID().toString());
var agreement = contractAgreement();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());

var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT);
// state didn't change
verify(store, times(1)).save(argThat(tp -> tp.getState() == DEPROVISIONING.code()));
verifyNoInteractions(listener);
}

@Test
void notifyTerminated_shouldReturnBadRequest_whenCounterPartyUnauthorized() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var agreement = contractAgreement();
var transferProcess = transferProcess(TERMINATED, UUID.randomUUID().toString());
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error"));

var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result)
.isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(BAD_REQUEST);

verify(store, times(1)).save(any());

}

@Test
void findById_shouldReturnTransferProcess_whenValidCounterParty() {
var participantAgent = participantAgent();
Expand Down Expand Up @@ -881,6 +788,135 @@ void shouldReturnBadRequest_whenCounterPartyUnauthorized() {
}
}

@Nested
class NotifyTerminated {

@Test
void consumer_shouldTransitionToTerminated() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();
var agreement = contractAgreement();
var transferProcess = transferProcess(STARTED, "transferProcessId");

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result).isSucceeded();
verify(listener).preTerminated(any());
verify(store).save(argThat(t -> t.getState() == TERMINATED.code()));
verify(listener).terminated(any());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
}

@Test
void provider_shouldTerminateDataFlowAndTransitionToTerminated() {
ndr-brt marked this conversation as resolved.
Show resolved Hide resolved
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();
var agreement = contractAgreement();
var transferProcess = transferProcessBuilder().state(STARTED.code()).type(PROVIDER).build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());
when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success());
var result = service.notifyTerminated(message, tokenRepresentation);


assertThat(result).isSucceeded();
verify(listener).preTerminated(any());
verify(store, atLeastOnce()).save(argThat(t -> t.getState() == DEPROVISIONING.code()));
verify(listener).terminated(any());
verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class));
verify(dataFlowManager).terminate(any());
}

@Test
void shouldReturnConflict_whenTransferProcessCannotBeTerminated() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var transferProcess = transferProcess(DEPROVISIONING, UUID.randomUUID().toString());
var agreement = contractAgreement();
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.success());

var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT);
// state didn't change
verify(store, times(1)).save(argThat(tp -> tp.getState() == DEPROVISIONING.code()));
verifyNoInteractions(listener);
}

@Test
void shouldReturnBadRequest_whenCounterPartyUnauthorized() {
var participantAgent = participantAgent();
var tokenRepresentation = tokenRepresentation();
var agreement = contractAgreement();
var transferProcess = transferProcess(TERMINATED, UUID.randomUUID().toString());
var message = TransferTerminationMessage.Builder.newInstance()
.protocol("protocol")
.consumerPid("consumerPid")
.providerPid("providerPid")
.counterPartyAddress("http://any")
.processId("correlationId")
.code("TestCode")
.reason("TestReason")
.build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(store.findById("correlationId")).thenReturn(transferProcess);
when(store.findByIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess));
when(negotiationStore.findContractAgreement(any())).thenReturn(agreement);
when(validationService.validateRequest(participantAgent, agreement)).thenReturn(Result.failure("error"));

var result = service.notifyTerminated(message, tokenRepresentation);

assertThat(result)
.isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(BAD_REQUEST);

verify(store, times(1)).save(any());
}
}

@Nested
class IdempotencyProcessStateReplication {

Expand All @@ -897,12 +933,13 @@ <M extends ProcessRemoteMessage> void notify_shouldStoreReceivedMessageId(Method
when(validationService.validateAgreement(any(ParticipantAgent.class), any())).thenAnswer(i -> Result.success(i.getArgument(1)));
when(validationService.validateRequest(any(ParticipantAgent.class), isA(ContractAgreement.class))).thenReturn(Result.success());
when(dataFlowManager.suspend(any())).thenReturn(StatusResult.success());
when(dataFlowManager.terminate(any())).thenReturn(StatusResult.success());

var result = methodCall.call(service, message, tokenRepresentation());

assertThat(result).isSucceeded();
var captor = ArgumentCaptor.forClass(TransferProcess.class);
verify(store).save(captor.capture());
verify(store, atLeastOnce()).save(captor.capture());
var storedTransferProcess = captor.getValue();
assertThat(storedTransferProcess.getProtocolMessages().isAlreadyReceived(message.getId())).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
*/
public class Participant {

public static final String CONSUMER = "consumer";
public static final String PROVIDER = "provider";
ndr-brt marked this conversation as resolved.
Show resolved Hide resolved
protected String id;
protected String name;
protected Endpoint managementEndpoint;
Expand Down Expand Up @@ -519,6 +521,11 @@ public void awaitTransferToBeInState(String transferProcessId, TransferProcessSt
await().atMost(timeout).until(() -> getTransferProcessState(transferProcessId), it -> Objects.equals(it, state.name()));
}

@Override
public String toString() {
return name;
}

ndr-brt marked this conversation as resolved.
Show resolved Hide resolved
protected String getContractNegotiationField(String negotiationId, String fieldName) {
return managementEndpoint.baseRequest()
.contentType(JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.edc.test.e2e;

import jakarta.json.JsonObject;
import org.eclipse.edc.connector.controlplane.test.system.utils.Participant;
import org.eclipse.edc.spi.security.Vault;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -29,11 +30,11 @@
public abstract class TransferEndToEndTestBase {

protected static final TransferEndToEndParticipant CONSUMER = TransferEndToEndParticipant.Builder.newInstance()
.name("consumer")
.name(Participant.CONSUMER)
.id("urn:connector:consumer")
.build();
protected static final TransferEndToEndParticipant PROVIDER = TransferEndToEndParticipant.Builder.newInstance()
.name("provider")
.name(Participant.PROVIDER)
.id("urn:connector:provider")
.build();

Expand Down
Loading