Skip to content

Commit

Permalink
Merge pull request #1364 from brianphillips/pause-resume-helper
Browse files Browse the repository at this point in the history
Provide a `pause()` helper to eachMessage/eachBatch
  • Loading branch information
Nevon authored Jun 28, 2022
2 parents 51a4947 + 6b0be34 commit 196105c
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 4 deletions.
30 changes: 27 additions & 3 deletions docs/Consuming.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ The `eachMessage` handler provides a convenient and easy to use API, feeding you

```javascript
await consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat }) => {
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
Expand All @@ -53,10 +53,11 @@ await consumer.run({
```

Be aware that the `eachMessage` handler should not block for longer than the configured [session timeout](#options) or else the consumer will be removed from the group. If your workload involves very slow processing times for individual messages then you should either increase the session timeout or make periodic use of the `heartbeat` function exposed in the handler payload.
The `pause` function is a convenience for `consumer.pause({ topic, partitions: [partition] })`. It will pause the current topic-partition and returns a function that allows you to resume consuming later.

## <a name="each-batch"></a> eachBatch

Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, and `isStale`. All resolved offsets will be automatically committed after the function is executed.
Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, `isStale`, and `pause`. All resolved offsets will be automatically committed after the function is executed.

> Note: Be aware that using `eachBatch` directly is considered a more advanced use case as compared to using `eachMessage`, since you will have to understand how session timeouts and heartbeats are connected.
Expand All @@ -71,6 +72,7 @@ await consumer.run({
uncommittedOffsets,
isRunning,
isStale,
pause,
}) => {
for (let message of batch.messages) {
console.log({
Expand Down Expand Up @@ -100,6 +102,7 @@ await consumer.run({
* `uncommittedOffsets()` returns all offsets by topic-partition which have not yet been committed.
* `isRunning()` returns true if consumer is in running state, else it returns false.
* `isStale()` returns whether the messages in the batch have been rendered stale through some other operation and should be discarded. For example, when calling [`consumer.seek`](#seek) the messages in the batch should be discarded, as they are not at the offset we seeked to.
* `pause()` can be used to pause the consumer for the current topic-partition. All offsets resolved up to that point will be committed (subject to `eachBatchAutoResolve` and [autoCommit](#auto-commit)). Throw an error to pause in the middle of the batch without resolving the current offset. Alternatively, disable `eachBatchAutoResolve`. The returned function can be used to resume processing of the topic-partition. See [Pause & Resume](#pause-resume) for more information about this feature.

### Example

Expand Down Expand Up @@ -250,7 +253,7 @@ kafka.consumer({

## <a name="pause-resume"></a> Pause & Resume

In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. It also provides the `paused` method to get the list of all paused topics. Note that pausing a topic means that it won't be fetched in the next cycle. You may still receive messages for the topic within the current batch.
In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. It also provides the `paused` method to get the list of all paused topics. Note that pausing a topic means that it won't be fetched in the next cycle and subsequent messages within the current batch won't be passed to an `eachMessage` handler.

Calling `pause` with a topic that the consumer is not subscribed to is a no-op, calling `resume` with a topic that is not paused is also a no-op.

Expand Down Expand Up @@ -304,6 +307,27 @@ consumer.run({
})
```

As a convenience, the `eachMessage` callback provides a `pause` function to pause the specific topic-partition of the message currently being processed.

```javascript
await consumer.connect()
await consumer.subscribe({ topics: ['jobs'] })

await consumer.run({ eachMessage: async ({ topic, message, pause }) => {
try {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
const resumeThisPartition = pause()
// Other partitions that are paused will continue to be paused
setTimeout(resumeThisPartition, e.retryAfter * 1000)
}

throw e
}
}})
```

It's possible to access the list of paused topic partitions using the `paused` method.

```javascript
Expand Down
193 changes: 192 additions & 1 deletion src/consumer/__tests__/pause.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ const {
} = require('testHelpers')

describe('Consumer', () => {
let groupId, producer, consumer, topics
/**
* @type {import('../../../types').Consumer}
*/
let consumer
let groupId, producer, topics

beforeEach(async () => {
topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`]
Expand Down Expand Up @@ -64,6 +68,193 @@ describe('Consumer', () => {
)
})

it('pauses the appropriate topic/partition when pausing via the eachMessage callback', async () => {
await consumer.connect()
await producer.connect()
const messages = [0, 0, 1, 0].map(partition => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
})

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(0, 2) })
await consumer.subscribe({ topic, fromBeginning: true })
}

let shouldPause = true
const messagesConsumed = []
const resumeCallbacks = []
consumer.run({
eachMessage: async event => {
const { topic, message, pause } = event

const whichTopic = topics.indexOf(topic)
const whichMessage = messages.findIndex(m => String(m.key) === String(message.key))

if (shouldPause && whichTopic === 0 && whichMessage === 1) {
resumeCallbacks.push(pause())
throw new Error('bailing out')
}
messagesConsumed.push({
topic: whichTopic,
message: whichMessage,
})
},
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 3 })
const [pausedTopic] = topics
expect(consumer.paused()).toEqual([{ topic: pausedTopic, partitions: [0] }])

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(2) })
}
await waitForMessages(messagesConsumed, { number: 6, delay: 10 })

expect(messagesConsumed).toHaveLength(6)
expect(messagesConsumed).toContainEqual({ topic: 0, message: 0 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 0, message: 2 }) // partition 1

expect(messagesConsumed).toContainEqual({ topic: 1, message: 0 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 1, message: 1 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 1, message: 2 }) // partition 1
expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 }) // partition 0

shouldPause = false
resumeCallbacks.forEach(resume => resume())

await waitForMessages(messagesConsumed, { number: 8 })

// these messages have to wait until the consumer has resumed
expect(messagesConsumed).toHaveLength(8)
expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 0, message: 3 }) // partition 0
})

it('avoids calling eachMessage again for paused topics/partitions when paused via consumer.pause', async () => {
await consumer.connect()
await producer.connect()
const messages = [0, 0, 1, 0].map(partition => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
})

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(0, 2) })
await consumer.subscribe({ topic, fromBeginning: true })
}

let shouldPause = true
const messagesConsumed = []
consumer.run({
eachMessage: async event => {
const { topic, message, partition } = event

const whichTopic = topics.indexOf(topic)
const whichMessage = messages.findIndex(m => String(m.key) === String(message.key))

messagesConsumed.push({
topic: whichTopic,
message: whichMessage,
})

// here, we pause after the first message (0) on the first topic (0)
if (shouldPause && whichTopic === 0 && whichMessage === 0) {
consumer.pause([{ topic, partitions: [partition] }])
// we don't throw an exception here to ensure the loop calling us breaks on its own and doesn't call us again
}
},
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 3 })
const [pausedTopic] = topics
expect(consumer.paused()).toEqual([{ topic: pausedTopic, partitions: [0] }])

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: messages.slice(2) })
}
await waitForMessages(messagesConsumed, { number: 6, delay: 10 })

expect(messagesConsumed).toHaveLength(6)
expect(messagesConsumed).toContainEqual({ topic: 0, message: 0 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 0, message: 2 }) // partition 1

expect(messagesConsumed).toContainEqual({ topic: 1, message: 0 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 1, message: 1 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 1, message: 2 }) // partition 1
expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 }) // partition 0

shouldPause = false
consumer.resume(consumer.paused())

await waitForMessages(messagesConsumed, { number: 8 })

// these messages have to wait until the consumer has resumed
expect(messagesConsumed).toHaveLength(8)
expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 }) // partition 0
expect(messagesConsumed).toContainEqual({ topic: 0, message: 3 }) // partition 0
})

it('pauses when pausing via the eachBatch callback', async () => {
await consumer.connect()
await producer.connect()
const originalMessages = [0, 0, 0, 1].map(partition => {
const key = secureRandom()
return { key: `key-${key}`, value: `value-${key}`, partition }
})

for (const topic of topics) {
await producer.send({ acks: 1, topic, messages: originalMessages })
await consumer.subscribe({ topic, fromBeginning: true })
}

let shouldPause = true
const messagesConsumed = []
const resumeCallbacks = []
consumer.run({
eachBatch: async event => {
const {
batch: { topic, messages },
pause,
resolveOffset,
commitOffsetsIfNecessary,
} = event
messages.every(message => {
const whichTopic = topics.indexOf(topic)
const whichMessage = originalMessages.findIndex(
m => String(m.key) === String(message.key)
)

if (shouldPause && whichTopic === 0 && whichMessage === 1) {
resumeCallbacks.push(pause())
return false
} else if (shouldPause && whichTopic === 1 && whichMessage === 3) {
resumeCallbacks.push(pause())
return false
}
messagesConsumed.push({
topic: whichTopic,
message: whichMessage,
})
resolveOffset(message.offset)
return true
})
await commitOffsetsIfNecessary()
},
eachBatchAutoResolve: false,
})
await waitForConsumerToJoinGroup(consumer)
await waitForMessages(messagesConsumed, { number: 5 })
expect(consumer.paused()).toContainEqual({ topic: topics[0], partitions: [0] })
expect(consumer.paused()).toContainEqual({ topic: topics[1], partitions: [1] })
shouldPause = false
resumeCallbacks.forEach(resume => resume())
await waitForMessages(messagesConsumed, { number: 8 })
expect(consumer.paused()).toEqual([])
expect(messagesConsumed).toContainEqual({ topic: 0, message: 1 })
expect(messagesConsumed).toContainEqual({ topic: 1, message: 3 })
})

it('does not fetch messages for the paused topic', async () => {
await consumer.connect()
await producer.connect()
Expand Down
1 change: 1 addition & 0 deletions src/consumer/__tests__/runner.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ describe('Consumer > Runner', () => {
heartbeat: jest.fn(),
assigned: jest.fn(() => []),
isLeader: jest.fn(() => true),
isPaused: jest.fn().mockReturnValue(false),
}
instrumentationEmitter = new InstrumentationEventEmitter()

Expand Down
9 changes: 9 additions & 0 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,15 @@ module.exports = class ConsumerGroup {
return this.subscriptionState.paused()
}

/**
* @param {string} topic
* @param {string} partition
* @returns {boolean} whether the specified topic-partition are paused or not
*/
isPaused(topic, partition) {
return this.subscriptionState.isPaused(topic, partition)
}

async commitOffsetsIfNecessary() {
await this.offsetManager.commitOffsetsIfNecessary()
}
Expand Down
18 changes: 18 additions & 0 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ module.exports = class Runner extends EventEmitter {
async processEachMessage(batch) {
const { topic, partition } = batch

const pause = () => {
this.consumerGroup.pause([{ topic, partitions: [partition] }])
return () => this.consumerGroup.resume([{ topic, partitions: [partition] }])
}
for (const message of batch.messages) {
if (!this.running || this.consumerGroup.hasSeekOffset({ topic, partition })) {
break
Expand All @@ -229,6 +233,7 @@ module.exports = class Runner extends EventEmitter {
partition,
message,
heartbeat: () => this.heartbeat(),
pause,
})
} catch (e) {
if (!isKafkaJSError(e)) {
Expand All @@ -249,13 +254,22 @@ module.exports = class Runner extends EventEmitter {
this.consumerGroup.resolveOffset({ topic, partition, offset: message.offset })
await this.heartbeat()
await this.autoCommitOffsetsIfNecessary()

if (this.consumerGroup.isPaused(topic, partition)) {
break
}
}
}

async processEachBatch(batch) {
const { topic, partition } = batch
const lastFilteredMessage = batch.messages[batch.messages.length - 1]

const pause = () => {
this.consumerGroup.pause([{ topic, partitions: [partition] }])
return () => this.consumerGroup.resume([{ topic, partitions: [partition] }])
}

try {
await this.eachBatch({
batch,
Expand All @@ -281,6 +295,10 @@ module.exports = class Runner extends EventEmitter {
this.consumerGroup.resolveOffset({ topic, partition, offset: offsetToResolve })
},
heartbeat: () => this.heartbeat(),
/**
* Pause consumption for the current topic-partition being processed
*/
pause,
/**
* Commit offsets if provided. Otherwise commit most recent resolved offsets
* if the autoCommit conditions are met.
Expand Down
2 changes: 2 additions & 0 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -899,12 +899,14 @@ export interface EachMessagePayload {
partition: number
message: KafkaMessage
heartbeat(): Promise<void>
pause(): () => void
}

export interface EachBatchPayload {
batch: Batch
resolveOffset(offset: string): void
heartbeat(): Promise<void>
pause(): () => void
commitOffsetsIfNecessary(offsets?: Offsets): Promise<void>
uncommittedOffsets(): OffsetsByTopicPartition
isRunning(): boolean
Expand Down

0 comments on commit 196105c

Please sign in to comment.