Skip to content

Commit

Permalink
feat: Support delete and upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Sep 6, 2023
1 parent bd6521b commit b02ca94
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 18 deletions.
43 changes: 40 additions & 3 deletions runner/src/dml-handler/dml-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ describe('DML Handler tests', () => {

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *', []]
['INSERT INTO test_schema.test_table (account_id, block_height, block_timestamp, content, receipt_id, accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *', []]
]);
});

Expand All @@ -57,9 +57,9 @@ describe('DML Handler tests', () => {

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
await dmlHandler.insert(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *', []]
['INSERT INTO test_schema.test_table (account_id, block_height, receipt_id) VALUES (\'morgs_near\', \'1\', \'abc\'), (\'morgs_near\', \'2\', \'abc\') RETURNING *', []]
]);
});

Expand Down Expand Up @@ -109,4 +109,41 @@ describe('DML Handler tests', () => {
['UPDATE test_schema.test_table SET content=$1, receipt_id=$2 WHERE account_id=$3 AND block_height=$4 RETURNING *', [...Object.values(updateObj), ...Object.values(whereObj)]]
]);
});

test('Test valid update on two fields', async () => {
const inputObj = [{
account_id: 'morgs_near',
block_height: 1,
receipt_id: 'abc'
},
{
account_id: 'morgs_near',
block_height: 2,
receipt_id: 'abc'
}];

const conflictCol = ['account_id', 'block_height'];
const updateCol = ['receipt_id'];

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.upsert(SCHEMA, TABLE_NAME, inputObj, conflictCol, updateCol);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (account_id, block_height, receipt_id) VALUES (\'morgs_near\', \'1\', \'abc\'), (\'morgs_near\', \'2\', \'abc\') ON CONFLICT (account_id, block_height) DO UPDATE SET receipt_id = excluded.receipt_id RETURNING *', []]
]);
});

test('Test valid delete on two fields', async () => {
const inputObj = {
account_id: 'test_acc_near',
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.delete(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
['DELETE FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 RETURNING *', Object.values(inputObj)]
]);
});
});
36 changes: 35 additions & 1 deletion runner/src/dml-handler/dml-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export default class DmlHandler {
const keys = Object.keys(objects[0]);
// Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order
const values = objects.map(obj => keys.map(key => obj[key]));
const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *`;
const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(', ')}) VALUES %L RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (result.rows?.length === 0) {
Expand Down Expand Up @@ -78,4 +78,38 @@ export default class DmlHandler {
}
return result.rows;
}

async upsert (schemaName: string, tableName: string, objects: any[], conflictColumns: string[], updateColumns: string[]): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding
if (!objects?.length) {
return [];
}

const keys = Object.keys(objects[0]);
// Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order
const values = objects.map(obj => keys.map(key => obj[key]));
const updatePlaceholders = updateColumns.map(col => `${col} = excluded.${col}`).join(', ');
const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(', ')}) VALUES %L ON CONFLICT (${conflictColumns.join(', ')}) DO UPDATE SET ${updatePlaceholders} RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (result.rows?.length === 0) {
console.log('No rows were inserted or updated.');
}
return result.rows;
}

async delete (schemaName: string, tableName: string, object: any): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding

const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
const query = `DELETE FROM ${schemaName}.${tableName} WHERE ${param} RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (!(result.rows && result.rows.length > 0)) {
console.log('No rows were deleted.');
}
return result.rows;
}
}
79 changes: 65 additions & 14 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,28 +534,79 @@ CREATE TABLE
expect(result.length).toEqual(2);
});

test('indexer builds context and verifies all methods generated', async () => {
test('indexer builds context and upserts on existing table', async () => {
const mockDmlHandler: any = jest.fn().mockImplementation(() => {
return {
insert: jest.fn().mockReturnValue(true),
select: jest.fn().mockReturnValue(true)
upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => {
if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }, { colA: 'valA' }];
} else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }];
}
return [{}];
})
};
});

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToInsert = [{
account_id: 'morgs_near',
block_height: 1,
receipt_id: 'abc',
content: 'test',
block_timestamp: 800,
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
},
{
account_id: 'morgs_near',
block_height: 2,
receipt_id: 'abc',
content: 'test',
block_timestamp: 801,
accounts_liked: JSON.stringify(['cwpuzzles.near'])
}];

let result = await context.db.upsert_posts(objToInsert, ['account_id', 'block_height'], ['content', 'block_timestamp']);
expect(result.length).toEqual(2);
result = await context.db.upsert_posts(objToInsert[0], ['account_id', 'block_height'], ['content', 'block_timestamp']);
expect(result.length).toEqual(1);
});

test('indexer builds context and deletes objects from existing table', async () => {
const mockDmlHandler: any = jest.fn().mockImplementation(() => {
return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
});

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const deleteFilter = {
account_id: 'morgs_near',
receipt_id: 'abc',
};
const result = await context.db.delete_posts(deleteFilter);
expect(result.length).toEqual(2);
});

test('indexer builds context and verifies all methods generated', async () => {
const mockDmlHandler: any = jest.fn();

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

expect(Object.keys(context.db)).toStrictEqual(
['insert_creator_quest', 'select_creator_quest', 'update_creator_quest',
'insert_composer_quest', 'select_composer_quest', 'update_composer_quest',
'insert_contractor___quest', 'select_contractor___quest', 'update_contractor___quest',
'insert_posts', 'select_posts', 'update_posts',
'insert_comments', 'select_comments', 'update_comments',
'insert_post_likes', 'select_post_likes', 'update_post_likes',
'insert_My_Table1', 'select_My_Table1', 'update_My_Table1',
'insert_Another_Table', 'select_Another_Table', 'update_Another_Table',
'insert_Third_Table', 'select_Third_Table', 'update_Third_Table',
'insert_yet_another_table', 'select_yet_another_table', 'update_yet_another_table']);
expect(Object.keys(context.db)).toStrictEqual([
'insert_creator_quest', 'select_creator_quest', 'update_creator_quest', 'upsert_creator_quest', 'delete_creator_quest',
'insert_composer_quest', 'select_composer_quest', 'update_composer_quest', 'upsert_composer_quest', 'delete_composer_quest',
'insert_contractor___quest', 'select_contractor___quest', 'update_contractor___quest', 'upsert_contractor___quest', 'delete_contractor___quest',
'insert_posts', 'select_posts', 'update_posts', 'upsert_posts', 'delete_posts',
'insert_comments', 'select_comments', 'update_comments', 'upsert_comments', 'delete_comments',
'insert_post_likes', 'select_post_likes', 'update_post_likes', 'upsert_post_likes', 'delete_post_likes',
'insert_My_Table1', 'select_My_Table1', 'update_My_Table1', 'upsert_My_Table1', 'delete_My_Table1',
'insert_Another_Table', 'select_Another_Table', 'update_Another_Table', 'upsert_Another_Table', 'delete_Another_Table',
'insert_Third_Table', 'select_Third_Table', 'update_Third_Table', 'upsert_Third_Table', 'delete_Third_Table',
'insert_yet_another_table', 'select_yet_another_table', 'update_yet_another_table', 'upsert_yet_another_table', 'delete_yet_another_table']);
});

test('indexer builds context and returns empty array if failed to generate db methods', async () => {
Expand Down
15 changes: 15 additions & 0 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ export default class Indexer {
try {
const tables = this.getTableNames(schema);
let dmlHandler: DmlHandler | null = null;
// TODO: Refactor object to be context.db.[table_name].[insert, select, update, upsert, delete]
const result = tables.reduce((prev, tableName) => {
const sanitizedTableName = this.sanitizeTableName(tableName);
const funcForTable = {
Expand All @@ -281,6 +282,20 @@ export default class Indexer {
`Updating object that matches ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`);
dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account);
return await dmlHandler.update(schemaName, tableName, whereObj, updateObj);
},
[`upsert_${sanitizedTableName}`]: async (objects: any, conflictColumns: string[], updateColumns: string[]) => {
await this.writeLog(`context.db.upsert_${sanitizedTableName}`, blockHeight,
`Calling context.db.upsert_${sanitizedTableName}.`,
`Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. On conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`);
dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account);
return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objects) ? objects : [objects], conflictColumns, updateColumns);
},
[`delete_${sanitizedTableName}`]: async (object: any) => {
await this.writeLog(`context.db.delete_${sanitizedTableName}`, blockHeight,
`Calling context.db.delete_${sanitizedTableName}.`,
`Deleting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName}`);
dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account);
return await dmlHandler.delete(schemaName, tableName, object);
}
};

Expand Down

0 comments on commit b02ca94

Please sign in to comment.