Skip to content

Commit

Permalink
Merge pull request #12280 from Budibase/fix/update-bull-queue-parameters
Browse files Browse the repository at this point in the history
Restore bull parameter changes + reduce Redis config complexity
  • Loading branch information
shogunpurple authored Nov 6, 2023
2 parents 922b903 + 5dd6c40 commit 880e9a9
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 54 deletions.
1 change: 1 addition & 0 deletions packages/backend-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export * as timers from "./timers"
export { default as env } from "./environment"
export * as blacklist from "./blacklist"
export * as docUpdates from "./docUpdates"
export * from "./utils/Duration"
export { SearchParams } from "./db"
// Add context to tenancy for backwards compatibility
// only do this for external usages to prevent internal
Expand Down
2 changes: 1 addition & 1 deletion packages/backend-core/src/queue/inMemoryQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class InMemoryQueue {
* @param opts This is not used by the in memory queue as there is no real use
* case when in memory, but is the same API as Bull
*/
constructor(name: string, opts = null) {
constructor(name: string, opts?: any) {
this._name = name
this._opts = opts
this._messages = []
Expand Down
21 changes: 17 additions & 4 deletions packages/backend-core/src/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ import env from "../environment"
import { getRedisOptions } from "../redis/utils"
import { JobQueue } from "./constants"
import InMemoryQueue from "./inMemoryQueue"
import BullQueue from "bull"
import BullQueue, { QueueOptions } from "bull"
import { addListeners, StalledFn } from "./listeners"
import { Duration } from "../utils"
import * as timers from "../timers"

const CLEANUP_PERIOD_MS = 60 * 1000
// the queue lock is held for 5 minutes
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
// queue lock is refreshed every 30 seconds
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
// cleanup the queue every 60 seconds
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
let cleanupInterval: NodeJS.Timeout

Expand All @@ -20,8 +26,15 @@ export function createQueue<T>(
jobQueue: JobQueue,
opts: { removeStalledCb?: StalledFn } = {}
): BullQueue.Queue<T> {
const { opts: redisOpts, redisProtocolUrl } = getRedisOptions()
const queueConfig: any = redisProtocolUrl || { redis: redisOpts }
const redisOpts = getRedisOptions()
const queueConfig: QueueOptions = {
redis: redisOpts,
settings: {
maxStalledCount: 0,
lockDuration: QUEUE_LOCK_MS,
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
},
}
let queue: any
if (!env.isTest()) {
queue = new BullQueue(jobQueue, queueConfig)
Expand Down
6 changes: 3 additions & 3 deletions packages/backend-core/src/redis/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
getRedisOptions,
SEPARATOR,
SelectableDatabase,
getRedisConnectionDetails,
} from "./utils"
import * as timers from "../timers"

Expand Down Expand Up @@ -91,12 +92,11 @@ function init(selectDb = DEFAULT_SELECT_DB) {
if (client) {
client.disconnect()
}
const { redisProtocolUrl, opts, host, port } = getRedisOptions()
const { host, port } = getRedisConnectionDetails()
const opts = getRedisOptions()

if (CLUSTERED) {
client = new RedisCore.Cluster([{ host, port }], opts)
} else if (redisProtocolUrl) {
client = new RedisCore(redisProtocolUrl)
} else {
client = new RedisCore(opts)
}
Expand Down
43 changes: 25 additions & 18 deletions packages/backend-core/src/redis/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import env from "../environment"
import * as Redis from "ioredis"

const SLOT_REFRESH_MS = 2000
const CONNECT_TIMEOUT_MS = 10000
Expand Down Expand Up @@ -42,7 +43,7 @@ export enum Databases {
export enum SelectableDatabase {
DEFAULT = 0,
SOCKET_IO = 1,
UNUSED_1 = 2,
RATE_LIMITING = 2,
UNUSED_2 = 3,
UNUSED_3 = 4,
UNUSED_4 = 5,
Expand All @@ -58,7 +59,7 @@ export enum SelectableDatabase {
UNUSED_14 = 15,
}

export function getRedisOptions() {
export function getRedisConnectionDetails() {
let password = env.REDIS_PASSWORD
let url: string[] | string = env.REDIS_URL.split("//")
// get rid of the protocol
Expand All @@ -74,28 +75,34 @@ export function getRedisOptions() {
}
const [host, port] = url.split(":")

let redisProtocolUrl

// fully qualified redis URL
if (/rediss?:\/\//.test(env.REDIS_URL)) {
redisProtocolUrl = env.REDIS_URL
return {
host,
password,
port: parseInt(port),
}
}

const opts: any = {
export function getRedisOptions() {
const { host, password, port } = getRedisConnectionDetails()
let redisOpts: Redis.RedisOptions = {
connectTimeout: CONNECT_TIMEOUT_MS,
port: port,
host,
password,
}
let opts: Redis.ClusterOptions | Redis.RedisOptions = redisOpts
if (env.REDIS_CLUSTERED) {
opts.redisOptions = {}
opts.redisOptions.tls = {}
opts.redisOptions.password = password
opts.slotsRefreshTimeout = SLOT_REFRESH_MS
opts.dnsLookup = (address: string, callback: any) => callback(null, address)
} else {
opts.host = host
opts.port = port
opts.password = password
opts = {
connectTimeout: CONNECT_TIMEOUT_MS,
redisOptions: {
...redisOpts,
tls: {},
},
slotsRefreshTimeout: SLOT_REFRESH_MS,
dnsLookup: (address: string, callback: any) => callback(null, address),
} as Redis.ClusterOptions
}
return { opts, host, port: parseInt(port), redisProtocolUrl }
return opts
}

export function addDbPrefix(db: string, key: string) {
Expand Down
49 changes: 49 additions & 0 deletions packages/backend-core/src/utils/Duration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
export enum DurationType {
MILLISECONDS = "milliseconds",
SECONDS = "seconds",
MINUTES = "minutes",
HOURS = "hours",
DAYS = "days",
}

const conversion: Record<DurationType, number> = {
milliseconds: 1,
seconds: 1000,
minutes: 60 * 1000,
hours: 60 * 60 * 1000,
days: 24 * 60 * 60 * 1000,
}

export class Duration {
static convert(from: DurationType, to: DurationType, duration: number) {
const milliseconds = duration * conversion[from]
return milliseconds / conversion[to]
}

static from(from: DurationType, duration: number) {
return {
to: (to: DurationType) => {
return Duration.convert(from, to, duration)
},
toMs: () => {
return Duration.convert(from, DurationType.MILLISECONDS, duration)
},
}
}

static fromSeconds(duration: number) {
return Duration.from(DurationType.SECONDS, duration)
}

static fromMinutes(duration: number) {
return Duration.from(DurationType.MINUTES, duration)
}

static fromHours(duration: number) {
return Duration.from(DurationType.HOURS, duration)
}

static fromDays(duration: number) {
return Duration.from(DurationType.DAYS, duration)
}
}
1 change: 1 addition & 0 deletions packages/backend-core/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./hashing"
export * from "./utils"
export * from "./stringUtils"
export * from "./Duration"
19 changes: 19 additions & 0 deletions packages/backend-core/src/utils/tests/Duration.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Duration, DurationType } from "../Duration"

describe("duration", () => {
it("should convert minutes to milliseconds", () => {
expect(Duration.fromMinutes(5).toMs()).toBe(300000)
})

it("should convert seconds to milliseconds", () => {
expect(Duration.fromSeconds(30).toMs()).toBe(30000)
})

it("should convert days to milliseconds", () => {
expect(Duration.fromDays(1).toMs()).toBe(86400000)
})

it("should convert minutes to days", () => {
expect(Duration.fromMinutes(1440).to(DurationType.DAYS)).toBe(1)
})
})
47 changes: 23 additions & 24 deletions packages/server/src/api/routes/public/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ import env from "../../../environment"
const Router = require("@koa/router")
const { RateLimit, Stores } = require("koa2-ratelimit")
import { middleware, redis } from "@budibase/backend-core"
import { SelectableDatabase } from "@budibase/backend-core/src/redis/utils"

interface KoaRateLimitOptions {
socket: {
host: string
port: number
}
password?: string
database?: number
}

const PREFIX = "/api/public/v1"
// allow a lot more requests when in test
Expand All @@ -29,32 +39,21 @@ function getApiLimitPerSecond(): number {

let rateLimitStore: any = null
if (!env.isTest()) {
const REDIS_OPTS = redis.utils.getRedisOptions()
let options
if (REDIS_OPTS.redisProtocolUrl) {
// fully qualified redis URL
options = {
url: REDIS_OPTS.redisProtocolUrl,
}
} else {
options = {
socket: {
host: REDIS_OPTS.host,
port: REDIS_OPTS.port,
},
}
const { password, host, port } = redis.utils.getRedisConnectionDetails()
let options: KoaRateLimitOptions = {
socket: {
host: host,
port: port,
},
}

if (REDIS_OPTS.opts?.password || REDIS_OPTS.opts.redisOptions?.password) {
// @ts-ignore
options.password =
REDIS_OPTS.opts.password || REDIS_OPTS.opts.redisOptions.password
}
if (password) {
options.password = password
}

if (!env.REDIS_CLUSTERED) {
// @ts-ignore
// Can't set direct redis db in clustered env
options.database = 1
}
if (!env.REDIS_CLUSTERED) {
// Can't set direct redis db in clustered env
options.database = SelectableDatabase.RATE_LIMITING
}
rateLimitStore = new Stores.Redis(options)
RateLimit.defaultOptions({
Expand Down
7 changes: 3 additions & 4 deletions packages/worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ import destroyable from "server-destroy"
import { initPro } from "./initPro"
import { handleScimBody } from "./middleware/handleScimBody"

// configure events to use the pro audit log write
// can't integrate directly into backend-core due to cyclic issues
events.processors.init(proSdk.auditLogs.write)

if (coreEnv.ENABLE_SSO_MAINTENANCE_MODE) {
console.warn(
"Warning: ENABLE_SSO_MAINTENANCE_MODE is set. It is recommended this flag is disabled if maintenance is not in progress"
Expand Down Expand Up @@ -93,6 +89,9 @@ export default server.listen(parseInt(env.PORT || "4002"), async () => {
console.log(`Worker running on ${JSON.stringify(server.address())}`)
await initPro()
await redis.init()
// configure events to use the pro audit log write
// can't integrate directly into backend-core due to cyclic issues
await events.processors.init(proSdk.auditLogs.write)
})

process.on("uncaughtException", err => {
Expand Down

0 comments on commit 880e9a9

Please sign in to comment.