Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lru ttl cache implementation #5997

Merged
merged 11 commits into from
Oct 16, 2024
Merged
4 changes: 2 additions & 2 deletions packages/api-derive/src/util/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface CacheValue<T> {
x: number;
}

const CHACHE_EXPIRY = 7 * (24 * 60) * (60 * 1000);
const CACHE_EXPIRY = 7 * (24 * 60) * (60 * 1000);

let deriveCache: DeriveCache;

Expand Down Expand Up @@ -43,7 +43,7 @@ function clearCache (cache: DeriveCache): void {
const all: string[] = [];

cache.forEach((key: string, { x }: CacheValue<any>): void => {
((now - x) > CHACHE_EXPIRY) && all.push(key);
((now - x) > CACHE_EXPIRY) && all.push(key);
});

// don't do delete inside loop, just in-case
Expand Down
24 changes: 13 additions & 11 deletions packages/rpc-core/src/bundle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { RpcCoreStats, RpcInterfaceMethod } from './types/index.js';

import { Observable, publishReplay, refCount } from 'rxjs';

import { DEFAULT_CAPACITY, LRUCache } from '@polkadot/rpc-provider';
import { rpcDefinitions } from '@polkadot/types';
import { hexToU8a, isFunction, isNull, isUndefined, lazyMethod, logger, memoize, objectSpread, u8aConcat, u8aToU8a } from '@polkadot/util';

Expand Down Expand Up @@ -93,9 +94,8 @@ export class RpcCore {
readonly #instanceId: string;
readonly #isPedantic: boolean;
readonly #registryDefault: Registry;
readonly #storageCache = new Map<string, Codec>();
readonly #storageCache: LRUCache;
#storageCacheHits = 0;
#storageCacheSize = 0;

#getBlockRegistry?: (blockHash: Uint8Array) => Promise<{ registry: Registry }>;
#getBlockHash?: (blockNumber: AnyNumber) => Promise<Uint8Array>;
Expand Down Expand Up @@ -123,7 +123,7 @@ export class RpcCore {

// these are the base keys (i.e. part of jsonrpc)
this.sections.push(...sectionNames);

this.#storageCache = new LRUCache(DEFAULT_CAPACITY);
// decorate all interfaces, defined and user on this instance
this.addUserInterfaces(userRpc);
}
Expand All @@ -145,7 +145,9 @@ export class RpcCore {
/**
* @description Manually disconnect from the attached provider
*/
public disconnect (): Promise<void> {
public async disconnect (): Promise<void> {
await this.#storageCache.clearInterval();

return this.provider.disconnect();
}

Expand All @@ -160,7 +162,7 @@ export class RpcCore {
...stats,
core: {
cacheHits: this.#storageCacheHits,
cacheSize: this.#storageCacheSize
cacheSize: this.#storageCache.length
}
}
: undefined;
Expand Down Expand Up @@ -469,7 +471,7 @@ export class RpcCore {
// - if a single result value, don't fill - it is not an update hole
// - fallback to an empty option in all cases
if (isNotFound && withCache) {
const cached = this.#storageCache.get(hexKey);
const cached = this.#storageCache.get(hexKey) as Codec | undefined;

if (cached) {
this.#storageCacheHits++;
Expand All @@ -487,15 +489,15 @@ export class RpcCore {
: u8aToU8a(value);
const codec = this._newType(registry, blockHash, key, input, isEmpty, entryIndex);

// store the retrieved result - the only issue with this cache is that there is no
// clearing of it, so very long running processes (not just a couple of hours, longer)
// will increase memory beyond what is allowed.
this.#storageCache.set(hexKey, codec);
this.#storageCacheSize++;
TarikGul marked this conversation as resolved.
Show resolved Hide resolved
this._setToCache(hexKey, codec);

return codec;
}

private _setToCache (key: string, value: Codec): void {
this.#storageCache.set(key, value);
}

private _newType (registry: Registry, blockHash: Uint8Array | string | null | undefined, key: StorageKey, input: string | Uint8Array | null, isEmpty: boolean, entryIndex = -1): Codec {
// single return value (via state.getStorage), decode the value based on the
// outputType that we have specified. Fallback to Raw on nothing
Expand Down
1 change: 1 addition & 0 deletions packages/rpc-provider/src/bundle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

export { HttpProvider } from './http/index.js';
export { DEFAULT_CAPACITY, LRUCache } from './lru.js';
export { packageInfo } from './packageInfo.js';
export { ScProvider } from './substrate-connect/index.js';
export { WsProvider } from './ws/index.js';
15 changes: 12 additions & 3 deletions packages/rpc-provider/src/http/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { fetch } from '@polkadot/x-fetch';

import { RpcCoder } from '../coder/index.js';
import defaults from '../defaults.js';
import { LRUCache } from '../lru.js';
import { DEFAULT_CAPACITY, LRUCache } from '../lru.js';

const ERROR_SUBSCRIBE = 'HTTP Provider does not have subscriptions, use WebSockets instead';

Expand All @@ -35,7 +35,8 @@ const l = logger('api-http');
* @see [[WsProvider]]
*/
export class HttpProvider implements ProviderInterface {
readonly #callCache = new LRUCache();
readonly #callCache: LRUCache;
readonly #cacheCapacity: number;
readonly #coder: RpcCoder;
readonly #endpoint: string;
readonly #headers: Record<string, string>;
Expand All @@ -44,14 +45,17 @@ export class HttpProvider implements ProviderInterface {
/**
* @param {string} endpoint The endpoint url starting with http://
*/
constructor (endpoint: string = defaults.HTTP_URL, headers: Record<string, string> = {}) {
constructor (endpoint: string = defaults.HTTP_URL, headers: Record<string, string> = {}, cacheCapacity?: number) {
if (!/^(https|http):\/\//.test(endpoint)) {
throw new Error(`Endpoint should start with 'http://' or 'https://', received '${endpoint}'`);
}

this.#coder = new RpcCoder();
this.#endpoint = endpoint;
this.#headers = headers;
this.#callCache = new LRUCache(cacheCapacity === 0 ? 0 : cacheCapacity || DEFAULT_CAPACITY);
this.#cacheCapacity = cacheCapacity === 0 ? 0 : cacheCapacity || DEFAULT_CAPACITY;

this.#stats = {
active: { requests: 0, subscriptions: 0 },
total: { bytesRecv: 0, bytesSent: 0, cached: 0, errors: 0, requests: 0, subscriptions: 0, timeout: 0 }
Expand Down Expand Up @@ -125,6 +129,11 @@ export class HttpProvider implements ProviderInterface {
this.#stats.total.requests++;

const [, body] = this.#coder.encodeJson(method, params);

if (this.#cacheCapacity === 0) {
return this.#send(body);
}

const cacheKey = isCacheable ? `${method}::${stringify(params)}` : '';
let resultPromise: Promise<T> | null = isCacheable
? this.#callCache.get(cacheKey)
Expand Down
54 changes: 31 additions & 23 deletions packages/rpc-provider/src/lru.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,60 @@
import { LRUCache } from './lru.js';

describe('LRUCache', (): void => {
let lru: LRUCache | undefined;

beforeEach((): void => {
lru = new LRUCache(4);
});
afterEach(async () => {
await lru?.clearInterval();
lru = undefined;
});

it('allows getting of items below capacity', (): void => {
const keys = ['1', '2', '3', '4'];
const lru = new LRUCache(4);

keys.forEach((k) => lru.set(k, `${k}${k}${k}`));
keys.forEach((k) => lru?.set(k, `${k}${k}${k}`));
const lruKeys = lru?.keys();

expect(lru.keys().join(', ')).toEqual(keys.reverse().join(', '));
expect(lru.length === lru.lengthData && lru.length === lru.lengthRefs).toBe(true);
expect(lruKeys?.join(', ')).toBe(keys.reverse().join(', '));
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);

keys.forEach((k) => expect(lru.get(k)).toEqual(`${k}${k}${k}`));
keys.forEach((k) => expect(lru?.get(k)).toEqual(`${k}${k}${k}`));
});

it('drops items when at capacity', (): void => {
const keys = ['1', '2', '3', '4', '5', '6'];
const lru = new LRUCache(4);

keys.forEach((k) => lru.set(k, `${k}${k}${k}`));
keys.forEach((k) => lru?.set(k, `${k}${k}${k}`));

expect(lru.keys().join(', ')).toEqual(keys.slice(2).reverse().join(', '));
expect(lru.length === lru.lengthData && lru.length === lru.lengthRefs).toBe(true);
expect(lru?.keys().join(', ')).toEqual(keys.slice(2).reverse().join(', '));
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);

keys.slice(2).forEach((k) => expect(lru.get(k)).toEqual(`${k}${k}${k}`));
keys.slice(2).forEach((k) => expect(lru?.get(k)).toEqual(`${k}${k}${k}`));
});

it('adjusts the order as they are used', (): void => {
const keys = ['1', '2', '3', '4', '5'];
const lru = new LRUCache(4);

keys.forEach((k) => lru.set(k, `${k}${k}${k}`));
keys.forEach((k) => lru?.set(k, `${k}${k}${k}`));

expect(lru.entries()).toEqual([['5', '555'], ['4', '444'], ['3', '333'], ['2', '222']]);
expect(lru.length === lru.lengthData && lru.length === lru.lengthRefs).toBe(true);
expect(lru?.entries()).toEqual([['5', '555'], ['4', '444'], ['3', '333'], ['2', '222']]);
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);

lru.get('3');
lru?.get('3');

expect(lru.entries()).toEqual([['3', '333'], ['5', '555'], ['4', '444'], ['2', '222']]);
expect(lru.length === lru.lengthData && lru.length === lru.lengthRefs).toBe(true);
expect(lru?.entries()).toEqual([['3', '333'], ['5', '555'], ['4', '444'], ['2', '222']]);
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);

lru.set('4', '4433');
lru?.set('4', '4433');

expect(lru.entries()).toEqual([['4', '4433'], ['3', '333'], ['5', '555'], ['2', '222']]);
expect(lru.length === lru.lengthData && lru.length === lru.lengthRefs).toBe(true);
expect(lru?.entries()).toEqual([['4', '4433'], ['3', '333'], ['5', '555'], ['2', '222']]);
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);

lru.set('6', '666');
lru?.set('6', '666');

expect(lru.entries()).toEqual([['6', '666'], ['4', '4433'], ['3', '333'], ['5', '555']]);
expect(lru.length === lru.lengthData && lru.length === lru.lengthRefs).toBe(true);
expect(lru?.entries()).toEqual([['6', '666'], ['4', '4433'], ['3', '333'], ['5', '555']]);
expect(lru?.length === lru?.lengthData && lru?.length === lru?.lengthRefs).toBe(true);
});
});
71 changes: 69 additions & 2 deletions packages/rpc-provider/src/lru.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,31 @@
// Assuming all 1.5MB responses, we apply a default allowing for 192MB
// cache space (depending on the historic queries this would vary, metadata
// for Kusama/Polkadot/Substrate falls between 600-750K, 2x for estimate)
export const DEFAULT_CAPACITY = 128;

export const DEFAULT_CAPACITY = 64;

class LRUNode {
readonly key: string;
#lastAccess: number;
readonly createdAt: number;

public next: LRUNode;
public prev: LRUNode;

constructor (key: string) {
this.key = key;
this.#lastAccess = Date.now();
this.createdAt = this.#lastAccess;
this.next = this.prev = this;
}

public refresh (): void {
this.#lastAccess = Date.now();
}

public get lastAccess (): number {
return this.#lastAccess;
}
}

// https://en.wikipedia.org/wiki/Cache_replacement_policies#LRU
Expand All @@ -29,9 +42,28 @@ export class LRUCache {
#head: LRUNode;
#tail: LRUNode;

constructor (capacity = DEFAULT_CAPACITY) {
readonly #ttl: number;
readonly #ttlInterval: number;
#ttlTimerId: ReturnType<typeof setInterval> | null = null;

constructor (capacity = DEFAULT_CAPACITY, ttl = 30000, ttlInterval = 15000) {
this.capacity = capacity;
this.#ttl = ttl;
this.#ttlInterval = ttlInterval;
this.#head = this.#tail = new LRUNode('<empty>');

// make sure the interval is not longer than the ttl
if (this.#ttlInterval > this.#ttl) {
this.#ttlInterval = this.#ttl;
}
}

get ttl (): number {
return this.#ttl;
}

get ttlInterval (): number {
return this.#ttlInterval;
}

get length (): number {
Expand Down Expand Up @@ -116,13 +148,40 @@ export class LRUCache {
}
}

if (this.#ttl > 0 && !this.#ttlTimerId) {
this.#ttlTimerId = setInterval(() => {
this.#ttlClean();
}, this.#ttlInterval);
}

this.#data.set(key, value);
}

#ttlClean () {
// Find last node to keep
const expires = Date.now() - this.#ttl;

// traverse map to find the lastAccessed
while (this.#tail.lastAccess && this.#tail.lastAccess < expires && this.#length > 0) {
if (this.#ttlTimerId && this.#length === 0) {
clearInterval(this.#ttlTimerId);
this.#ttlTimerId = null;
this.#head = this.#tail = new LRUNode('<empty>');
} else {
this.#refs.delete(this.#tail.key);
this.#data.delete(this.#tail.key);
this.#length -= 1;
this.#tail = this.#tail.prev;
this.#tail.next = this.#head;
}
}
}

#toHead (key: string): void {
const ref = this.#refs.get(key);

if (ref && ref !== this.#head) {
ref.refresh();
ref.prev.next = ref.next;
ref.next.prev = ref.prev;
ref.next = this.#head;
Expand All @@ -131,4 +190,12 @@ export class LRUCache {
this.#head = ref;
}
}

// eslint-disable-next-line @typescript-eslint/require-await
public async clearInterval (): Promise<void> {
if (this.#ttlTimerId) {
clearInterval(this.#ttlTimerId);
this.#ttlTimerId = null;
}
}
}
7 changes: 7 additions & 0 deletions packages/rpc-provider/src/ws/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export class WsProvider implements ProviderInterface {
readonly #isReadyPromise: Promise<WsProvider>;
readonly #stats: ProviderStats;
readonly #waitingForId: Record<string, JsonRpcResponse<unknown>> = {};
readonly #cacheCapacity: number;

#autoConnectMs: number;
#endpointIndex: number;
Expand Down Expand Up @@ -123,6 +124,7 @@ export class WsProvider implements ProviderInterface {
}
});
this.#callCache = new LRUCache(cacheCapacity || DEFAULT_CAPACITY);
this.#cacheCapacity = cacheCapacity || DEFAULT_CAPACITY;
this.#eventemitter = new EventEmitter();
this.#autoConnectMs = autoConnectMs || 0;
this.#coder = new RpcCoder();
Expand Down Expand Up @@ -312,6 +314,11 @@ export class WsProvider implements ProviderInterface {
this.#stats.total.requests++;

const [id, body] = this.#coder.encodeJson(method, params);

if (this.#cacheCapacity === 0) {
return this.#send(id, body, method, params, subscription);
}

const cacheKey = isCacheable ? `${method}::${stringify(params)}` : '';
let resultPromise: Promise<T> | null = isCacheable
? this.#callCache.get(cacheKey)
Expand Down