diff --git a/src/consumer/__tests__/seek.spec.js b/src/consumer/__tests__/seek.spec.js index 25c619f9d..46487a9fe 100644 --- a/src/consumer/__tests__/seek.spec.js +++ b/src/consumer/__tests__/seek.spec.js @@ -42,6 +42,16 @@ describe('Consumer', () => { }) describe('when seek offset', () => { + let admin + + beforeEach(() => { + admin = createAdmin({ logger: newLogger(), cluster }) + }) + + afterEach(async () => { + admin && (await admin.disconnect()) + }) + it('throws an error if the topic is invalid', () => { expect(() => consumer.seek({ topic: null })).toThrow( KafkaJSNonRetriableError, @@ -163,6 +173,13 @@ describe('Consumer', () => { message: expect.objectContaining({ offset: '0' }), }, ]) + + await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([ + expect.objectContaining({ + partition: 0, + offset: '1', + }), + ]) }) describe('When "autoCommit" is false', () => { @@ -225,6 +242,41 @@ describe('Consumer', () => { }, ]) }) + + it('recovers from offset out of range', async () => { + await consumer.connect() + await producer.connect() + + const key1 = secureRandom() + const message1 = { key: `key-${key1}`, value: `value-${key1}` } + + await producer.send({ acks: 1, topic: topicName, messages: [message1] }) + await consumer.subscribe({ topic: topicName, fromBeginning: true }) + + const messagesConsumed = [] + consumer.run({ + autoCommit: false, + eachMessage: async event => messagesConsumed.push(event), + }) + consumer.seek({ topic: topicName, partition: 0, offset: 100 }) + + await waitForConsumerToJoinGroup(consumer) + + await expect(waitForMessages(messagesConsumed, { number: 1 })).resolves.toEqual([ + { + topic: topicName, + partition: 0, + message: expect.objectContaining({ offset: '0' }), + }, + ]) + + await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([ + expect.objectContaining({ + partition: 0, + offset: '-1', + }), + ]) + }) }) }) }) diff --git a/src/consumer/offsetManager/index.js b/src/consumer/offsetManager/index.js index 06c49c349..5a1b9f68b 100644 --- a/src/consumer/offsetManager/index.js +++ b/src/consumer/offsetManager/index.js @@ -143,6 +143,15 @@ module.exports = class OffsetManager { const defaultOffset = this.cluster.defaultOffset(this.topicConfigurations[topic]) const coordinator = await this.getCoordinator() + if (!this.autoCommit) { + this.resolveOffset({ + topic, + partition, + offset: defaultOffset, + }) + return + } + await coordinator.offsetCommit({ groupId, memberId,