Utilities to help direct the flow of Redis streams.
// Server A
import { Writer } from 'redis-currents'
const writer = new Writer('redis://localhost:6379', 'stream_name')
const write = async () => {
await writer.write({ data: 1 })
await writer.write({ data: 2 })
await writer.write({ data: 3 })
await writer.write({ data: 4 })
}
write()
// Server B, Process 1
import { Consumer } from 'redis-currents'
const consumer = new Consumer('redis://localhost:6379', 'stream_name', 'group_1', 'consumer_1')
const read = async () => {
for await (const [id, msg] of consumer) {
console.log(msg)
//=> { data: 1 }
//=> { data: 3 }
await consumer.ack(id)
}
}
read()
// Server B, Process 2
import { Consumer } from 'redis-currents'
const consumer = new Consumer('redis://localhost:6379', 'stream_name', 'group_1', 'consumer_2')
const read = async () => {
for await (const [id, msg] of consumer) {
console.log(msg)
//=> { data: 2 }
//=> { data: 4 }
await consumer.ack(id)
}
}
read()
// Server C
import { Consumer } from 'redis-currents'
const consumer = new Consumer('redis://localhost:6379', 'stream_name', 'group_2', 'consumer_1')
const read = async () => {
for await (const [id, msg] of consumer) {
console.log(msg)
//=> { data: 1 }
//=> { data: 2 }
//=> { data: 3 }
//=> { data: 4 }
await consumer.ack(id)
}
}
read()
- Redis 5, for access to streams.
- Node 10+, for access to async iterables.
yarn add redis-currents
Tests in redis-currents
by default will assume that you have an instance of redis running locally on port 6379
.
You can override this by providing TEST_REDIS_URI
as an environment variable.
# run all tests
yarn test
Examples in redis-currents
can be found at ./examples
, and run from the terminal.
yarn example:safe-exit
yarn example:groups
Docs generated with TypeDoc can be found at https://dvlsg.github.io/redis-currents/