Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incomplete data retrieved. #5

Open
bamos opened this issue Dec 27, 2013 · 3 comments
Open

Incomplete data retrieved. #5

bamos opened this issue Dec 27, 2013 · 3 comments

Comments

@bamos
Copy link

bamos commented Dec 27, 2013

Hi Federico,

I've modified your example to add 10 items to Kinesis and then attempt to retrieve them 10 separate times to illustrate a bug I'm seeing in some of my other projects where not all of the records are retrieved:

Do you know if this is expected behavior from Kinesis?
Am I doing something subtly wrong when retrieving the data?

Below, some attempts retrieve all 10 records, but other attempts, such as the last one, read less than 10 records.

stream created
stream active
ACTIVE
true
data stored: shardId-000000000000, 49535162123327736836226361226397834013393393790669029377
data stored: shardId-000000000000, 49535162123327736836226361326022690278750787438945239041
data stored: shardId-000000000000, 49535162123327736836226361368991565008786413418608328705
data stored: shardId-000000000000, 49535162123327736836226362473531453190127716825632342017
data stored: shardId-000000000000, 49535162123327736836226362521162196476315335329660796929
data stored: shardId-000000000000, 49535162123327736836226362486934026452529592727012114433
data stored: shardId-000000000000, 49535162123327736836226362545355462718347236243547357185
data stored: shardId-000000000000, 49535162123327736836226362496631199651551572522497474561
data stored: shardId-000000000000, 49535162123327736836226362425675822156409032816724017153
data stored: shardId-000000000000, 49535162123327736836226363990199997461027351869823909889
Retrieving data, attempt 1
data retrieved
==Record chunk.
Read 0 records.

Retrieving data, attempt 2
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
Read 3 records.

Retrieving data, attempt 3
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362486934026452529592727012114433
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362496631199651551572522497474561
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362521162196476315335329660796929
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362545355462718347236243547357185
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226363990199997461027351869823909889
data: hello
partitionKey: k1
Read 10 records.

Retrieving data, attempt 4
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
Read 5 records.

Retrieving data, attempt 5
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
Read 5 records.

Retrieving data, attempt 6
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
Read 5 records.

Retrieving data, attempt 7
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362486934026452529592727012114433
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362496631199651551572522497474561
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362521162196476315335329660796929
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362545355462718347236243547357185
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226363990199997461027351869823909889
data: hello
partitionKey: k1
Read 10 records.

Retrieving data, attempt 8
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362486934026452529592727012114433
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362496631199651551572522497474561
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362521162196476315335329660796929
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362545355462718347236243547357185
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226363990199997461027351869823909889
data: hello
partitionKey: k1
Read 10 records.

Retrieving data, attempt 9
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362486934026452529592727012114433
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362496631199651551572522497474561
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362521162196476315335329660796929
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362545355462718347236243547357185
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226363990199997461027351869823909889
data: hello
partitionKey: k1
Read 10 records.

Retrieving data, attempt 10
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
Read 3 records.

stream deleted
@cloudify
Copy link
Owner

Mmh this is weird, I haven't seen this issue yet, have you tried using different horizons?
I'm actually not using Kinesis in production yet, so I haven't done any proper testing.
The whole shard iterator thing for fetching the messages is not super super clear in the docs, I may have made a mistake in the fetch logic...

@bamos
Copy link
Author

bamos commented Jan 3, 2014

Hi Federico, I've found a solution to this. Instead of getting records by only the first shard iterator, an application should continuously use the nextIterator and poll for new records. The following code is working well for me.

    // Initialize the shard iterators.
    val initialShardIteratorsRequest = for {
      shards <- stream.get.shards.list
      initialShardIterator <- Future.sequence(shards.map {
        shard => implicitExecute(shard.iterator)
      })
    } yield initialShardIterator
    var shardIterators = Await.result(initialShardIteratorsRequest,
      30.seconds).asInstanceOf[List[ShardIterator]]

    // Continuously poll for records.
    while (shardIterators != null) {
      val recordChunksRequest = for {
        recordChunkIterator <- Future.sequence(shardIterators.map {
          iterator => implicitExecute(iterator.nextRecords)
        })
      } yield recordChunkIterator
      val recordChunks = Await.result(recordChunksRequest, 30.seconds)
      //println("recordChunks: " + recordChunks.toString)
      val nextShardIterators = new MutableList[ShardIterator]()
      for (recordChunk <- recordChunks) {
        //println("==Record chunk:" + recordChunk.toString)
        for (record <- recordChunk.records) {
          println("sequenceNumber: " + record.sequenceNumber)
          //printData(record.data.array())
          println("partitionKey: " + record.partitionKey)
        }
        nextShardIterators += recordChunk.nextIterator
      }
      shardIterators = nextShardIterators.toList
      Thread.sleep(1000)
    }

@bamos
Copy link
Author

bamos commented Jan 3, 2014

CC @alexanderdean

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants