Skip to content

Commit

Permalink
Merge pull request IBM#1006 from SamiHiltunen/fetch-based-on-last-delta
Browse files Browse the repository at this point in the history
partition consumer offset to last offset delta
  • Loading branch information
eapache authored Dec 23, 2017
2 parents 1fcddd9 + d9ef2be commit b1433c2
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 3 deletions.
9 changes: 8 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
var messages []*ConsumerMessage
var incomplete bool
prelude := true
originalOffset := child.offset

for _, rec := range batch.Records {
offset := batch.FirstOffset + rec.OffsetDelta
Expand All @@ -547,9 +548,15 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
}
}

if incomplete || len(messages) == 0 {
if incomplete {
return nil, ErrIncompleteResponse
}

child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1
if child.offset <= originalOffset {
return nil, ErrConsumerOffsetNotAdvanced
}

return messages, nil
}

Expand Down
18 changes: 16 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,12 @@ func TestConsumerExtraOffsets(t *testing.T) {
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
var offsetResponseVersion int16
cfg := NewConfig()
cfg.Consumer.Return.Errors = true
if fetchResponse1.Version >= 4 {
cfg.Version = V0_11_0_0
offsetResponseVersion = 1
Expand Down Expand Up @@ -426,8 +428,19 @@ func TestConsumerExtraOffsets(t *testing.T) {

// Then: messages with offsets 1 and 2 are not returned even though they
// are present in the response.
assertMessageOffset(t, <-consumer.Messages(), 3)
assertMessageOffset(t, <-consumer.Messages(), 4)
select {
case msg := <-consumer.Messages():
assertMessageOffset(t, msg, 3)
case err := <-consumer.Errors():
t.Fatal(err)
}

select {
case msg := <-consumer.Messages():
assertMessageOffset(t, msg, 4)
case err := <-consumer.Errors():
t.Fatal(err)
}

safeClose(t, consumer)
safeClose(t, master)
Expand Down Expand Up @@ -490,6 +503,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
var offsetResponseVersion int16
Expand Down
4 changes: 4 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process
// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

// ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
// a RecordBatch.
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
type PacketEncodingError struct {
Expand Down
10 changes: 10 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
batch.addRecord(rec)
}

func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
frb := r.getOrCreateBlock(topic, partition)
batch := frb.Records.recordBatch
if batch == nil {
batch = &RecordBatch{Version: 2}
frb.Records = newDefaultRecords(batch)
}
batch.LastOffsetDelta = offset
}

func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
frb := r.getOrCreateBlock(topic, partition)
frb.LastStableOffset = offset
Expand Down

0 comments on commit b1433c2

Please sign in to comment.