Skip to content

Commit

Permalink
Merge pull request #626 from tulios/merge-topic-messages-by-topic
Browse files Browse the repository at this point in the history
Merge TopicMessages by topic in sendBatch
  • Loading branch information
Nevon authored Jan 9, 2020
2 parents 1904481 + 05fe99c commit 0b70749
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 2 deletions.
89 changes: 88 additions & 1 deletion src/producer/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ jest.mock('../retry', () => {
return spy
})

const uuid = require('uuid/v4')
const InstrumentationEventEmitter = require('../instrumentation/emitter')
const createProducer = require('./index')
const createConsumer = require('../consumer')
const {
secureRandom,
connectionOpts,
Expand All @@ -34,20 +36,22 @@ const {
newLogger,
testIfKafka_0_11,
createTopic,
waitForMessages,
} = require('testHelpers')
const createRetrier = require('../retry')

const { KafkaJSNonRetriableError } = require('../errors')

describe('Producer', () => {
let topicName, producer
let topicName, producer, consumer

beforeEach(() => {
topicName = `test-topic-${secureRandom()}`
})

afterEach(async () => {
producer && (await producer.disconnect())
consumer && (await consumer.disconnect())
})

test('throws an error if the topic is invalid', async () => {
Expand Down Expand Up @@ -524,6 +528,89 @@ describe('Producer', () => {
await expect(producer.sendBatch({ acks, topicMessages: [] })).toResolve()
})

test('sendBatch should consolidate topicMessages by topic', async () => {
const cluster = createCluster(
Object.assign(connectionOpts(), {
createPartitioner: createModPartitioner,
})
)

await createTopic({ topic: topicName, partitions: 1 })

const messagesConsumed = []
consumer = createConsumer({
groupId: `test-consumer-${uuid()}`,
cluster: createCluster(),
logger: newLogger(),
})
await consumer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true })
await consumer.run({
eachMessage: async event => {
messagesConsumed.push(event)
},
})

producer = createProducer({ cluster, logger: newLogger(), idempotent })
await producer.connect()

const topicMessages = [
{
topic: topicName,
messages: [{ key: 'key-1', value: 'value-1' }, { key: 'key-2', value: 'value-2' }],
},
{
topic: topicName,
messages: [{ key: 'key-3', value: 'value-3' }],
},
]

const result = await producer.sendBatch({
acks,
topicMessages,
})
expect(result).toEqual([
{
topicName,
errorCode: 0,
offset: '0',
partition: 0,
timestamp: '-1',
},
])

await waitForMessages(messagesConsumed, { number: 3 })
await expect(waitForMessages(messagesConsumed, { number: 3 })).resolves.toEqual([
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from('key-1'),
value: Buffer.from('value-1'),
offset: '0',
}),
}),
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from('key-2'),
value: Buffer.from('value-2'),
offset: '1',
}),
}),
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({
key: Buffer.from('key-3'),
value: Buffer.from('value-3'),
offset: '2',
}),
}),
])
})

testIfKafka_0_11('produce messages for Kafka 0.11', async () => {
const cluster = createCluster(
Object.assign(connectionOpts(), {
Expand Down
14 changes: 13 additions & 1 deletion src/producer/messageProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,25 @@ module.exports = ({ logger, cluster, partitioner, eosManager, idempotent, retrie
}
}

const mergedTopicMessages = topicMessages.reduce((merged, { topic, messages }) => {
const index = merged.findIndex(({ topic: mergedTopic }) => topic === mergedTopic)

if (index === -1) {
merged.push({ topic, messages })
} else {
merged[index].messages = [...merged[index].messages, ...messages]
}

return merged
}, [])

return retrier(async (bail, retryCount, retryTime) => {
try {
return await sendMessages({
acks,
timeout,
compression,
topicMessages,
topicMessages: mergedTopicMessages,
})
} catch (error) {
if (!cluster.isConnected()) {
Expand Down

0 comments on commit 0b70749

Please sign in to comment.