Skip to content

Commit

Permalink
feat: Generate insert and select methods for context object under db (#…
Browse files Browse the repository at this point in the history
…177)

Co-authored-by: Darun Seethammagari <[email protected]>
  • Loading branch information
darunrs and Darun Seethammagari authored Aug 22, 2023
1 parent 639064d commit 10aae59
Show file tree
Hide file tree
Showing 13 changed files with 544 additions and 65 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
redis/
*.log
/indexer/blocks/
node_modules/
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ when they match new blocks by placing messages on an SQS queue. Spawns historica
indexer_rules_engine, storage.
2. [Indexer Runner](.indexer-js-queue-handler)
Retrieves messages from the SQS queue, fetches the matching block and executes the IndexerFunction.
3. [Runner](.runner)
Retrieves messages from Redis Stream, fetching matching block and executes the IndexerFunction.
3. [IndexerFunction Editor UI](./frontend)
Serves the editor UI within the dashboard widget and mediates some communication with the GraphQL DB and block server.
4. [Hasura Authentication Service](./hasura-authentication-service)
Expand Down
7 changes: 4 additions & 3 deletions frontend/src/components/Editor/Editor.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,22 +261,23 @@ const Editor = ({

async function executeIndexerFunction(option = "latest", startingBlockHeight = null) {
setIsExecutingIndexerFunction(() => true)
const schemaName = indexerDetails.accountId.concat("_", indexerDetails.indexerName).replace(/[^a-zA-Z0-9]/g, '_');

switch (option) {
case "debugList":
await indexerRunner.executeIndexerFunctionOnHeights(heights, indexingCode, option)
await indexerRunner.executeIndexerFunctionOnHeights(heights, indexingCode, schema, schemaName, option)
break
case "specific":
if (startingBlockHeight === null && Number(startingBlockHeight) === 0) {
console.log("Invalid Starting Block Height: starting block height is null or 0")
break
}

await indexerRunner.start(startingBlockHeight, indexingCode, option)
await indexerRunner.start(startingBlockHeight, indexingCode, schema, schemaName, option)
break
case "latest":
const latestHeight = await requestLatestBlockHeight()
if (latestHeight) await indexerRunner.start(latestHeight - 10, indexingCode, option)
if (latestHeight) await indexerRunner.start(latestHeight - 10, indexingCode, schema, schemaName, option)
}
setIsExecutingIndexerFunction(() => false)
}
Expand Down
104 changes: 82 additions & 22 deletions frontend/src/utils/indexerRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default class IndexerRunner {
this.shouldStop = false;
}

async start(startingHeight, indexingCode, option) {
async start(startingHeight, indexingCode, schema, schemaName, option) {
this.currentHeight = startingHeight;
this.shouldStop = false;
console.clear()
Expand All @@ -32,7 +32,7 @@ export default class IndexerRunner {
this.stop()
}
if (blockDetails) {
await this.executeIndexerFunction(this.currentHeight, blockDetails, indexingCode);
await this.executeIndexerFunction(this.currentHeight, blockDetails, indexingCode, schema, schemaName);
this.currentHeight++;
await this.delay(1000);
}
Expand All @@ -50,20 +50,36 @@ export default class IndexerRunner {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async executeIndexerFunction(height, blockDetails, indexingCode) {
validateTableNames(tableNames) {
if (!(Array.isArray(tableNames) && tableNames.length > 0)) {
throw new Error("Schema does not have any tables. There should be at least one table.");
}
const correctTableNameFormat = /^[a-zA-Z_][a-zA-Z0-9_]*$/;

tableNames.forEach(name => {
if (!correctTableNameFormat.test(name)) {
throw new Error(`Table name ${name} is not formatted correctly. Table names must not start with a number and only contain alphanumerics or underscores.`);
}
});
}

async executeIndexerFunction(height, blockDetails, indexingCode, schema, schemaName) {
let innerCode = indexingCode.match(/getBlock\s*\([^)]*\)\s*{([\s\S]*)}/)[1];
let tableNames = Array.from(schema.matchAll(/CREATE TABLE\s+"(\w+)"/g), match => match[1]); // Get first capturing group of each match
this.validateTableNames(tableNames);

if (blockDetails) {
const block = Block.fromStreamerMessage(blockDetails);
block.actions()
block.receipts()
block.events()

console.log(block)
await this.runFunction(blockDetails, height, innerCode);
await this.runFunction(blockDetails, height, innerCode, schemaName, tableNames);
}
}

async executeIndexerFunctionOnHeights(heights, indexingCode) {
async executeIndexerFunctionOnHeights(heights, indexingCode, schema, schemaName) {
console.clear()
console.group('%c Welcome! Lets test your indexing logic on some Near Blocks!', 'color: white; background-color: navy; padding: 5px;');
if (heights.length === 0) {
Expand All @@ -80,14 +96,14 @@ export default class IndexerRunner {
console.log(error)
}
console.time('Indexing Execution Complete')
this.executeIndexerFunction(height, blockDetails, indexingCode)
this.executeIndexerFunction(height, blockDetails, indexingCode, schema, schemaName)
console.timeEnd('Indexing Execution Complete')
console.groupEnd()
}
console.groupEnd()
}

async runFunction(streamerMessage, blockHeight, indexerCode) {
async runFunction(streamerMessage, blockHeight, indexerCode, schemaName, tableNames) {
const innerCodeWithBlockHelper =
`
const block = Block.fromStreamerMessage(streamerMessage);
Expand Down Expand Up @@ -125,33 +141,77 @@ export default class IndexerRunner {
"",
() => {
let operationType, operationName
const match = query.match(/(query|mutation)\s+(\w+)\s*(\(.*?\))?\s*\{([\s\S]*)\}/);
if (match) {
operationType = match[1];
operationName = match[2];
}

console.group(`Executing GraphQL ${operationType}: (${operationName})`);
if (operationType === 'mutation') console.log('%c Mutations in debug mode do not alter the database', 'color: black; background-color: yellow; padding: 5px;');
console.group(`Data passed to ${operationType}`);
console.dir(mutationData);
console.groupEnd();
console.group(`Data returned by ${operationType}`);
console.log({})
console.groupEnd();
console.groupEnd();
const match = query.match(/(query|mutation)\s+(\w+)\s*(\(.*?\))?\s*\{([\s\S]*)\}/);
if (match) {
operationType = match[1];
operationName = match[2];
}
console.group(`Executing GraphQL ${operationType}: (${operationName})`);
if (operationType === 'mutation') console.log('%c Mutations in debug mode do not alter the database', 'color: black; background-color: yellow; padding: 5px;');
console.group(`Data passed to ${operationType}`);
console.dir(mutationData);
console.groupEnd();
console.group(`Data returned by ${operationType}`);
console.log({})
console.groupEnd();
console.groupEnd();
}
);
return {};
},
log: async (message) => {
this.handleLog(blockHeight, message);
},
db: this.buildDatabaseContext(blockHeight, schemaName, tableNames)
};

wrappedFunction(Block, streamerMessage, context);
}

buildDatabaseContext (blockHeight, schemaName, tables) {
try {
const result = tables.reduce((prev, tableName) => ({
...prev,
[`insert_${tableName}`]: async (objects) => await this.insert(blockHeight, schemaName, tableName, objects),
[`select_${tableName}`]: async (object, limit = 0) => await this.select(blockHeight, schemaName, tableName, object, limit),
}), {});
return result;
} catch (error) {
console.error('Caught error when generating DB methods. Falling back to generic methods.', error);
return {
insert: async (tableName, objects) => {
this.insert(blockHeight, schemaName, tableName, objects);
},
select: async (tableName, object, limit = 0) => {
this.select(blockHeight, schemaName, tableName, object, limit);
}
};
}
}

insert(blockHeight, schemaName, tableName, objects) {
this.handleLog(
blockHeight,
"",
() => {
console.log('Inserting object %s into table %s on schema %s', JSON.stringify(objects), tableName, schemaName);
}
);
return {};
}

select(blockHeight, schemaName, tableName, object, limit) {
this.handleLog(
blockHeight,
"",
() => {
const roundedLimit = Math.round(limit);
console.log('Selecting objects with values %s from table %s on schema %s with %s limit', JSON.stringify(object), tableName, schemaName, limit === 0 ? 'no' : roundedLimit.toString());
}
);
return {};
}

// deprecated
replaceNewLines(code) {
return code.replace(/\\n/g, "\n").replace(/\\"/g, '"');
Expand Down
93 changes: 93 additions & 0 deletions runner/src/dml-handler/dml-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pgFormat from 'pg-format';
import DmlHandler from './dml-handler';

describe('DML Handler tests', () => {
const hasuraClient: any = {
getDbConnectionParameters: jest.fn().mockReturnValue({
database: 'test_near',
host: 'postgres',
password: 'test_pass',
port: 5432,
username: 'test_near'
})
};
let PgClient: any;
let query: any;

const ACCOUNT = 'test_near';
const SCHEMA = 'test_schema';
const TABLE_NAME = 'test_table';

beforeEach(() => {
query = jest.fn().mockReturnValue({ rows: [] });
PgClient = jest.fn().mockImplementation(() => {
return { query, format: pgFormat };
});
});

test('Test valid insert one with array', async () => {
const inputObj = {
account_id: 'test_acc_near',
block_height: 999,
block_timestamp: 'UTC',
content: 'test_content',
receipt_id: 111,
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *;', []]
]);
});

test('Test valid insert multiple rows with array', async () => {
const inputObj = [{
account_id: 'morgs_near',
block_height: 1,
receipt_id: 'abc',
},
{
account_id: 'morgs_near',
block_height: 2,
receipt_id: 'abc',
}];

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *;', []]
]);
});

test('Test valid select on two fields', async () => {
const inputObj = {
account_id: 'test_acc_near',
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2', Object.values(inputObj)]
]);
});

test('Test valid select on two fields with limit', async () => {
const inputObj = {
account_id: 'test_acc_near',
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1);
expect(query.mock.calls).toEqual([
['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1', Object.values(inputObj)]
]);
});
});
63 changes: 63 additions & 0 deletions runner/src/dml-handler/dml-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { wrapError } from '../utility';
import PgClientModule from '../pg-client';
import HasuraClient from '../hasura-client/hasura-client';

export default class DmlHandler {
private pgClient!: PgClientModule;
private readonly initialized: Promise<void>;

constructor (
private readonly account: string,
private readonly hasuraClient: HasuraClient = new HasuraClient(),
private readonly PgClient = PgClientModule,
) {
this.initialized = this.initialize();
}

private async initialize (): Promise<void> {
const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account);
this.pgClient = new this.PgClient({
user: connectionParameters.username,
password: connectionParameters.password,
host: process.env.PGHOST,
port: Number(connectionParameters.port),
database: connectionParameters.database,
});
}

async insert (schemaName: string, tableName: string, objects: any[]): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding
if (!objects?.length) {
return [];
}

const keys = Object.keys(objects[0]);
// Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order
const values = objects.map(obj => keys.map(key => obj[key]));
const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *;`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (result.rows?.length === 0) {
console.log('No rows were inserted.');
}
return result.rows;
}

async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding

const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
let query = `SELECT * FROM ${schemaName}.${tableName} WHERE ${param}`;
if (limit !== null) {
query = query.concat(' LIMIT ', Math.round(limit).toString());
}

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (!(result.rows && result.rows.length > 0)) {
console.log('No rows were selected.');
}
return result.rows;
}
}
1 change: 1 addition & 0 deletions runner/src/dml-handler/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { default } from './dml-handler';
Loading

0 comments on commit 10aae59

Please sign in to comment.