Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specify the ShardIterator where consumer should start reading from. #52

Open
ghost opened this issue May 16, 2019 · 1 comment
Open

Comments

@ghost
Copy link

ghost commented May 16, 2019

Main affected components: synapse-kinesis and synapse-core.

Abstract

Depending on the use case, it's useful to just start reading at the end of the stream instead of reading all the previous history.

Proposition

EnableEventSource approach

Add a new parameter to EnableEventSource specifing the ShardIterator strategy.

I would suggest starting with two strategies:

  • DEFAULT: Current behaviour.
  • LATEST: ShardIterator starts at latest message.

Implications:

  • All consumers will start at the same position.
@EnableEventSource(name = "kinesis1", channelName = "kinesis-arn", iteratorAt = IteratorPosition.LATEST)

EventSourceConsumer approach

Another way could be adding the parameter to EventSourceConsumer.

Then, we could have something like:

@EventSourceConsumer(
            eventSource = "kinesis1",
            payloadType = BAR.class,
            keyPattern = ".*BAREvent",
	   iteratorAt = IteratorPosition.LATEST
    )
...
@EventSourceConsumer(
            eventSource = "kinesis1",
            payloadType = FOO.class,
            keyPattern = ".*FOOEvent",
	   iteratorAt = IteratorPosition.DEFAULT
    )
...

Even though this option is more customisable, it might be more tedious to implement. It might need to open as many Consumers connections to the same kinesis as EventSourceConsumer are defined. Meanwhile, with the previous option (EnableEventSource approach) it just needs to create one connection per eventSource.

⚠️ Notes and Questions:

  • Naming are not final.

Looking for better naming than used as example. Should the naming be similar to Kinesis or should we adopt a specific naming for synapse?

Wondering in case of support Kafka, synapse should use names that are meaningful for Kinesis and Kafka.

@gsteinacker
Copy link
Member

gsteinacker commented May 28, 2019

As multiple consumers can be registered to a single event-source, the starting position can/should not be added to the @EventSourceConsumer.

Adding the position to the EnableEventSource would also be slightly confusing, I would prefer to:

  • Add a @EnableMessageLogReceiverEndpoint similar to the @EnableEventSource and @EnableMessageQueueReceiverEndpoint. The annotation should then have an optional property to select the start position (horizon, latest)
  • Add a @MessageLogConsumer annotation similar to the @MessageQueueConsumer or @EventSourceConsumer annotation.
  • Add StartFrom.LATEST as another value
  • Extend ShardPosition by ShardPosition.fromLatest()
  • Extend ChannelPosition to support ShardPosition.fromLatest() in addition to the current ShardPosition.fromHorizon()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant