diff --git a/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisShardIterator.java b/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisShardIterator.java index e8b0d5f2..cf0b8452 100644 --- a/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisShardIterator.java +++ b/synapse-aws-kinesis/src/main/java/de/otto/synapse/endpoint/receiver/kinesis/KinesisShardIterator.java @@ -126,6 +126,10 @@ private GetShardIteratorRequest buildIteratorShardRequest(final ShardPosition sh .shardIteratorType(AT_TIMESTAMP) .timestamp(shardPosition.timestamp()); break; + case LATEST: + shardRequestBuilder + .shardIteratorType(LATEST); + break; } return shardRequestBuilder.build(); } diff --git a/synapse-core/src/main/java/de/otto/synapse/channel/ShardPosition.java b/synapse-core/src/main/java/de/otto/synapse/channel/ShardPosition.java index 631465da..506afad8 100644 --- a/synapse-core/src/main/java/de/otto/synapse/channel/ShardPosition.java +++ b/synapse-core/src/main/java/de/otto/synapse/channel/ShardPosition.java @@ -38,6 +38,13 @@ private ShardPosition(final String shardName, final String position, final Insta this.startFrom = startFrom; } + private ShardPosition(final String shardName) { + this.shardName = requireNonNull(shardName); + this.position = ""; + this.timestamp = null; + this.startFrom = StartFrom.LATEST; + } + @Nonnull public static ShardPosition fromHorizon(final @Nonnull String shardName) { return new ShardPosition(shardName, ""); @@ -61,6 +68,11 @@ public static ShardPosition fromTimestamp(final @Nonnull String shardName, return new ShardPosition(shardName, timestamp); } + @Nonnull + public static ShardPosition fromLatest(final @Nonnull String shardName) { + return new ShardPosition(shardName); + } + @Nonnull @JsonProperty public String shardName() {