Skip to content

Commit

Permalink
@uppy/companion: switch from node-redis to ioredis (#4623)
Browse files Browse the repository at this point in the history
* Switch to ioredis

* run yarn

* semverify ioredis

* fix linting

* try to fix build

* Revert "try to fix build"

This reverts commit 2dd7ecd.

* remove semver

probably from borked merge

* fix bug breaking tests

and add some types

* fix probable typo

* doc

---------

Co-authored-by: Mikael Finstad <[email protected]>
  • Loading branch information
dschmidt and mifi authored May 13, 2024
1 parent 0469ea9 commit 4890281
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 124 deletions.
7 changes: 4 additions & 3 deletions docs/companion.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,12 @@ in the future, we plan and changing the default to `companion:` and possibly
remove this option. This is a standalone-only option. See also
`COMPANION_REDIS_PUBSUB_SCOPE`.

#### `redisOptions`
#### `redisOptions` `COMPANION_REDIS_OPTIONS`

An object of
[options supported by redis client](https://www.npmjs.com/package/redis#options-object-properties).
This option can be used in place of `redisUrl`.
[options supported by the `ioredis` client](https://github.com/redis/ioredis).
See also
[`RedisOptions`](https://github.com/redis/ioredis/blob/af832752040e616daf51621681bcb40cab965a9b/lib/redis/RedisOptions.ts#L8).

#### `redisPubSubScope` `COMPANION_REDIS_PUBSUB_SCOPE`

Expand Down
2 changes: 1 addition & 1 deletion e2e/mock-server.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const requestListener = (req, res) => {
export default function startMockServer (host, port) {
const server = http.createServer(requestListener)
server.listen(port, host, () => {
console.log(`Server is running on http://${host}:${port}`)
console.log(`Mock server is running on http://${host}:${port}`)
})
}

Expand Down
2 changes: 1 addition & 1 deletion packages/@uppy/companion/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"got": "^13.0.0",
"grant": "5.4.22",
"helmet": "^4.6.0",
"ioredis": "^5.3.2",
"ipaddr.js": "^2.0.1",
"jsonwebtoken": "9.0.2",
"lodash": "^4.17.21",
Expand All @@ -60,7 +61,6 @@
"ms": "2.1.3",
"node-schedule": "2.1.1",
"prom-client": "14.0.1",
"redis": "4.6.13",
"serialize-error": "^2.1.0",
"serialize-javascript": "^6.0.0",
"tus-js-client": "^3.1.3",
Expand Down
7 changes: 4 additions & 3 deletions packages/@uppy/companion/src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ class StreamableBlob {
}

class Uploader {
/** @type {import('ioredis').Redis} */
storage

/**
* Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
* For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
Expand Down Expand Up @@ -446,9 +449,7 @@ class Uploader {
// https://github.com/transloadit/uppy/issues/3748
const keyExpirySec = 60 * 60 * 24
const redisKey = `${Uploader.STORAGE_PREFIX}:${this.token}`
this.storage.set(redisKey, jsonStringify(state), {
EX: keyExpirySec,
})
this.storage.set(redisKey, jsonStringify(state), 'EX', keyExpirySec)
}

throttledEmitProgress = throttle((dataToEmit) => {
Expand Down
38 changes: 29 additions & 9 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ const logger = require('../logger')
* This module simulates the builtin events.EventEmitter but with the use of redis.
* This is useful for when companion is running on multiple instances and events need
* to be distributed across.
*
* @param {import('ioredis').Redis} redisClient
* @param {string} redisPubSubScope
* @returns
*/
module.exports = (redisClient, redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redisClient.duplicate()
publisher.on('error', err => logger.error('publisher redis error', err))
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
/** @type {import('ioredis').Redis} */
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err))
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
return subscriber.connect()
})

Expand Down Expand Up @@ -55,20 +60,32 @@ module.exports = (redisClient, redisPubSubScope) => {
handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)

return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler)
subscriber.off('pmessage', actualHandler)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
}

/**
*
* @param {string} eventName
* @param {*} handler
* @param {*} _once
*/
function addListener (eventName, handler, _once = false) {
function actualHandler (message) {
function actualHandler (pattern, channel, message) {
if (pattern !== getPrefixedEventName(eventName)) {
return
}

if (_once) removeListener(eventName, handler)
let args
try {
args = JSON.parse(message)
} catch (ex) {
return handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
return
}
return handler(...args)
handler(...args)
}

let handlersByThisEventName = handlersByEvent.get(eventName)
Expand All @@ -78,7 +95,10 @@ module.exports = (redisClient, redisPubSubScope) => {
}
handlersByThisEventName.set(handler, actualHandler)

runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
runWhenConnected(() => {
subscriber.on('pmessage', actualHandler)
return subscriber.psubscribe(getPrefixedEventName(eventName))
})
}

/**
Expand Down Expand Up @@ -134,7 +154,7 @@ module.exports = (redisClient, redisPubSubScope) => {

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.pUnsubscribe(getPrefixedEventName(eventName))
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
}

Expand Down
37 changes: 14 additions & 23 deletions packages/@uppy/companion/src/server/redis.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,34 @@
const redis = require('redis')
const Redis = require('ioredis').default

const logger = require('./logger')

/** @type {import('ioredis').Redis} */
let redisClient

/**
* A Singleton module that provides a single redis client through out
* the lifetime of the server
*
* @param {{ redisUrl?: string, redisOptions?: Record<string, any> }} [companionOptions] options
* @param {string} [redisUrl] ioredis url
* @param {Record<string, any>} [redisOptions] ioredis client options
*/
function createClient (companionOptions) {
function createClient (redisUrl, redisOptions) {
if (!redisClient) {
const { redisUrl, redisOptions } = companionOptions
redisClient = redis.createClient({
...redisOptions,
...(redisUrl && { url: redisUrl }),
})

redisClient.on('error', err => logger.error('redis error', err))

;(async () => {
try {
// fire and forget.
// any requests made on the client before connection is established will be auto-queued by node-redis
await redisClient.connect()
} catch (err) {
logger.error(err.message, 'redis.error')
}
})()
if (redisUrl) {
redisClient = new Redis(redisUrl, redisOptions)
} else {
redisClient = new Redis(redisOptions)
}
redisClient.on('error', err => logger.error('redis error', err.toString()))
}

return redisClient
}

module.exports.client = (companionOptions) => {
if (!companionOptions?.redisUrl && !companionOptions?.redisOptions) {
module.exports.client = ({ redisUrl, redisOptions } = { redisUrl: undefined, redisOptions: undefined }) => {
if (!redisUrl && !redisOptions) {
return redisClient
}

return createClient(companionOptions)
return createClient(redisUrl, redisOptions)
}
4 changes: 2 additions & 2 deletions packages/@uppy/companion/src/standalone/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ const getConfigFromEnv = () => {
periodicPingCount: process.env.COMPANION_PERIODIC_PING_COUNT
? parseInt(process.env.COMPANION_PERIODIC_PING_COUNT, 10) : undefined,
filePath: process.env.COMPANION_DATADIR,
redisUrl: process.env.COMPANION_REDIS_URL,
redisPubSubScope: process.env.COMPANION_REDIS_PUBSUB_SCOPE,
// redisOptions refers to https://www.npmjs.com/package/redis#options-object-properties
redisUrl: process.env.COMPANION_REDIS_URL,
// redisOptions refers to https://redis.github.io/ioredis/index.html#RedisOptions
redisOptions: (() => {
try {
if (!process.env.COMPANION_REDIS_OPTIONS) {
Expand Down
Loading

0 comments on commit 4890281

Please sign in to comment.