Skip to content

Commit

Permalink
[WIP] updating example and introducing redis-info parser
Browse files Browse the repository at this point in the history
  • Loading branch information
erikengervall committed Jan 21, 2020
1 parent ac2733d commit 0505950
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 60 deletions.
25 changes: 15 additions & 10 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const { setQueues, router } = require('./dist/index')
const { Queue3: QueueMQ } = require('bullmq/dist/classes/compat')
const { Queue: QueueMQ, Worker } = require('bullmq')
const Queue3 = require('bull')
const app = require('express')()

Expand All @@ -16,31 +16,36 @@ const createQueue3 = name => new Queue3(name, { redis: redisOptions })
const createQueueMQ = name => new QueueMQ(name, { connection: redisOptions })

const run = () => {
const example3 = createQueue3('ExampleBull')
const exampleMQ = createQueueMQ('ExampleBullMQ')
const exampleBullName = 'ExampleBull'
const exampleBull = createQueue3(exampleBullName)
const exampleBullMqName = 'ExampleBullMQ'
const exampleBullMq = createQueueMQ(exampleBullMqName)

setQueues([example3, exampleMQ])
setQueues([exampleBullMq])

example3.process(async job => {
exampleBull.process(async job => {
for (let i = 0; i <= 100; i++) {
await sleep(Math.random())
job.progress(i)
if (Math.random() * 200 < 1) throw new Error(`Random error ${i}`)
}
})

exampleMQ.process(async job => {
new Worker(exampleBullMqName, async job => {
for (let i = 0; i <= 100; i++) {
await sleep(Math.random())
await job.updateProgress(i)
job.progress(i)
if (Math.random() * 200 < 1) throw new Error(`Random error ${i}`)
}
})

app.use('/add', (req, res) => {
example3.add({ title: req.query.title })
exampleMQ.add('Add', { title: req.query.title })
res.json({ ok: true })
exampleBull.add({ title: req.query.title })
exampleBullMq.add('Add', { title: req.query.title })

res.json({
ok: true,
})
})

app.use('/ui', router)
Expand Down
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
"prepare": "yarn build",
"build": "yarn build:clean && yarn build:ui && yarn build:routes",
"build:ui": "NODE_ENV=production webpack",
"build:ui:watch": "NODE_ENV=production webpack --watch",
"build:routes": "yarn tsc",
"build:watch": "NODE_ENV=production webpack --watch",
"build:clean": "rm -rf ./static ./dist",
"lint": "prettier --check \"./**/*.js\" && eslint --ext .js .",
"start:docker": "docker-compose up",
"start": "node example.js",
"start:watch": "nodemon example.js",
"start:example": "yarn build && yarn start"
},
"dependencies": {
Expand All @@ -37,7 +38,8 @@
"pretty-bytes": "5.3.0",
"react": "16.12.0",
"react-dom": "16.12.0",
"react-highlight": "0.12.0"
"react-highlight": "0.12.0",
"redis-info": "^3.0.7"
},
"devDependencies": {
"@types/bull": "^3.12.0",
Expand All @@ -57,6 +59,7 @@
"eslint-plugin-prettier": "^3.1.2",
"eslint-plugin-react": "^7.18.0",
"jest": "^24.9.0",
"nodemon": "^2.0.2",
"prettier": "^1.19.1",
"prettier-eslint": "^9.0.1",
"react-dev-utils": "^8.0.0",
Expand Down
1 change: 0 additions & 1 deletion src/@types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Queue as QueueMq } from 'bullmq'

export interface BullBoardQueue {
queue: Queue | QueueMq
version: number | string
}

export interface BullBoardQueues {
Expand Down
1 change: 1 addition & 0 deletions src/declarations.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
declare module 'redis-info'
10 changes: 0 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,12 @@ router.put('/queues/:queueName/retry', wrapAsync(retryAll))
router.put('/queues/:queueName/:id/retry', wrapAsync(retryJob))
router.put('/queues/:queueName/clean/:queueStatus', wrapAsync(cleanAll))

// TODO: Do we even need this if we can compare instanceof at anytime using TS?
const getQueueVersion = (queue: Queue | QueueMq) => {
if (queue instanceof QueueMq) {
return 4
}

return 3
}

export const setQueues = (bullQueues: Queue[] | QueueMq[]) => {
bullQueues.forEach((queue: Queue | QueueMq) => {
const name = queue instanceof QueueMq ? queue.toKey('TODO:') : queue.name

bullBoardQueues[name] = {
queue,
version: getQueueVersion(queue),
}
})
}
Expand Down
50 changes: 23 additions & 27 deletions src/routes/queues.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { parse as parseRedisInfo } from 'redis-info'
import { RequestHandler, Request } from 'express'
import { Job } from 'bull'
import { Job as JobMq } from 'bullmq'
import {
Job as JobMq,
// Queue as QueueMq
} from 'bullmq'

import { BullBoardQueues, BullBoardQueue } from '../@types'

Expand All @@ -21,29 +25,25 @@ const metrics = [
'blocked_clients',
]

const getStats = async ({ queue }: BullBoardQueue) => {
const getStats = async ({ queue }: BullBoardQueue): Promise<ValidMetrics> => {
const redisClient = await queue.client
await redisClient.info()

// TODO:
// eslint-disable-next-line @typescript-eslint/ban-ts-ignore
// @ts-ignore
const { serverInfo } = redisClient

const validMetrics: ValidMetrics = metrics.reduce((accumulator, value) => {
if (value in serverInfo) {
// TODO:
// eslint-disable-next-line @typescript-eslint/ban-ts-ignore
// @ts-ignore
accumulator[value] = serverInfo[value]
}
const redisInfoRaw = await redisClient.info()
const redisInfo: { [key: string]: any } = parseRedisInfo(redisInfoRaw)

return accumulator
}, {})
const validMetrics: ValidMetrics = metrics.reduce(
(acc: { [key: string]: any }, metric) => {
if (redisInfo[metric]) {
acc[metric] = redisInfo[metric]
}

return acc
},
{},
)

// eslint-disable-next-line @typescript-eslint/camelcase
validMetrics.total_system_memory =
serverInfo.total_system_memory || serverInfo.maxmemory
redisInfo.total_system_memory || redisInfo.maxmemory

return validMetrics
}
Expand Down Expand Up @@ -87,25 +87,21 @@ const getDataForQueues = async (
}

const counts = await Promise.all(
pairs.map(async ([name, { queue, version }]) => {
pairs.map(async ([name, { queue }]) => {
const counts = await queue.getJobCounts(...statuses)

let jobs: (Job | JobMq)[] = [] // eslint-disable-line prettier/prettier
if (name) {
const status = query[name] === 'latest' ? statuses : query[name]
jobs = await queue.getJobs(status, 0, 10)
}
const status = query[name] === 'latest' ? statuses : query[name]
const jobs: (Job | JobMq)[] = await queue.getJobs(status, 0, 10) // eslint-disable-line prettier/prettier

return {
name,
counts,
jobs: jobs.map(formatJob),
version,
}
}),
)

const stats = getStats(pairs[0][1])
const stats = await getStats(pairs[0][1])

return {
stats,
Expand Down
Loading

0 comments on commit 0505950

Please sign in to comment.