From 8ed2277b599c26ad3caeec9bce36da7d6224d040 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 9 Nov 2023 15:19:19 +1300 Subject: [PATCH] Revert "feat: Pre-Fetch Streamer Messages (#269)" This reverts commit 262b1838cd20738a3ac532687aa97a74ac278d0f. --- docker-compose.yml | 16 - prometheus.yml | 7 - runner/src/indexer/indexer.test.ts | 435 +++++++++++++++---- runner/src/indexer/indexer.ts | 98 ++++- runner/src/lake-client/index.ts | 1 - runner/src/lake-client/lake-client.test.ts | 170 -------- runner/src/lake-client/lake-client.ts | 90 ---- runner/src/metrics.ts | 42 +- runner/src/redis-client/index.ts | 2 +- runner/src/redis-client/redis-client.test.ts | 20 +- runner/src/redis-client/redis-client.ts | 13 +- runner/src/stream-handler/worker.ts | 139 ++---- 12 files changed, 488 insertions(+), 545 deletions(-) delete mode 100644 prometheus.yml delete mode 100644 runner/src/lake-client/index.ts delete mode 100644 runner/src/lake-client/lake-client.test.ts delete mode 100644 runner/src/lake-client/lake-client.ts diff --git a/docker-compose.yml b/docker-compose.yml index 8911aa97e..aa4dc242d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -88,23 +88,7 @@ services: HASURA_GRAPHQL_ENABLED_LOG_TYPES: startup, http-log, webhook-log, websocket-log, query-log HASURA_GRAPHQL_ADMIN_SECRET: myadminsecretkey HASURA_GRAPHQL_AUTH_HOOK: http://hasura-auth:4000/auth - grafana: - image: grafana/grafana - volumes: - - grafana:/var/lib/grafana - ports: - - "3000:3000" - environment: - - GF_SECURITY_ADMIN_PASSWORD=secret - - prometheus: - image: prom/prometheus - volumes: - - ./prometheus.yml:/etc/prometheus/prometheus.yml - ports: - - "9090:9090" volumes: postgres: redis: - grafana: diff --git a/prometheus.yml b/prometheus.yml deleted file mode 100644 index cd0eab3f2..000000000 --- a/prometheus.yml +++ /dev/null @@ -1,7 +0,0 @@ -global: - scrape_interval: 1s - -scrape_configs: - - job_name: 'queryapi-runner' - static_configs: - - targets: ['host.docker.internal:9180'] \ No newline at end of file diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index b9313f631..38f9ce77c 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -1,5 +1,7 @@ -import { Block, type StreamerMessage } from '@near-lake/primitives'; +import { Block } from '@near-lake/primitives'; import type fetch from 'node-fetch'; +import { type S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; +import type RedisClient from '../redis-client'; import Indexer from './indexer'; import { VM } from 'vm2'; @@ -28,7 +30,7 @@ describe('Indexer unit tests', () => { );`; const SOCIAL_SCHEMA = ` - CREATE TABLE + CREATE TABLE "posts" ( "id" SERIAL NOT NULL, "account_id" VARCHAR NOT NULL, @@ -161,6 +163,10 @@ CREATE TABLE }), }); + const transparentRedis = { + getStreamerMessage: jest.fn() + } as unknown as RedisClient; + beforeEach(() => { process.env = { ...oldEnv, @@ -181,17 +187,24 @@ CREATE TABLE }), })); const blockHeight = 456; - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [], - header: { - height: blockHeight + const mockData = jest.fn().mockResolvedValue( + JSON.stringify( + { + block: { + chunks: [], + header: { + height: blockHeight + } + }, + shards: {} } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; + ) + ); + const mockRedis = { + getStreamerMessage: mockData + } as unknown as RedisClient; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: mockRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -201,13 +214,188 @@ CREATE TABLE `, schema: SIMPLE_SCHEMA }; - await indexer.runFunctions(mockBlock, functions, false); + await indexer.runFunctions(blockHeight, functions, false); expect(mockFetch.mock.calls).toMatchSnapshot(); }); + test('Indexer.fetchBlock() should fetch a block from S3', async () => { + const author = 'dokiacapital.poolv1.near'; + const mockData = JSON.stringify({ + author + }); + const mockSend = jest.fn().mockResolvedValue({ + Body: { + transformToString: () => mockData + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); + + const blockHeight = 84333960; + const block = await indexer.fetchBlockPromise(blockHeight); + const params = { + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }; + + expect(mockS3.send).toHaveBeenCalledTimes(1); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); + expect(block.author).toEqual(author); + }); + + test('Indexer.fetchShard() should fetch a shard from S3', async () => { + const mockData = JSON.stringify({}); + const mockSend = jest.fn().mockResolvedValue({ + Body: { + transformToString: () => mockData + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); + + const blockHeight = 82699904; + const shard = 0; + const params = { + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json` + }; + await indexer.fetchShardPromise(blockHeight, shard); + + expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params))); + }); + + test('Indexer.fetchStreamerMessage() should fetch the message from cache and use it directly', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const getMessage = jest.fn() + .mockReturnValueOnce(JSON.stringify( + { + block: { + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }, + shards: {} + } + )); + const mockRedis = { + getStreamerMessage: getMessage + } as unknown as RedisClient; + const indexer = new Indexer('mainnet', { redisClient: mockRedis }); + + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); + + expect(getMessage).toHaveBeenCalledTimes(1); + expect(JSON.stringify(getMessage.mock.calls[0])).toEqual( + `[${blockHeight}]` + ); + const block = Block.fromStreamerMessage(streamerMessage); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + + test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 upon cache miss', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const mockSend = jest.fn() + .mockReturnValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } + }) + .mockReturnValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis }); + + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false); + + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` + }))); + expect(transparentRedis.getStreamerMessage).toHaveBeenCalledTimes(1); + + const block = Block.fromStreamerMessage(streamerMessage); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + + test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 and not cache and construct the streamer message if historical', async () => { + const blockHeight = 85233529; + const blockHash = 'xyz'; + const mockSend = jest.fn() + .mockReturnValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + hash: blockHash, + } + }) + } + }) + .mockReturnValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + } + }); + const mockS3 = { + send: mockSend, + } as unknown as S3Client; + const mockRedis = { + getStreamerMessage: jest.fn() + } as unknown as RedisClient; + const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis }); + + const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, true); + + expect(mockSend).toHaveBeenCalledTimes(5); + expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/block.json` + }))); + expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ + Bucket: 'near-lake-data-mainnet', + Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` + }))); + expect(mockRedis.getStreamerMessage).toHaveBeenCalledTimes(0); + + const block = Block.fromStreamerMessage(streamerMessage); + + expect(block.blockHeight).toEqual(blockHeight); + expect(block.blockHash).toEqual(blockHash); + }); + test('Indexer.transformIndexerFunction() applies the necessary transformations', () => { - const indexer = new Indexer(); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const transformedFunction = indexer.transformIndexerFunction('console.log(\'hello\')'); @@ -239,7 +427,7 @@ CREATE TABLE } }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -291,7 +479,7 @@ CREATE TABLE test('Indexer.buildContext() can fetch from the near social api', async () => { const mockFetch = jest.fn(); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -320,7 +508,7 @@ CREATE TABLE errors: ['boom'] }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE); @@ -335,7 +523,7 @@ CREATE TABLE data: 'mock', }), }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -362,7 +550,7 @@ CREATE TABLE }); test('GetTables works for a variety of input schemas', async () => { - const indexer = new Indexer(); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const simpleSchemaTables = indexer.getTableNames(SIMPLE_SCHEMA); expect(simpleSchemaTables).toStrictEqual(['posts']); @@ -402,7 +590,7 @@ CREATE TABLE }); test('SanitizeTableName works properly on many test cases', async () => { - const indexer = new Indexer(); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); expect(indexer.sanitizeTableName('table_name')).toStrictEqual('TableName'); expect(indexer.sanitizeTableName('tablename')).toStrictEqual('Tablename'); // name is not capitalized @@ -417,7 +605,7 @@ CREATE TABLE }); test('indexer fails to build context.db due to collision on sanitized table names', async () => { - const indexer = new Indexer(); + const indexer = new Indexer('mainnet', { redisClient: transparentRedis }); const schemaWithDuplicateSanitizedTableNames = `CREATE TABLE "test table" ( @@ -439,8 +627,9 @@ CREATE TABLE }) }; - const indexer = new Indexer({ + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -478,8 +667,9 @@ CREATE TABLE }) }; - const indexer = new Indexer({ + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -508,8 +698,9 @@ CREATE TABLE }) }; - const indexer = new Indexer({ + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -542,8 +733,9 @@ CREATE TABLE }) }; - const indexer = new Indexer({ + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -578,8 +770,9 @@ CREATE TABLE }) }; - const indexer = new Indexer({ + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -597,8 +790,9 @@ CREATE TABLE create: jest.fn() }; - const indexer = new Indexer({ + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); @@ -639,8 +833,9 @@ CREATE TABLE create: jest.fn() }; - const indexer = new Indexer({ + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, + redisClient: transparentRedis, DmlHandler: mockDmlHandler }); const context = indexer.buildContext('', 'morgs.near/social_feed1', 1, 'postgres'); @@ -702,16 +897,25 @@ CREATE TABLE }), }); - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight - } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const mockS3 = { + send: jest.fn() + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, + }) + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, + }), + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -747,7 +951,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA }; - await indexer.runFunctions(mockBlock, functions, false); + await indexer.runFunctions(blockHeight, functions, false); expect(mockFetch.mock.calls).toMatchSnapshot(); }); @@ -781,16 +985,19 @@ CREATE TABLE }), })); const blockHeight = 456; - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight + const mockS3 = { + send: jest.fn().mockResolvedValue({ + Body: { + transformToString: () => JSON.stringify({ + chunks: [], + header: { + height: blockHeight + } + }) } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + }), + } as unknown as S3Client; + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -800,7 +1007,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA }; - await expect(indexer.runFunctions(mockBlock, functions, false)).rejects.toThrow(new Error('boom')); + await expect(indexer.runFunctions(blockHeight, functions, false)).rejects.toThrow(new Error('boom')); expect(mockFetch.mock.calls).toMatchSnapshot(); }); @@ -812,20 +1019,30 @@ CREATE TABLE errors: null, }), })); - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight - } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; + const mockS3 = { + send: jest + .fn() + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, + }) + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, + }), + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions = { 'morgs.near/test': { @@ -835,7 +1052,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA, } }; - await indexer.runFunctions(mockBlock, functions, false, { provision: true }); + await indexer.runFunctions(1, functions, false, { provision: true }); expect(provisioner.isUserApiProvisioned).toHaveBeenCalledWith('morgs.near', 'test'); expect(provisioner.provisionUserApi).toHaveBeenCalledTimes(1); @@ -854,20 +1071,30 @@ CREATE TABLE errors: null, }), })); - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight - } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; + const mockS3 = { + send: jest + .fn() + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, + }) + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, + }), + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -875,7 +1102,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA, } }; - await indexer.runFunctions(mockBlock, functions, false, { provision: true }); + await indexer.runFunctions(1, functions, false, { provision: true }); expect(provisioner.provisionUserApi).not.toHaveBeenCalled(); }); @@ -888,20 +1115,30 @@ CREATE TABLE errors: null, }), })); - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight - } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; + const mockS3 = { + send: jest + .fn() + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, + }) + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, + }), + } as unknown as S3Client; const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -911,7 +1148,7 @@ CREATE TABLE schema: SIMPLE_SCHEMA, } }; - await indexer.runFunctions(mockBlock, functions, false, { provision: true }); + await indexer.runFunctions(blockHeight, functions, false, { provision: true }); expect(provisioner.provisionUserApi).not.toHaveBeenCalled(); expect(mockFetch.mock.calls).toMatchSnapshot(); @@ -925,21 +1162,31 @@ CREATE TABLE errors: null, }), })); - const mockBlock = Block.fromStreamerMessage({ - block: { - chunks: [0], - header: { - height: blockHeight - } - }, - shards: {} - } as unknown as StreamerMessage) as unknown as Block; + const mockS3 = { + send: jest + .fn() + .mockResolvedValueOnce({ // block + Body: { + transformToString: () => JSON.stringify({ + chunks: [0], + header: { + height: blockHeight, + }, + }), + }, + }) + .mockResolvedValue({ // shard + Body: { + transformToString: () => JSON.stringify({}) + }, + }), + } as unknown as S3Client; const error = new Error('something went wrong with provisioning'); const provisioner: any = { isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn().mockRejectedValue(error), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, s3: mockS3, redisClient: transparentRedis, provisioner }); const functions: Record = { 'morgs.near/test': { @@ -950,7 +1197,7 @@ CREATE TABLE } }; - await expect(indexer.runFunctions(mockBlock, functions, false, { provision: true })).rejects.toThrow(error); + await expect(indexer.runFunctions(blockHeight, functions, false, { provision: true })).rejects.toThrow(error); expect(mockFetch.mock.calls).toMatchSnapshot(); }); @@ -962,7 +1209,7 @@ CREATE TABLE data: {} }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); // @ts-expect-error legacy test const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null); @@ -998,7 +1245,7 @@ CREATE TABLE }) }); const role = 'morgs_near'; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: transparentRedis }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const mutation = ` diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index fd69a3898..e866652b2 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -1,16 +1,21 @@ import fetch, { type Response } from 'node-fetch'; import { VM } from 'vm2'; -import { type Block } from '@near-lake/primitives'; +import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; +import { Block } from '@near-lake/primitives'; import { Parser } from 'node-sql-parser'; +import { METRICS } from '../metrics'; import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; +import RedisClient from '../redis-client'; interface Dependencies { fetch: typeof fetch + s3: S3Client provisioner: Provisioner DmlHandler: typeof DmlHandler parser: Parser + redisClient: RedisClient }; interface Context { @@ -35,27 +40,31 @@ export default class Indexer { private readonly deps: Dependencies; constructor ( + private readonly network: string, deps?: Partial ) { this.DEFAULT_HASURA_ROLE = 'append'; + this.network = network; this.deps = { fetch, + s3: new S3Client(), provisioner: new Provisioner(), DmlHandler, parser: new Parser(), + redisClient: deps?.redisClient ?? new RedisClient(), ...deps, }; } async runFunctions ( - block: Block, + blockHeight: number, functions: Record, isHistorical: boolean, options: { provision?: boolean } = { provision: false } ): Promise { - const blockHeight = block.blockHeight; + const blockWithHelpers = Block.fromStreamerMessage(await this.fetchStreamerMessage(blockHeight, isHistorical)); - const lag = Date.now() - Math.floor(Number(block.header().timestampNanosec) / 1000000); + const lag = Date.now() - Math.floor(Number(blockWithHelpers.header().timestampNanosec) / 1000000); const simultaneousPromises: Array> = []; const allMutations: string[] = []; @@ -89,10 +98,11 @@ export default class Indexer { } await this.setStatus(functionName, blockHeight, 'RUNNING'); + const vm = new VM({ timeout: 3000, allowAsync: true }); const context = this.buildContext(indexerFunction.schema, functionName, blockHeight, hasuraRoleName); - vm.freeze(block, 'block'); + vm.freeze(blockWithHelpers, 'block'); vm.freeze(context, 'context'); vm.freeze(context, 'console'); // provide console.log via context.log @@ -108,6 +118,7 @@ export default class Indexer { await this.writeLog(functionName, blockHeight, 'Error running IndexerFunction', error.message); throw e; } + simultaneousPromises.push(this.writeFunctionState(functionName, blockHeight, isHistorical)); } catch (e) { console.error(`${functionName}: Failed to run function`, e); @@ -120,6 +131,62 @@ export default class Indexer { return allMutations; } + // pad with 0s to 12 digits + normalizeBlockHeight (blockHeight: number): string { + return blockHeight.toString().padStart(12, '0'); + } + + async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> { + if (!isHistorical) { + const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight); + if (cachedMessage) { + METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); + const parsedMessage = JSON.parse(cachedMessage); + return parsedMessage; + } else { + METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); + } + } + const blockPromise = this.fetchBlockPromise(blockHeight); + const shardsPromises = await this.fetchShardsPromises(blockHeight, 4); + + const results = await Promise.all([blockPromise, ...shardsPromises]); + const block = results.shift(); + const shards = results; + return { + block, + shards, + }; + } + + async fetchShardsPromises (blockHeight: number, numberOfShards: number): Promise>> { + return ([...Array(numberOfShards).keys()].map(async (shardId) => + await this.fetchShardPromise(blockHeight, shardId) + )); + } + + async fetchShardPromise (blockHeight: number, shardId: number): Promise { + const params = { + Bucket: `near-lake-data-${this.network}`, + Key: `${this.normalizeBlockHeight(blockHeight)}/shard_${shardId}.json`, + }; + const response = await this.deps.s3.send(new GetObjectCommand(params)); + const shardData = await response.Body?.transformToString() ?? '{}'; + return JSON.parse(shardData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); + } + + async fetchBlockPromise (blockHeight: number): Promise { + const file = 'block.json'; + const folder = this.normalizeBlockHeight(blockHeight); + const params = { + Bucket: 'near-lake-data-' + this.network, + Key: `${folder}/${file}`, + }; + const response = await this.deps.s3.send(new GetObjectCommand(params)); + const blockData = await response.Body?.transformToString() ?? '{}'; + return JSON.parse(blockData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); + } + enableAwaitTransform (indexerFunction: string): string { return ` async function f(){ @@ -423,4 +490,25 @@ export default class Indexer { return data; } + + renameUnderscoreFieldsToCamelCase (value: Record): Record { + if (value !== null && typeof value === 'object' && !Array.isArray(value)) { + // It's a non-null, non-array object, create a replacement with the keys initially-capped + const newValue: any = {}; + for (const key in value) { + const newKey: string = key + .split('_') + .map((word, i) => { + if (i > 0) { + return word.charAt(0).toUpperCase() + word.slice(1); + } + return word; + }) + .join(''); + newValue[newKey] = value[key]; + } + return newValue; + } + return value; + } } diff --git a/runner/src/lake-client/index.ts b/runner/src/lake-client/index.ts deleted file mode 100644 index 41779a063..000000000 --- a/runner/src/lake-client/index.ts +++ /dev/null @@ -1 +0,0 @@ -export { default } from './lake-client'; diff --git a/runner/src/lake-client/lake-client.test.ts b/runner/src/lake-client/lake-client.test.ts deleted file mode 100644 index f04a18a28..000000000 --- a/runner/src/lake-client/lake-client.test.ts +++ /dev/null @@ -1,170 +0,0 @@ -import { GetObjectCommand, type S3Client } from '@aws-sdk/client-s3'; -import LakeClient from './lake-client'; -import type RedisClient from '../redis-client'; - -describe('LakeClient', () => { - const transparentRedis = { - getStreamerMessage: jest.fn() - } as unknown as RedisClient; - - test('Indexer.fetchBlock() should fetch the block and shards from S3 upon cache miss', async () => { - const blockHeight = 85233529; - const blockHash = 'xyz'; - const mockSend = jest.fn() - .mockReturnValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }) - } - }) - .mockReturnValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - const client = new LakeClient('mainnet', mockS3, transparentRedis); - - const block = await client.fetchBlock(blockHeight, true); - - expect(mockSend).toHaveBeenCalledTimes(5); - expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }))); - expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` - }))); - - expect(block.blockHeight).toEqual(blockHeight); - expect(block.blockHash).toEqual(blockHash); - }); - - test('fetchBlock should fetch the streamer message from cache, convert it to block, and return it', async () => { - const blockHeight = 85233529; - const blockHash = 'xyz'; - const getMessage = jest.fn() - .mockReturnValueOnce(JSON.stringify( - { - block: { - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }, - shards: {} - } - )); - const mockRedis = { - getStreamerMessage: getMessage - } as unknown as RedisClient; - const mockS3 = {} as unknown as S3Client; - const client = new LakeClient('mainnet', mockS3, mockRedis); - - const block = await client.fetchBlock(blockHeight, false); - - expect(getMessage).toHaveBeenCalledTimes(1); - expect(JSON.stringify(getMessage.mock.calls[0])).toEqual( - `[${blockHeight}]` - ); - - expect(block.blockHeight).toEqual(blockHeight); - expect(block.blockHash).toEqual(blockHash); - }); - - test('fetchBlock should fetch the block and shards from S3 upon cache miss', async () => { - const blockHeight = 85233529; - const blockHash = 'xyz'; - const mockSend = jest.fn() - .mockReturnValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }) - } - }) - .mockReturnValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - const client = new LakeClient('mainnet', mockS3, transparentRedis); - - const block = await client.fetchBlock(blockHeight, false); - - expect(mockSend).toHaveBeenCalledTimes(5); - expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }))); - expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` - }))); - expect(transparentRedis.getStreamerMessage).toHaveBeenCalledTimes(1); - - expect(block.blockHeight).toEqual(blockHeight); - expect(block.blockHash).toEqual(blockHash); - }); - - test('fetchBlock should not hit cache and instead fetch the block and shards from S3 if historical', async () => { - const blockHeight = 85233529; - const blockHash = 'xyz'; - const mockSend = jest.fn() - .mockReturnValueOnce({ // block - Body: { - transformToString: () => JSON.stringify({ - chunks: [0], - header: { - height: blockHeight, - hash: blockHash, - } - }) - } - }) - .mockReturnValue({ // shard - Body: { - transformToString: () => JSON.stringify({}) - } - }); - const mockS3 = { - send: mockSend, - } as unknown as S3Client; - const mockRedis = { - getStreamerMessage: jest.fn() - } as unknown as RedisClient; - const client = new LakeClient('mainnet', mockS3, mockRedis); - - const block = await client.fetchBlock(blockHeight, true); - - expect(mockSend).toHaveBeenCalledTimes(5); - expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/block.json` - }))); - expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({ - Bucket: 'near-lake-data-mainnet', - Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json` - }))); - expect(mockRedis.getStreamerMessage).toHaveBeenCalledTimes(0); - - expect(block.blockHeight).toEqual(blockHeight); - expect(block.blockHash).toEqual(blockHash); - }); -}); diff --git a/runner/src/lake-client/lake-client.ts b/runner/src/lake-client/lake-client.ts deleted file mode 100644 index d06d5cef9..000000000 --- a/runner/src/lake-client/lake-client.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3'; -import { Block } from '@near-lake/primitives'; -import { METRICS } from '../metrics'; -import RedisClient from '../redis-client'; - -export default class LakeClient { - constructor ( - private readonly network: string = 'mainnet', - private readonly s3Client: S3Client = new S3Client(), - private readonly redisClient: RedisClient = new RedisClient() - ) {} - - // pad with 0s to 12 digits - private normalizeBlockHeight (blockHeight: number): string { - return blockHeight.toString().padStart(12, '0'); - } - - private async fetchShardsPromises (blockHeight: number, numberOfShards: number): Promise>> { - return ([...Array(numberOfShards).keys()].map(async (shardId) => - await this.fetchShardPromise(blockHeight, shardId) - )); - } - - private async fetchShardPromise (blockHeight: number, shardId: number): Promise { - const params = { - Bucket: `near-lake-data-${this.network}`, - Key: `${this.normalizeBlockHeight(blockHeight)}/shard_${shardId}.json`, - }; - const response = await this.s3Client.send(new GetObjectCommand(params)); - const shardData = await response.Body?.transformToString() ?? '{}'; - return JSON.parse(shardData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); - } - - private async fetchBlockPromise (blockHeight: number): Promise { - const file = 'block.json'; - const folder = this.normalizeBlockHeight(blockHeight); - const params = { - Bucket: 'near-lake-data-' + this.network, - Key: `${folder}/${file}`, - }; - const response = await this.s3Client.send(new GetObjectCommand(params)); - const blockData = await response.Body?.transformToString() ?? '{}'; - return JSON.parse(blockData, (_key, value) => this.renameUnderscoreFieldsToCamelCase(value)); - } - - private renameUnderscoreFieldsToCamelCase (value: Record): Record { - if (value !== null && typeof value === 'object' && !Array.isArray(value)) { - // It's a non-null, non-array object, create a replacement with the keys initially-capped - const newValue: any = {}; - for (const key in value) { - const newKey: string = key - .split('_') - .map((word, i) => { - if (i > 0) { - return word.charAt(0).toUpperCase() + word.slice(1); - } - return word; - }) - .join(''); - newValue[newKey] = value[key]; - } - return newValue; - } - return value; - } - - async fetchBlock (blockHeight: number, isHistorical: boolean): Promise { - if (!isHistorical) { - const cachedMessage = await this.redisClient.getStreamerMessage(blockHeight); - if (cachedMessage) { - METRICS.CACHE_HIT.inc(); - const parsedMessage = JSON.parse(cachedMessage); - return Block.fromStreamerMessage(parsedMessage); - } else { - METRICS.CACHE_MISS.inc(); - } - } - - const blockPromise = this.fetchBlockPromise(blockHeight); - const shardsPromises = await this.fetchShardsPromises(blockHeight, 4); - - const results = await Promise.all([blockPromise, ...shardsPromises]); - const block = results.shift(); - const shards = results; - return Block.fromStreamerMessage({ - block, - shards, - }); - } -} diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index e76d4d86d..c3ab74b81 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -1,47 +1,35 @@ import express from 'express'; import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client'; -const BLOCK_WAIT_DURATION = new Gauge({ - name: 'queryapi_runner_block_wait_duration_milliseconds', - help: 'Time an indexer function waited for a block before processing', - labelNames: ['indexer', 'type'], -}); - -const CACHE_HIT = new Counter({ - name: 'queryapi_runner_cache_hit', - help: 'The number of times cache was hit successfully' -}); - -const CACHE_MISS = new Counter({ - name: 'queryapi_runner_cache_miss', - help: 'The number of times cache was missed' -}); - const UNPROCESSED_STREAM_MESSAGES = new Gauge({ name: 'queryapi_runner_unprocessed_stream_messages', help: 'Number of Redis Stream messages not yet processed', labelNames: ['indexer', 'type'], }); -const LAST_PROCESSED_BLOCK_HEIGHT = new Gauge({ - name: 'queryapi_runner_last_processed_block_height', - help: 'Previous block height processed by an indexer', - labelNames: ['indexer', 'type'], -}); - const EXECUTION_DURATION = new Histogram({ name: 'queryapi_runner_execution_duration_milliseconds', help: 'Time taken to execute an indexer function', labelNames: ['indexer', 'type'], }); +const CACHE_HIT = new Counter({ + name: 'queryapi_runner_cache_hit', + help: 'The number of times cache was hit successfully', + labelNames: ['type', 'key'] +}); + +const CACHE_MISS = new Counter({ + name: 'queryapi_runner_cache_miss', + help: 'The number of times cache was missed', + labelNames: ['type', 'key'] +}); + export const METRICS = { - BLOCK_WAIT_DURATION, - CACHE_HIT, - CACHE_MISS, - UNPROCESSED_STREAM_MESSAGES, - LAST_PROCESSED_BLOCK_HEIGHT, EXECUTION_DURATION, + UNPROCESSED_STREAM_MESSAGES, + CACHE_HIT, + CACHE_MISS }; const aggregatorRegistry = new AggregatorRegistry(); diff --git a/runner/src/redis-client/index.ts b/runner/src/redis-client/index.ts index 938571c25..efa0f96e7 100644 --- a/runner/src/redis-client/index.ts +++ b/runner/src/redis-client/index.ts @@ -1 +1 @@ -export { default, type StreamType } from './redis-client'; +export { default } from './redis-client'; diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 1abfd262f..26030f249 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -10,7 +10,7 @@ describe('RedisClient', () => { const client = new RedisClient(mockClient); - const message = await client.getStreamMessages('streamKey'); + const message = await client.getNextStreamMessage('streamKey'); expect(mockClient.xRead).toHaveBeenCalledWith( { key: 'streamKey', id: '0' }, @@ -19,24 +19,6 @@ describe('RedisClient', () => { expect(message).toBeUndefined(); }); - it('returns count of messages after id with block', async () => { - const mockClient = { - on: jest.fn(), - connect: jest.fn().mockResolvedValue(null), - xRead: jest.fn().mockResolvedValue(null), - } as any; - - const client = new RedisClient(mockClient); - - const message = await client.getStreamMessages('streamKey', '123-0', 10); - - expect(mockClient.xRead).toHaveBeenCalledWith( - { key: 'streamKey', id: '123-0' }, - { COUNT: 10 } - ); - expect(message).toBeUndefined(); - }); - it('deletes the stream message', async () => { const mockClient = { on: jest.fn(), diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 3edbde25a..18e11b854 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -14,7 +14,7 @@ interface StreamStorage { schema: string } -export type StreamType = 'historical' | 'real-time'; +type StreamType = 'historical' | 'real-time'; export default class RedisClient { SMALLEST_STREAM_ID = '0'; @@ -44,14 +44,12 @@ export default class RedisClient { await this.client.disconnect(); } - async getStreamMessages ( + async getNextStreamMessage ( streamKey: string, - streamId = this.SMALLEST_STREAM_ID, - count = 1 ): Promise { const results = await this.client.xRead( - { key: streamKey, id: streamId }, - { COUNT: count } + { key: streamKey, id: this.SMALLEST_STREAM_ID }, + { COUNT: 1 } ); return results?.[0].messages as StreamMessage[]; @@ -66,9 +64,8 @@ export default class RedisClient { async getUnprocessedStreamMessages ( streamKey: string, - startId = this.SMALLEST_STREAM_ID, ): Promise { - const results = await this.client.xRange(streamKey, startId, this.LARGEST_STREAM_ID); + const results = await this.client.xRange(streamKey, this.SMALLEST_STREAM_ID, this.LARGEST_STREAM_ID); return results as StreamMessage[]; }; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index ae0d29c56..1cc1c531d 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -2,144 +2,69 @@ import { isMainThread, parentPort, workerData } from 'worker_threads'; import promClient from 'prom-client'; import Indexer from '../indexer'; -import RedisClient, { type StreamType } from '../redis-client'; +import RedisClient from '../redis-client'; import { METRICS } from '../metrics'; -import type { Block } from '@near-lake/primitives'; -import LakeClient from '../lake-client'; if (isMainThread) { throw new Error('Worker should not be run on main thread'); } -interface QueueMessage { - block: Block - streamMessageId: string -} -type PrefetchQueue = Array>; - -interface WorkerContext { - redisClient: RedisClient - lakeClient: LakeClient - queue: PrefetchQueue - streamKey: string - streamType: StreamType -} + +const indexer = new Indexer('mainnet'); +const redisClient = new RedisClient(); const sleep = async (ms: number): Promise => { await new Promise((resolve) => setTimeout(resolve, ms)); }; void (async function main () { const { streamKey } = workerData; - const redisClient = new RedisClient(); - const workerContext: WorkerContext = { - redisClient, - lakeClient: new LakeClient(), - queue: [], - streamKey, - streamType: redisClient.getStreamType(streamKey), - }; console.log('Started processing stream: ', streamKey); - await handleStream(workerContext, streamKey); -})(); - -async function handleStream (workerContext: WorkerContext, streamKey: string): Promise { - void blockQueueProducer(workerContext, streamKey); - void blockQueueConsumer(workerContext, streamKey); -} - -function incrementId (id: string): string { - const [main, sequence] = id.split('-'); - return `${Number(main) + 1}-${sequence}`; -} - -async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { - const HISTORICAL_BATCH_SIZE = 100; - let streamMessageStartId = '0'; + let indexerName = ''; + const streamType = redisClient.getStreamType(streamKey); + const isHistorical = streamType === 'historical'; while (true) { - const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length; - if (preFetchCount <= 0) { - await sleep(100); - continue; - } - const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); - if (messages == null) { - await sleep(100); - continue; - } - console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`); - - for (const streamMessage of messages) { - const { id, message } = streamMessage; - workerContext.queue.push(generateQueueMessage(workerContext, Number(message.block_height), id)); - } + try { + const startTime = performance.now(); - streamMessageStartId = incrementId(messages[messages.length - 1].id); - } -} + const messages = await redisClient.getNextStreamMessage(streamKey); + const indexerConfig = await redisClient.getStreamStorage(streamKey); -async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise { - const indexer = new Indexer(); - const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey); - const indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; - const functions = { - [indexerName]: { - account_id: indexerConfig.account_id, - function_name: indexerConfig.function_name, - code: indexerConfig.code, - schema: indexerConfig.schema, - provisioned: false, - }, - }; + indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; - while (true) { - let streamMessageId = ''; - try { - while (workerContext.queue.length === 0) { - await sleep(100); - } - const queueMessage = await workerContext.queue.at(0); - if (queueMessage === undefined) { + if (messages == null) { + await sleep(1000); continue; } - const startTime = performance.now(); - const blockStartTime = startTime; - const block = queueMessage.block; - streamMessageId = queueMessage.streamMessageId; - if (block === undefined || block.blockHeight == null) { - console.error('Block failed to process or does not have block height', block); - continue; - } - METRICS.BLOCK_WAIT_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).set(performance.now() - blockStartTime); - await indexer.runFunctions(block, functions, false, { provision: true }); + const [{ id, message }] = messages; - await workerContext.redisClient.deleteStreamMessage(streamKey, streamMessageId); - await workerContext.queue.shift(); + const functions = { + [indexerName]: { + account_id: indexerConfig.account_id, + function_name: indexerConfig.function_name, + code: indexerConfig.code, + schema: indexerConfig.schema, + provisioned: false, + }, + }; + await indexer.runFunctions(Number(message.block_height), functions, isHistorical, { + provision: true, + }); - METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime); + await redisClient.deleteStreamMessage(streamKey, id); - if (workerContext.streamType === 'historical') { - METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight); - } + METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime); console.log(`Success: ${indexerName}`); } catch (err) { await sleep(10000); console.log(`Failed: ${indexerName}`, err); } finally { - const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0); + const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0); parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); } } -} - -async function generateQueueMessage (workerContext: WorkerContext, blockHeight: number, streamMessageId: string): Promise { - const block = await workerContext.lakeClient.fetchBlock(blockHeight, workerContext.streamType === 'historical'); - return { - block, - streamMessageId - }; -} +})();