Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent produce() sequence number fix #1050

Merged
merged 11 commits into from
Feb 9, 2022
221 changes: 221 additions & 0 deletions src/producer/__tests__/idempotentProduceMessages.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
jest.setTimeout(20000)

const PromiseAllSettled = require('../../utils/promiseAllSettled')

const {
secureRandom,
newLogger,
createCluster,
createTopic,
waitForMessages,
waitFor,
} = require('testHelpers')
const { KafkaJSError, KafkaJSProtocolError } = require('../../errors')

const createProducer = require('../index')
const createConsumer = require('../../consumer/index')
const { describe } = require('jest-circus')

const arrayUnique = a => [...new Set(a)]

describe('Producer > Idempotent producer', () => {
let producer, consumer, topicName, cluster, messages

beforeAll(async () => {
messages = Array(4)
.fill()
.map((_, i) => {
const value = secureRandom()
return { key: `key-${value}`, value: `${i}` }
})
})

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
cluster = createCluster()
producer = createProducer({
cluster,
logger: newLogger(),
idempotent: true,
})
consumer = createConsumer({
cluster,
groupId: `consumer-group-id-${secureRandom()}`,
maxWaitTimeInMs: 0,
logger: newLogger(),
})
await createTopic({ topic: topicName, partitions: 1 })
await Promise.all([producer.connect(), consumer.connect()])
await consumer.subscribe({ topic: topicName, fromBeginning: true })
})

afterEach(
async () =>
await Promise.all([
producer && (await producer.disconnect()),
consumer && (await consumer.disconnect()),
])
)

it('sequential produce() calls > all messages are written to the partition once, in order', async () => {
const messagesConsumed = []

for (const m of messages) {
await producer.send({ acks: -1, topic: topicName, messages: [m] })
}

await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })

await waitForMessages(messagesConsumed, { number: messages.length })

messagesConsumed.forEach(({ message: { value } }, i) =>
expect(value.toString()).toEqual(`${i}`)
)
})

it('sequential produce() calls > where produce() throws a retriable error, all messages are written to the partition once, in order', async () => {
for (const nodeId of [0, 1, 2]) {
const broker = await cluster.findBroker({ nodeId })

const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce(() => {
throw new KafkaJSError('retriable error')
})
}

const messagesConsumed = []

for (const m of messages) {
await producer.send({ acks: -1, topic: topicName, messages: [m] })
}

await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })

await waitForMessages(messagesConsumed, { number: messages.length })

messagesConsumed.forEach(({ message: { value } }, i) =>
expect(value.toString()).toEqual(`${i}`)
)
})

it('sequential produce() calls > where produce() throws a retriable error after the message is written to the log, all messages are written to the partition once, in order', async () => {
for (const nodeId of [0, 1, 2]) {
const broker = await cluster.findBroker({ nodeId })
const originalCall = broker.produce.bind(broker)
const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce()
brokerProduce.mockImplementationOnce()
brokerProduce.mockImplementationOnce(async (...args) => {
await originalCall(...args)
throw new KafkaJSError('retriable error')
})
}

const messagesConsumed = []

for (const m of messages) {
await producer.send({ acks: -1, topic: topicName, messages: [m] })
}

await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })

await waitForMessages(messagesConsumed, { number: messages.length })

messagesConsumed.forEach(({ message: { value } }, i) =>
expect(value.toString()).toEqual(`${i}`)
)
})

it('concurrent produce() calls > all messages are written to the partition once', async () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't worry about changing this, but just for future reference, you can just wrap these test cases in a describe block to group the tests together, instead of repeating the prefix in each test case:

describe('concurrent produce() calls', () => {
  test('all messages are written to the partition at once')
})

const messagesConsumed = []

await Promise.all(
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
)

await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })

await waitForMessages(messagesConsumed, { number: messages.length })

expect(arrayUnique(messagesConsumed.map(({ message: { value } }) => value))).toHaveLength(
messages.length
)
})

it('concurrent produce() calls > where produce() throws a retriable error on the first call, all subsequent calls throw UNKNOWN_PRODUCER_ID', async () => {
for (const nodeId of [0, 1, 2]) {
const broker = await cluster.findBroker({ nodeId })

const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce(async () => {
await waitFor(() => brokerProduce.mock.calls.length >= messages.length) // for all the other concurrent calls to have completed
throw new KafkaJSError('retriable error')
})
}

const settlements = await PromiseAllSettled(
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
).catch(e => e)

settlements
.filter(({ status }) => status === 'rejected')
.forEach(({ reason }) => {
expect(reason).toBeInstanceOf(KafkaJSProtocolError)
expect(reason.type).toBe('UNKNOWN_PRODUCER_ID')
})

expect(settlements.filter(({ status }) => status === 'fulfilled')).toHaveLength(1)
})

it('concurrent produce() calls > where produce() throws a retriable error on 2nd call, all subsequent calls throw OUT_OF_ORDER_SEQUENCE_NUMBER', async () => {
for (const nodeId of [0, 1, 2]) {
const broker = await cluster.findBroker({ nodeId })

const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce()
brokerProduce.mockImplementationOnce(async () => {
await waitFor(() => brokerProduce.mock.calls.length >= messages.length) // for all the other concurrent calls to have completed
throw new KafkaJSError('retriable error')
})
}

const settlements = await PromiseAllSettled(
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
).catch(e => e)

settlements
.filter(({ status }) => status === 'rejected')
.forEach(({ reason }) => {
expect(reason).toBeInstanceOf(KafkaJSProtocolError)
expect(reason.type).toBe('OUT_OF_ORDER_SEQUENCE_NUMBER')
})

expect(settlements.filter(({ status }) => status === 'fulfilled')).toHaveLength(2)
})

it('concurrent produce() calls > where produce() throws a retriable error after the message is written to the log, all messages are written to the partition once', async () => {
for (const nodeId of [0, 1, 2]) {
const broker = await cluster.findBroker({ nodeId })
const originalCall = broker.produce.bind(broker)
const brokerProduce = jest.spyOn(broker, 'produce')
brokerProduce.mockImplementationOnce(async (...args) => {
await originalCall(...args)
throw new KafkaJSError('retriable error')
})
}

const messagesConsumed = []

await Promise.all(
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
)

await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })

await waitForMessages(messagesConsumed, { number: messages.length })

expect(arrayUnique(messagesConsumed.map(({ message: { value } }) => value))).toHaveLength(
messages.length
)
})
})
1 change: 0 additions & 1 deletion src/producer/createTopicData.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ module.exports = topicDataForBroker => {
topic,
partitions: partitions.map(partition => ({
partition,
firstSequence: sequencePerPartition[partition],
messages: messagesPerPartition[partition],
})),
})
Expand Down
18 changes: 5 additions & 13 deletions src/producer/createTopicData.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const createTopicData = require('./createTopicData')

describe('Producer > createTopicData', () => {
let topic, partitions, messagesPerPartition, sequencePerPartition
let topic, partitions, messagesPerPartition

beforeEach(() => {
topic = 'test-topic'
Expand All @@ -12,25 +12,17 @@ describe('Producer > createTopicData', () => {
2: [{ key: '2' }],
3: [{ key: '3' }, { key: '4' }],
}

sequencePerPartition = {
1: 0,
2: 5,
3: 10,
}
})

test('format data by topic and partition', () => {
const result = createTopicData([
{ topic, partitions, messagesPerPartition, sequencePerPartition },
])
const result = createTopicData([{ topic, partitions, messagesPerPartition }])
expect(result).toEqual([
{
topic,
partitions: [
{ partition: 1, firstSequence: 0, messages: [{ key: '1' }] },
{ partition: 2, firstSequence: 5, messages: [{ key: '2' }] },
{ partition: 3, firstSequence: 10, messages: [{ key: '3' }, { key: '4' }] },
{ partition: 1, messages: [{ key: '1' }] },
{ partition: 2, messages: [{ key: '2' }] },
{ partition: 3, messages: [{ key: '3' }, { key: '4' }] },
],
},
])
Expand Down
50 changes: 27 additions & 23 deletions src/producer/sendMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,12 @@ module.exports = ({ logger, cluster, partitioner, eosManager, retrier }) => {
})

const partitions = keys(messagesPerPartition)
const sequencePerPartition = partitions.reduce((result, partition) => {
result[partition] = eosManager.getSequence(topic, partition)
return result
}, {})

const partitionsPerLeader = cluster.findLeaderForPartitions(topic, partitions)
const leaders = keys(partitionsPerLeader)

topicMetadata.set(topic, {
partitionsPerLeader,
messagesPerPartition,
sequencePerPartition,
})

for (const nodeId of leaders) {
Expand All @@ -78,7 +72,6 @@ module.exports = ({ logger, cluster, partitioner, eosManager, retrier }) => {
.map(([topic, { partitionsPerLeader, messagesPerPartition, sequencePerPartition }]) => ({
topic,
partitions: partitionsPerLeader[broker.nodeId],
sequencePerPartition,
messagesPerPartition,
}))

Expand All @@ -89,27 +82,38 @@ module.exports = ({ logger, cluster, partitioner, eosManager, retrier }) => {
await eosManager.addPartitionsToTransaction(topicData)
}

const response = await broker.produce({
transactionalId: eosManager.isTransactional()
? eosManager.getTransactionalId()
: undefined,
producerId: eosManager.getProducerId(),
producerEpoch: eosManager.getProducerEpoch(),
acks,
timeout,
compression,
topicData,
topicData.forEach(({ topic, partitions }) => {
partitions.forEach(entry => {
entry['firstSequence'] = eosManager.getSequence(topic, entry.partition)
eosManager.updateSequence(topic, entry.partition, entry.messages.length)
})
})

let response
try {
response = await broker.produce({
transactionalId: eosManager.isTransactional()
? eosManager.getTransactionalId()
: undefined,
producerId: eosManager.getProducerId(),
producerEpoch: eosManager.getProducerEpoch(),
acks,
timeout,
compression,
topicData,
})
} catch (e) {
topicData.forEach(({ topic, partitions }) => {
partitions.forEach(entry => {
eosManager.updateSequence(topic, entry.partition, -entry.messages.length)
})
})
throw e
}

const expectResponse = acks !== 0
const formattedResponse = expectResponse ? responseSerializer(response) : []

formattedResponse.forEach(({ topicName, partition }) => {
const increment = topicMetadata.get(topicName).messagesPerPartition[partition].length

eosManager.updateSequence(topicName, partition, increment)
})

responsePerBroker.set(broker, formattedResponse)
} catch (e) {
responsePerBroker.delete(broker)
Expand Down
18 changes: 18 additions & 0 deletions src/utils/promiseAllSettled.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
*
* @param { [Promise<T>] } promises
* @returns {Promise<[{ status: "fulfilled", value: T} | { status: "rejected", reason: Error}]> }
* @template T
*/

function allSettled(promises) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. We should actually bump the requirement on Node to 12 at this point, but that's outside the scope of this PR.

const wrappedPromises = promises.map(p =>
Promise.resolve(p).then(
val => ({ status: 'fulfilled', value: val }),
err => ({ status: 'rejected', reason: err })
)
)
return Promise.all(wrappedPromises)
}

module.exports = allSettled