Skip to content

Commit

Permalink
WIP Add iteratorAt to EnableEventSource annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
estervilaseca8097 committed May 27, 2019
1 parent 884f8be commit ad92f9f
Show file tree
Hide file tree
Showing 20 changed files with 140 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ public static String beanNameForJournal(final String channelName) {
return LOWER_HYPHEN.to(LOWER_CAMEL, channelName) + "Journal";
}

public static String beanNameForIteratorAt(final String iteratorAt) {
return LOWER_HYPHEN.to(LOWER_CAMEL, iteratorAt) + "IteratorAt";
}

}

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.otto.synapse.annotation;

import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.eventsource.EventSource;
import de.otto.synapse.eventsource.EventSourceBuilder;
Expand Down Expand Up @@ -80,4 +81,9 @@
*/
String messageLogReceiverEndpoint() default "";

/**
*
*/
StartFrom iteratorAt() default StartFrom.HORIZON;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
import java.util.Objects;

import static com.google.common.base.Strings.emptyToNull;
import static de.otto.synapse.annotation.BeanNameHelper.beanNameForEventSource;
import static de.otto.synapse.annotation.BeanNameHelper.beanNameForMessageLogReceiverEndpoint;
import static de.otto.synapse.annotation.BeanNameHelper.*;
import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.beans.factory.support.AbstractBeanDefinition.DEPENDENCY_CHECK_ALL;
Expand Down Expand Up @@ -70,16 +69,22 @@ private void registerMultipleEventSources(final BeanDefinitionRegistry registry,
final String eventSourceBeanName = Objects.toString(
emptyToNull(annotationAttributes.getString("name")),
beanNameForEventSource(channelName));

final String messageLogBeanName = Objects.toString(
emptyToNull(annotationAttributes.getString("messageLogReceiverEndpoint")),
beanNameForMessageLogReceiverEndpoint(channelName));

final String iteratorAt = Objects.toString(
emptyToNull(annotationAttributes.getEnum("iteratorAt").toString()),
beanNameForIteratorAt(channelName));

if (!registry.containsBeanDefinition(messageLogBeanName)) {
registerMessageLogBeanDefinition(registry, messageLogBeanName, channelName);
registerMessageLogBeanDefinition(registry, messageLogBeanName, channelName, iteratorAt);
} else {
throw new BeanCreationException(messageLogBeanName, format("MessageLogReceiverEndpoint %s is already registered.", messageLogBeanName));
}
if (!registry.containsBeanDefinition(eventSourceBeanName)) {
registerEventSourceBeanDefinition(registry, eventSourceBeanName, messageLogBeanName, channelName);
registerEventSourceBeanDefinition(registry, eventSourceBeanName, messageLogBeanName, channelName, iteratorAt);
} else {
throw new BeanCreationException(eventSourceBeanName, format("EventSource %s is already registered.", eventSourceBeanName));
}
Expand All @@ -101,13 +106,17 @@ private void registerSingleEventSource(final BeanDefinitionRegistry registry,
emptyToNull(eventSourceAttr.getFirst("messageLogReceiverEndpoint").toString()),
beanNameForMessageLogReceiverEndpoint(channelName));

final String iteratorAt = Objects.toString(
emptyToNull(eventSourceAttr.getFirst("iteratorAt").toString()),
beanNameForIteratorAt(channelName));

if (!registry.containsBeanDefinition(messageLogBeanName)) {
registerMessageLogBeanDefinition(registry, messageLogBeanName, channelName);
registerMessageLogBeanDefinition(registry, messageLogBeanName, channelName, iteratorAt);
} else {
throw new BeanCreationException(messageLogBeanName, format("MessageLogReceiverEndpoint %s is already registered.", messageLogBeanName));
}
if (!registry.containsBeanDefinition(eventSourceBeanName)) {
registerEventSourceBeanDefinition(registry, eventSourceBeanName, messageLogBeanName, channelName);
registerEventSourceBeanDefinition(registry, eventSourceBeanName, messageLogBeanName, channelName, iteratorAt);
} else {
throw new BeanCreationException(eventSourceBeanName, format("EventSource %s is already registered.", eventSourceBeanName));
}
Expand All @@ -116,13 +125,15 @@ private void registerSingleEventSource(final BeanDefinitionRegistry registry,

private void registerMessageLogBeanDefinition(final BeanDefinitionRegistry registry,
final String beanName,
final String channelName) {
final String channelName,
final String iteratorAt) {


registry.registerBeanDefinition(
beanName,
genericBeanDefinition(DelegateMessageLogReceiverEndpoint.class)
.addConstructorArgValue(channelName)
.addConstructorArgValue(iteratorAt)
.setDependencyCheck(DEPENDENCY_CHECK_ALL)
.getBeanDefinition()
);
Expand All @@ -133,11 +144,13 @@ private void registerMessageLogBeanDefinition(final BeanDefinitionRegistry regis
private void registerEventSourceBeanDefinition(final BeanDefinitionRegistry registry,
final String beanName,
final String messageLogBeanName,
final String channelName) {
final String channelName,
final String iteratorAt) {
registry.registerBeanDefinition(
beanName,
genericBeanDefinition(DelegateEventSource.class)
.addConstructorArgValue(messageLogBeanName)
.addConstructorArgValue(iteratorAt)
.setDependencyCheck(DEPENDENCY_CHECK_ALL)
.getBeanDefinition()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ public enum StartFrom {
/**
* Start consumption of messages at first message at the specified timestamp.
*/
TIMESTAMP
TIMESTAMP,
/**
* Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
*/
LATEST
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.otto.synapse.endpoint;

import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.endpoint.sender.MessageSenderEndpoint;
Expand Down Expand Up @@ -41,6 +42,7 @@ public abstract class AbstractMessageEndpoint implements MessageEndpoint {
private final String channelName;
@Nonnull
private final MessageInterceptorRegistry interceptorRegistry;
private final StartFrom iteratorAt;

/**
* Constructor used to create a new AbstractMessageEndpoint.
Expand All @@ -50,9 +52,18 @@ public abstract class AbstractMessageEndpoint implements MessageEndpoint {
* endpoint.
*/
public AbstractMessageEndpoint(final @Nonnull String channelName,
final @Nonnull String iteratorAt,
final @Nonnull MessageInterceptorRegistry interceptorRegistry) {
this.channelName = requireNonNull(channelName, "ChannelName must not be null");
this.interceptorRegistry = requireNonNull(interceptorRegistry);
this.iteratorAt = StartFrom.valueOf(iteratorAt);
}

AbstractMessageEndpoint(final @Nonnull String channelName,
final @Nonnull MessageInterceptorRegistry interceptorRegistry) {
this.channelName = requireNonNull(channelName, "ChannelName must not be null");
this.interceptorRegistry = requireNonNull(interceptorRegistry);
this.iteratorAt = StartFrom.HORIZON;
}

/**
Expand Down Expand Up @@ -104,4 +115,8 @@ public final TextMessage intercept(final @Nonnull TextMessage message) {
return getInterceptorChain().intercept(message);
}

@Override
public StartFrom getIterator() {
return iteratorAt;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.otto.synapse.endpoint;

import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.endpoint.sender.MessageSenderEndpoint;
Expand Down Expand Up @@ -83,4 +84,11 @@ public interface MessageEndpoint {
*/
@Nullable
TextMessage intercept(@Nonnull TextMessage message);

/**
* Returns the
*
* @return StartFrom
*/
StartFrom getIterator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
public abstract class AbstractMessageLogReceiverEndpoint extends AbstractMessageReceiverEndpoint implements MessageLogReceiverEndpoint {

public AbstractMessageLogReceiverEndpoint(final @Nonnull String channelName,
final @Nonnull String iteratorAt,
final @Nonnull MessageInterceptorRegistry interceptorRegistry,
final @Nullable ApplicationEventPublisher eventPublisher) {
super(channelName, interceptorRegistry, eventPublisher);
super(channelName, iteratorAt, interceptorRegistry, eventPublisher);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public class AbstractMessageReceiverEndpoint extends AbstractMessageEndpoint imp
private final ApplicationEventPublisher eventPublisher;

public AbstractMessageReceiverEndpoint(final @Nonnull String channelName,
final @Nonnull String iteratorAt,
final @Nonnull MessageInterceptorRegistry interceptorRegistry,
final @Nullable ApplicationEventPublisher eventPublisher) {
super(channelName, interceptorRegistry);
super(channelName, iteratorAt, interceptorRegistry);
messageDispatcher = new MessageDispatcher();
this.eventPublisher = eventPublisher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.EndpointType;
Expand All @@ -16,10 +17,13 @@
public class DelegateMessageLogReceiverEndpoint implements MessageLogReceiverEndpoint {

private final MessageLogReceiverEndpoint delegate;
private final StartFrom iteratorAt;

public DelegateMessageLogReceiverEndpoint(final @Nonnull String channelName,
final @Nonnull String iteratorAt,
final @Nonnull MessageLogReceiverEndpointFactory messageLogReceiverEndpointFactory) {
this.delegate = messageLogReceiverEndpointFactory.create(channelName);
this.iteratorAt = StartFrom.valueOf(iteratorAt);
}

@Nonnull
Expand Down Expand Up @@ -68,4 +72,9 @@ public EndpointType getEndpointType() {
public TextMessage intercept(final @Nonnull TextMessage message) {
return delegate.intercept(message);
}

@Override
public StartFrom getIterator() {
return iteratorAt;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.otto.synapse.endpoint.receiver;

import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.EndpointType;
Expand Down Expand Up @@ -69,4 +70,9 @@ public EndpointType getEndpointType() {
public TextMessage intercept(final @Nonnull TextMessage message) {
return delegate.intercept(message);
}

@Override
public StartFrom getIterator() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.otto.synapse.endpoint.sender;

import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.AbstractMessageEndpoint;
import de.otto.synapse.endpoint.EndpointType;
import de.otto.synapse.endpoint.MessageInterceptor;
Expand All @@ -22,7 +23,7 @@
* and {@link de.otto.synapse.endpoint.MessageInterceptor interception}.
*
* <p>
* <img src="http://www.enterpriseintegrationpatterns.com/img/MessageEndpointSolution.gif" alt="Message Endpoint">
* <img src="http://www.enterpriseintegrationpatterns.com/img/MessageEndpointSolution.gif" alt="Message Endpoint">
* </p>
*
* @see <a href="http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageEndpoint.html">EIP: Message Endpoint</a>
Expand All @@ -35,24 +36,32 @@ public abstract class AbstractMessageSenderEndpoint extends AbstractMessageEndpo
/**
* Constructor used to create a new MessageEndpoint.
*
* @param channelName the name of the underlying channel / stream / queue / message log.
* @param channelName the name of the underlying channel / stream / queue / message log.
* @param interceptorRegistry registry used to determine {@link MessageInterceptor message interceptors} for this
* endpoint.
* @param messageTranslator the MessageTranslator used to translate message payloads as expected by the
* {@link de.otto.synapse.consumer.MessageConsumer consumers}.
* @param messageTranslator the MessageTranslator used to translate message payloads as expected by the
* {@link de.otto.synapse.consumer.MessageConsumer consumers}.
*/
public AbstractMessageSenderEndpoint(final @Nonnull String channelName,
final @Nonnull String iteratorAt,
final @Nonnull MessageInterceptorRegistry interceptorRegistry,
final @Nonnull MessageTranslator<TextMessage> messageTranslator) {
super(channelName, interceptorRegistry);
super(channelName, iteratorAt, interceptorRegistry);
this.messageTranslator = messageTranslator;
}

AbstractMessageSenderEndpoint(final @Nonnull String channelName,
final @Nonnull MessageInterceptorRegistry interceptorRegistry,
final @Nonnull MessageTranslator<TextMessage> messageTranslator) {
super(channelName, StartFrom.HORIZON.toString(), interceptorRegistry);
this.messageTranslator = messageTranslator;
}

/**
* Sends a {@link Message} to the message channel.
*
* @param message the message to send
* @param <T> type of the message's payload
* @param <T> type of the message's payload
*/
@Override
public final <T> CompletableFuture<Void> send(@Nonnull final Message<T> message) {
Expand All @@ -70,7 +79,7 @@ public final <T> CompletableFuture<Void> send(@Nonnull final Message<T> message)
* batches are supported by the infrastructure. If not, the messages are send one by one.
*
* @param batch a stream of messages that is sent in batched mode, if supported
* @param <T> the type of the message payload
* @param <T> the type of the message payload
*/
@Override
public final <T> CompletableFuture<Void> sendBatch(@Nonnull final Stream<Message<T>> batch) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.otto.synapse.endpoint.sender;

import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.channel.selector.Selector;
import de.otto.synapse.endpoint.EndpointType;
import de.otto.synapse.endpoint.InterceptorChain;
Expand All @@ -16,7 +17,7 @@
import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;

public class DelegateMessageSenderEndpoint implements MessageSenderEndpoint{
public class DelegateMessageSenderEndpoint implements MessageSenderEndpoint {

private static final Logger LOG = getLogger(DelegateMessageSenderEndpoint.class);
private final MessageSenderEndpoint delegate;
Expand Down Expand Up @@ -66,4 +67,9 @@ public <T> CompletableFuture<Void> send(@Nonnull Message<T> message) {
public <T> CompletableFuture<Void> sendBatch(@Nonnull Stream<Message<T>> batch) {
return delegate.sendBatch(batch);
}

@Override
public StartFrom getIterator() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.otto.synapse.eventsource;

import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.consumer.MessageConsumer;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
Expand Down Expand Up @@ -66,4 +67,8 @@ public MessageLogReceiverEndpoint getMessageLogReceiverEndpoint() {
return messageLog;
}

@Override
public StartFrom getIterator() {
return messageLog.getIterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class DefaultEventSource extends AbstractEventSource {

private final MessageStore messageStore;


public DefaultEventSource(final @Nonnull MessageStore messageStore,
final @Nonnull MessageLogReceiverEndpoint messageLog) {
super(messageLog);
Expand Down
Loading

0 comments on commit ad92f9f

Please sign in to comment.