From 8f80fad587d871c87e445c131a83c09c45ac1914 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 16 Aug 2023 13:27:38 -0700 Subject: [PATCH] fix: Address PR comments, use user account instead of admin --- frontend/src/components/Editor/Editor.js | 7 +- frontend/src/utils/indexerRunner.js | 31 +++++-- runner/src/dml-handler/dml-handler.test.ts | 33 +++----- runner/src/dml-handler/dml-handler.ts | 28 +++---- runner/src/indexer/indexer.test.ts | 95 ++++++++++++++++------ runner/src/indexer/indexer.ts | 58 +++++++------ runner/src/pg-client.ts | 41 ---------- runner/src/pg-client/index.ts | 1 + runner/src/pg-client/pg-client.test.ts | 89 ++++++++++++++++++++ runner/src/pg-client/pg-client.ts | 83 +++++++++++++++++++ 10 files changed, 327 insertions(+), 139 deletions(-) delete mode 100644 runner/src/pg-client.ts create mode 100644 runner/src/pg-client/index.ts create mode 100644 runner/src/pg-client/pg-client.test.ts create mode 100644 runner/src/pg-client/pg-client.ts diff --git a/frontend/src/components/Editor/Editor.js b/frontend/src/components/Editor/Editor.js index cade39ec6..9745661f8 100644 --- a/frontend/src/components/Editor/Editor.js +++ b/frontend/src/components/Editor/Editor.js @@ -261,10 +261,11 @@ const Editor = ({ async function executeIndexerFunction(option = "latest", startingBlockHeight = null) { setIsExecutingIndexerFunction(() => true) + const schemaName = indexerDetails.accountId.concat("_", indexerDetails.indexerName).replace(/[^a-zA-Z0-9]/g, '_'); switch (option) { case "debugList": - await indexerRunner.executeIndexerFunctionOnHeights(heights, indexingCode, schema, option) + await indexerRunner.executeIndexerFunctionOnHeights(heights, indexingCode, schema, schemaName, option) break case "specific": if (startingBlockHeight === null && Number(startingBlockHeight) === 0) { @@ -272,11 +273,11 @@ const Editor = ({ break } - await indexerRunner.start(startingBlockHeight, indexingCode, schema, option) + await indexerRunner.start(startingBlockHeight, indexingCode, schema, schemaName, option) break case "latest": const latestHeight = await requestLatestBlockHeight() - if (latestHeight) await indexerRunner.start(latestHeight - 10, indexingCode, schema, option) + if (latestHeight) await indexerRunner.start(latestHeight - 10, indexingCode, schema, schemaName, option) } setIsExecutingIndexerFunction(() => false) } diff --git a/frontend/src/utils/indexerRunner.js b/frontend/src/utils/indexerRunner.js index feb4a2a32..6df114c29 100644 --- a/frontend/src/utils/indexerRunner.js +++ b/frontend/src/utils/indexerRunner.js @@ -10,7 +10,7 @@ export default class IndexerRunner { this.shouldStop = false; } - async start(startingHeight, indexingCode, schema, option) { + async start(startingHeight, indexingCode, schema, schemaName, option) { this.currentHeight = startingHeight; this.shouldStop = false; console.clear() @@ -32,7 +32,7 @@ export default class IndexerRunner { this.stop() } if (blockDetails) { - await this.executeIndexerFunction(this.currentHeight, blockDetails, indexingCode, schema); + await this.executeIndexerFunction(this.currentHeight, blockDetails, indexingCode, schema, schemaName); this.currentHeight++; await this.delay(1000); } @@ -50,10 +50,23 @@ export default class IndexerRunner { return new Promise((resolve) => setTimeout(resolve, ms)); } - async executeIndexerFunction(height, blockDetails, indexingCode, schema) { + validateTableNames(tableNames) { + if (!(Array.isArray(tableNames) && tableNames.length > 0)) { + throw new Error("Schema does not have any tables. There should be at least one table."); + } + const correctTableNameFormat = /^[a-zA-Z_][a-zA-Z0-9_]*$/; + + tableNames.forEach(name => { + if (!correctTableNameFormat.test(name)) { + throw new Error(`Table name ${name} is not formatted correctly. Table names must not start with a number and only contain alphanumerics or underscores.`); + } + }); + } + + async executeIndexerFunction(height, blockDetails, indexingCode, schema, schemaName) { let innerCode = indexingCode.match(/getBlock\s*\([^)]*\)\s*{([\s\S]*)}/)[1]; let tableNames = Array.from(schema.matchAll(/CREATE TABLE\s+"(\w+)"/g), match => match[1]); // Get first capturing group of each match - console.log('Retrieved the following table names from schema: ', tableNames); + this.validateTableNames(tableNames); if (blockDetails) { const block = Block.fromStreamerMessage(blockDetails); @@ -62,11 +75,11 @@ export default class IndexerRunner { block.events() console.log(block) - await this.runFunction(blockDetails, height, innerCode, tableNames); + await this.runFunction(blockDetails, height, innerCode, schemaName, tableNames); } } - async executeIndexerFunctionOnHeights(heights, indexingCode, schema) { + async executeIndexerFunctionOnHeights(heights, indexingCode, schema, schemaName) { console.clear() console.group('%c Welcome! Lets test your indexing logic on some Near Blocks!', 'color: white; background-color: navy; padding: 5px;'); if (heights.length === 0) { @@ -83,14 +96,14 @@ export default class IndexerRunner { console.log(error) } console.time('Indexing Execution Complete') - this.executeIndexerFunction(height, blockDetails, indexingCode, schema) + this.executeIndexerFunction(height, blockDetails, indexingCode, schema, schemaName) console.timeEnd('Indexing Execution Complete') console.groupEnd() } console.groupEnd() } - async runFunction(streamerMessage, blockHeight, indexerCode, tableNames) { + async runFunction(streamerMessage, blockHeight, indexerCode, schemaName, tableNames) { const innerCodeWithBlockHelper = ` const block = Block.fromStreamerMessage(streamerMessage); @@ -149,7 +162,7 @@ export default class IndexerRunner { log: async (message) => { this.handleLog(blockHeight, message); }, - db: this.buildDatabaseContext(blockHeight, 'debug', tableNames) + db: this.buildDatabaseContext(blockHeight, schemaName, tableNames) }; wrappedFunction(Block, streamerMessage, context); diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index 56bed561b..38d637d07 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -4,15 +4,19 @@ import DmlHandler from './dml-handler'; describe('DML Handler tests', () => { let pgClient: any; + const ACCOUNT = 'test_near'; const SCHEMA = 'test_schema'; const TABLE_NAME = 'test_table'; - test('Test valid insert one with array', async () => { + beforeEach(() => { pgClient = { + setUser: jest.fn(), query: jest.fn().mockReturnValue({ rows: [] }), format: pgFormat, }; + }); + test('Test valid insert one with array', async () => { const inputObj = { account_id: 'test_acc_near', block_height: 999, @@ -24,18 +28,13 @@ describe('DML Handler tests', () => { const dmlHandler = new DmlHandler(pgClient); - await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); + await dmlHandler.insert(ACCOUNT, SCHEMA, TABLE_NAME, [inputObj]); expect(pgClient.query.mock.calls).toEqual([ ['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *;', []] ]); }); test('Test valid insert multiple rows with array', async () => { - pgClient = { - query: jest.fn().mockReturnValue({ rows: [] }), - format: pgFormat, - }; - const inputObj = [{ account_id: 'morgs_near', block_height: 1, @@ -49,18 +48,13 @@ describe('DML Handler tests', () => { const dmlHandler = new DmlHandler(pgClient); - await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); + await dmlHandler.insert(ACCOUNT, SCHEMA, TABLE_NAME, [inputObj]); expect(pgClient.query.mock.calls).toEqual([ ['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *;', []] ]); }); test('Test valid select on two fields', async () => { - pgClient = { - query: jest.fn().mockReturnValue({ rows: [] }), - format: pgFormat, - }; - const inputObj = { account_id: 'test_acc_near', block_height: 999, @@ -68,18 +62,13 @@ describe('DML Handler tests', () => { const dmlHandler = new DmlHandler(pgClient); - await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 0); + await dmlHandler.select(ACCOUNT, SCHEMA, TABLE_NAME, inputObj, 0); expect(pgClient.query.mock.calls).toEqual([ - ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2;', Object.values(inputObj)] + ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2', Object.values(inputObj)] ]); }); test('Test valid select on two fields with limit', async () => { - pgClient = { - query: jest.fn().mockReturnValue({ rows: [] }), - format: pgFormat, - }; - const inputObj = { account_id: 'test_acc_near', block_height: 999, @@ -87,9 +76,9 @@ describe('DML Handler tests', () => { const dmlHandler = new DmlHandler(pgClient); - await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1); + await dmlHandler.select(ACCOUNT, SCHEMA, TABLE_NAME, inputObj, 1); expect(pgClient.query.mock.calls).toEqual([ - ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1;', Object.values(inputObj)] + ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1', Object.values(inputObj)] ]); }); }); diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index 6b8f6e1a4..d8b1c1fa0 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -16,39 +16,37 @@ export default class DmlHandler { this.pgClient = pgClient; } - async insert (schemaName: string, tableName: string, objects: any[]): Promise { - console.log('Inserting object %s into table %s on schema %s', JSON.stringify(objects), tableName, schemaName); + async insert (account: string, schemaName: string, tableName: string, objects: any[]): Promise { if (!objects?.length) { return []; } + await this.pgClient.setUser(account); // Set Postgres user to account's user + const keys = Object.keys(objects[0]); // Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order const values = objects.map(obj => keys.map(key => obj[key])); - const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *;`; - const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute ${query} on schema ${schemaName}.${tableName}`); - if (!(result.rows && result.rows.length > 0)) { + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); + if (result.rows?.length === 0) { console.log('No rows were inserted.'); } return result.rows; } - async select (schemaName: string, tableName: string, object: any, limit: number): Promise { + async select (account: string, schemaName: string, tableName: string, object: any, limit: number): Promise { + await this.pgClient.setUser(account); // Set Postgres user to account's user + const roundedLimit = Math.round(limit); - console.log('Selecting objects with values %s from table %s on schema %s with %s limit', JSON.stringify(object), tableName, schemaName, limit === 0 ? 'no' : roundedLimit.toString()); - console.log(object); const keys = Object.keys(object); - console.log(keys); const values = Object.values(object); const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); - let query = `SELECT * FROM ${schemaName}.${tableName} WHERE ${param}`; - if (roundedLimit <= 0) { - query = query.concat(';'); - } else { - query = query.concat(' LIMIT ', roundedLimit.toString(), ';'); + if (roundedLimit > 0) { + query = query.concat(' LIMIT ', roundedLimit.toString()); } - const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute ${query} on ${schemaName}.${tableName}`); + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); if (!(result.rows && result.rows.length > 0)) { console.log('No rows were selected.'); } diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index f4d65b11b..4ed5e5c61 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -13,10 +13,22 @@ describe('Indexer unit tests', () => { const HASURA_ROLE = 'morgs_near'; const INVALID_HASURA_ROLE = 'other_near'; - const FUNC_NAME = 'morgs.near/test_fn'; - const FUNC_NAME_WITHOUT_ACCOUNT = 'test_fn'; + const INDEXER_NAME = 'morgs.near/test_fn'; - const SOCIAL_SCHEMA_TABLES = `CREATE TABLE + const SIMPLE_SCHEMA = `CREATE TABLE + "posts" ( + "id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "accounts_liked" JSONB NOT NULL DEFAULT '[]', + "last_comment_timestamp" DECIMAL(20, 0), + CONSTRAINT "posts_pkey" PRIMARY KEY ("id") + );`; + + const SOCIAL_SCHEMA = `CREATE TABLE "posts" ( "id" SERIAL NOT NULL, "account_id" VARCHAR NOT NULL, @@ -51,7 +63,7 @@ CREATE TABLE CONSTRAINT "post_likes_pkey" PRIMARY KEY ("post_id", "account_id") );'`; - beforeAll(() => { + beforeEach(() => { process.env = { ...oldEnv, HASURA_ENDPOINT, @@ -93,7 +105,7 @@ CREATE TABLE const foo = 3; block.result = context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); `, - schema: '' + schema: SIMPLE_SCHEMA }; await indexer.runFunctions(blockHeight, functions, false); @@ -231,7 +243,7 @@ CREATE TABLE }); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('', FUNC_NAME, FUNC_NAME_WITHOUT_ACCOUNT, 1, HASURA_ROLE); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const query = ` query { @@ -283,7 +295,7 @@ CREATE TABLE const mockFetch = jest.fn(); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('', FUNC_NAME, FUNC_NAME_WITHOUT_ACCOUNT, 1, HASURA_ROLE); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); await context.fetchFromSocialApi('/index', { method: 'POST', @@ -312,7 +324,7 @@ CREATE TABLE }); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('', FUNC_NAME, FUNC_NAME_WITHOUT_ACCOUNT, 1, INVALID_HASURA_ROLE); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE); await expect(async () => await context.graphql('query { hello }')).rejects.toThrow('boom'); }); @@ -327,7 +339,7 @@ CREATE TABLE }); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('', FUNC_NAME, FUNC_NAME_WITHOUT_ACCOUNT, 1, HASURA_ROLE); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const query = 'query($name: String) { hello(name: $name) }'; const variables = { name: 'morgan' }; @@ -352,12 +364,17 @@ CREATE TABLE }); test('indexer builds context and inserts an objects into existing table', async () => { + // Set HASURA_ENDPOINT and HASURA_ADMIN_SECRET values in process.env before running with actual mockDmlHandler + // process.env.HASURA_ENDPOINT = ''; + // process.env.HASURA_ADMIN_SECRET = ''; + const mockDmlHandler: any = { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; const indexer = new Indexer('mainnet', { dmlHandler: mockDmlHandler }); - const context = indexer.buildContext(SOCIAL_SCHEMA_TABLES, 'morgs.near/social_feed1', 'social_feed1', 1, 'postgres'); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + const objToInsert = [{ account_id: 'morgs_near', block_height: 1, @@ -374,16 +391,15 @@ CREATE TABLE block_timestamp: 801, accounts_liked: JSON.stringify(['cwpuzzles.near']) }]; + const result = await context.db.insert_posts(objToInsert); expect(result.length).toEqual(2); }); test('indexer builds context and selects objects from existing table', async () => { - const objToSelect = { - account_id: 'morgs_near', - receipt_id: 'abc', - }; - + // Set HASURA_ENDPOINT and HASURA_ADMIN_SECRET values in process.env before running with actual mockDmlHandler + // process.env.HASURA_ENDPOINT = ''; + // process.env.HASURA_ADMIN_SECRET = ''; const selectFn = jest.fn(); selectFn.mockImplementation((...lim) => { // Expects limit to be last parameter @@ -394,13 +410,44 @@ CREATE TABLE }; const indexer = new Indexer('mainnet', { dmlHandler: mockDmlHandler }); - const context = indexer.buildContext(SOCIAL_SCHEMA_TABLES, 'morgs.near/social_feed1', 'social_feed1', 1, 'postgres'); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + const objToSelect = { + account_id: 'morgs_near', + receipt_id: 'abc', + }; const result = await context.db.select_posts(objToSelect); expect(result.length).toEqual(2); const resultLimit = await context.db.select_posts(objToSelect, 1); expect(resultLimit.length).toEqual(1); }); + test('indexer builds context and verifies all methods generated', async () => { + const mockDmlHandler: any = { + insert: jest.fn().mockReturnValue(true), + select: jest.fn().mockReturnValue(true) + }; + + const indexer = new Indexer('mainnet', { dmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + const objToPassIn = {}; + + // These calls would fail on a real database, but we are merely checking to ensure they exist + let result = await context.db.insert_posts([objToPassIn]); + expect(result).toEqual(true); + result = await context.db.insert_comments(objToPassIn); // Verifying both array and single object input is supported for insert + expect(result).toEqual(true); + result = await context.db.insert_post_likes([objToPassIn]); + expect(result).toEqual(true); + + result = await context.db.select_posts(objToPassIn); + expect(result).toEqual(true); + result = await context.db.select_comments(objToPassIn); + expect(result).toEqual(true); + result = await context.db.select_post_likes(objToPassIn); + expect(result).toEqual(true); + }); + test('Indexer.runFunctions() allows imperative execution of GraphQL operations', async () => { const postId = 1; const commentId = 2; @@ -510,7 +557,7 @@ CREATE TABLE return (\`Created comment \${id} on post \${post.id}\`) `, - schema: '' + schema: SIMPLE_SCHEMA }; await indexer.runFunctions(blockHeight, functions, false); @@ -568,7 +615,7 @@ CREATE TABLE code: ` throw new Error('boom'); `, - schema: '' + schema: SIMPLE_SCHEMA }; await expect(indexer.runFunctions(blockHeight, functions, false)).rejects.toThrow(new Error('boom')); @@ -617,7 +664,7 @@ CREATE TABLE account_id: 'morgs.near', function_name: 'test', code: '', - schema: 'schema', + schema: SIMPLE_SCHEMA, } }; await indexer.runFunctions(1, functions, false, { provision: true }); @@ -627,7 +674,7 @@ CREATE TABLE expect(provisioner.provisionUserApi).toHaveBeenCalledWith( 'morgs.near', 'test', - 'schema' + SIMPLE_SCHEMA ); }); @@ -671,7 +718,7 @@ CREATE TABLE const functions: Record = { 'morgs.near/test': { code: '', - schema: 'schema', + schema: SIMPLE_SCHEMA, } }; await indexer.runFunctions(1, functions, false, { provision: true }); @@ -721,7 +768,7 @@ CREATE TABLE code: ` context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); `, - schema: 'schema', + schema: SIMPLE_SCHEMA, } }; await indexer.runFunctions(blockHeight, functions, false, { provision: true }); @@ -791,7 +838,7 @@ CREATE TABLE }); const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); // @ts-expect-error legacy test - const context = indexer.buildContext('test', 'morgs.near/test', 1, null); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null); const mutation = ` mutation { @@ -826,7 +873,7 @@ CREATE TABLE }); const role = 'morgs_near'; const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch }); - const context = indexer.buildContext('', FUNC_NAME, FUNC_NAME_WITHOUT_ACCOUNT, 1, HASURA_ROLE); + const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const mutation = ` mutation { diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 0ff3c476e..9fba078ae 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -18,7 +18,7 @@ interface Context { set: (key: string, value: any) => Promise log: (...log: any[]) => Promise fetchFromSocialApi: (path: string, options?: any) => Promise - db: any + db: Record any> } interface IndexerFunction { @@ -72,7 +72,6 @@ export default class Indexer { simultaneousPromises.push(this.writeLog(functionName, blockHeight, runningMessage)); const hasuraRoleName = functionName.split('/')[0].replace(/[.-]/g, '_'); - const functionNameWithoutAccount = functionName.split('/')[1].replace(/[.-]/g, '_'); if (options.provision && !indexerFunction.provisioned) { try { @@ -94,7 +93,7 @@ export default class Indexer { await this.setStatus(functionName, blockHeight, 'RUNNING'); const vm = new VM({ timeout: 3000, allowAsync: true }); - const context = this.buildContext(indexerFunction.schema, functionName, functionNameWithoutAccount, blockHeight, hasuraRoleName); + const context = this.buildContext(indexerFunction.schema, functionName, blockHeight, hasuraRoleName); vm.freeze(blockWithHelpers, 'block'); vm.freeze(context, 'context'); @@ -187,15 +186,31 @@ export default class Indexer { ].reduce((acc, val) => val(acc), indexerFunction); } + validateTableNames (tableNames: string[]): void { + if (!(tableNames.length > 0)) { + throw new Error('Schema does not have any tables. There should be at least one table.'); + } + const correctTableNameFormat = /^[a-zA-Z_][a-zA-Z0-9_]*$/; + + tableNames.forEach((name: string) => { + if (!correctTableNameFormat.test(name)) { + throw new Error(`Table name ${name} is not formatted correctly. Table names must not start with a number and only contain alphanumerics or underscores.`); + } + }); + } + getTableNames (schema: string): string[] { const tableNameMatcher = /CREATE TABLE\s+"(\w+)"/g; const tableNames = Array.from(schema.matchAll(tableNameMatcher), match => match[1]); // Get first capturing group of each match + this.validateTableNames(tableNames); console.log('Retrieved the following table names from schema: ', tableNames); return tableNames; } - buildContext (schema: string, functionName: string, functionNameWithoutAccount: string, blockHeight: number, hasuraRoleName: string): Context { + buildContext (schema: string, functionName: string, blockHeight: number, hasuraRoleName: string): Context { const tables = this.getTableNames(schema); + const account = functionName.split('/')[0].replace(/[.-]/g, '_'); + const functionNameWithoutAccount = functionName.split('/')[1].replace(/[.-]/g, '_'); const schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_'); return { @@ -222,30 +237,23 @@ export default class Indexer { fetchFromSocialApi: async (path, options) => { return await this.deps.fetch(`https://api.near.social${path}`, options); }, - db: this.buildDatabaseContext(schemaName, tables) + db: this.buildDatabaseContext(account, schemaName, tables, blockHeight) }; } - buildDatabaseContext (schemaName: string, tables: string[]): any { - try { - const result = tables.reduce((prev, tableName) => ({ - ...prev, - [`insert_${tableName}`]: async (objects: any[]) => await this.deps.dmlHandler.insert(schemaName, tableName, objects), - [`select_${tableName}`]: async (object: any, limit = 0) => await this.deps.dmlHandler.select(schemaName, tableName, object, limit), - }), {}); - console.log(result); - return result; - } catch (error) { - console.error('Caught error when generating DB methods. Falling back to generic methods.', error); - return { - insert: async (tableName: string, objects: any[]) => { - return await this.deps.dmlHandler.insert(schemaName, tableName, objects); - }, - select: async (tableName: string, object: any, limit = 0) => { - return await this.deps.dmlHandler.select(schemaName, tableName, object, limit); - } - }; - } + buildDatabaseContext (account: string, schemaName: string, tables: string[], blockHeight: number): Record any> { + const result = tables.reduce((prev, tableName) => ({ + ...prev, + [`insert_${tableName}`]: async (objects: any) => { + await this.writeLog(`context.db.insert_${tableName}`, blockHeight, `Calling context.db.insert_${tableName}.`, `Inserting object ${JSON.stringify(objects)} into table ${tableName} on schema ${schemaName}`); + return await this.deps.dmlHandler.insert(account, schemaName, tableName, Array.isArray(objects) ? objects : [objects]); + }, + [`select_${tableName}`]: async (object: any, limit = 0) => { + await this.writeLog(`context.db.select_${tableName}`, blockHeight, `Calling context.db.select_${tableName}.`, `Selecting objects with values ${JSON.stringify(object)} from table ${tableName} on schema ${schemaName} with limit ${limit === 0 ? 'no' : limit}`); + return await this.deps.dmlHandler.select(account, schemaName, tableName, object, limit); + }, + }), {}); + return result; } async setStatus (functionName: string, blockHeight: number, status: string): Promise { diff --git a/runner/src/pg-client.ts b/runner/src/pg-client.ts deleted file mode 100644 index ebca73a49..000000000 --- a/runner/src/pg-client.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { Pool, type PoolConfig, type QueryResult, type QueryResultRow } from 'pg'; -import pgFormatModule from 'pg-format'; - -interface ConnectionParams { - user: string - password: string - host: string - port: number | string - database: string -} - -export default class PgClient { - private readonly pgPool: Pool; - public format: typeof pgFormatModule; - - constructor ( - connectionParams: ConnectionParams, - poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 }, - PgPool: typeof Pool = Pool, - pgFormat: typeof pgFormatModule = pgFormatModule - ) { - this.pgPool = new PgPool({ - user: connectionParams.user, - password: connectionParams.password, - host: connectionParams.host, - port: Number(connectionParams.port), - database: connectionParams.database, - ...poolConfig, - }); - this.format = pgFormat; - } - - async query(query: string, params: any[] = []): Promise> { - const client = await this.pgPool.connect(); - try { - return await (client.query(query, params)); - } finally { - client.release(); - } - } -} diff --git a/runner/src/pg-client/index.ts b/runner/src/pg-client/index.ts new file mode 100644 index 000000000..6d8451c58 --- /dev/null +++ b/runner/src/pg-client/index.ts @@ -0,0 +1 @@ +export { default } from './pg-client'; diff --git a/runner/src/pg-client/pg-client.test.ts b/runner/src/pg-client/pg-client.test.ts new file mode 100644 index 000000000..b6897d73c --- /dev/null +++ b/runner/src/pg-client/pg-client.test.ts @@ -0,0 +1,89 @@ +import PgClient from './pg-client'; + +describe('Postgres Client Tests', () => { + let hasuraClient: any; + const testUsers = { + testA_near: 'passA', + testB_near: 'passB', + testC_near: 'passC' + }; + const TEST_METADATA = generateMetadata(testUsers); + const testPgClient = { + user: 'user', + password: 'password', + database: 'database', + host: 'host', + port: 'port', + }; + + beforeEach(() => { + hasuraClient = { + exportMetadata: jest.fn().mockReturnValue(TEST_METADATA) + }; + }); + + test('Test set user', async () => { + const pgClient = new PgClient(testPgClient, hasuraClient); + await pgClient.setUser('testA_near'); + await expect(pgClient.setUser('fake_near')).rejects.toThrow('Could not find password for user fake_near when trying to set user account to process database actions.'); + }); +}); + +function generateMetadata (testUsers: any): any { + const sources = []; + // Insert default source which has different format than the rest + sources.push({ + name: 'default', + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' }, + isolation_level: 'read-committed', + pool_settings: { + connection_lifetime: 600, + idle_timeout: 180, + max_connections: 50, + retries: 1 + }, + use_prepared_statements: true + } + } + }); + + Object.keys(testUsers).forEach((user) => { + sources.push(generateSource(user, testUsers[user])); + }); + + console.log(sources); + + return { + version: 3, + sources + }; +} + +function generateSource (user: string, password: string): any { + return { + name: user, + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { connection_parameters: generateConnectionParameter(user, password) }, + isolation_level: 'read-committed', + use_prepared_statements: false + } + } + }; +} + +function generateConnectionParameter (user: string, password: string): any { + return { + database: user, + host: 'postgres', + password, + port: 5432, + username: user + }; +} diff --git a/runner/src/pg-client/pg-client.ts b/runner/src/pg-client/pg-client.ts new file mode 100644 index 000000000..88ee0f9cb --- /dev/null +++ b/runner/src/pg-client/pg-client.ts @@ -0,0 +1,83 @@ +import { Pool, type PoolConfig, type QueryResult, type QueryResultRow } from 'pg'; +import pgFormatModule from 'pg-format'; +import HasuraClient from '../hasura-client'; + +interface ConnectionParams { + user: string + password: string + host: string + port: number | string + database: string +} + +export default class PgClient { + private readonly connectionParams: ConnectionParams; + private readonly hasuraClient: HasuraClient; + private readonly poolConfig: PoolConfig; + private pgPool: Pool; + public format: typeof pgFormatModule; + private userPasswords: Record; + + constructor ( + connectionParams: ConnectionParams, + hasuraClient: HasuraClient = new HasuraClient(), + poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 }, + PgPool: typeof Pool = Pool, + pgFormat: typeof pgFormatModule = pgFormatModule + ) { + this.connectionParams = connectionParams; + this.hasuraClient = hasuraClient; + this.poolConfig = poolConfig; + this.pgPool = new PgPool({ + user: connectionParams.user, + password: connectionParams.password, + host: connectionParams.host, + port: Number(connectionParams.port), + database: connectionParams.database, + ...poolConfig, + }); + this.format = pgFormat; + this.userPasswords = {}; + } + + private async collectPasswords (): Promise { + const metadata = await this.hasuraClient.exportMetadata(); + console.log(metadata.sources[2].configuration.connection_info.database_url.connection_parameters); + this.userPasswords = metadata.sources.reduce((prev: any, source: { name: any, configuration: any }) => ({ + ...prev, + [source.name]: source.name === 'default' ? 'N/A' : source.configuration.connection_info.database_url.connection_parameters.password + }), {}); + } + + async setUser (user: string): Promise { + if (Object.keys(this.userPasswords).length === 0) { + console.log('Collecting passwords for each user.'); + await this.collectPasswords(); + } + + const newUser = user === 'admin' ? this.connectionParams.user : user; + const newPassword = user === 'admin' ? this.connectionParams.password : this.userPasswords[user]; + + if (newPassword === undefined) { + throw new Error(`Could not find password for user ${user} when trying to set user account to process database actions.`); + } + + this.pgPool = new Pool({ + user: newUser, + password: newPassword, + host: this.connectionParams.host, + port: Number(this.connectionParams.port), + database: this.connectionParams.database, + ...this.poolConfig, + }); + } + + async query(query: string, params: any[] = []): Promise> { + const client = await this.pgPool.connect(); + try { + return await (client.query(query, params)); + } finally { + client.release(); + } + } +}