Skip to content

Commit

Permalink
wip - add iteratorAt in kinesis message log receiver
Browse files Browse the repository at this point in the history
Co-Authored-By: "Aleix Morgadas" <[email protected]>
  • Loading branch information
estervilaseca8097 and "Aleix Morgadas" committed May 27, 2019
1 parent ad92f9f commit 15b847d
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import de.otto.synapse.channel.ChannelDurationBehind;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardResponse;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.consumer.MessageDispatcher;
import de.otto.synapse.endpoint.InterceptorChain;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
Expand Down Expand Up @@ -94,32 +95,67 @@ public void accept(final ShardResponse response) {


public KinesisMessageLogReceiverEndpoint(final String channelName,
final String iteratorAt,
final MessageInterceptorRegistry interceptorRegistry,
final KinesisAsyncClient kinesisClient,
final ExecutorService executorService,
final ApplicationEventPublisher eventPublisher) {
this(channelName, interceptorRegistry, kinesisClient, executorService, eventPublisher, Clock.systemDefaultZone());
this(channelName, iteratorAt, interceptorRegistry, kinesisClient, executorService, eventPublisher, Clock.systemDefaultZone());
}

public KinesisMessageLogReceiverEndpoint(final String channelName,
final MessageInterceptorRegistry interceptorRegistry,
final KinesisAsyncClient kinesisClient,
final ExecutorService executorService,
final ApplicationEventPublisher eventPublisher) {
this(channelName, StartFrom.HORIZON.toString(), interceptorRegistry, kinesisClient, executorService, eventPublisher, Clock.systemDefaultZone());
}

public KinesisMessageLogReceiverEndpoint(final String channelName,
final String iteratorAt,
final MessageInterceptorRegistry interceptorRegistry,
final KinesisAsyncClient kinesisClient,
final ExecutorService executorService,
final ApplicationEventPublisher eventPublisher,
final Clock clock) {
super(channelName, interceptorRegistry, eventPublisher);
super(channelName, iteratorAt, interceptorRegistry, eventPublisher);
this.eventPublisher = eventPublisher;
this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, executorService, clock);
this.interceptorRegistry = interceptorRegistry;
}

public KinesisMessageLogReceiverEndpoint(final String channelName,
final MessageInterceptorRegistry interceptorRegistry,
final KinesisAsyncClient kinesisClient,
final ExecutorService executorService,
final ApplicationEventPublisher eventPublisher,
final Clock clock) {
super(channelName, StartFrom.HORIZON.toString(), interceptorRegistry, eventPublisher);
this.eventPublisher = eventPublisher;
this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, executorService, clock);
this.interceptorRegistry = interceptorRegistry;
}

public KinesisMessageLogReceiverEndpoint(final String channelName,
final String iteratorAt,
final MessageInterceptorRegistry interceptorRegistry,
final KinesisAsyncClient kinesisClient,
final ExecutorService executorService,
final ApplicationEventPublisher eventPublisher,
final Clock clock, final int waitingTimeOnEmptyRecords) {
super(channelName, iteratorAt, interceptorRegistry, eventPublisher);
this.eventPublisher = eventPublisher;
this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, executorService, clock, waitingTimeOnEmptyRecords);
this.interceptorRegistry = interceptorRegistry;
}

public KinesisMessageLogReceiverEndpoint(final String channelName,
final MessageInterceptorRegistry interceptorRegistry,
final KinesisAsyncClient kinesisClient,
final ExecutorService executorService,
final ApplicationEventPublisher eventPublisher,
final Clock clock, final int waitingTimeOnEmptyRecords) {
super(channelName, interceptorRegistry, eventPublisher);
super(channelName, StartFrom.HORIZON.toString(), interceptorRegistry, eventPublisher);
this.eventPublisher = eventPublisher;
this.kinesisMessageLogReader = new KinesisMessageLogReader(channelName, kinesisClient, executorService, clock, waitingTimeOnEmptyRecords);
this.interceptorRegistry = interceptorRegistry;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.otto.synapse.endpoint.receiver.kinesis;

import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpointFactory;
Expand Down Expand Up @@ -41,7 +42,12 @@ public KinesisMessageLogReceiverEndpointFactory(final MessageInterceptorRegistry

@Override
public MessageLogReceiverEndpoint create(@Nonnull String channelName) {
return new KinesisMessageLogReceiverEndpoint(channelName, interceptorRegistry, kinesisClient, executorService, eventPublisher, clock);
return new KinesisMessageLogReceiverEndpoint(channelName, StartFrom.HORIZON.toString(), interceptorRegistry, kinesisClient, executorService, eventPublisher, clock);
}

@Override
public MessageLogReceiverEndpoint create(@Nonnull String channelName, @Nonnull StartFrom iteratorAt) {
return new KinesisMessageLogReceiverEndpoint(channelName, iteratorAt.toString(), interceptorRegistry, kinesisClient, executorService, eventPublisher, clock);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class DelegateMessageLogReceiverEndpoint implements MessageLogReceiverEnd
public DelegateMessageLogReceiverEndpoint(final @Nonnull String channelName,
final @Nonnull String iteratorAt,
final @Nonnull MessageLogReceiverEndpointFactory messageLogReceiverEndpointFactory) {
this.delegate = messageLogReceiverEndpointFactory.create(channelName);
this.delegate = messageLogReceiverEndpointFactory.create(channelName, StartFrom.valueOf(iteratorAt));
this.iteratorAt = StartFrom.valueOf(iteratorAt);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package de.otto.synapse.endpoint.receiver;

import de.otto.synapse.channel.StartFrom;

import javax.annotation.Nonnull;

/*
* A factory used to create {@link MessageLogReceiverEndpoint} instances.
*
*/
@FunctionalInterface
public interface MessageLogReceiverEndpointFactory {

/**
Expand All @@ -17,4 +18,13 @@ public interface MessageLogReceiverEndpointFactory {
*/
MessageLogReceiverEndpoint create(@Nonnull String channelName);

/**
* Creates and returns a {@link MessageLogReceiverEndpoint} for a messaging channel.
*
* @param channelName the name of the channel of the created {@code MessageLogReceiverEndpoint}
* @param iteratorAt position for ShardIterator
* @return MessageLogReceiverEndpoint
*/
MessageLogReceiverEndpoint create(@Nonnull String channelName, @Nonnull StartFrom iteratorAt);

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public AbstractMessageSenderEndpoint(final @Nonnull String channelName,
this.messageTranslator = messageTranslator;
}

AbstractMessageSenderEndpoint(final @Nonnull String channelName,
public AbstractMessageSenderEndpoint(final @Nonnull String channelName,
final @Nonnull MessageInterceptorRegistry interceptorRegistry,
final @Nonnull MessageTranslator<TextMessage> messageTranslator) {
super(channelName, StartFrom.HORIZON.toString(), interceptorRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package de.otto.synapse.endpoint.receiver;

import de.otto.synapse.channel.InMemoryChannels;
import de.otto.synapse.channel.StartFrom;
import de.otto.synapse.eventsource.EventSource;

import javax.annotation.Nonnull;
Expand All @@ -20,7 +21,12 @@ public InMemoryMessageLogReceiverEndpointFactory(final InMemoryChannels inMemory
}

@Override
public MessageLogReceiverEndpoint create(final @Nonnull String channelName) {
public MessageLogReceiverEndpoint create(@Nonnull String channelName) {
return inMemoryChannels.getChannel(channelName);
}

@Override
public MessageLogReceiverEndpoint create(final @Nonnull String channelName, final @Nonnull StartFrom iteratorAt) {
return inMemoryChannels.getChannel(channelName);
}
}

0 comments on commit 15b847d

Please sign in to comment.