Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Generate insert and select methods for context object under db #177

Merged
merged 5 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
redis/
*.log
/indexer/blocks/
node_modules/
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ when they match new blocks by placing messages on an SQS queue. Spawns historica
indexer_rules_engine, storage.
2. [Indexer Runner](.indexer-js-queue-handler)
Retrieves messages from the SQS queue, fetches the matching block and executes the IndexerFunction.
3. [Runner](.runner)
Retrieves messages from Redis Stream, fetching matching block and executes the IndexerFunction.
3. [IndexerFunction Editor UI](./frontend)
Serves the editor UI within the dashboard widget and mediates some communication with the GraphQL DB and block server.
4. [Hasura Authentication Service](./hasura-authentication-service)
Expand Down
7 changes: 4 additions & 3 deletions frontend/src/components/Editor/Editor.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,22 +261,23 @@ const Editor = ({

async function executeIndexerFunction(option = "latest", startingBlockHeight = null) {
setIsExecutingIndexerFunction(() => true)
const schemaName = indexerDetails.accountId.concat("_", indexerDetails.indexerName).replace(/[^a-zA-Z0-9]/g, '_');

switch (option) {
case "debugList":
await indexerRunner.executeIndexerFunctionOnHeights(heights, indexingCode, option)
await indexerRunner.executeIndexerFunctionOnHeights(heights, indexingCode, schema, schemaName, option)
break
case "specific":
if (startingBlockHeight === null && Number(startingBlockHeight) === 0) {
console.log("Invalid Starting Block Height: starting block height is null or 0")
break
}

await indexerRunner.start(startingBlockHeight, indexingCode, option)
await indexerRunner.start(startingBlockHeight, indexingCode, schema, schemaName, option)
break
case "latest":
const latestHeight = await requestLatestBlockHeight()
if (latestHeight) await indexerRunner.start(latestHeight - 10, indexingCode, option)
if (latestHeight) await indexerRunner.start(latestHeight - 10, indexingCode, schema, schemaName, option)
}
setIsExecutingIndexerFunction(() => false)
}
Expand Down
104 changes: 82 additions & 22 deletions frontend/src/utils/indexerRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default class IndexerRunner {
this.shouldStop = false;
}

async start(startingHeight, indexingCode, option) {
async start(startingHeight, indexingCode, schema, schemaName, option) {
this.currentHeight = startingHeight;
this.shouldStop = false;
console.clear()
Expand All @@ -32,7 +32,7 @@ export default class IndexerRunner {
this.stop()
}
if (blockDetails) {
await this.executeIndexerFunction(this.currentHeight, blockDetails, indexingCode);
await this.executeIndexerFunction(this.currentHeight, blockDetails, indexingCode, schema, schemaName);
this.currentHeight++;
await this.delay(1000);
}
Expand All @@ -50,20 +50,36 @@ export default class IndexerRunner {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async executeIndexerFunction(height, blockDetails, indexingCode) {
validateTableNames(tableNames) {
if (!(Array.isArray(tableNames) && tableNames.length > 0)) {
throw new Error("Schema does not have any tables. There should be at least one table.");
}
const correctTableNameFormat = /^[a-zA-Z_][a-zA-Z0-9_]*$/;

tableNames.forEach(name => {
if (!correctTableNameFormat.test(name)) {
throw new Error(`Table name ${name} is not formatted correctly. Table names must not start with a number and only contain alphanumerics or underscores.`);
}
});
}

async executeIndexerFunction(height, blockDetails, indexingCode, schema, schemaName) {
let innerCode = indexingCode.match(/getBlock\s*\([^)]*\)\s*{([\s\S]*)}/)[1];
let tableNames = Array.from(schema.matchAll(/CREATE TABLE\s+"(\w+)"/g), match => match[1]); // Get first capturing group of each match
darunrs marked this conversation as resolved.
Show resolved Hide resolved
this.validateTableNames(tableNames);

if (blockDetails) {
const block = Block.fromStreamerMessage(blockDetails);
block.actions()
block.receipts()
block.events()

console.log(block)
await this.runFunction(blockDetails, height, innerCode);
await this.runFunction(blockDetails, height, innerCode, schemaName, tableNames);
}
}

async executeIndexerFunctionOnHeights(heights, indexingCode) {
async executeIndexerFunctionOnHeights(heights, indexingCode, schema, schemaName) {
console.clear()
console.group('%c Welcome! Lets test your indexing logic on some Near Blocks!', 'color: white; background-color: navy; padding: 5px;');
if (heights.length === 0) {
Expand All @@ -80,14 +96,14 @@ export default class IndexerRunner {
console.log(error)
}
console.time('Indexing Execution Complete')
this.executeIndexerFunction(height, blockDetails, indexingCode)
this.executeIndexerFunction(height, blockDetails, indexingCode, schema, schemaName)
console.timeEnd('Indexing Execution Complete')
console.groupEnd()
}
console.groupEnd()
}

async runFunction(streamerMessage, blockHeight, indexerCode) {
async runFunction(streamerMessage, blockHeight, indexerCode, schemaName, tableNames) {
const innerCodeWithBlockHelper =
`
const block = Block.fromStreamerMessage(streamerMessage);
Expand Down Expand Up @@ -125,33 +141,77 @@ export default class IndexerRunner {
"",
() => {
let operationType, operationName
const match = query.match(/(query|mutation)\s+(\w+)\s*(\(.*?\))?\s*\{([\s\S]*)\}/);
if (match) {
operationType = match[1];
operationName = match[2];
}

console.group(`Executing GraphQL ${operationType}: (${operationName})`);
if (operationType === 'mutation') console.log('%c Mutations in debug mode do not alter the database', 'color: black; background-color: yellow; padding: 5px;');
console.group(`Data passed to ${operationType}`);
console.dir(mutationData);
console.groupEnd();
console.group(`Data returned by ${operationType}`);
console.log({})
console.groupEnd();
console.groupEnd();
const match = query.match(/(query|mutation)\s+(\w+)\s*(\(.*?\))?\s*\{([\s\S]*)\}/);
if (match) {
operationType = match[1];
operationName = match[2];
}
console.group(`Executing GraphQL ${operationType}: (${operationName})`);
if (operationType === 'mutation') console.log('%c Mutations in debug mode do not alter the database', 'color: black; background-color: yellow; padding: 5px;');
console.group(`Data passed to ${operationType}`);
console.dir(mutationData);
console.groupEnd();
console.group(`Data returned by ${operationType}`);
console.log({})
console.groupEnd();
console.groupEnd();
}
);
return {};
},
log: async (message) => {
this.handleLog(blockHeight, message);
},
db: this.buildDatabaseContext(blockHeight, schemaName, tableNames)
};

wrappedFunction(Block, streamerMessage, context);
}

buildDatabaseContext (blockHeight, schemaName, tables) {
darunrs marked this conversation as resolved.
Show resolved Hide resolved
try {
const result = tables.reduce((prev, tableName) => ({
...prev,
[`insert_${tableName}`]: async (objects) => await this.insert(blockHeight, schemaName, tableName, objects),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming: do we want camelCase for everything or it's fine to have underscores here?

Copy link
Collaborator Author

@darunrs darunrs Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was that since it's feasible for a developer to name their database something like test_9_table_4 or post_likes, I'd prefer to keep it as underscored simply to visually distinguish the modifier (insert/delete/select) while keeping the table names themselves consistent with how they're defined. It's not clear how we'd want to camelCase it otherwise. Would we want to remove underscores, uppercase the next letter after each one, and combine them (e.g. post_likes_and_comments -> PostLikesAndComments? Retain the underscores (e.g. insert_Post_likes_and_comments)? This is really more of a UX choice, but I'd say underscore and just the table name as is, is my personal vote on that.

[`select_${tableName}`]: async (object, limit = 0) => await this.select(blockHeight, schemaName, tableName, object, limit),
}), {});
return result;
} catch (error) {
console.error('Caught error when generating DB methods. Falling back to generic methods.', error);
return {
insert: async (tableName, objects) => {
this.insert(blockHeight, schemaName, tableName, objects);
},
select: async (tableName, object, limit = 0) => {
this.select(blockHeight, schemaName, tableName, object, limit);
}
};
}
}

insert(blockHeight, schemaName, tableName, objects) {
this.handleLog(
blockHeight,
"",
() => {
console.log('Inserting object %s into table %s on schema %s', JSON.stringify(objects), tableName, schemaName);
}
);
return {};
}

select(blockHeight, schemaName, tableName, object, limit) {
this.handleLog(
blockHeight,
"",
() => {
const roundedLimit = Math.round(limit);
console.log('Selecting objects with values %s from table %s on schema %s with %s limit', JSON.stringify(object), tableName, schemaName, limit === 0 ? 'no' : roundedLimit.toString());
}
);
return {};
}

// deprecated
replaceNewLines(code) {
return code.replace(/\\n/g, "\n").replace(/\\"/g, '"');
Expand Down
93 changes: 93 additions & 0 deletions runner/src/dml-handler/dml-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pgFormat from 'pg-format';
import DmlHandler from './dml-handler';

describe('DML Handler tests', () => {
const hasuraClient: any = {
getDbConnectionParameters: jest.fn().mockReturnValue({
database: 'test_near',
host: 'postgres',
password: 'test_pass',
port: 5432,
username: 'test_near'
})
};
let PgClient: any;
let query: any;

const ACCOUNT = 'test_near';
const SCHEMA = 'test_schema';
const TABLE_NAME = 'test_table';

beforeEach(() => {
query = jest.fn().mockReturnValue({ rows: [] });
PgClient = jest.fn().mockImplementation(() => {
return { query, format: pgFormat };
});
});

test('Test valid insert one with array', async () => {
const inputObj = {
account_id: 'test_acc_near',
block_height: 999,
block_timestamp: 'UTC',
content: 'test_content',
receipt_id: 111,
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *;', []]
]);
});

test('Test valid insert multiple rows with array', async () => {
const inputObj = [{
account_id: 'morgs_near',
block_height: 1,
receipt_id: 'abc',
},
{
account_id: 'morgs_near',
block_height: 2,
receipt_id: 'abc',
}];

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *;', []]
]);
});

test('Test valid select on two fields', async () => {
const inputObj = {
account_id: 'test_acc_near',
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2', Object.values(inputObj)]
]);
});

test('Test valid select on two fields with limit', async () => {
const inputObj = {
account_id: 'test_acc_near',
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1);
expect(query.mock.calls).toEqual([
['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1', Object.values(inputObj)]
]);
});
});
63 changes: 63 additions & 0 deletions runner/src/dml-handler/dml-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { wrapError } from '../utility';
import PgClientModule from '../pg-client';
import HasuraClient from '../hasura-client/hasura-client';

export default class DmlHandler {
private pgClient!: PgClientModule;
private readonly initialized: Promise<void>;

constructor (
private readonly account: string,
private readonly hasuraClient: HasuraClient = new HasuraClient(),
private readonly PgClient = PgClientModule,
) {
this.initialized = this.initialize();
}

private async initialize (): Promise<void> {
const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account);
this.pgClient = new this.PgClient({
user: connectionParameters.username,
password: connectionParameters.password,
host: process.env.PGHOST,
port: Number(connectionParameters.port),
database: connectionParameters.database,
});
}

async insert (schemaName: string, tableName: string, objects: any[]): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding
if (!objects?.length) {
return [];
}

const keys = Object.keys(objects[0]);
darunrs marked this conversation as resolved.
Show resolved Hide resolved
// Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order
const values = objects.map(obj => keys.map(key => obj[key]));
const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *;`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (result.rows?.length === 0) {
console.log('No rows were inserted.');
}
return result.rows;
}

async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding

const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
let query = `SELECT * FROM ${schemaName}.${tableName} WHERE ${param}`;
if (limit !== null) {
query = query.concat(' LIMIT ', Math.round(limit).toString());
}

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (!(result.rows && result.rows.length > 0)) {
console.log('No rows were selected.');
}
return result.rows;
}
}
1 change: 1 addition & 0 deletions runner/src/dml-handler/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { default } from './dml-handler';
Loading