diff --git a/packages/backend-core/src/index.ts b/packages/backend-core/src/index.ts index ffffd8240a2..c7cf9f56cc3 100644 --- a/packages/backend-core/src/index.ts +++ b/packages/backend-core/src/index.ts @@ -30,6 +30,7 @@ export * as timers from "./timers" export { default as env } from "./environment" export * as blacklist from "./blacklist" export * as docUpdates from "./docUpdates" +export * from "./utils/Duration" export { SearchParams } from "./db" // Add context to tenancy for backwards compatibility // only do this for external usages to prevent internal diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index af2ec6dbaa7..a8add7ecb66 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -36,7 +36,7 @@ class InMemoryQueue { * @param opts This is not used by the in memory queue as there is no real use * case when in memory, but is the same API as Bull */ - constructor(name: string, opts = null) { + constructor(name: string, opts?: any) { this._name = name this._opts = opts this._messages = [] diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 06581477096..0657437a3bd 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -2,11 +2,17 @@ import env from "../environment" import { getRedisOptions } from "../redis/utils" import { JobQueue } from "./constants" import InMemoryQueue from "./inMemoryQueue" -import BullQueue from "bull" +import BullQueue, { QueueOptions } from "bull" import { addListeners, StalledFn } from "./listeners" +import { Duration } from "../utils" import * as timers from "../timers" -const CLEANUP_PERIOD_MS = 60 * 1000 +// the queue lock is held for 5 minutes +const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() +// queue lock is refreshed every 30 seconds +const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs() +// cleanup the queue every 60 seconds +const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs() let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] let cleanupInterval: NodeJS.Timeout @@ -20,8 +26,15 @@ export function createQueue( jobQueue: JobQueue, opts: { removeStalledCb?: StalledFn } = {} ): BullQueue.Queue { - const { opts: redisOpts, redisProtocolUrl } = getRedisOptions() - const queueConfig: any = redisProtocolUrl || { redis: redisOpts } + const redisOpts = getRedisOptions() + const queueConfig: QueueOptions = { + redis: redisOpts, + settings: { + maxStalledCount: 0, + lockDuration: QUEUE_LOCK_MS, + lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS, + }, + } let queue: any if (!env.isTest()) { queue = new BullQueue(jobQueue, queueConfig) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index d1e2d8989ec..6f1b573718a 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -16,6 +16,7 @@ import { getRedisOptions, SEPARATOR, SelectableDatabase, + getRedisConnectionDetails, } from "./utils" import * as timers from "../timers" @@ -91,12 +92,11 @@ function init(selectDb = DEFAULT_SELECT_DB) { if (client) { client.disconnect() } - const { redisProtocolUrl, opts, host, port } = getRedisOptions() + const { host, port } = getRedisConnectionDetails() + const opts = getRedisOptions() if (CLUSTERED) { client = new RedisCore.Cluster([{ host, port }], opts) - } else if (redisProtocolUrl) { - client = new RedisCore(redisProtocolUrl) } else { client = new RedisCore(opts) } diff --git a/packages/backend-core/src/redis/utils.ts b/packages/backend-core/src/redis/utils.ts index 34b7275a2b6..5187fe13f83 100644 --- a/packages/backend-core/src/redis/utils.ts +++ b/packages/backend-core/src/redis/utils.ts @@ -1,4 +1,5 @@ import env from "../environment" +import * as Redis from "ioredis" const SLOT_REFRESH_MS = 2000 const CONNECT_TIMEOUT_MS = 10000 @@ -42,7 +43,7 @@ export enum Databases { export enum SelectableDatabase { DEFAULT = 0, SOCKET_IO = 1, - UNUSED_1 = 2, + RATE_LIMITING = 2, UNUSED_2 = 3, UNUSED_3 = 4, UNUSED_4 = 5, @@ -58,7 +59,7 @@ export enum SelectableDatabase { UNUSED_14 = 15, } -export function getRedisOptions() { +export function getRedisConnectionDetails() { let password = env.REDIS_PASSWORD let url: string[] | string = env.REDIS_URL.split("//") // get rid of the protocol @@ -74,28 +75,34 @@ export function getRedisOptions() { } const [host, port] = url.split(":") - let redisProtocolUrl - - // fully qualified redis URL - if (/rediss?:\/\//.test(env.REDIS_URL)) { - redisProtocolUrl = env.REDIS_URL + return { + host, + password, + port: parseInt(port), } +} - const opts: any = { +export function getRedisOptions() { + const { host, password, port } = getRedisConnectionDetails() + let redisOpts: Redis.RedisOptions = { connectTimeout: CONNECT_TIMEOUT_MS, + port: port, + host, + password, } + let opts: Redis.ClusterOptions | Redis.RedisOptions = redisOpts if (env.REDIS_CLUSTERED) { - opts.redisOptions = {} - opts.redisOptions.tls = {} - opts.redisOptions.password = password - opts.slotsRefreshTimeout = SLOT_REFRESH_MS - opts.dnsLookup = (address: string, callback: any) => callback(null, address) - } else { - opts.host = host - opts.port = port - opts.password = password + opts = { + connectTimeout: CONNECT_TIMEOUT_MS, + redisOptions: { + ...redisOpts, + tls: {}, + }, + slotsRefreshTimeout: SLOT_REFRESH_MS, + dnsLookup: (address: string, callback: any) => callback(null, address), + } as Redis.ClusterOptions } - return { opts, host, port: parseInt(port), redisProtocolUrl } + return opts } export function addDbPrefix(db: string, key: string) { diff --git a/packages/backend-core/src/utils/Duration.ts b/packages/backend-core/src/utils/Duration.ts new file mode 100644 index 00000000000..f376c2f7c70 --- /dev/null +++ b/packages/backend-core/src/utils/Duration.ts @@ -0,0 +1,49 @@ +export enum DurationType { + MILLISECONDS = "milliseconds", + SECONDS = "seconds", + MINUTES = "minutes", + HOURS = "hours", + DAYS = "days", +} + +const conversion: Record = { + milliseconds: 1, + seconds: 1000, + minutes: 60 * 1000, + hours: 60 * 60 * 1000, + days: 24 * 60 * 60 * 1000, +} + +export class Duration { + static convert(from: DurationType, to: DurationType, duration: number) { + const milliseconds = duration * conversion[from] + return milliseconds / conversion[to] + } + + static from(from: DurationType, duration: number) { + return { + to: (to: DurationType) => { + return Duration.convert(from, to, duration) + }, + toMs: () => { + return Duration.convert(from, DurationType.MILLISECONDS, duration) + }, + } + } + + static fromSeconds(duration: number) { + return Duration.from(DurationType.SECONDS, duration) + } + + static fromMinutes(duration: number) { + return Duration.from(DurationType.MINUTES, duration) + } + + static fromHours(duration: number) { + return Duration.from(DurationType.HOURS, duration) + } + + static fromDays(duration: number) { + return Duration.from(DurationType.DAYS, duration) + } +} diff --git a/packages/backend-core/src/utils/index.ts b/packages/backend-core/src/utils/index.ts index 318a7f13baf..ac17227459f 100644 --- a/packages/backend-core/src/utils/index.ts +++ b/packages/backend-core/src/utils/index.ts @@ -1,3 +1,4 @@ export * from "./hashing" export * from "./utils" export * from "./stringUtils" +export * from "./Duration" diff --git a/packages/backend-core/src/utils/tests/Duration.spec.ts b/packages/backend-core/src/utils/tests/Duration.spec.ts new file mode 100644 index 00000000000..46b996f7884 --- /dev/null +++ b/packages/backend-core/src/utils/tests/Duration.spec.ts @@ -0,0 +1,19 @@ +import { Duration, DurationType } from "../Duration" + +describe("duration", () => { + it("should convert minutes to milliseconds", () => { + expect(Duration.fromMinutes(5).toMs()).toBe(300000) + }) + + it("should convert seconds to milliseconds", () => { + expect(Duration.fromSeconds(30).toMs()).toBe(30000) + }) + + it("should convert days to milliseconds", () => { + expect(Duration.fromDays(1).toMs()).toBe(86400000) + }) + + it("should convert minutes to days", () => { + expect(Duration.fromMinutes(1440).to(DurationType.DAYS)).toBe(1) + }) +}) diff --git a/packages/server/src/api/routes/public/index.ts b/packages/server/src/api/routes/public/index.ts index 4cc1eff8a42..b37ed931fc9 100644 --- a/packages/server/src/api/routes/public/index.ts +++ b/packages/server/src/api/routes/public/index.ts @@ -15,6 +15,16 @@ import env from "../../../environment" const Router = require("@koa/router") const { RateLimit, Stores } = require("koa2-ratelimit") import { middleware, redis } from "@budibase/backend-core" +import { SelectableDatabase } from "@budibase/backend-core/src/redis/utils" + +interface KoaRateLimitOptions { + socket: { + host: string + port: number + } + password?: string + database?: number +} const PREFIX = "/api/public/v1" // allow a lot more requests when in test @@ -29,32 +39,21 @@ function getApiLimitPerSecond(): number { let rateLimitStore: any = null if (!env.isTest()) { - const REDIS_OPTS = redis.utils.getRedisOptions() - let options - if (REDIS_OPTS.redisProtocolUrl) { - // fully qualified redis URL - options = { - url: REDIS_OPTS.redisProtocolUrl, - } - } else { - options = { - socket: { - host: REDIS_OPTS.host, - port: REDIS_OPTS.port, - }, - } + const { password, host, port } = redis.utils.getRedisConnectionDetails() + let options: KoaRateLimitOptions = { + socket: { + host: host, + port: port, + }, + } - if (REDIS_OPTS.opts?.password || REDIS_OPTS.opts.redisOptions?.password) { - // @ts-ignore - options.password = - REDIS_OPTS.opts.password || REDIS_OPTS.opts.redisOptions.password - } + if (password) { + options.password = password + } - if (!env.REDIS_CLUSTERED) { - // @ts-ignore - // Can't set direct redis db in clustered env - options.database = 1 - } + if (!env.REDIS_CLUSTERED) { + // Can't set direct redis db in clustered env + options.database = SelectableDatabase.RATE_LIMITING } rateLimitStore = new Stores.Redis(options) RateLimit.defaultOptions({ diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index 3905a21c73d..4b1d11ecf7e 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -31,10 +31,6 @@ import destroyable from "server-destroy" import { initPro } from "./initPro" import { handleScimBody } from "./middleware/handleScimBody" -// configure events to use the pro audit log write -// can't integrate directly into backend-core due to cyclic issues -events.processors.init(proSdk.auditLogs.write) - if (coreEnv.ENABLE_SSO_MAINTENANCE_MODE) { console.warn( "Warning: ENABLE_SSO_MAINTENANCE_MODE is set. It is recommended this flag is disabled if maintenance is not in progress" @@ -93,6 +89,9 @@ export default server.listen(parseInt(env.PORT || "4002"), async () => { console.log(`Worker running on ${JSON.stringify(server.address())}`) await initPro() await redis.init() + // configure events to use the pro audit log write + // can't integrate directly into backend-core due to cyclic issues + await events.processors.init(proSdk.auditLogs.write) }) process.on("uncaughtException", err => {