-
Notifications
You must be signed in to change notification settings - Fork 7.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(core): Add cache service (#6729)
* add cache service * PR adjustments * switch to maxSize for memory cache
- Loading branch information
1 parent
e1e6d4a
commit c0d2bac
Showing
5 changed files
with
397 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
import { Service } from 'typedi'; | ||
import config from '@/config'; | ||
import { caching } from 'cache-manager'; | ||
import type { MemoryCache } from 'cache-manager'; | ||
import type { RedisCache } from 'cache-manager-ioredis-yet'; | ||
import type { RedisOptions } from 'ioredis'; | ||
import { getRedisClusterNodes } from '../GenericHelpers'; | ||
import { LoggerProxy, jsonStringify } from 'n8n-workflow'; | ||
|
||
@Service() | ||
export class CacheService { | ||
private cache: RedisCache | MemoryCache | undefined; | ||
|
||
async init() { | ||
if (!config.getEnv('cache.enabled')) { | ||
throw new Error('Cache is disabled'); | ||
} | ||
|
||
const backend = config.getEnv('cache.backend'); | ||
|
||
if ( | ||
backend === 'redis' || | ||
(backend === 'auto' && config.getEnv('executions.mode') === 'queue') | ||
) { | ||
// eslint-disable-next-line @typescript-eslint/naming-convention | ||
const { redisInsStore } = await import('cache-manager-ioredis-yet'); | ||
|
||
// #region TEMPORARY Redis Client Code | ||
/* | ||
* TODO: remove once redis service is ready | ||
* this code is just temporary | ||
*/ | ||
// eslint-disable-next-line @typescript-eslint/naming-convention | ||
const { default: Redis } = await import('ioredis'); | ||
let lastTimer = 0; | ||
let cumulativeTimeout = 0; | ||
const { host, port, username, password, db }: RedisOptions = | ||
config.getEnv('queue.bull.redis'); | ||
const clusterNodes = getRedisClusterNodes(); | ||
const redisConnectionTimeoutLimit = config.getEnv('queue.bull.redis.timeoutThreshold'); | ||
const usesRedisCluster = clusterNodes.length > 0; | ||
LoggerProxy.debug( | ||
usesRedisCluster | ||
? `(Cache Service) Initialising Redis cluster connection with nodes: ${clusterNodes | ||
.map((e) => `${e.host}:${e.port}`) | ||
.join(',')}` | ||
: `(Cache Service) Initialising Redis client connection with host: ${ | ||
host ?? 'localhost' | ||
} and port: ${port ?? '6379'}`, | ||
); | ||
const sharedRedisOptions: RedisOptions = { | ||
username, | ||
password, | ||
db, | ||
enableReadyCheck: false, | ||
maxRetriesPerRequest: null, | ||
}; | ||
const redisClient = usesRedisCluster | ||
? new Redis.Cluster( | ||
clusterNodes.map((node) => ({ host: node.host, port: node.port })), | ||
{ | ||
redisOptions: sharedRedisOptions, | ||
}, | ||
) | ||
: new Redis({ | ||
host, | ||
port, | ||
...sharedRedisOptions, | ||
retryStrategy: (): number | null => { | ||
const now = Date.now(); | ||
if (now - lastTimer > 30000) { | ||
// Means we had no timeout at all or last timeout was temporary and we recovered | ||
lastTimer = now; | ||
cumulativeTimeout = 0; | ||
} else { | ||
cumulativeTimeout += now - lastTimer; | ||
lastTimer = now; | ||
if (cumulativeTimeout > redisConnectionTimeoutLimit) { | ||
LoggerProxy.error( | ||
`Unable to connect to Redis after ${redisConnectionTimeoutLimit}. Exiting process.`, | ||
); | ||
process.exit(1); | ||
} | ||
} | ||
return 500; | ||
}, | ||
}); | ||
// #endregion TEMPORARY Redis Client Code | ||
const redisStore = redisInsStore(redisClient, { | ||
ttl: config.getEnv('cache.redis.ttl'), | ||
}); | ||
this.cache = await caching(redisStore); | ||
} else { | ||
// using TextEncoder to get the byte length of the string even if it contains unicode characters | ||
const textEncoder = new TextEncoder(); | ||
this.cache = await caching('memory', { | ||
ttl: config.getEnv('cache.memory.ttl'), | ||
maxSize: config.getEnv('cache.memory.maxSize'), | ||
sizeCalculation: (item) => { | ||
return textEncoder.encode(jsonStringify(item, { replaceCircularRefs: true })).length; | ||
}, | ||
}); | ||
} | ||
} | ||
|
||
async destroy() { | ||
if (this.cache) { | ||
await this.reset(); | ||
this.cache = undefined; | ||
} | ||
} | ||
|
||
async getCache(): Promise<RedisCache | MemoryCache | undefined> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache; | ||
} | ||
|
||
async get<T>(key: string): Promise<T> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache?.store.get(key) as T; | ||
} | ||
|
||
async set<T>(key: string, value: T, ttl?: number): Promise<void> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache?.store.set(key, value, ttl); | ||
} | ||
|
||
async delete(key: string): Promise<void> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache?.store.del(key); | ||
} | ||
|
||
async reset(): Promise<void> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache?.store.reset(); | ||
} | ||
|
||
async keys(): Promise<string[]> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache?.store.keys() ?? []; | ||
} | ||
|
||
async setMany<T>(values: Array<[string, T]>, ttl?: number): Promise<void> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache?.store.mset(values, ttl); | ||
} | ||
|
||
async getMany<T>(keys: string[]): Promise<Array<[string, T]>> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache?.store.mget(...keys) as Promise<Array<[string, T]>>; | ||
} | ||
|
||
async deleteMany(keys: string[]): Promise<void> { | ||
if (!this.cache) { | ||
await this.init(); | ||
} | ||
return this.cache?.store.mdel(...keys); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import Container from 'typedi'; | ||
import { CacheService } from '@/services/cache.service'; | ||
import type { MemoryCache } from 'cache-manager'; | ||
// import type { RedisCache } from 'cache-manager-ioredis-yet'; | ||
import config from '@/config'; | ||
|
||
const cacheService = Container.get(CacheService); | ||
|
||
function setDefaultConfig() { | ||
config.set('executions.mode', 'regular'); | ||
config.set('cache.backend', 'auto'); | ||
config.set('cache.memory.maxSize', 1 * 1024 * 1024); | ||
} | ||
|
||
describe('cacheService', () => { | ||
beforeEach(async () => { | ||
setDefaultConfig(); | ||
await Container.get(CacheService).destroy(); | ||
}); | ||
|
||
test('should create a memory cache by default', async () => { | ||
await cacheService.init(); | ||
await expect(cacheService.getCache()).resolves.toBeDefined(); | ||
const candidate = (await cacheService.getCache()) as MemoryCache; | ||
// type guard to check that a MemoryCache is returned and not a RedisCache (which does not have a size property) | ||
expect(candidate.store.size).toBeDefined(); | ||
}); | ||
|
||
test('should cache and retrieve a value', async () => { | ||
await cacheService.init(); | ||
await expect(cacheService.getCache()).resolves.toBeDefined(); | ||
await cacheService.set<string>('testString', 'test'); | ||
await cacheService.set<number>('testNumber', 123); | ||
|
||
await expect(cacheService.get<string>('testString')).resolves.toBe('test'); | ||
expect(typeof (await cacheService.get<string>('testString'))).toBe('string'); | ||
await expect(cacheService.get<number>('testNumber')).resolves.toBe(123); | ||
expect(typeof (await cacheService.get<number>('testNumber'))).toBe('number'); | ||
}); | ||
|
||
test('should honour ttl values', async () => { | ||
// set default TTL to 10ms | ||
config.set('cache.memory.ttl', 10); | ||
|
||
await cacheService.set<string>('testString', 'test'); | ||
await cacheService.set<number>('testNumber', 123, 1000); | ||
|
||
const store = (await cacheService.getCache())?.store; | ||
|
||
expect(store).toBeDefined(); | ||
|
||
await expect(store!.ttl('testString')).resolves.toBeLessThanOrEqual(100); | ||
await expect(store!.ttl('testNumber')).resolves.toBeLessThanOrEqual(1000); | ||
|
||
await expect(cacheService.get<string>('testString')).resolves.toBe('test'); | ||
await expect(cacheService.get<number>('testNumber')).resolves.toBe(123); | ||
|
||
await new Promise((resolve) => setTimeout(resolve, 20)); | ||
|
||
await expect(cacheService.get<string>('testString')).resolves.toBeUndefined(); | ||
await expect(cacheService.get<number>('testNumber')).resolves.toBe(123); | ||
}); | ||
|
||
test('should set and remove values', async () => { | ||
await cacheService.set<string>('testString', 'test'); | ||
await expect(cacheService.get<string>('testString')).resolves.toBe('test'); | ||
await cacheService.delete('testString'); | ||
await expect(cacheService.get<string>('testString')).resolves.toBeUndefined(); | ||
}); | ||
|
||
test('should calculate maxSize', async () => { | ||
config.set('cache.memory.maxSize', 16); | ||
await cacheService.destroy(); | ||
|
||
// 16 bytes because stringify wraps the string in quotes, so 2 bytes for the quotes | ||
await cacheService.set<string>('testString', 'withoutUnicode'); | ||
await expect(cacheService.get<string>('testString')).resolves.toBe('withoutUnicode'); | ||
|
||
await cacheService.destroy(); | ||
|
||
// should not fit! | ||
await cacheService.set<string>('testString', 'withUnicodeԱԲԳ'); | ||
await expect(cacheService.get<string>('testString')).resolves.toBeUndefined(); | ||
}); | ||
|
||
test('should set and get complex objects', async () => { | ||
interface TestObject { | ||
test: string; | ||
test2: number; | ||
test3?: TestObject & { test4: TestObject }; | ||
} | ||
|
||
const testObject: TestObject = { | ||
test: 'test', | ||
test2: 123, | ||
test3: { | ||
test: 'test3', | ||
test2: 123, | ||
test4: { | ||
test: 'test4', | ||
test2: 123, | ||
}, | ||
}, | ||
}; | ||
|
||
await cacheService.set<TestObject>('testObject', testObject); | ||
await expect(cacheService.get<TestObject>('testObject')).resolves.toMatchObject(testObject); | ||
}); | ||
|
||
test('should set and get multiple values', async () => { | ||
config.set('executions.mode', 'regular'); | ||
config.set('cache.backend', 'auto'); | ||
|
||
await cacheService.setMany<string>([ | ||
['testString', 'test'], | ||
['testString2', 'test2'], | ||
]); | ||
await cacheService.setMany<number>([ | ||
['testNumber', 123], | ||
['testNumber2', 456], | ||
]); | ||
await expect( | ||
cacheService.getMany<string>(['testString', 'testString2']), | ||
).resolves.toStrictEqual(['test', 'test2']); | ||
await expect( | ||
cacheService.getMany<number>(['testNumber', 'testNumber2']), | ||
).resolves.toStrictEqual([123, 456]); | ||
}); | ||
// This test is skipped because it requires the Redis service | ||
// test('should create a redis cache if asked', async () => { | ||
// config.set('cache.backend', 'redis'); | ||
// await cacheService.init(); | ||
// expect(cacheService.getCacheInstance()).toBeDefined(); | ||
// const candidate = cacheService.getCacheInstance() as RedisCache; | ||
// expect(candidate.store.client).toBeDefined(); | ||
// }); | ||
}); |
Oops, something went wrong.