Skip to content

Commit

Permalink
Workaround for missing offsets in Azure's OffsetFetchResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Nov 22, 2019
2 parents e63e271 + ed4bdee commit 355d120
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,23 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse,
return nil, err
}

// In new consumer group / partition pairs, Microsoft's Azure Event Hubs
// communicates the "no error / new offset" state by omitting the requested
// entries from the OffsetFetchResponse (in contrast to other implementations
// which indicate this by returning an explicit offset of -1). To handle this
// case, we check all entries in the request and add an offset to the response
// table for any that are missing.
for topic, partitions := range request.partitions {
if response.Blocks[topic] == nil {
response.Blocks[topic] = make(map[int32]*OffsetFetchResponseBlock)
}
for _, p := range partitions {
if response.Blocks[topic][p] == nil {
response.Blocks[topic][p] = &OffsetFetchResponseBlock{Offset: -1}
}
}
}

return response, nil
}

Expand Down

0 comments on commit 355d120

Please sign in to comment.