From 10aae59485ece18d859945ab1206856f2d22b7ff Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 22 Aug 2023 09:07:05 -0700 Subject: [PATCH] feat: Generate insert and select methods for context object under db (#177) Co-authored-by: Darun Seethammagari --- .gitignore | 1 + README.md | 2 + frontend/src/components/Editor/Editor.js | 7 +- frontend/src/utils/indexerRunner.js | 104 +++++++++--- runner/src/dml-handler/dml-handler.test.ts | 93 +++++++++++ runner/src/dml-handler/dml-handler.ts | 63 ++++++++ runner/src/dml-handler/index.ts | 1 + .../src/hasura-client/hasura-client.test.ts | 76 +++++++++ runner/src/hasura-client/hasura-client.ts | 9 ++ runner/src/indexer/indexer.test.ts | 153 ++++++++++++++++-- runner/src/indexer/indexer.ts | 56 ++++++- runner/src/provisioner/provisioner.ts | 31 ++-- runner/src/utility.ts | 13 ++ 13 files changed, 544 insertions(+), 65 deletions(-) create mode 100644 runner/src/dml-handler/dml-handler.test.ts create mode 100644 runner/src/dml-handler/dml-handler.ts create mode 100644 runner/src/dml-handler/index.ts create mode 100644 runner/src/utility.ts diff --git a/.gitignore b/.gitignore index 6440537ad..6a2af26a0 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ redis/ *.log /indexer/blocks/ +node_modules/ diff --git a/README.md b/README.md index bb82abb01..7112dd31d 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,8 @@ when they match new blocks by placing messages on an SQS queue. Spawns historica indexer_rules_engine, storage. 2. [Indexer Runner](.indexer-js-queue-handler) Retrieves messages from the SQS queue, fetches the matching block and executes the IndexerFunction. +3. [Runner](.runner) + Retrieves messages from Redis Stream, fetching matching block and executes the IndexerFunction. 3. [IndexerFunction Editor UI](./frontend) Serves the editor UI within the dashboard widget and mediates some communication with the GraphQL DB and block server. 4. [Hasura Authentication Service](./hasura-authentication-service) diff --git a/frontend/src/components/Editor/Editor.js b/frontend/src/components/Editor/Editor.js index 4d217e6b6..9745661f8 100644 --- a/frontend/src/components/Editor/Editor.js +++ b/frontend/src/components/Editor/Editor.js @@ -261,10 +261,11 @@ const Editor = ({ async function executeIndexerFunction(option = "latest", startingBlockHeight = null) { setIsExecutingIndexerFunction(() => true) + const schemaName = indexerDetails.accountId.concat("_", indexerDetails.indexerName).replace(/[^a-zA-Z0-9]/g, '_'); switch (option) { case "debugList": - await indexerRunner.executeIndexerFunctionOnHeights(heights, indexingCode, option) + await indexerRunner.executeIndexerFunctionOnHeights(heights, indexingCode, schema, schemaName, option) break case "specific": if (startingBlockHeight === null && Number(startingBlockHeight) === 0) { @@ -272,11 +273,11 @@ const Editor = ({ break } - await indexerRunner.start(startingBlockHeight, indexingCode, option) + await indexerRunner.start(startingBlockHeight, indexingCode, schema, schemaName, option) break case "latest": const latestHeight = await requestLatestBlockHeight() - if (latestHeight) await indexerRunner.start(latestHeight - 10, indexingCode, option) + if (latestHeight) await indexerRunner.start(latestHeight - 10, indexingCode, schema, schemaName, option) } setIsExecutingIndexerFunction(() => false) } diff --git a/frontend/src/utils/indexerRunner.js b/frontend/src/utils/indexerRunner.js index 123226bbb..6df114c29 100644 --- a/frontend/src/utils/indexerRunner.js +++ b/frontend/src/utils/indexerRunner.js @@ -10,7 +10,7 @@ export default class IndexerRunner { this.shouldStop = false; } - async start(startingHeight, indexingCode, option) { + async start(startingHeight, indexingCode, schema, schemaName, option) { this.currentHeight = startingHeight; this.shouldStop = false; console.clear() @@ -32,7 +32,7 @@ export default class IndexerRunner { this.stop() } if (blockDetails) { - await this.executeIndexerFunction(this.currentHeight, blockDetails, indexingCode); + await this.executeIndexerFunction(this.currentHeight, blockDetails, indexingCode, schema, schemaName); this.currentHeight++; await this.delay(1000); } @@ -50,8 +50,24 @@ export default class IndexerRunner { return new Promise((resolve) => setTimeout(resolve, ms)); } - async executeIndexerFunction(height, blockDetails, indexingCode) { + validateTableNames(tableNames) { + if (!(Array.isArray(tableNames) && tableNames.length > 0)) { + throw new Error("Schema does not have any tables. There should be at least one table."); + } + const correctTableNameFormat = /^[a-zA-Z_][a-zA-Z0-9_]*$/; + + tableNames.forEach(name => { + if (!correctTableNameFormat.test(name)) { + throw new Error(`Table name ${name} is not formatted correctly. Table names must not start with a number and only contain alphanumerics or underscores.`); + } + }); + } + + async executeIndexerFunction(height, blockDetails, indexingCode, schema, schemaName) { let innerCode = indexingCode.match(/getBlock\s*\([^)]*\)\s*{([\s\S]*)}/)[1]; + let tableNames = Array.from(schema.matchAll(/CREATE TABLE\s+"(\w+)"/g), match => match[1]); // Get first capturing group of each match + this.validateTableNames(tableNames); + if (blockDetails) { const block = Block.fromStreamerMessage(blockDetails); block.actions() @@ -59,11 +75,11 @@ export default class IndexerRunner { block.events() console.log(block) - await this.runFunction(blockDetails, height, innerCode); + await this.runFunction(blockDetails, height, innerCode, schemaName, tableNames); } } - async executeIndexerFunctionOnHeights(heights, indexingCode) { + async executeIndexerFunctionOnHeights(heights, indexingCode, schema, schemaName) { console.clear() console.group('%c Welcome! Lets test your indexing logic on some Near Blocks!', 'color: white; background-color: navy; padding: 5px;'); if (heights.length === 0) { @@ -80,14 +96,14 @@ export default class IndexerRunner { console.log(error) } console.time('Indexing Execution Complete') - this.executeIndexerFunction(height, blockDetails, indexingCode) + this.executeIndexerFunction(height, blockDetails, indexingCode, schema, schemaName) console.timeEnd('Indexing Execution Complete') console.groupEnd() } console.groupEnd() } - async runFunction(streamerMessage, blockHeight, indexerCode) { + async runFunction(streamerMessage, blockHeight, indexerCode, schemaName, tableNames) { const innerCodeWithBlockHelper = ` const block = Block.fromStreamerMessage(streamerMessage); @@ -125,21 +141,20 @@ export default class IndexerRunner { "", () => { let operationType, operationName - const match = query.match(/(query|mutation)\s+(\w+)\s*(\(.*?\))?\s*\{([\s\S]*)\}/); - if (match) { - operationType = match[1]; - operationName = match[2]; - } - - console.group(`Executing GraphQL ${operationType}: (${operationName})`); - if (operationType === 'mutation') console.log('%c Mutations in debug mode do not alter the database', 'color: black; background-color: yellow; padding: 5px;'); - console.group(`Data passed to ${operationType}`); - console.dir(mutationData); - console.groupEnd(); - console.group(`Data returned by ${operationType}`); - console.log({}) - console.groupEnd(); - console.groupEnd(); + const match = query.match(/(query|mutation)\s+(\w+)\s*(\(.*?\))?\s*\{([\s\S]*)\}/); + if (match) { + operationType = match[1]; + operationName = match[2]; + } + console.group(`Executing GraphQL ${operationType}: (${operationName})`); + if (operationType === 'mutation') console.log('%c Mutations in debug mode do not alter the database', 'color: black; background-color: yellow; padding: 5px;'); + console.group(`Data passed to ${operationType}`); + console.dir(mutationData); + console.groupEnd(); + console.group(`Data returned by ${operationType}`); + console.log({}) + console.groupEnd(); + console.groupEnd(); } ); return {}; @@ -147,11 +162,56 @@ export default class IndexerRunner { log: async (message) => { this.handleLog(blockHeight, message); }, + db: this.buildDatabaseContext(blockHeight, schemaName, tableNames) }; wrappedFunction(Block, streamerMessage, context); } + buildDatabaseContext (blockHeight, schemaName, tables) { + try { + const result = tables.reduce((prev, tableName) => ({ + ...prev, + [`insert_${tableName}`]: async (objects) => await this.insert(blockHeight, schemaName, tableName, objects), + [`select_${tableName}`]: async (object, limit = 0) => await this.select(blockHeight, schemaName, tableName, object, limit), + }), {}); + return result; + } catch (error) { + console.error('Caught error when generating DB methods. Falling back to generic methods.', error); + return { + insert: async (tableName, objects) => { + this.insert(blockHeight, schemaName, tableName, objects); + }, + select: async (tableName, object, limit = 0) => { + this.select(blockHeight, schemaName, tableName, object, limit); + } + }; + } + } + + insert(blockHeight, schemaName, tableName, objects) { + this.handleLog( + blockHeight, + "", + () => { + console.log('Inserting object %s into table %s on schema %s', JSON.stringify(objects), tableName, schemaName); + } + ); + return {}; + } + + select(blockHeight, schemaName, tableName, object, limit) { + this.handleLog( + blockHeight, + "", + () => { + const roundedLimit = Math.round(limit); + console.log('Selecting objects with values %s from table %s on schema %s with %s limit', JSON.stringify(object), tableName, schemaName, limit === 0 ? 'no' : roundedLimit.toString()); + } + ); + return {}; + } + // deprecated replaceNewLines(code) { return code.replace(/\\n/g, "\n").replace(/\\"/g, '"'); diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts new file mode 100644 index 000000000..e0c92c057 --- /dev/null +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -0,0 +1,93 @@ +import pgFormat from 'pg-format'; +import DmlHandler from './dml-handler'; + +describe('DML Handler tests', () => { + const hasuraClient: any = { + getDbConnectionParameters: jest.fn().mockReturnValue({ + database: 'test_near', + host: 'postgres', + password: 'test_pass', + port: 5432, + username: 'test_near' + }) + }; + let PgClient: any; + let query: any; + + const ACCOUNT = 'test_near'; + const SCHEMA = 'test_schema'; + const TABLE_NAME = 'test_table'; + + beforeEach(() => { + query = jest.fn().mockReturnValue({ rows: [] }); + PgClient = jest.fn().mockImplementation(() => { + return { query, format: pgFormat }; + }); + }); + + test('Test valid insert one with array', async () => { + const inputObj = { + account_id: 'test_acc_near', + block_height: 999, + block_timestamp: 'UTC', + content: 'test_content', + receipt_id: 111, + accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) + }; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); + expect(query.mock.calls).toEqual([ + ['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *;', []] + ]); + }); + + test('Test valid insert multiple rows with array', async () => { + const inputObj = [{ + account_id: 'morgs_near', + block_height: 1, + receipt_id: 'abc', + }, + { + account_id: 'morgs_near', + block_height: 2, + receipt_id: 'abc', + }]; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); + expect(query.mock.calls).toEqual([ + ['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *;', []] + ]); + }); + + test('Test valid select on two fields', async () => { + const inputObj = { + account_id: 'test_acc_near', + block_height: 999, + }; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj); + expect(query.mock.calls).toEqual([ + ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2', Object.values(inputObj)] + ]); + }); + + test('Test valid select on two fields with limit', async () => { + const inputObj = { + account_id: 'test_acc_near', + block_height: 999, + }; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1); + expect(query.mock.calls).toEqual([ + ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1', Object.values(inputObj)] + ]); + }); +}); diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts new file mode 100644 index 000000000..45dbdcb73 --- /dev/null +++ b/runner/src/dml-handler/dml-handler.ts @@ -0,0 +1,63 @@ +import { wrapError } from '../utility'; +import PgClientModule from '../pg-client'; +import HasuraClient from '../hasura-client/hasura-client'; + +export default class DmlHandler { + private pgClient!: PgClientModule; + private readonly initialized: Promise; + + constructor ( + private readonly account: string, + private readonly hasuraClient: HasuraClient = new HasuraClient(), + private readonly PgClient = PgClientModule, + ) { + this.initialized = this.initialize(); + } + + private async initialize (): Promise { + const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account); + this.pgClient = new this.PgClient({ + user: connectionParameters.username, + password: connectionParameters.password, + host: process.env.PGHOST, + port: Number(connectionParameters.port), + database: connectionParameters.database, + }); + } + + async insert (schemaName: string, tableName: string, objects: any[]): Promise { + await this.initialized; // Ensure constructor completed before proceeding + if (!objects?.length) { + return []; + } + + const keys = Object.keys(objects[0]); + // Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order + const values = objects.map(obj => keys.map(key => obj[key])); + const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *;`; + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); + if (result.rows?.length === 0) { + console.log('No rows were inserted.'); + } + return result.rows; + } + + async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise { + await this.initialized; // Ensure constructor completed before proceeding + + const keys = Object.keys(object); + const values = Object.values(object); + const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); + let query = `SELECT * FROM ${schemaName}.${tableName} WHERE ${param}`; + if (limit !== null) { + query = query.concat(' LIMIT ', Math.round(limit).toString()); + } + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); + if (!(result.rows && result.rows.length > 0)) { + console.log('No rows were selected.'); + } + return result.rows; + } +} diff --git a/runner/src/dml-handler/index.ts b/runner/src/dml-handler/index.ts new file mode 100644 index 000000000..8beb5a70c --- /dev/null +++ b/runner/src/dml-handler/index.ts @@ -0,0 +1 @@ +export { default } from './dml-handler'; diff --git a/runner/src/hasura-client/hasura-client.test.ts b/runner/src/hasura-client/hasura-client.test.ts index 56a19cef2..d4b621886 100644 --- a/runner/src/hasura-client/hasura-client.test.ts +++ b/runner/src/hasura-client/hasura-client.test.ts @@ -244,4 +244,80 @@ describe('HasuraClient', () => { expect(mockFetch).toBeCalledTimes(1); // to fetch the foreign keys }); + + it('returns connection parameters for valid and invalid users', async () => { + const testUsers = { + testA_near: 'passA', + testB_near: 'passB', + testC_near: 'passC' + }; + const TEST_METADATA = generateMetadata(testUsers); + const mockFetch = jest + .fn() + .mockResolvedValue({ + status: 200, + text: () => JSON.stringify({ metadata: TEST_METADATA }) + }); + const client = new HasuraClient({ fetch: mockFetch as unknown as typeof fetch }); + const result = await client.getDbConnectionParameters('testB_near'); + expect(result).toEqual(generateConnectionParameter('testB_near', 'passB')); + await expect(client.getDbConnectionParameters('fake_near')).rejects.toThrow('Could not find connection parameters for user fake_near on respective database.'); + }); }); + +function generateMetadata (testUsers: any): any { + const sources = []; + // Insert default source which has different format than the rest + sources.push({ + name: 'default', + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' }, + isolation_level: 'read-committed', + pool_settings: { + connection_lifetime: 600, + idle_timeout: 180, + max_connections: 50, + retries: 1 + }, + use_prepared_statements: true + } + } + }); + + Object.keys(testUsers).forEach((user) => { + sources.push(generateSource(user, testUsers[user])); + }); + + return { + version: 3, + sources + }; +} + +function generateSource (user: string, password: string): any { + return { + name: user, + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { connection_parameters: generateConnectionParameter(user, password) }, + isolation_level: 'read-committed', + use_prepared_statements: false + } + } + }; +} + +function generateConnectionParameter (user: string, password: string): any { + return { + database: user, + host: 'postgres', + password, + port: 5432, + username: user + }; +} diff --git a/runner/src/hasura-client/hasura-client.ts b/runner/src/hasura-client/hasura-client.ts index 4edf6f1bb..0162c4b41 100644 --- a/runner/src/hasura-client/hasura-client.ts +++ b/runner/src/hasura-client/hasura-client.ts @@ -99,6 +99,15 @@ export default class HasuraClient { return metadata; } + async getDbConnectionParameters (account: string): Promise { + const metadata = await this.exportMetadata(); + const source = metadata.sources.find((source: { name: any, configuration: any }) => source.name === account); + if (source === undefined) { + throw new Error(`Could not find connection parameters for user ${account} on respective database.`); + } + return source.configuration.connection_info.database_url.connection_parameters; + } + async doesSourceExist (source: string): Promise { const metadata = await this.exportMetadata(); return metadata.sources.filter(({ name }: { name: string }) => name === source).length > 0; diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index f83165d7d..2ace38cc2 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -10,8 +10,61 @@ describe('Indexer unit tests', () => { const HASURA_ENDPOINT = 'mock-hasura-endpoint'; const HASURA_ADMIN_SECRET = 'mock-hasura-secret'; - - beforeAll(() => { + const HASURA_ROLE = 'morgs_near'; + const INVALID_HASURA_ROLE = 'other_near'; + + const INDEXER_NAME = 'morgs.near/test_fn'; + + const SIMPLE_SCHEMA = `CREATE TABLE + "posts" ( + "id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "accounts_liked" JSONB NOT NULL DEFAULT '[]', + "last_comment_timestamp" DECIMAL(20, 0), + CONSTRAINT "posts_pkey" PRIMARY KEY ("id") + );`; + + const SOCIAL_SCHEMA = ` + CREATE TABLE + "posts" ( + "id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "accounts_liked" JSONB NOT NULL DEFAULT '[]', + "last_comment_timestamp" DECIMAL(20, 0), + CONSTRAINT "posts_pkey" PRIMARY KEY ("id") + ); + + CREATE TABLE + "comments" ( + "id" SERIAL NOT NULL, + "post_id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + CONSTRAINT "comments_pkey" PRIMARY KEY ("id") + ); + + CREATE TABLE + "post_likes" ( + "post_id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0), + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + CONSTRAINT "post_likes_pkey" PRIMARY KEY ("post_id", "account_id") + );'`; + + beforeEach(() => { process.env = { ...oldEnv, HASURA_ENDPOINT, @@ -52,7 +105,8 @@ describe('Indexer unit tests', () => { code: ` const foo = 3; block.result = context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); - ` + `, + schema: SIMPLE_SCHEMA }; await indexer.runFunctions(blockHeight, functions, false); @@ -190,7 +244,7 @@ describe('Indexer unit tests', () => { }); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('test', 'morgs.near/test', 1, 'morgs_near'); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const query = ` query { @@ -242,7 +296,7 @@ describe('Indexer unit tests', () => { const mockFetch = jest.fn(); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('test', 'morgs.near/test', 1, 'role'); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); await context.fetchFromSocialApi('/index', { method: 'POST', @@ -271,7 +325,7 @@ describe('Indexer unit tests', () => { }); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('test', 'morgs.near/test', 1, 'role'); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE); await expect(async () => await context.graphql('query { hello }')).rejects.toThrow('boom'); }); @@ -286,7 +340,7 @@ describe('Indexer unit tests', () => { }); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('test', 'morgs.near/test', 1, 'morgs_near'); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const query = 'query($name: String) { hello(name: $name) }'; const variables = { name: 'morgan' }; @@ -310,6 +364,73 @@ describe('Indexer unit tests', () => { ]); }); + test('indexer builds context and inserts an objects into existing table', async () => { + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + }); + + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + const objToInsert = [{ + account_id: 'morgs_near', + block_height: 1, + receipt_id: 'abc', + content: 'test', + block_timestamp: 800, + accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) + }, + { + account_id: 'morgs_near', + block_height: 2, + receipt_id: 'abc', + content: 'test', + block_timestamp: 801, + accounts_liked: JSON.stringify(['cwpuzzles.near']) + }]; + + const result = await context.db.insert_posts(objToInsert); + expect(result.length).toEqual(2); + }); + + test('indexer builds context and selects objects from existing table', async () => { + const selectFn = jest.fn(); + selectFn.mockImplementation((...lim) => { + // Expects limit to be last parameter + return lim[lim.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }]; + }); + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { select: selectFn }; + }); + + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + const objToSelect = { + account_id: 'morgs_near', + receipt_id: 'abc', + }; + const result = await context.db.select_posts(objToSelect); + expect(result.length).toEqual(2); + const resultLimit = await context.db.select_posts(objToSelect, 1); + expect(resultLimit.length).toEqual(1); + }); + + test('indexer builds context and verifies all methods generated', async () => { + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { + insert: jest.fn().mockReturnValue(true), + select: jest.fn().mockReturnValue(true) + }; + }); + + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + // These calls would fail on a real database, but we are merely checking to ensure they exist + expect(Object.keys(context.db)).toStrictEqual(['insert_posts', 'select_posts', 'insert_comments', 'select_comments', 'insert_post_likes', 'select_post_likes']); + }); + test('Indexer.runFunctions() allows imperative execution of GraphQL operations', async () => { const postId = 1; const commentId = 2; @@ -418,7 +539,8 @@ describe('Indexer unit tests', () => { \`); return (\`Created comment \${id} on post \${post.id}\`) - ` + `, + schema: SIMPLE_SCHEMA }; await indexer.runFunctions(blockHeight, functions, false); @@ -475,7 +597,8 @@ describe('Indexer unit tests', () => { functions['buildnear.testnet/test'] = { code: ` throw new Error('boom'); - ` + `, + schema: SIMPLE_SCHEMA }; await expect(indexer.runFunctions(blockHeight, functions, false)).rejects.toThrow(new Error('boom')); @@ -524,7 +647,7 @@ describe('Indexer unit tests', () => { account_id: 'morgs.near', function_name: 'test', code: '', - schema: 'schema', + schema: SIMPLE_SCHEMA, } }; await indexer.runFunctions(1, functions, false, { provision: true }); @@ -534,7 +657,7 @@ describe('Indexer unit tests', () => { expect(provisioner.provisionUserApi).toHaveBeenCalledWith( 'morgs.near', 'test', - 'schema' + SIMPLE_SCHEMA ); }); @@ -578,7 +701,7 @@ describe('Indexer unit tests', () => { const functions: Record = { 'morgs.near/test': { code: '', - schema: 'schema', + schema: SIMPLE_SCHEMA, } }; await indexer.runFunctions(1, functions, false, { provision: true }); @@ -628,7 +751,7 @@ describe('Indexer unit tests', () => { code: ` context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); `, - schema: 'schema', + schema: SIMPLE_SCHEMA, } }; await indexer.runFunctions(blockHeight, functions, false, { provision: true }); @@ -698,7 +821,7 @@ describe('Indexer unit tests', () => { }); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); // @ts-expect-error legacy test - const context = indexer.buildContext('test', 'morgs.near/test', 1, null); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null); const mutation = ` mutation { @@ -733,7 +856,7 @@ describe('Indexer unit tests', () => { }); const role = 'morgs_near'; const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('test', 'morgs.near/test', 1, role); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const mutation = ` mutation { diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 6dc9326f2..4cae6a0ca 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -4,11 +4,13 @@ import AWS from 'aws-sdk'; import { Block } from '@near-lake/primitives'; import Provisioner from '../provisioner'; +import DmlHandler from '../dml-handler/dml-handler'; interface Dependencies { fetch: typeof fetch s3: AWS.S3 provisioner: Provisioner + DmlHandler: typeof DmlHandler }; interface Context { @@ -16,6 +18,7 @@ interface Context { set: (key: string, value: any) => Promise log: (...log: any[]) => Promise fetchFromSocialApi: (path: string, options?: any) => Promise + db: Record any> } interface IndexerFunction { @@ -41,6 +44,7 @@ export default class Indexer { fetch, s3: new AWS.S3(), provisioner: new Provisioner(), + DmlHandler, ...deps, }; } @@ -68,7 +72,6 @@ export default class Indexer { simultaneousPromises.push(this.writeLog(functionName, blockHeight, runningMessage)); const hasuraRoleName = functionName.split('/')[0].replace(/[.-]/g, '_'); - const functionNameWithoutAccount = functionName.split('/')[1].replace(/[.-]/g, '_'); if (options.provision && !indexerFunction.provisioned) { try { @@ -90,7 +93,7 @@ export default class Indexer { await this.setStatus(functionName, blockHeight, 'RUNNING'); const vm = new VM({ timeout: 3000, allowAsync: true }); - const context = this.buildContext(functionName, functionNameWithoutAccount, blockHeight, hasuraRoleName); + const context = this.buildContext(indexerFunction.schema, functionName, blockHeight, hasuraRoleName); vm.freeze(blockWithHelpers, 'block'); vm.freeze(context, 'context'); @@ -183,7 +186,33 @@ export default class Indexer { ].reduce((acc, val) => val(acc), indexerFunction); } - buildContext (functionName: string, functionNameWithoutAccount: string, blockHeight: number, hasuraRoleName: string): Context { + validateTableNames (tableNames: string[]): void { + if (!(tableNames.length > 0)) { + throw new Error('Schema does not have any tables. There should be at least one table.'); + } + const correctTableNameFormat = /^[a-zA-Z_][a-zA-Z0-9_]*$/; + + tableNames.forEach((name: string) => { + if (!correctTableNameFormat.test(name)) { + throw new Error(`Table name ${name} is not formatted correctly. Table names must not start with a number and only contain alphanumerics or underscores.`); + } + }); + } + + getTableNames (schema: string): string[] { + const tableNameMatcher = /CREATE TABLE\s+"(\w+)"/g; + const tableNames = Array.from(schema.matchAll(tableNameMatcher), match => match[1]); // Get first capturing group of each match + this.validateTableNames(tableNames); + console.log('Retrieved the following table names from schema: ', tableNames); + return tableNames; + } + + buildContext (schema: string, functionName: string, blockHeight: number, hasuraRoleName: string): Context { + const tables = this.getTableNames(schema); + const account = functionName.split('/')[0].replace(/[.-]/g, '_'); + const functionNameWithoutAccount = functionName.split('/')[1].replace(/[.-]/g, '_'); + const schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_'); + return { graphql: async (operation, variables) => { console.log(`${functionName}: Running context graphql`, operation); @@ -207,10 +236,29 @@ export default class Indexer { }, fetchFromSocialApi: async (path, options) => { return await this.deps.fetch(`https://api.near.social${path}`, options); - } + }, + db: this.buildDatabaseContext(account, schemaName, tables, blockHeight) }; } + buildDatabaseContext (account: string, schemaName: string, tables: string[], blockHeight: number): Record any> { + let dmlHandler: DmlHandler | null = null; + const result = tables.reduce((prev, tableName) => ({ + ...prev, + [`insert_${tableName}`]: async (objects: any) => { + await this.writeLog(`context.db.insert_${tableName}`, blockHeight, `Calling context.db.insert_${tableName}.`, `Inserting object ${JSON.stringify(objects)} into table ${tableName} on schema ${schemaName}`); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.insert(schemaName, tableName, Array.isArray(objects) ? objects : [objects]); + }, + [`select_${tableName}`]: async (object: any, limit = null) => { + await this.writeLog(`context.db.select_${tableName}`, blockHeight, `Calling context.db.select_${tableName}.`, `Selecting objects with values ${JSON.stringify(object)} from table ${tableName} on schema ${schemaName} with limit ${limit === null ? 'no' : limit}`); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.select(schemaName, tableName, object, limit); + }, + }), {}); + return result; + } + async setStatus (functionName: string, blockHeight: number, status: string): Promise { return await this.runGraphQLQuery( ` diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index 334b4dbf0..3634cef91 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -1,4 +1,4 @@ -import VError from 'verror'; +import { wrapError } from '../utility'; import cryptoModule from 'crypto'; import HasuraClient from '../hasura-client'; import PgClient from '../pg-client'; @@ -47,7 +47,7 @@ export default class Provisioner { } async createUserDb (userName: string, password: string, databaseName: string): Promise { - await this.wrapError( + await wrapError( async () => { await this.createDatabase(databaseName); await this.createUser(userName, password); @@ -74,35 +74,24 @@ export default class Provisioner { return schemaExists; } - async wrapError(fn: () => Promise, errorMessage: string): Promise { - try { - return await fn(); - } catch (error) { - if (error instanceof Error) { - throw new VError(error, errorMessage); - } - throw new VError(errorMessage); - } - } - async createSchema (databaseName: string, schemaName: string): Promise { - return await this.wrapError(async () => await this.hasuraClient.createSchema(databaseName, schemaName), 'Failed to create schema'); + return await wrapError(async () => await this.hasuraClient.createSchema(databaseName, schemaName), 'Failed to create schema'); } async runMigrations (databaseName: string, schemaName: string, migration: any): Promise { - return await this.wrapError(async () => await this.hasuraClient.runMigrations(databaseName, schemaName, migration), 'Failed to run migrations'); + return await wrapError(async () => await this.hasuraClient.runMigrations(databaseName, schemaName, migration), 'Failed to run migrations'); } async getTableNames (schemaName: string, databaseName: string): Promise { - return await this.wrapError(async () => await this.hasuraClient.getTableNames(schemaName, databaseName), 'Failed to fetch table names'); + return await wrapError(async () => await this.hasuraClient.getTableNames(schemaName, databaseName), 'Failed to fetch table names'); } async trackTables (schemaName: string, tableNames: string[], databaseName: string): Promise { - return await this.wrapError(async () => await this.hasuraClient.trackTables(schemaName, tableNames, databaseName), 'Failed to track tables'); + return await wrapError(async () => await this.hasuraClient.trackTables(schemaName, tableNames, databaseName), 'Failed to track tables'); } async addPermissionsToTables (schemaName: string, databaseName: string, tableNames: string[], roleName: string, permissions: string[]): Promise { - return await this.wrapError(async () => await this.hasuraClient.addPermissionsToTables( + return await wrapError(async () => await this.hasuraClient.addPermissionsToTables( schemaName, databaseName, tableNames, @@ -112,11 +101,11 @@ export default class Provisioner { } async trackForeignKeyRelationships (schemaName: string, databaseName: string): Promise { - return await this.wrapError(async () => await this.hasuraClient.trackForeignKeyRelationships(schemaName, databaseName), 'Failed to track foreign key relationships'); + return await wrapError(async () => await this.hasuraClient.trackForeignKeyRelationships(schemaName, databaseName), 'Failed to track foreign key relationships'); } async addDatasource (userName: string, password: string, databaseName: string): Promise { - return await this.wrapError(async () => await this.hasuraClient.addDatasource(userName, password, databaseName), 'Failed to add datasource'); + return await wrapError(async () => await this.hasuraClient.addDatasource(userName, password, databaseName), 'Failed to add datasource'); } replaceSpecialChars (str: string): string { @@ -131,7 +120,7 @@ export default class Provisioner { const userName = sanitizedAccountId; const schemaName = `${sanitizedAccountId}_${sanitizedFunctionName}`; - await this.wrapError( + await wrapError( async () => { if (!await this.hasuraClient.doesSourceExist(databaseName)) { const password = this.generatePassword(); diff --git a/runner/src/utility.ts b/runner/src/utility.ts new file mode 100644 index 000000000..33262f408 --- /dev/null +++ b/runner/src/utility.ts @@ -0,0 +1,13 @@ +import VError from 'verror'; + +export async function wrapError (fn: () => Promise, errorMessage: string): Promise { + try { + return await fn(); + } catch (error) { + console.log(error); + if (error instanceof Error) { + throw new VError(error, errorMessage); + } + throw new VError(errorMessage); + } +}