diff --git a/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/message/RemoteMessageDispatcherRegistryImpl.java b/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/message/RemoteMessageDispatcherRegistryImpl.java index 905d516c2d5..342e75d213b 100644 --- a/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/message/RemoteMessageDispatcherRegistryImpl.java +++ b/core/common/connector-core/src/main/java/org/eclipse/edc/connector/core/message/RemoteMessageDispatcherRegistryImpl.java @@ -33,8 +33,8 @@ public class RemoteMessageDispatcherRegistryImpl implements RemoteMessageDispatc private final Map dispatchers = new HashMap<>(); @Override - public void register(RemoteMessageDispatcher dispatcher) { - dispatchers.put(dispatcher.protocol(), dispatcher); + public void register(String protocol, RemoteMessageDispatcher dispatcher) { + dispatchers.put(protocol, dispatcher); } @Override diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/contractnegotiation/ContractNegotiationEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/contractnegotiation/ContractNegotiationEventDispatchTest.java index 613dd7d8a02..9eb7d225165 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/contractnegotiation/ContractNegotiationEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/contractnegotiation/ContractNegotiationEventDispatchTest.java @@ -101,7 +101,7 @@ void shouldDispatchEventsOnProviderContractNegotiationStateChanges(EventRouter e ContractDefinitionStore contractDefinitionStore, PolicyDefinitionStore policyDefinitionStore, AssetIndex assetIndex) { - dispatcherRegistry.register(succeedingDispatcher()); + dispatcherRegistry.register("test", succeedingDispatcher()); when(identityService.verifyJwtToken(eq(tokenRepresentation), isA(VerificationContext.class))).thenReturn(Result.success(token)); eventRouter.register(ContractNegotiationEvent.class, eventSubscriber); @@ -143,7 +143,6 @@ private ContractRequestMessage createContractOfferRequest(Policy policy, String @NotNull private RemoteMessageDispatcher succeedingDispatcher() { var testDispatcher = mock(RemoteMessageDispatcher.class); - when(testDispatcher.protocol()).thenReturn("test"); when(testDispatcher.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); return testDispatcher; } diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java index b71625cd4dd..4ae955691e6 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/controlplane/services/transferprocess/TransferProcessEventDispatchTest.java @@ -84,8 +84,6 @@ public class TransferProcessEventDispatchTest { public static final Duration TIMEOUT = Duration.ofSeconds(30); - private final EventSubscriber eventSubscriber = mock(); - @RegisterExtension static final RuntimeExtension RUNTIME = new RuntimePerClassExtension() .setConfiguration(Map.of( @@ -100,6 +98,7 @@ public class TransferProcessEventDispatchTest { .registerServiceMock(ContractNegotiationStore.class, mock()) .registerServiceMock(ParticipantAgentService.class, mock()) .registerServiceMock(DataPlaneClientFactory.class, mock()); + private final EventSubscriber eventSubscriber = mock(); @Test void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService service, @@ -128,7 +127,7 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se when(agent.getIdentity()).thenReturn(providerId); - dispatcherRegistry.register(getTestDispatcher()); + dispatcherRegistry.register("test", getTestDispatcher()); when(policyArchive.findPolicyForContract(matches(transferRequest.getContractId()))).thenReturn(Policy.Builder.newInstance().target("assetId").build()); when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement); when(agentService.createFor(token)).thenReturn(agent); @@ -194,7 +193,7 @@ void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService servi .policy(Policy.Builder.newInstance().build()) .build(); when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement); - dispatcherRegistry.register(getTestDispatcher()); + dispatcherRegistry.register("test", getTestDispatcher()); eventRouter.register(TransferProcessEvent.class, eventSubscriber); var initiateResult = service.initiateTransfer(transferRequest); @@ -213,7 +212,7 @@ void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService servi @Test void shouldDispatchEventOnTransferProcessFailure(TransferProcessService service, EventRouter eventRouter, RemoteMessageDispatcherRegistry dispatcherRegistry, ContractNegotiationStore negotiationStore, PolicyArchive policyArchive) { - dispatcherRegistry.register(getFailingDispatcher()); + dispatcherRegistry.register("test", getFailingDispatcher()); eventRouter.register(TransferProcessEvent.class, eventSubscriber); var transferRequest = createTransferRequest(); var agreement = ContractAgreement.Builder.newInstance() @@ -233,7 +232,6 @@ void shouldDispatchEventOnTransferProcessFailure(TransferProcessService service, @NotNull private RemoteMessageDispatcher getTestDispatcher() { var testDispatcher = mock(RemoteMessageDispatcher.class); - when(testDispatcher.protocol()).thenReturn("test"); var ack = TransferProcessAck.Builder.newInstance().build(); when(testDispatcher.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack))); return testDispatcher; @@ -242,7 +240,6 @@ private RemoteMessageDispatcher getTestDispatcher() { @NotNull private RemoteMessageDispatcher getFailingDispatcher() { var testDispatcher = mock(RemoteMessageDispatcher.class); - when(testDispatcher.protocol()).thenReturn("test"); when(testDispatcher.dispatch(any(), any())).thenReturn(failedFuture(new EdcException("cannot send message"))); return testDispatcher; } diff --git a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/DspHttpCoreExtension.java b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/DspHttpCoreExtension.java index 7af806a69c8..2841c032963 100644 --- a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/DspHttpCoreExtension.java +++ b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/DspHttpCoreExtension.java @@ -52,6 +52,7 @@ import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry; +import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_SCOPE; import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD; @@ -127,7 +128,7 @@ public DspHttpRemoteMessageDispatcher dspHttpRemoteMessageDispatcher(ServiceExte registerNegotiationPolicyScopes(dispatcher); registerTransferProcessPolicyScopes(dispatcher); registerCatalogPolicyScopes(dispatcher); - dispatcherRegistry.register(dispatcher); + dispatcherRegistry.register(DATASPACE_PROTOCOL_HTTP, dispatcher); return dispatcher; } diff --git a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/dispatcher/DspHttpRemoteMessageDispatcherImpl.java b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/dispatcher/DspHttpRemoteMessageDispatcherImpl.java index 291dde471ab..3a141541c7a 100644 --- a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/dispatcher/DspHttpRemoteMessageDispatcherImpl.java +++ b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/http/dispatcher/DspHttpRemoteMessageDispatcherImpl.java @@ -24,7 +24,6 @@ import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.DspHttpRemoteMessageDispatcher; import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.DspHttpRequestFactory; import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.response.DspHttpResponseBodyExtractor; -import org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.iam.AudienceResolver; import org.eclipse.edc.spi.iam.IdentityService; @@ -78,11 +77,6 @@ public DspHttpRemoteMessageDispatcherImpl(EdcHttpClient httpClient, this.audienceResolver = audienceResolver; } - @Override - public String protocol() { - return HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; - } - @Override public CompletableFuture> dispatch(Class responseType, M message) { var handler = (MessageHandler) this.handlers.get(message.getClass()); diff --git a/data-protocols/dsp/dsp-http-core/src/test/java/org/eclipse/edc/protocol/dsp/http/dispatcher/DspHttpRemoteMessageDispatcherImplTest.java b/data-protocols/dsp/dsp-http-core/src/test/java/org/eclipse/edc/protocol/dsp/http/dispatcher/DspHttpRemoteMessageDispatcherImplTest.java index 627c35bdafc..268e424a2e9 100644 --- a/data-protocols/dsp/dsp-http-core/src/test/java/org/eclipse/edc/protocol/dsp/http/dispatcher/DspHttpRemoteMessageDispatcherImplTest.java +++ b/data-protocols/dsp/dsp-http-core/src/test/java/org/eclipse/edc/protocol/dsp/http/dispatcher/DspHttpRemoteMessageDispatcherImplTest.java @@ -50,7 +50,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; -import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; import static org.mockito.AdditionalMatchers.and; @@ -100,11 +99,6 @@ void setUp() { when(tokenDecorator.decorate(any())).thenAnswer(a -> a.getArgument(0)); } - @Test - void protocol_returnDsp() { - assertThat(dispatcher.protocol()).isEqualTo(DATASPACE_PROTOCOL_HTTP); - } - @Test void dispatch_noScope() { var authToken = "token"; diff --git a/extensions/control-plane/callback/callback-http-dispatcher/src/main/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/CallbackEventDispatcherHttpExtension.java b/extensions/control-plane/callback/callback-http-dispatcher/src/main/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/CallbackEventDispatcherHttpExtension.java index 77824cc524c..918e280ab3c 100644 --- a/extensions/control-plane/callback/callback-http-dispatcher/src/main/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/CallbackEventDispatcherHttpExtension.java +++ b/extensions/control-plane/callback/callback-http-dispatcher/src/main/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/CallbackEventDispatcherHttpExtension.java @@ -24,6 +24,8 @@ import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.TypeManager; +import static org.eclipse.edc.connector.controlplane.callback.dispatcher.http.GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP; + @Extension(value = CallbackEventDispatcherHttpExtension.NAME) public class CallbackEventDispatcherHttpExtension implements ServiceExtension { @@ -55,13 +57,13 @@ public void initialize(ServiceExtensionContext context) { var baseDispatcher = new GenericHttpRemoteDispatcherImpl(client); baseDispatcher.registerDelegate(new CallbackEventRemoteMessageDispatcher(typeManager.getMapper(), vault)); - registry.register(baseDispatcher); + registry.register(CALLBACK_EVENT_HTTP, baseDispatcher); } private String resolveScheme(String scheme) { if (scheme.equalsIgnoreCase("https") || scheme.equalsIgnoreCase("http")) { - return GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP; + return CALLBACK_EVENT_HTTP; } return null; } diff --git a/extensions/control-plane/callback/callback-http-dispatcher/src/main/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/GenericHttpRemoteDispatcherImpl.java b/extensions/control-plane/callback/callback-http-dispatcher/src/main/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/GenericHttpRemoteDispatcherImpl.java index fcd1c34dfdd..0695937b2b1 100644 --- a/extensions/control-plane/callback/callback-http-dispatcher/src/main/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/GenericHttpRemoteDispatcherImpl.java +++ b/extensions/control-plane/callback/callback-http-dispatcher/src/main/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/GenericHttpRemoteDispatcherImpl.java @@ -39,16 +39,11 @@ protected GenericHttpRemoteDispatcherImpl(EdcHttpClient httpClient) { this.httpClient = httpClient; } - @Override - public String protocol() { - return CALLBACK_EVENT_HTTP; - } - @Override public CompletableFuture> dispatch(Class responseType, M message) { var delegate = (GenericHttpDispatcherDelegate) delegates.get(message.getClass()); if (delegate == null) { - throw new EdcException(format("No %s message dispatcher found for message type %s", protocol(), message.getClass())); + throw new EdcException(format("No %s message dispatcher found for message type %s", CALLBACK_EVENT_HTTP, message.getClass())); } var request = delegate.buildRequest(message); return httpClient.executeAsync(request, emptyList()) diff --git a/extensions/control-plane/callback/callback-http-dispatcher/src/test/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/GenericHttpRemoteDispatcherWrapperExtensionTest.java b/extensions/control-plane/callback/callback-http-dispatcher/src/test/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/GenericHttpRemoteDispatcherWrapperExtensionTest.java index 69b955f537c..1e9f58383b7 100644 --- a/extensions/control-plane/callback/callback-http-dispatcher/src/test/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/GenericHttpRemoteDispatcherWrapperExtensionTest.java +++ b/extensions/control-plane/callback/callback-http-dispatcher/src/test/java/org/eclipse/edc/connector/controlplane/callback/dispatcher/http/GenericHttpRemoteDispatcherWrapperExtensionTest.java @@ -23,10 +23,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentMatcher; import static org.eclipse.edc.connector.controlplane.callback.dispatcher.http.GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP; -import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -50,10 +50,7 @@ void setUp(ServiceExtensionContext context, ObjectFactory factory) { void initialize_shouldRegisterBothDispatcher(ServiceExtensionContext context) { extension.initialize(context); - verify(registry).register(argThat(dispatcher(CALLBACK_EVENT_HTTP))); + verify(registry).register(eq(CALLBACK_EVENT_HTTP), isA(GenericHttpRemoteDispatcherImpl.class)); } - private ArgumentMatcher dispatcher(String scheme) { - return dispatcher -> dispatcher.protocol().equals(scheme); - } } diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/message/RemoteMessageDispatcher.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/message/RemoteMessageDispatcher.java index bffa50f42a4..2d36ae6b318 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/message/RemoteMessageDispatcher.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/message/RemoteMessageDispatcher.java @@ -24,11 +24,7 @@ */ public interface RemoteMessageDispatcher { - /** - * Return the protocol this dispatcher uses. - */ - String protocol(); - + /** * Binds and sends the message. * diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/message/RemoteMessageDispatcherRegistry.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/message/RemoteMessageDispatcherRegistry.java index f6db8d7c876..5f1218a4517 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/message/RemoteMessageDispatcherRegistry.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/message/RemoteMessageDispatcherRegistry.java @@ -31,7 +31,7 @@ public interface RemoteMessageDispatcherRegistry { /** * Registers a dispatcher. */ - void register(RemoteMessageDispatcher dispatcher); + void register(String protocol, RemoteMessageDispatcher dispatcher); /** * Sends the message. diff --git a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/callback/CallbackProtocolResolver.java b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/callback/CallbackProtocolResolver.java index 86024453c05..f99523ceec0 100644 --- a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/callback/CallbackProtocolResolver.java +++ b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/callback/CallbackProtocolResolver.java @@ -19,7 +19,7 @@ /** * The resolver translate the scheme part {@link CallbackAddress#getUri()} to an internal - * naming of {@link RemoteMessageDispatcher#protocol()} ()} + * naming of {@link RemoteMessageDispatcher} */ @FunctionalInterface public interface CallbackProtocolResolver { diff --git a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/callback/CallbackProtocolResolverRegistry.java b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/callback/CallbackProtocolResolverRegistry.java index f55b35cf213..338a9bf6c0f 100644 --- a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/callback/CallbackProtocolResolverRegistry.java +++ b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/controlplane/services/spi/callback/CallbackProtocolResolverRegistry.java @@ -20,7 +20,7 @@ /** * Registry for {@link CallbackProtocolResolver} resolvers. The registry resolves the scheme part {@link CallbackAddress#getUri()} to an internal - * naming of {@link RemoteMessageDispatcher#protocol()} + * naming of {@link RemoteMessageDispatcher} */ @ExtensionPoint public interface CallbackProtocolResolverRegistry {