diff --git a/docs/Consuming.md b/docs/Consuming.md index 558b25463..583a8e16e 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -42,7 +42,7 @@ The `eachMessage` handler provides a convenient and easy to use API, feeding you ```javascript await consumer.run({ - eachMessage: async ({ topic, partition, message, heartbeat }) => { + eachMessage: async ({ topic, partition, message, heartbeat, pause }) => { console.log({ key: message.key.toString(), value: message.value.toString(), @@ -53,10 +53,11 @@ await consumer.run({ ``` Be aware that the `eachMessage` handler should not block for longer than the configured [session timeout](#options) or else the consumer will be removed from the group. If your workload involves very slow processing times for individual messages then you should either increase the session timeout or make periodic use of the `heartbeat` function exposed in the handler payload. +The `pause` function is a convenience for `consumer.pause({ topic, partitions: [partition] })`. It will pause the current topic-partition and returns a function that allows you to resume consuming later. ## eachBatch -Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, and `isStale`. All resolved offsets will be automatically committed after the function is executed. +Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, `isStale`, and `pause`. All resolved offsets will be automatically committed after the function is executed. > Note: Be aware that using `eachBatch` directly is considered a more advanced use case as compared to using `eachMessage`, since you will have to understand how session timeouts and heartbeats are connected. @@ -71,6 +72,7 @@ await consumer.run({ uncommittedOffsets, isRunning, isStale, + pause, }) => { for (let message of batch.messages) { console.log({ @@ -100,6 +102,7 @@ await consumer.run({ * `uncommittedOffsets()` returns all offsets by topic-partition which have not yet been committed. * `isRunning()` returns true if consumer is in running state, else it returns false. * `isStale()` returns whether the messages in the batch have been rendered stale through some other operation and should be discarded. For example, when calling [`consumer.seek`](#seek) the messages in the batch should be discarded, as they are not at the offset we seeked to. +* `pause()` can be used to pause the consumer for the current topic-partition. All offsets resolved up to that point will be committed (subject to `eachBatchAutoResolve` and [autoCommit](#auto-commit)). Throw an error to pause in the middle of the batch without resolving the current offset. Alternatively, disable `eachBatchAutoResolve`. The returned function can be used to resume processing of the topic-partition. See [Pause & Resume](#pause-resume) for more information about this feature. ### Example @@ -250,7 +253,7 @@ kafka.consumer({ ## Pause & Resume -In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. It also provides the `paused` method to get the list of all paused topics. Note that pausing a topic means that it won't be fetched in the next cycle. You may still receive messages for the topic within the current batch. +In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. It also provides the `paused` method to get the list of all paused topics. Note that pausing a topic means that it won't be fetched in the next cycle and subsequent messages within the current batch won't be passed to an `eachMessage` handler. Calling `pause` with a topic that the consumer is not subscribed to is a no-op, calling `resume` with a topic that is not paused is also a no-op. @@ -304,6 +307,27 @@ consumer.run({ }) ``` +As a convenience, the `eachMessage` callback provides a `pause` function to pause the specific topic-partition of the message currently being processed. + +```javascript +await consumer.connect() +await consumer.subscribe({ topics: ['jobs'] }) + +await consumer.run({ eachMessage: async ({ topic, message, pause }) => { + try { + await sendToDependency(message) + } catch (e) { + if (e instanceof TooManyRequestsError) { + const resumeThisPartition = pause() + // Other partitions that are paused will continue to be paused + setTimeout(resumeThisPartition, e.retryAfter * 1000) + } + + throw e + } +}}) +``` + It's possible to access the list of paused topic partitions using the `paused` method. ```javascript diff --git a/src/consumer/__tests__/pause.spec.js b/src/consumer/__tests__/pause.spec.js index a9525e5f1..773838c76 100644 --- a/src/consumer/__tests__/pause.spec.js +++ b/src/consumer/__tests__/pause.spec.js @@ -12,7 +12,11 @@ const { } = require('testHelpers') describe('Consumer', () => { - let groupId, producer, consumer, topics + /** + * @type {import('../../../types').Consumer} + */ + let consumer + let groupId, producer, topics beforeEach(async () => { topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`] @@ -64,6 +68,193 @@ describe('Consumer', () => { ) }) + it('pauses the appropriate topic/partition when pausing via the eachMessage callback', async () => { + await consumer.connect() + await producer.connect() + const messages = [0, 0, 1, 0].map(partition => { + const key = secureRandom() + return { key: `key-${key}`, value: `value-${key}`, partition } + }) + + for (const topic of topics) { + await producer.send({ acks: 1, topic, messages: messages.slice(0, 2) }) + await consumer.subscribe({ topic, fromBeginning: true }) + } + + let shouldPause = true + const messagesConsumed = [] + const resumeCallbacks = [] + consumer.run({ + eachMessage: async event => { + const { topic, message, pause } = event + + const whichTopic = topics.indexOf(topic) + const whichMessage = messages.findIndex(m => String(m.key) === String(message.key)) + + if (shouldPause && whichTopic === 0 && whichMessage === 1) { + resumeCallbacks.push(pause()) + throw new Error('bailing out') + } + messagesConsumed.push({ + topic: whichTopic, + message: whichMessage, + }) + }, + }) + await waitForConsumerToJoinGroup(consumer) + await waitForMessages(messagesConsumed, { number: 3 }) + const [pausedTopic] = topics + expect(consumer.paused()).toEqual([{ topic: pausedTopic, partitions: [0] }]) + + for (const topic of topics) { + await producer.send({ acks: 1, topic, messages: messages.slice(2) }) + } + await waitForMessages(messagesConsumed, { number: 6, delay: 10 }) + + expect(messagesConsumed).toHaveLength(6) + expect(messagesConsumed).toContainEqual({ topic: 0, message: 0 }) // partition 0 + expect(messagesConsumed).toContainEqual({ topic: 0, message: 2 }) // partition 1 + + expect(messagesConsumed).toContainEqual({ topic: 1, message: 0 }) // partition 0 + expect(messagesConsumed).toContainEqual({ topic: 1, message: 1 }) // partition 0 + expect(messagesConsumed).toContainEqual({ topic: 1, message: 2 }) // partition 1 + expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 }) // partition 0 + + shouldPause = false + resumeCallbacks.forEach(resume => resume()) + + await waitForMessages(messagesConsumed, { number: 8 }) + + // these messages have to wait until the consumer has resumed + expect(messagesConsumed).toHaveLength(8) + expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 }) // partition 0 + expect(messagesConsumed).toContainEqual({ topic: 0, message: 3 }) // partition 0 + }) + + it('avoids calling eachMessage again for paused topics/partitions when paused via consumer.pause', async () => { + await consumer.connect() + await producer.connect() + const messages = [0, 0, 1, 0].map(partition => { + const key = secureRandom() + return { key: `key-${key}`, value: `value-${key}`, partition } + }) + + for (const topic of topics) { + await producer.send({ acks: 1, topic, messages: messages.slice(0, 2) }) + await consumer.subscribe({ topic, fromBeginning: true }) + } + + let shouldPause = true + const messagesConsumed = [] + consumer.run({ + eachMessage: async event => { + const { topic, message, partition } = event + + const whichTopic = topics.indexOf(topic) + const whichMessage = messages.findIndex(m => String(m.key) === String(message.key)) + + messagesConsumed.push({ + topic: whichTopic, + message: whichMessage, + }) + + // here, we pause after the first message (0) on the first topic (0) + if (shouldPause && whichTopic === 0 && whichMessage === 0) { + consumer.pause([{ topic, partitions: [partition] }]) + // we don't throw an exception here to ensure the loop calling us breaks on its own and doesn't call us again + } + }, + }) + await waitForConsumerToJoinGroup(consumer) + await waitForMessages(messagesConsumed, { number: 3 }) + const [pausedTopic] = topics + expect(consumer.paused()).toEqual([{ topic: pausedTopic, partitions: [0] }]) + + for (const topic of topics) { + await producer.send({ acks: 1, topic, messages: messages.slice(2) }) + } + await waitForMessages(messagesConsumed, { number: 6, delay: 10 }) + + expect(messagesConsumed).toHaveLength(6) + expect(messagesConsumed).toContainEqual({ topic: 0, message: 0 }) // partition 0 + expect(messagesConsumed).toContainEqual({ topic: 0, message: 2 }) // partition 1 + + expect(messagesConsumed).toContainEqual({ topic: 1, message: 0 }) // partition 0 + expect(messagesConsumed).toContainEqual({ topic: 1, message: 1 }) // partition 0 + expect(messagesConsumed).toContainEqual({ topic: 1, message: 2 }) // partition 1 + expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 }) // partition 0 + + shouldPause = false + consumer.resume(consumer.paused()) + + await waitForMessages(messagesConsumed, { number: 8 }) + + // these messages have to wait until the consumer has resumed + expect(messagesConsumed).toHaveLength(8) + expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 }) // partition 0 + expect(messagesConsumed).toContainEqual({ topic: 0, message: 3 }) // partition 0 + }) + + it('pauses when pausing via the eachBatch callback', async () => { + await consumer.connect() + await producer.connect() + const originalMessages = [0, 0, 0, 1].map(partition => { + const key = secureRandom() + return { key: `key-${key}`, value: `value-${key}`, partition } + }) + + for (const topic of topics) { + await producer.send({ acks: 1, topic, messages: originalMessages }) + await consumer.subscribe({ topic, fromBeginning: true }) + } + + let shouldPause = true + const messagesConsumed = [] + const resumeCallbacks = [] + consumer.run({ + eachBatch: async event => { + const { + batch: { topic, messages }, + pause, + resolveOffset, + commitOffsetsIfNecessary, + } = event + messages.every(message => { + const whichTopic = topics.indexOf(topic) + const whichMessage = originalMessages.findIndex( + m => String(m.key) === String(message.key) + ) + + if (shouldPause && whichTopic === 0 && whichMessage === 1) { + resumeCallbacks.push(pause()) + return false + } else if (shouldPause && whichTopic === 1 && whichMessage === 3) { + resumeCallbacks.push(pause()) + return false + } + messagesConsumed.push({ + topic: whichTopic, + message: whichMessage, + }) + resolveOffset(message.offset) + return true + }) + await commitOffsetsIfNecessary() + }, + eachBatchAutoResolve: false, + }) + await waitForConsumerToJoinGroup(consumer) + await waitForMessages(messagesConsumed, { number: 5 }) + expect(consumer.paused()).toContainEqual({ topic: topics[0], partitions: [0] }) + expect(consumer.paused()).toContainEqual({ topic: topics[1], partitions: [1] }) + shouldPause = false + resumeCallbacks.forEach(resume => resume()) + await waitForMessages(messagesConsumed, { number: 8 }) + expect(consumer.paused()).toEqual([]) + expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 }) + expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 }) + }) + it('does not fetch messages for the paused topic', async () => { await consumer.connect() await producer.connect() diff --git a/src/consumer/__tests__/runner.spec.js b/src/consumer/__tests__/runner.spec.js index 5a3cf5b32..6ebf597d5 100644 --- a/src/consumer/__tests__/runner.spec.js +++ b/src/consumer/__tests__/runner.spec.js @@ -62,6 +62,7 @@ describe('Consumer > Runner', () => { heartbeat: jest.fn(), assigned: jest.fn(() => []), isLeader: jest.fn(() => true), + isPaused: jest.fn().mockReturnValue(false), } instrumentationEmitter = new InstrumentationEventEmitter() diff --git a/src/consumer/consumerGroup.js b/src/consumer/consumerGroup.js index 160a5d87a..4b9067695 100644 --- a/src/consumer/consumerGroup.js +++ b/src/consumer/consumerGroup.js @@ -418,6 +418,15 @@ module.exports = class ConsumerGroup { return this.subscriptionState.paused() } + /** + * @param {string} topic + * @param {string} partition + * @returns {boolean} whether the specified topic-partition are paused or not + */ + isPaused(topic, partition) { + return this.subscriptionState.isPaused(topic, partition) + } + async commitOffsetsIfNecessary() { await this.offsetManager.commitOffsetsIfNecessary() } diff --git a/src/consumer/runner.js b/src/consumer/runner.js index 690cb539c..7504d9b98 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -218,6 +218,10 @@ module.exports = class Runner extends EventEmitter { async processEachMessage(batch) { const { topic, partition } = batch + const pause = () => { + this.consumerGroup.pause([{ topic, partitions: [partition] }]) + return () => this.consumerGroup.resume([{ topic, partitions: [partition] }]) + } for (const message of batch.messages) { if (!this.running || this.consumerGroup.hasSeekOffset({ topic, partition })) { break @@ -229,6 +233,7 @@ module.exports = class Runner extends EventEmitter { partition, message, heartbeat: () => this.heartbeat(), + pause, }) } catch (e) { if (!isKafkaJSError(e)) { @@ -249,6 +254,10 @@ module.exports = class Runner extends EventEmitter { this.consumerGroup.resolveOffset({ topic, partition, offset: message.offset }) await this.heartbeat() await this.autoCommitOffsetsIfNecessary() + + if (this.consumerGroup.isPaused(topic, partition)) { + break + } } } @@ -256,6 +265,11 @@ module.exports = class Runner extends EventEmitter { const { topic, partition } = batch const lastFilteredMessage = batch.messages[batch.messages.length - 1] + const pause = () => { + this.consumerGroup.pause([{ topic, partitions: [partition] }]) + return () => this.consumerGroup.resume([{ topic, partitions: [partition] }]) + } + try { await this.eachBatch({ batch, @@ -281,6 +295,10 @@ module.exports = class Runner extends EventEmitter { this.consumerGroup.resolveOffset({ topic, partition, offset: offsetToResolve }) }, heartbeat: () => this.heartbeat(), + /** + * Pause consumption for the current topic-partition being processed + */ + pause, /** * Commit offsets if provided. Otherwise commit most recent resolved offsets * if the autoCommit conditions are met. diff --git a/types/index.d.ts b/types/index.d.ts index 17c4fde90..ff168b24c 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -899,12 +899,14 @@ export interface EachMessagePayload { partition: number message: KafkaMessage heartbeat(): Promise + pause(): () => void } export interface EachBatchPayload { batch: Batch resolveOffset(offset: string): void heartbeat(): Promise + pause(): () => void commitOffsetsIfNecessary(offsets?: Offsets): Promise uncommittedOffsets(): OffsetsByTopicPartition isRunning(): boolean