Skip to content

Commit

Permalink
Merge pull request #1136 from chosh31/offset-manager-set-default-offs…
Browse files Browse the repository at this point in the history
…et-check-auto-commit

Offset manager setDefaultOffset check if autoCommit is enabled
  • Loading branch information
Nevon authored Jun 29, 2021
2 parents 338d60d + 075931c commit 9cf3dfb
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
52 changes: 52 additions & 0 deletions src/consumer/__tests__/seek.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ describe('Consumer', () => {
})

describe('when seek offset', () => {
let admin

beforeEach(() => {
admin = createAdmin({ logger: newLogger(), cluster })
})

afterEach(async () => {
admin && (await admin.disconnect())
})

it('throws an error if the topic is invalid', () => {
expect(() => consumer.seek({ topic: null })).toThrow(
KafkaJSNonRetriableError,
Expand Down Expand Up @@ -163,6 +173,13 @@ describe('Consumer', () => {
message: expect.objectContaining({ offset: '0' }),
},
])

await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([
expect.objectContaining({
partition: 0,
offset: '1',
}),
])
})

describe('When "autoCommit" is false', () => {
Expand Down Expand Up @@ -225,6 +242,41 @@ describe('Consumer', () => {
},
])
})

it('recovers from offset out of range', async () => {
await consumer.connect()
await producer.connect()

const key1 = secureRandom()
const message1 = { key: `key-${key1}`, value: `value-${key1}` }

await producer.send({ acks: 1, topic: topicName, messages: [message1] })
await consumer.subscribe({ topic: topicName, fromBeginning: true })

const messagesConsumed = []
consumer.run({
autoCommit: false,
eachMessage: async event => messagesConsumed.push(event),
})
consumer.seek({ topic: topicName, partition: 0, offset: 100 })

await waitForConsumerToJoinGroup(consumer)

await expect(waitForMessages(messagesConsumed, { number: 1 })).resolves.toEqual([
{
topic: topicName,
partition: 0,
message: expect.objectContaining({ offset: '0' }),
},
])

await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([
expect.objectContaining({
partition: 0,
offset: '-1',
}),
])
})
})
})
})
9 changes: 9 additions & 0 deletions src/consumer/offsetManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ module.exports = class OffsetManager {
const defaultOffset = this.cluster.defaultOffset(this.topicConfigurations[topic])
const coordinator = await this.getCoordinator()

if (!this.autoCommit) {
this.resolveOffset({
topic,
partition,
offset: defaultOffset,
})
return
}

await coordinator.offsetCommit({
groupId,
memberId,
Expand Down

0 comments on commit 9cf3dfb

Please sign in to comment.