From 15b847d2287845fcdc8081091bb4df472e2157cc Mon Sep 17 00:00:00 2001 From: Ester Marti Date: Mon, 27 May 2019 16:21:50 +0200 Subject: [PATCH] wip - add iteratorAt in kinesis message log receiver Co-Authored-By: "Aleix Morgadas" --- .../KinesisMessageLogReceiverEndpoint.java | 42 +++++++++++++++++-- ...esisMessageLogReceiverEndpointFactory.java | 8 +++- .../DelegateMessageLogReceiverEndpoint.java | 2 +- .../MessageLogReceiverEndpointFactory.java | 12 +++++- .../sender/AbstractMessageSenderEndpoint.java | 2 +- ...moryMessageLogReceiverEndpointFactory.java | 8 +++- 6 files changed, 66 insertions(+), 8 deletions(-) diff --git a/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReceiverEndpoint.java b/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReceiverEndpoint.java index 5e608e4d..6f5fec2f 100644 --- a/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReceiverEndpoint.java +++ b/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReceiverEndpoint.java @@ -5,6 +5,7 @@ import de.otto.synapse.channel.ChannelDurationBehind; import de.otto.synapse.channel.ChannelPosition; import de.otto.synapse.channel.ShardResponse; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.consumer.MessageDispatcher; import de.otto.synapse.endpoint.InterceptorChain; import de.otto.synapse.endpoint.MessageInterceptorRegistry; @@ -94,32 +95,67 @@ public void accept(final ShardResponse response) { public KinesisMessageLogReceiverEndpoint(final String channelName, + final String iteratorAt, final MessageInterceptorRegistry interceptorRegistry, final KinesisAsyncClient kinesisClient, final ExecutorService executorService, final ApplicationEventPublisher eventPublisher) { - this(channelName, interceptorRegistry, kinesisClient, executorService, eventPublisher, Clock.systemDefaultZone()); + this(channelName, iteratorAt, interceptorRegistry, kinesisClient, executorService, eventPublisher, Clock.systemDefaultZone()); } public KinesisMessageLogReceiverEndpoint(final String channelName, + final MessageInterceptorRegistry interceptorRegistry, + final KinesisAsyncClient kinesisClient, + final ExecutorService executorService, + final ApplicationEventPublisher eventPublisher) { + this(channelName, StartFrom.HORIZON.toString(), interceptorRegistry, kinesisClient, executorService, eventPublisher, Clock.systemDefaultZone()); + } + + public KinesisMessageLogReceiverEndpoint(final String channelName, + final String iteratorAt, final MessageInterceptorRegistry interceptorRegistry, final KinesisAsyncClient kinesisClient, final ExecutorService executorService, final ApplicationEventPublisher eventPublisher, final Clock clock) { - super(channelName, interceptorRegistry, eventPublisher); + super(channelName, iteratorAt, interceptorRegistry, eventPublisher); this.eventPublisher = eventPublisher; this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, executorService, clock); this.interceptorRegistry = interceptorRegistry; } + public KinesisMessageLogReceiverEndpoint(final String channelName, + final MessageInterceptorRegistry interceptorRegistry, + final KinesisAsyncClient kinesisClient, + final ExecutorService executorService, + final ApplicationEventPublisher eventPublisher, + final Clock clock) { + super(channelName, StartFrom.HORIZON.toString(), interceptorRegistry, eventPublisher); + this.eventPublisher = eventPublisher; + this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, executorService, clock); + this.interceptorRegistry = interceptorRegistry; + } + + public KinesisMessageLogReceiverEndpoint(final String channelName, + final String iteratorAt, + final MessageInterceptorRegistry interceptorRegistry, + final KinesisAsyncClient kinesisClient, + final ExecutorService executorService, + final ApplicationEventPublisher eventPublisher, + final Clock clock, final int waitingTimeOnEmptyRecords) { + super(channelName, iteratorAt, interceptorRegistry, eventPublisher); + this.eventPublisher = eventPublisher; + this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, executorService, clock, waitingTimeOnEmptyRecords); + this.interceptorRegistry = interceptorRegistry; + } + public KinesisMessageLogReceiverEndpoint(final String channelName, final MessageInterceptorRegistry interceptorRegistry, final KinesisAsyncClient kinesisClient, final ExecutorService executorService, final ApplicationEventPublisher eventPublisher, final Clock clock, final int waitingTimeOnEmptyRecords) { - super(channelName, interceptorRegistry, eventPublisher); + super(channelName, StartFrom.HORIZON.toString(), interceptorRegistry, eventPublisher); this.eventPublisher = eventPublisher; this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, executorService, clock, waitingTimeOnEmptyRecords); this.interceptorRegistry = interceptorRegistry; diff --git a/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReceiverEndpointFactory.java b/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReceiverEndpointFactory.java index 15d6ac92..d943213e 100644 --- a/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReceiverEndpointFactory.java +++ b/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisMessageLogReceiverEndpointFactory.java @@ -1,5 +1,6 @@ package de.otto.synapse.endpoint.receiver.kinesis; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.endpoint.MessageInterceptorRegistry; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint; import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpointFactory; @@ -41,7 +42,12 @@ public KinesisMessageLogReceiverEndpointFactory(final MessageInterceptorRegistry @Override public MessageLogReceiverEndpoint create(@Nonnull String channelName) { - return new KinesisMessageLogReceiverEndpoint(channelName, interceptorRegistry, kinesisClient, executorService, eventPublisher, clock); + return new KinesisMessageLogReceiverEndpoint(channelName, StartFrom.HORIZON.toString(), interceptorRegistry, kinesisClient, executorService, eventPublisher, clock); + } + + @Override + public MessageLogReceiverEndpoint create(@Nonnull String channelName, @Nonnull StartFrom iteratorAt) { + return new KinesisMessageLogReceiverEndpoint(channelName, iteratorAt.toString(), interceptorRegistry, kinesisClient, executorService, eventPublisher, clock); } } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageLogReceiverEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageLogReceiverEndpoint.java index a9d96647..c7c3b425 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageLogReceiverEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/DelegateMessageLogReceiverEndpoint.java @@ -22,7 +22,7 @@ public class DelegateMessageLogReceiverEndpoint implements MessageLogReceiverEnd public DelegateMessageLogReceiverEndpoint(final @Nonnull String channelName, final @Nonnull String iteratorAt, final @Nonnull MessageLogReceiverEndpointFactory messageLogReceiverEndpointFactory) { - this.delegate = messageLogReceiverEndpointFactory.create(channelName); + this.delegate = messageLogReceiverEndpointFactory.create(channelName, StartFrom.valueOf(iteratorAt)); this.iteratorAt = StartFrom.valueOf(iteratorAt); } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/MessageLogReceiverEndpointFactory.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/MessageLogReceiverEndpointFactory.java index 2d3f0949..115e960b 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/MessageLogReceiverEndpointFactory.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/receiver/MessageLogReceiverEndpointFactory.java @@ -1,12 +1,13 @@ package de.otto.synapse.endpoint.receiver; +import de.otto.synapse.channel.StartFrom; + import javax.annotation.Nonnull; /* * A factory used to create {@link MessageLogReceiverEndpoint} instances. * */ -@FunctionalInterface public interface MessageLogReceiverEndpointFactory { /** @@ -17,4 +18,13 @@ public interface MessageLogReceiverEndpointFactory { */ MessageLogReceiverEndpoint create(@Nonnull String channelName); + /** + * Creates and returns a {@link MessageLogReceiverEndpoint} for a messaging channel. + * + * @param channelName the name of the channel of the created {@code MessageLogReceiverEndpoint} + * @param iteratorAt position for ShardIterator + * @return MessageLogReceiverEndpoint + */ + MessageLogReceiverEndpoint create(@Nonnull String channelName, @Nonnull StartFrom iteratorAt); + } diff --git a/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.java b/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.java index b466402a..578d6068 100644 --- a/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.java +++ b/synapse-core/src/main/java/de/otto/synapse/endpoint/sender/AbstractMessageSenderEndpoint.java @@ -50,7 +50,7 @@ public AbstractMessageSenderEndpoint(final @Nonnull String channelName, this.messageTranslator = messageTranslator; } - AbstractMessageSenderEndpoint(final @Nonnull String channelName, + public AbstractMessageSenderEndpoint(final @Nonnull String channelName, final @Nonnull MessageInterceptorRegistry interceptorRegistry, final @Nonnull MessageTranslator messageTranslator) { super(channelName, StartFrom.HORIZON.toString(), interceptorRegistry); diff --git a/synapse-testsupport/src/main/java/de/otto/synapse/endpoint/receiver/InMemoryMessageLogReceiverEndpointFactory.java b/synapse-testsupport/src/main/java/de/otto/synapse/endpoint/receiver/InMemoryMessageLogReceiverEndpointFactory.java index a6c1a5b8..5f5c4ae4 100644 --- a/synapse-testsupport/src/main/java/de/otto/synapse/endpoint/receiver/InMemoryMessageLogReceiverEndpointFactory.java +++ b/synapse-testsupport/src/main/java/de/otto/synapse/endpoint/receiver/InMemoryMessageLogReceiverEndpointFactory.java @@ -1,6 +1,7 @@ package de.otto.synapse.endpoint.receiver; import de.otto.synapse.channel.InMemoryChannels; +import de.otto.synapse.channel.StartFrom; import de.otto.synapse.eventsource.EventSource; import javax.annotation.Nonnull; @@ -20,7 +21,12 @@ public InMemoryMessageLogReceiverEndpointFactory(final InMemoryChannels inMemory } @Override - public MessageLogReceiverEndpoint create(final @Nonnull String channelName) { + public MessageLogReceiverEndpoint create(@Nonnull String channelName) { + return inMemoryChannels.getChannel(channelName); + } + + @Override + public MessageLogReceiverEndpoint create(final @Nonnull String channelName, final @Nonnull StartFrom iteratorAt) { return inMemoryChannels.getChannel(channelName); } }