Skip to content

Commit

Permalink
Merge pull request #1142 from chosh31/revert-1136-offset-manager-set-…
Browse files Browse the repository at this point in the history
…default-offset-check-auto-commit

Revert "Offset manager setDefaultOffset check if autoCommit is enabled"
  • Loading branch information
Nevon authored Nov 22, 2021
2 parents ca0454d + d395716 commit 812e055
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 61 deletions.
52 changes: 0 additions & 52 deletions src/consumer/__tests__/seek.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ describe('Consumer', () => {
})

describe('when seek offset', () => {
let admin

beforeEach(() => {
admin = createAdmin({ logger: newLogger(), cluster })
})

afterEach(async () => {
admin && (await admin.disconnect())
})

it('throws an error if the topic is invalid', () => {
expect(() => consumer.seek({ topic: null })).toThrow(
KafkaJSNonRetriableError,
Expand Down Expand Up @@ -173,13 +163,6 @@ describe('Consumer', () => {
message: expect.objectContaining({ offset: '0' }),
},
])

await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([
expect.objectContaining({
partition: 0,
offset: '1',
}),
])
})

describe('When "autoCommit" is false', () => {
Expand Down Expand Up @@ -242,41 +225,6 @@ describe('Consumer', () => {
},
])
})

it('recovers from offset out of range', async () => {
await consumer.connect()
await producer.connect()

const key1 = secureRandom()
const message1 = { key: `key-${key1}`, value: `value-${key1}` }

await producer.send({ acks: 1, topic: topicName, messages: [message1] })
await consumer.subscribe({ topic: topicName, fromBeginning: true })

const messagesConsumed = []
consumer.run({
autoCommit: false,
eachMessage: async event => messagesConsumed.push(event),
})
consumer.seek({ topic: topicName, partition: 0, offset: 100 })

await waitForConsumerToJoinGroup(consumer)

await expect(waitForMessages(messagesConsumed, { number: 1 })).resolves.toEqual([
{
topic: topicName,
partition: 0,
message: expect.objectContaining({ offset: '0' }),
},
])

await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([
expect.objectContaining({
partition: 0,
offset: '-1',
}),
])
})
})
})
})
9 changes: 0 additions & 9 deletions src/consumer/offsetManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,6 @@ module.exports = class OffsetManager {
const defaultOffset = this.cluster.defaultOffset(this.topicConfigurations[topic])
const coordinator = await this.getCoordinator()

if (!this.autoCommit) {
this.resolveOffset({
topic,
partition,
offset: defaultOffset,
})
return
}

await coordinator.offsetCommit({
groupId,
memberId,
Expand Down

0 comments on commit 812e055

Please sign in to comment.