From ad92f9f2ec96e35ddbf7ee075bcd962a8d945161 Mon Sep 17 00:00:00 2001 From: Ester Marti Date: Mon, 27 May 2019 11:58:19 +0200 Subject: [PATCH] WIP Add iteratorAt to EnableEventSource annotation --- .../synapse/annotation/BeanNameHelper.java | 4 +++ .../synapse/annotation/EnableEventSource.java | 6 ++++ .../annotation/EventSourceBeanRegistrar.java | 29 ++++++++++++++----- .../de/otto/synapse/channel/StartFrom.java | 6 +++- .../endpoint/AbstractMessageEndpoint.java | 15 ++++++++++ .../synapse/endpoint/MessageEndpoint.java | 8 +++++ .../AbstractMessageLogReceiverEndpoint.java | 3 +- .../AbstractMessageReceiverEndpoint.java | 3 +- .../DelegateMessageLogReceiverEndpoint.java | 9 ++++++ .../DelegateMessageQueueReceiverEndpoint.java | 6 ++++ .../sender/AbstractMessageSenderEndpoint.java | 23 ++++++++++----- .../sender/DelegateMessageSenderEndpoint.java | 8 ++++- .../eventsource/AbstractEventSource.java | 5 ++++ .../eventsource/DefaultEventSource.java | 1 + .../eventsource/DelegateEventSource.java | 9 ++++++ .../otto/synapse/eventsource/EventSource.java | 3 ++ .../EventSourceBeanRegistrarTest.java | 18 +++++++++++- .../synapse/endpoint/MessageEndpointTest.java | 1 + .../otto/synapse/channel/InMemoryChannel.java | 4 +-- .../sender/InMemoryMessageSender.java | 2 +- 20 files changed, 140 insertions(+), 23 deletions(-) diff --git a/synapse-core/src/main/java/de/otto/synapse/annotation/BeanNameHelper.java b/synapse-core/src/main/java/de/otto/synapse/annotation/BeanNameHelper.java index 883f2a31..def5ecd4 100644 --- a/synapse-core/src/main/java/de/otto/synapse/annotation/BeanNameHelper.java +++ b/synapse-core/src/main/java/de/otto/synapse/annotation/BeanNameHelper.java @@ -25,5 +25,9 @@ public static String beanNameForJournal(final String channelName) { return LOWER_HYPHEN.to(LOWER_CAMEL, channelName) + "Journal"; } + public static String beanNameForIteratorAt(final String iteratorAt) { + return LOWER_HYPHEN.to(LOWER_CAMEL, iteratorAt) + "IteratorAt"; + } + } diff --git a/synapse-core/src/main/java/de/otto/synapse/annotation/EnableEventSource.java b/synapse-core/src/main/java/de/otto/synapse/annotation/EnableEventSource.java index 4613074c..aa5f7cdb 100644 --- a/synapse-core/src/main/java/de/otto/synapse/annotation/EnableEventSource.java +++ b/synapse-core/src/main/java/de/otto/synapse/annotation/EnableEventSource.java @@ -1,5 +1,6 @@ package de.otto.synapse.annotation; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint; import de.otto.synapse.eventsource.EventSource; import de.otto.synapse.eventsource.EventSourceBuilder; @@ -80,4 +81,9 @@ */ String messageLogReceiverEndpoint() default ""; + /** + * + */ + StartFrom iteratorAt() default StartFrom.HORIZON; + } diff --git a/synapse-core/src/main/java/de/otto/synapse/annotation/EventSourceBeanRegistrar.java b/synapse-core/src/main/java/de/otto/synapse/annotation/EventSourceBeanRegistrar.java index 523b51bc..3d73bea8 100644 --- a/synapse-core/src/main/java/de/otto/synapse/annotation/EventSourceBeanRegistrar.java +++ b/synapse-core/src/main/java/de/otto/synapse/annotation/EventSourceBeanRegistrar.java @@ -15,8 +15,7 @@ import java.util.Objects; import static com.google.common.base.Strings.emptyToNull; -import static de.otto.synapse.annotation.BeanNameHelper.beanNameForEventSource; -import static de.otto.synapse.annotation.BeanNameHelper.beanNameForMessageLogReceiverEndpoint; +import static de.otto.synapse.annotation.BeanNameHelper.*; import static java.lang.String.format; import static org.slf4j.LoggerFactory.getLogger; import static org.springframework.beans.factory.support.AbstractBeanDefinition.DEPENDENCY_CHECK_ALL; @@ -70,16 +69,22 @@ private void registerMultipleEventSources(final BeanDefinitionRegistry registry, final String eventSourceBeanName = Objects.toString( emptyToNull(annotationAttributes.getString("name")), beanNameForEventSource(channelName)); + final String messageLogBeanName = Objects.toString( emptyToNull(annotationAttributes.getString("messageLogReceiverEndpoint")), beanNameForMessageLogReceiverEndpoint(channelName)); + + final String iteratorAt = Objects.toString( + emptyToNull(annotationAttributes.getEnum("iteratorAt").toString()), + beanNameForIteratorAt(channelName)); + if (!registry.containsBeanDefinition(messageLogBeanName)) { - registerMessageLogBeanDefinition(registry, messageLogBeanName, channelName); + registerMessageLogBeanDefinition(registry, messageLogBeanName, channelName, iteratorAt); } else { throw new BeanCreationException(messageLogBeanName, format("MessageLogReceiverEndpoint %s is already registered.", messageLogBeanName)); } if (!registry.containsBeanDefinition(eventSourceBeanName)) { - registerEventSourceBeanDefinition(registry, eventSourceBeanName, messageLogBeanName, channelName); + registerEventSourceBeanDefinition(registry, eventSourceBeanName, messageLogBeanName, channelName, iteratorAt); } else { throw new BeanCreationException(eventSourceBeanName, format("EventSource %s is already registered.", eventSourceBeanName)); } @@ -101,13 +106,17 @@ private void registerSingleEventSource(final BeanDefinitionRegistry registry, emptyToNull(eventSourceAttr.getFirst("messageLogReceiverEndpoint").toString()), beanNameForMessageLogReceiverEndpoint(channelName)); + final String iteratorAt = Objects.toString( + emptyToNull(eventSourceAttr.getFirst("iteratorAt").toString()), + beanNameForIteratorAt(channelName)); + if (!registry.containsBeanDefinition(messageLogBeanName)) { - registerMessageLogBeanDefinition(registry, messageLogBeanName, channelName); + registerMessageLogBeanDefinition(registry, messageLogBeanName, channelName, iteratorAt); } else { throw new BeanCreationException(messageLogBeanName, format("MessageLogReceiverEndpoint %s is already registered.", messageLogBeanName)); } if (!registry.containsBeanDefinition(eventSourceBeanName)) { - registerEventSourceBeanDefinition(registry, eventSourceBeanName, messageLogBeanName, channelName); + registerEventSourceBeanDefinition(registry, eventSourceBeanName, messageLogBeanName, channelName, iteratorAt); } else { throw new BeanCreationException(eventSourceBeanName, format("EventSource %s is already registered.", eventSourceBeanName)); } @@ -116,13 +125,15 @@ private void registerSingleEventSource(final BeanDefinitionRegistry registry, private void registerMessageLogBeanDefinition(final BeanDefinitionRegistry registry, final String beanName, - final String channelName) { + final String channelName, + final String iteratorAt) { registry.registerBeanDefinition( beanName, genericBeanDefinition(DelegateMessageLogReceiverEndpoint.class) .addConstructorArgValue(channelName) + .addConstructorArgValue(iteratorAt) .setDependencyCheck(DEPENDENCY_CHECK_ALL) .getBeanDefinition() ); @@ -133,11 +144,13 @@ private void registerMessageLogBeanDefinition(final BeanDefinitionRegistry regis private void registerEventSourceBeanDefinition(final BeanDefinitionRegistry registry, final String beanName, final String messageLogBeanName, - final String channelName) { + final String channelName, + final String iteratorAt) { registry.registerBeanDefinition( beanName, genericBeanDefinition(DelegateEventSource.class) .addConstructorArgValue(messageLogBeanName) + .addConstructorArgValue(iteratorAt) .setDependencyCheck(DEPENDENCY_CHECK_ALL) .getBeanDefinition() ); diff --git a/synapse-core/src/main/java/de/otto/synapse/channel/StartFrom.java b/synapse-core/src/main/java/de/otto/synapse/channel/StartFrom.java index d98a2358..42e375c8 100644 --- a/synapse-core/src/main/java/de/otto/synapse/channel/StartFrom.java +++ b/synapse-core/src/main/java/de/otto/synapse/channel/StartFrom.java @@ -16,5 +16,9 @@ public enum StartFrom { /** * Start consumption of messages at first message at the specified timestamp. */ - TIMESTAMP + TIMESTAMP, + /** + * Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard. + */ + LATEST } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/AbstractMessageEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/AbstractMessageEndpoint.java index f5951239..55460f33 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/AbstractMessageEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/AbstractMessageEndpoint.java @@ -1,5 +1,6 @@ package de.otto.synapse.endpoint; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint; import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint; import de.otto.synapse.endpoint.sender.MessageSenderEndpoint; @@ -41,6 +42,7 @@ public abstract class AbstractMessageEndpoint implements MessageEndpoint { private final String channelName; @Nonnull private final MessageInterceptorRegistry interceptorRegistry; + private final StartFrom iteratorAt; /** * Constructor used to create a new AbstractMessageEndpoint. @@ -50,9 +52,18 @@ public abstract class AbstractMessageEndpoint implements MessageEndpoint { * endpoint. */ public AbstractMessageEndpoint(final @Nonnull String channelName, + final @Nonnull String iteratorAt, final @Nonnull MessageInterceptorRegistry interceptorRegistry) { this.channelName = requireNonNull(channelName, "ChannelName must not be null"); this.interceptorRegistry = requireNonNull(interceptorRegistry); + this.iteratorAt = StartFrom.valueOf(iteratorAt); + } + + AbstractMessageEndpoint(final @Nonnull String channelName, + final @Nonnull MessageInterceptorRegistry interceptorRegistry) { + this.channelName = requireNonNull(channelName, "ChannelName must not be null"); + this.interceptorRegistry = requireNonNull(interceptorRegistry); + this.iteratorAt = StartFrom.HORIZON; } /** @@ -104,4 +115,8 @@ public final TextMessage intercept(final @Nonnull TextMessage message) { return getInterceptorChain().intercept(message); } + @Override + public StartFrom getIterator() { + return iteratorAt; + } } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/MessageEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/MessageEndpoint.java index 35d03a2b..07903e11 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/MessageEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/MessageEndpoint.java @@ -1,5 +1,6 @@ package de.otto.synapse.endpoint; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint; import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint; import de.otto.synapse.endpoint.sender.MessageSenderEndpoint; @@ -83,4 +84,11 @@ public interface MessageEndpoint { */ @Nullable TextMessage intercept(@Nonnull TextMessage message); + + /** + * Returns the + * + * @return StartFrom + */ + StartFrom getIterator(); } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/AbstractMessageLogReceiverEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/AbstractMessageLogReceiverEndpoint.java index 2e9f2c50..594c9ad2 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/AbstractMessageLogReceiverEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/AbstractMessageLogReceiverEndpoint.java @@ -18,9 +18,10 @@ public abstract class AbstractMessageLogReceiverEndpoint extends AbstractMessageReceiverEndpoint implements MessageLogReceiverEndpoint { public AbstractMessageLogReceiverEndpoint(final @Nonnull String channelName, + final @Nonnull String iteratorAt, final @Nonnull MessageInterceptorRegistry interceptorRegistry, final @Nullable ApplicationEventPublisher eventPublisher) { - super(channelName, interceptorRegistry, eventPublisher); + super(channelName, iteratorAt, interceptorRegistry, eventPublisher); } } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/AbstractMessageReceiverEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/AbstractMessageReceiverEndpoint.java index 00d2cd94..b6aa21c8 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/AbstractMessageReceiverEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/AbstractMessageReceiverEndpoint.java @@ -28,9 +28,10 @@ public class AbstractMessageReceiverEndpoint extends AbstractMessageEndpoint imp private final ApplicationEventPublisher eventPublisher; public AbstractMessageReceiverEndpoint(final @Nonnull String channelName, + final @Nonnull String iteratorAt, final @Nonnull MessageInterceptorRegistry interceptorRegistry, final @Nullable ApplicationEventPublisher eventPublisher) { - super(channelName, interceptorRegistry); + super(channelName, iteratorAt, interceptorRegistry); messageDispatcher = new MessageDispatcher(); this.eventPublisher = eventPublisher; } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageLogReceiverEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageLogReceiverEndpoint.java index 88f1d80c..a9d96647 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageLogReceiverEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageLogReceiverEndpoint.java @@ -2,6 +2,7 @@ import de.otto.synapse.channel.ChannelPosition; import de.otto.synapse.channel.ShardResponse; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.consumer.MessageConsumer; import de.otto.synapse.consumer.MessageDispatcher; import de.otto.synapse.endpoint.EndpointType; @@ -16,10 +17,13 @@ public class DelegateMessageLogReceiverEndpoint implements MessageLogReceiverEndpoint { private final MessageLogReceiverEndpoint delegate; + private final StartFrom iteratorAt; public DelegateMessageLogReceiverEndpoint(final @Nonnull String channelName, + final @Nonnull String iteratorAt, final @Nonnull MessageLogReceiverEndpointFactory messageLogReceiverEndpointFactory) { this.delegate = messageLogReceiverEndpointFactory.create(channelName); + this.iteratorAt = StartFrom.valueOf(iteratorAt); } @Nonnull @@ -68,4 +72,9 @@ public EndpointType getEndpointType() { public TextMessage intercept(final @Nonnull TextMessage message) { return delegate.intercept(message); } + + @Override + public StartFrom getIterator() { + return iteratorAt; + } } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageQueueReceiverEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageQueueReceiverEndpoint.java index 45a6d4bf..712d6662 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageQueueReceiverEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageQueueReceiverEndpoint.java @@ -1,5 +1,6 @@ package de.otto.synapse.endpoint.receiver; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.consumer.MessageConsumer; import de.otto.synapse.consumer.MessageDispatcher; import de.otto.synapse.endpoint.EndpointType; @@ -69,4 +70,9 @@ public EndpointType getEndpointType() { public TextMessage intercept(final @Nonnull TextMessage message) { return delegate.intercept(message); } + + @Override + public StartFrom getIterator() { + return null; + } } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.java index 992e527e..b466402a 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.java @@ -1,5 +1,6 @@ package de.otto.synapse.endpoint.sender; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.endpoint.AbstractMessageEndpoint; import de.otto.synapse.endpoint.EndpointType; import de.otto.synapse.endpoint.MessageInterceptor; @@ -22,7 +23,7 @@ * and {@link de.otto.synapse.endpoint.MessageInterceptor interception}. * *

- * Message Endpoint + * Message Endpoint *

* * @see EIP: Message Endpoint @@ -35,16 +36,24 @@ public abstract class AbstractMessageSenderEndpoint extends AbstractMessageEndpo /** * Constructor used to create a new MessageEndpoint. * - * @param channelName the name of the underlying channel / stream / queue / message log. + * @param channelName the name of the underlying channel / stream / queue / message log. * @param interceptorRegistry registry used to determine {@link MessageInterceptor message interceptors} for this * endpoint. - * @param messageTranslator the MessageTranslator used to translate message payloads as expected by the - * {@link de.otto.synapse.consumer.MessageConsumer consumers}. + * @param messageTranslator the MessageTranslator used to translate message payloads as expected by the + * {@link de.otto.synapse.consumer.MessageConsumer consumers}. */ public AbstractMessageSenderEndpoint(final @Nonnull String channelName, + final @Nonnull String iteratorAt, final @Nonnull MessageInterceptorRegistry interceptorRegistry, final @Nonnull MessageTranslator messageTranslator) { - super(channelName, interceptorRegistry); + super(channelName, iteratorAt, interceptorRegistry); + this.messageTranslator = messageTranslator; + } + + AbstractMessageSenderEndpoint(final @Nonnull String channelName, + final @Nonnull MessageInterceptorRegistry interceptorRegistry, + final @Nonnull MessageTranslator messageTranslator) { + super(channelName, StartFrom.HORIZON.toString(), interceptorRegistry); this.messageTranslator = messageTranslator; } @@ -52,7 +61,7 @@ public AbstractMessageSenderEndpoint(final @Nonnull String channelName, * Sends a {@link Message} to the message channel. * * @param message the message to send - * @param type of the message's payload + * @param type of the message's payload */ @Override public final CompletableFuture send(@Nonnull final Message message) { @@ -70,7 +79,7 @@ public final CompletableFuture send(@Nonnull final Message message) * batches are supported by the infrastructure. If not, the messages are send one by one. * * @param batch a stream of messages that is sent in batched mode, if supported - * @param the type of the message payload + * @param the type of the message payload */ @Override public final CompletableFuture sendBatch(@Nonnull final Stream> batch) { diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/DelegateMessageSenderEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/DelegateMessageSenderEndpoint.java index 0b7cdd1f..070f818f 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/DelegateMessageSenderEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/DelegateMessageSenderEndpoint.java @@ -1,5 +1,6 @@ package de.otto.synapse.endpoint.sender; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.channel.selector.Selector; import de.otto.synapse.endpoint.EndpointType; import de.otto.synapse.endpoint.InterceptorChain; @@ -16,7 +17,7 @@ import static java.lang.String.format; import static org.slf4j.LoggerFactory.getLogger; -public class DelegateMessageSenderEndpoint implements MessageSenderEndpoint{ +public class DelegateMessageSenderEndpoint implements MessageSenderEndpoint { private static final Logger LOG = getLogger(DelegateMessageSenderEndpoint.class); private final MessageSenderEndpoint delegate; @@ -66,4 +67,9 @@ public CompletableFuture send(@Nonnull Message message) { public CompletableFuture sendBatch(@Nonnull Stream> batch) { return delegate.sendBatch(batch); } + + @Override + public StartFrom getIterator() { + return null; + } } diff --git a/synapse-core/src/main/java/de/otto/synapse/eventsource/AbstractEventSource.java b/synapse-core/src/main/java/de/otto/synapse/eventsource/AbstractEventSource.java index 1578b14c..5c9a6344 100644 --- a/synapse-core/src/main/java/de/otto/synapse/eventsource/AbstractEventSource.java +++ b/synapse-core/src/main/java/de/otto/synapse/eventsource/AbstractEventSource.java @@ -1,5 +1,6 @@ package de.otto.synapse.eventsource; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.consumer.MessageConsumer; import de.otto.synapse.consumer.MessageDispatcher; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint; @@ -66,4 +67,8 @@ public MessageLogReceiverEndpoint getMessageLogReceiverEndpoint() { return messageLog; } + @Override + public StartFrom getIterator() { + return messageLog.getIterator(); + } } diff --git a/synapse-core/src/main/java/de/otto/synapse/eventsource/DefaultEventSource.java b/synapse-core/src/main/java/de/otto/synapse/eventsource/DefaultEventSource.java index de853451..81f027f2 100644 --- a/synapse-core/src/main/java/de/otto/synapse/eventsource/DefaultEventSource.java +++ b/synapse-core/src/main/java/de/otto/synapse/eventsource/DefaultEventSource.java @@ -25,6 +25,7 @@ public class DefaultEventSource extends AbstractEventSource { private final MessageStore messageStore; + public DefaultEventSource(final @Nonnull MessageStore messageStore, final @Nonnull MessageLogReceiverEndpoint messageLog) { super(messageLog); diff --git a/synapse-core/src/main/java/de/otto/synapse/eventsource/DelegateEventSource.java b/synapse-core/src/main/java/de/otto/synapse/eventsource/DelegateEventSource.java index feb45286..01328c60 100644 --- a/synapse-core/src/main/java/de/otto/synapse/eventsource/DelegateEventSource.java +++ b/synapse-core/src/main/java/de/otto/synapse/eventsource/DelegateEventSource.java @@ -2,6 +2,7 @@ import de.otto.synapse.channel.ChannelPosition; import de.otto.synapse.channel.ShardResponse; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.consumer.MessageConsumer; import de.otto.synapse.consumer.MessageDispatcher; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint; @@ -15,12 +16,15 @@ public class DelegateEventSource implements EventSource { private final EventSource delegate; + private final StartFrom iteratorAt; public DelegateEventSource(final String messageLogBeanName, + final String iteratorAt, final EventSourceBuilder eventSourceBuilder, final ApplicationContext applicationContext) { final MessageLogReceiverEndpoint messageLogReceiverEndpoint = applicationContext.getBean(messageLogBeanName, MessageLogReceiverEndpoint.class); this.delegate = eventSourceBuilder.buildEventSource(messageLogReceiverEndpoint); + this.iteratorAt = StartFrom.valueOf(iteratorAt); } public EventSource getDelegate() { @@ -117,6 +121,11 @@ public boolean isStopping() { return delegate.isStopping(); } + @Override + public StartFrom getIterator() { + return iteratorAt; + } + @Override public String toString() { return "DelegateEventSource{" + diff --git a/synapse-core/src/main/java/de/otto/synapse/eventsource/EventSource.java b/synapse-core/src/main/java/de/otto/synapse/eventsource/EventSource.java index 240961d1..43da4fd4 100644 --- a/synapse-core/src/main/java/de/otto/synapse/eventsource/EventSource.java +++ b/synapse-core/src/main/java/de/otto/synapse/eventsource/EventSource.java @@ -2,6 +2,7 @@ import de.otto.synapse.channel.ChannelPosition; import de.otto.synapse.channel.ShardResponse; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.consumer.MessageConsumer; import de.otto.synapse.consumer.MessageDispatcher; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint; @@ -88,4 +89,6 @@ default CompletableFuture consume() { void stop(); boolean isStopping(); + + StartFrom getIterator(); } diff --git a/synapse-core/src/test/java/de/otto/synapse/annotation/EventSourceBeanRegistrarTest.java b/synapse-core/src/test/java/de/otto/synapse/annotation/EventSourceBeanRegistrarTest.java index 9a6db783..84d90df5 100644 --- a/synapse-core/src/test/java/de/otto/synapse/annotation/EventSourceBeanRegistrarTest.java +++ b/synapse-core/src/test/java/de/otto/synapse/annotation/EventSourceBeanRegistrarTest.java @@ -1,7 +1,7 @@ package de.otto.synapse.annotation; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.configuration.InMemoryMessageLogTestConfiguration; -import de.otto.synapse.configuration.SynapseAutoConfiguration; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint; import de.otto.synapse.eventsource.DefaultEventSource; import de.otto.synapse.eventsource.DelegateEventSource; @@ -48,6 +48,10 @@ static class MultiEventSourceTestConfigWithDifferentNames { static class RepeatableMultiEventSourceTestConfig { } + @EnableEventSource(name = "testEventSource", channelName = "test-stream", iteratorAt = StartFrom.LATEST) + static class IteratorAtEventSourceTestConfig { + } + @Test(expected = BeanCreationException.class) public void shouldFailToRegisterMultipleEventSourcesForSameStreamNameWithSameName() { context.register(MultiEventSourceTestConfigWithSameNames.class); @@ -124,4 +128,16 @@ public void shouldRegisterMessageLogReceiverEndpointWithSpecifiedName() { assertThat(receiverEndpoint.getChannelName()).isEqualTo("test-stream"); } + @Test + public void shouldRegisterEventSourceWithIteratorAt() { + context.register(IteratorAtEventSourceTestConfig.class); + context.register(InMemoryMessageLogTestConfiguration.class); + context.refresh(); + + assertThat(context.containsBean("testEventSource")).isTrue(); + final EventSource eventSource = context.getBean("testEventSource", DelegateEventSource.class).getDelegate(); + assertThat(eventSource.getIterator()).isEqualTo(StartFrom.LATEST); + } + + } diff --git a/synapse-core/src/test/java/de/otto/synapse/endpoint/MessageEndpointTest.java b/synapse-core/src/test/java/de/otto/synapse/endpoint/MessageEndpointTest.java index 5c5d31f7..f827216f 100644 --- a/synapse-core/src/test/java/de/otto/synapse/endpoint/MessageEndpointTest.java +++ b/synapse-core/src/test/java/de/otto/synapse/endpoint/MessageEndpointTest.java @@ -1,5 +1,6 @@ package de.otto.synapse.endpoint; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.message.TextMessage; import org.junit.Test; diff --git a/synapse-testsupport/src/main/java/de/otto/synapse/channel/InMemoryChannel.java b/synapse-testsupport/src/main/java/de/otto/synapse/channel/InMemoryChannel.java index 47930b62..1c650f23 100644 --- a/synapse-testsupport/src/main/java/de/otto/synapse/channel/InMemoryChannel.java +++ b/synapse-testsupport/src/main/java/de/otto/synapse/channel/InMemoryChannel.java @@ -43,14 +43,14 @@ public class InMemoryChannel extends AbstractMessageLogReceiverEndpoint implemen public InMemoryChannel(final String channelName, final MessageInterceptorRegistry interceptorRegistry) { - super(channelName, interceptorRegistry, null); + super(channelName, HORIZON.toString(), interceptorRegistry, null); this.eventQueue = synchronizedList(new ArrayList<>()); } public InMemoryChannel(final String channelName, final MessageInterceptorRegistry interceptorRegistry, final ApplicationEventPublisher eventPublisher) { - super(channelName, interceptorRegistry, eventPublisher); + super(channelName, HORIZON.toString(), interceptorRegistry, eventPublisher); this.eventQueue = synchronizedList(new ArrayList<>()); } diff --git a/synapse-testsupport/src/main/java/de/otto/synapse/endpoint/sender/InMemoryMessageSender.java b/synapse-testsupport/src/main/java/de/otto/synapse/endpoint/sender/InMemoryMessageSender.java index 82946325..2fe10a5c 100644 --- a/synapse-testsupport/src/main/java/de/otto/synapse/endpoint/sender/InMemoryMessageSender.java +++ b/synapse-testsupport/src/main/java/de/otto/synapse/endpoint/sender/InMemoryMessageSender.java @@ -17,7 +17,7 @@ public class InMemoryMessageSender extends AbstractMessageSenderEndpoint { public InMemoryMessageSender(final MessageInterceptorRegistry interceptorRegistry, final MessageTranslator messageTranslator, final InMemoryChannel channel) { - super(channel.getChannelName(), interceptorRegistry, messageTranslator); + super(channel.getChannelName(), channel.getIterator().toString(), interceptorRegistry, messageTranslator); this.channel = channel; }