From b02ca94b96ae07d7f3ee4f90b23e7b6052a547bb Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 30 Aug 2023 17:15:22 -0700 Subject: [PATCH] feat: Support delete and upsert --- runner/src/dml-handler/dml-handler.test.ts | 43 +++++++++++- runner/src/dml-handler/dml-handler.ts | 36 +++++++++- runner/src/indexer/indexer.test.ts | 79 ++++++++++++++++++---- runner/src/indexer/indexer.ts | 15 ++++ 4 files changed, 155 insertions(+), 18 deletions(-) diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index 86a756d10..02d895795 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -39,7 +39,7 @@ describe('DML Handler tests', () => { await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); expect(query.mock.calls).toEqual([ - ['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *', []] + ['INSERT INTO test_schema.test_table (account_id, block_height, block_timestamp, content, receipt_id, accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *', []] ]); }); @@ -57,9 +57,9 @@ describe('DML Handler tests', () => { const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); - await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); + await dmlHandler.insert(SCHEMA, TABLE_NAME, inputObj); expect(query.mock.calls).toEqual([ - ['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *', []] + ['INSERT INTO test_schema.test_table (account_id, block_height, receipt_id) VALUES (\'morgs_near\', \'1\', \'abc\'), (\'morgs_near\', \'2\', \'abc\') RETURNING *', []] ]); }); @@ -109,4 +109,41 @@ describe('DML Handler tests', () => { ['UPDATE test_schema.test_table SET content=$1, receipt_id=$2 WHERE account_id=$3 AND block_height=$4 RETURNING *', [...Object.values(updateObj), ...Object.values(whereObj)]] ]); }); + + test('Test valid update on two fields', async () => { + const inputObj = [{ + account_id: 'morgs_near', + block_height: 1, + receipt_id: 'abc' + }, + { + account_id: 'morgs_near', + block_height: 2, + receipt_id: 'abc' + }]; + + const conflictCol = ['account_id', 'block_height']; + const updateCol = ['receipt_id']; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.upsert(SCHEMA, TABLE_NAME, inputObj, conflictCol, updateCol); + expect(query.mock.calls).toEqual([ + ['INSERT INTO test_schema.test_table (account_id, block_height, receipt_id) VALUES (\'morgs_near\', \'1\', \'abc\'), (\'morgs_near\', \'2\', \'abc\') ON CONFLICT (account_id, block_height) DO UPDATE SET receipt_id = excluded.receipt_id RETURNING *', []] + ]); + }); + + test('Test valid delete on two fields', async () => { + const inputObj = { + account_id: 'test_acc_near', + block_height: 999, + }; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.delete(SCHEMA, TABLE_NAME, inputObj); + expect(query.mock.calls).toEqual([ + ['DELETE FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 RETURNING *', Object.values(inputObj)] + ]); + }); }); diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index 98022b24a..391fbc0fa 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -34,7 +34,7 @@ export default class DmlHandler { const keys = Object.keys(objects[0]); // Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order const values = objects.map(obj => keys.map(key => obj[key])); - const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *`; + const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(', ')}) VALUES %L RETURNING *`; const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); if (result.rows?.length === 0) { @@ -78,4 +78,38 @@ export default class DmlHandler { } return result.rows; } + + async upsert (schemaName: string, tableName: string, objects: any[], conflictColumns: string[], updateColumns: string[]): Promise { + await this.initialized; // Ensure constructor completed before proceeding + if (!objects?.length) { + return []; + } + + const keys = Object.keys(objects[0]); + // Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order + const values = objects.map(obj => keys.map(key => obj[key])); + const updatePlaceholders = updateColumns.map(col => `${col} = excluded.${col}`).join(', '); + const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(', ')}) VALUES %L ON CONFLICT (${conflictColumns.join(', ')}) DO UPDATE SET ${updatePlaceholders} RETURNING *`; + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); + if (result.rows?.length === 0) { + console.log('No rows were inserted or updated.'); + } + return result.rows; + } + + async delete (schemaName: string, tableName: string, object: any): Promise { + await this.initialized; // Ensure constructor completed before proceeding + + const keys = Object.keys(object); + const values = Object.values(object); + const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); + const query = `DELETE FROM ${schemaName}.${tableName} WHERE ${param} RETURNING *`; + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); + if (!(result.rows && result.rows.length > 0)) { + console.log('No rows were deleted.'); + } + return result.rows; + } } diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index f3578743e..110401352 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -534,28 +534,79 @@ CREATE TABLE expect(result.length).toEqual(2); }); - test('indexer builds context and verifies all methods generated', async () => { + test('indexer builds context and upserts on existing table', async () => { const mockDmlHandler: any = jest.fn().mockImplementation(() => { return { - insert: jest.fn().mockReturnValue(true), - select: jest.fn().mockReturnValue(true) + upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => { + if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) { + return [{ colA: 'valA' }, { colA: 'valA' }]; + } else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) { + return [{ colA: 'valA' }]; + } + return [{}]; + }) }; }); + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + const objToInsert = [{ + account_id: 'morgs_near', + block_height: 1, + receipt_id: 'abc', + content: 'test', + block_timestamp: 800, + accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) + }, + { + account_id: 'morgs_near', + block_height: 2, + receipt_id: 'abc', + content: 'test', + block_timestamp: 801, + accounts_liked: JSON.stringify(['cwpuzzles.near']) + }]; + + let result = await context.db.upsert_posts(objToInsert, ['account_id', 'block_height'], ['content', 'block_timestamp']); + expect(result.length).toEqual(2); + result = await context.db.upsert_posts(objToInsert[0], ['account_id', 'block_height'], ['content', 'block_timestamp']); + expect(result.length).toEqual(1); + }); + + test('indexer builds context and deletes objects from existing table', async () => { + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + }); + + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + const deleteFilter = { + account_id: 'morgs_near', + receipt_id: 'abc', + }; + const result = await context.db.delete_posts(deleteFilter); + expect(result.length).toEqual(2); + }); + + test('indexer builds context and verifies all methods generated', async () => { + const mockDmlHandler: any = jest.fn(); + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); - expect(Object.keys(context.db)).toStrictEqual( - ['insert_creator_quest', 'select_creator_quest', 'update_creator_quest', - 'insert_composer_quest', 'select_composer_quest', 'update_composer_quest', - 'insert_contractor___quest', 'select_contractor___quest', 'update_contractor___quest', - 'insert_posts', 'select_posts', 'update_posts', - 'insert_comments', 'select_comments', 'update_comments', - 'insert_post_likes', 'select_post_likes', 'update_post_likes', - 'insert_My_Table1', 'select_My_Table1', 'update_My_Table1', - 'insert_Another_Table', 'select_Another_Table', 'update_Another_Table', - 'insert_Third_Table', 'select_Third_Table', 'update_Third_Table', - 'insert_yet_another_table', 'select_yet_another_table', 'update_yet_another_table']); + expect(Object.keys(context.db)).toStrictEqual([ + 'insert_creator_quest', 'select_creator_quest', 'update_creator_quest', 'upsert_creator_quest', 'delete_creator_quest', + 'insert_composer_quest', 'select_composer_quest', 'update_composer_quest', 'upsert_composer_quest', 'delete_composer_quest', + 'insert_contractor___quest', 'select_contractor___quest', 'update_contractor___quest', 'upsert_contractor___quest', 'delete_contractor___quest', + 'insert_posts', 'select_posts', 'update_posts', 'upsert_posts', 'delete_posts', + 'insert_comments', 'select_comments', 'update_comments', 'upsert_comments', 'delete_comments', + 'insert_post_likes', 'select_post_likes', 'update_post_likes', 'upsert_post_likes', 'delete_post_likes', + 'insert_My_Table1', 'select_My_Table1', 'update_My_Table1', 'upsert_My_Table1', 'delete_My_Table1', + 'insert_Another_Table', 'select_Another_Table', 'update_Another_Table', 'upsert_Another_Table', 'delete_Another_Table', + 'insert_Third_Table', 'select_Third_Table', 'update_Third_Table', 'upsert_Third_Table', 'delete_Third_Table', + 'insert_yet_another_table', 'select_yet_another_table', 'update_yet_another_table', 'upsert_yet_another_table', 'delete_yet_another_table']); }); test('indexer builds context and returns empty array if failed to generate db methods', async () => { diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index a2985fcf4..6e7c75962 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -258,6 +258,7 @@ export default class Indexer { try { const tables = this.getTableNames(schema); let dmlHandler: DmlHandler | null = null; + // TODO: Refactor object to be context.db.[table_name].[insert, select, update, upsert, delete] const result = tables.reduce((prev, tableName) => { const sanitizedTableName = this.sanitizeTableName(tableName); const funcForTable = { @@ -281,6 +282,20 @@ export default class Indexer { `Updating object that matches ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); return await dmlHandler.update(schemaName, tableName, whereObj, updateObj); + }, + [`upsert_${sanitizedTableName}`]: async (objects: any, conflictColumns: string[], updateColumns: string[]) => { + await this.writeLog(`context.db.upsert_${sanitizedTableName}`, blockHeight, + `Calling context.db.upsert_${sanitizedTableName}.`, + `Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. On conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objects) ? objects : [objects], conflictColumns, updateColumns); + }, + [`delete_${sanitizedTableName}`]: async (object: any) => { + await this.writeLog(`context.db.delete_${sanitizedTableName}`, blockHeight, + `Calling context.db.delete_${sanitizedTableName}.`, + `Deleting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName}`); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.delete(schemaName, tableName, object); } };