Skip to content

Commit

Permalink
DPLT-1002 Publish current block height of indexer functions to CloudW…
Browse files Browse the repository at this point in the history
…atch (#88)
  • Loading branch information
morgsmccauley authored and gabehamilton committed Jun 26, 2023
1 parent 311cd92 commit 0e162fb
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 32 deletions.
30 changes: 30 additions & 0 deletions indexer-js-queue-handler/__snapshots__/metrics.test.js.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`Metrics writes the block height for an indexer function 1`] = `
[
{
"MetricData": [
{
"Dimensions": [
{
"Name": "ACCOUNT_ID",
"Value": "morgs.near",
},
{
"Name": "FUNCTION_NAME",
"Value": "test",
},
{
"Name": "STAGE",
"Value": "dev",
},
],
"MetricName": "INDEXER_FUNCTION_LATEST_BLOCK_HEIGHT",
"Unit": "None",
"Value": 2,
},
],
"Namespace": "test",
},
]
`;
2 changes: 1 addition & 1 deletion indexer-js-queue-handler/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import AWS from "aws-sdk";
AWSXRay.captureAWS(AWS);

export const consumer = async (event) => {
const indexer = new Indexer('mainnet', 'eu-central-1');
const indexer = new Indexer('mainnet');

for (const record of event.Records) {
const jsonBody = JSON.parse(record.body);
Expand Down
20 changes: 10 additions & 10 deletions indexer-js-queue-handler/indexer.integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const mockAwsXray = {
describe('Indexer integration tests', () => {

test('Indexer.runFunctions() should execute an imperative style test function against a given block using key-value storage', async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: fetch, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: fetch, awsXray: mockAwsXray });
const functions = {};
functions['buildnear.testnet/itest1'] = {provisioned: false, code: 'context.set("BlockHeight", block.header().height);', schema: 'create table indexer_storage (function_name text, key_name text, value text, primary key (function_name, key_name));'};
const block_height = 85376002;
Expand All @@ -40,7 +40,7 @@ describe('Indexer integration tests', () => {
}, 30000);

test('Indexer.runFunctions() should execute a test function against a given block using key-value storage', async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });
const functions = {};
functions['buildnear.testnet/itest1'] = {code: 'context.set("BlockHeight", block.header().height);'};
const block_height = 85376546;
Expand All @@ -51,7 +51,7 @@ describe('Indexer integration tests', () => {
}, 30000);

test('Indexer.runFunctions() should execute a test function against a given block using a full mutation to write to key-value storage', async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });
const functions = {};
functions['buildnear.testnet/itest1'] = {code: 'context.graphql(`mutation { insert_buildnear_testnet_itest1_indexer_storage_one(object: {function_name: "buildnear.testnet/itest3", key_name: "BlockHeight", value: "${block.header().height}"} on_conflict: {constraint: indexer_storage_pkey, update_columns: value}) {key_name}}`);'};
const block_height = 85376546;
Expand All @@ -65,7 +65,7 @@ describe('Indexer integration tests', () => {
* due to known Hasura issues with unique indexes vs unique constraints */
test('Indexer.runFunctions() should execute a near social function against a given block', async () => {

const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });
const functions = {};
functions['buildnear.testnet/test'] = {code:

Expand Down Expand Up @@ -128,7 +128,7 @@ describe('Indexer integration tests', () => {
* due to known Hasura issues with unique indexes vs unique constraints */
// needs update to have schema
test.skip('Indexer.runFunctions() should execute an imperative style near social function against a given block', async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });
const functions = {};

functions['buildnear.testnet/itest5'] = {code:`
Expand Down Expand Up @@ -176,22 +176,22 @@ describe('Indexer integration tests', () => {
});

test("writeLog() should write a log to the database", async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });
const id = await indexer.writeLog("buildnear.testnet/itest", 85376002, "test message");
expect(id).toBeDefined();
expect(id.length).toBe(36);
});

test("writeFunctionState should write a function state to the database", async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });
const result = await indexer.writeFunctionState("buildnear.testnet/itest8", 85376002);
expect(result).toBeDefined();
expect(result.insert_indexer_state.returning[0].current_block_height).toBe(85376002);
});

// Errors are now exposed to the lambda hander. This test will be relevant again if this changes.
test.skip ("function that throws an error should catch the error", async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });

const functions = {};
functions['buildnear.testnet/test'] = {code:`
Expand All @@ -205,7 +205,7 @@ describe('Indexer integration tests', () => {

// Errors are now exposed to the lambda hander. This test will be relevant again if this changes.
test.skip("rejected graphql promise is awaited and caught", async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });

const functions = {};
functions['buildnear.testnet/itest3'] = {code:
Expand All @@ -219,7 +219,7 @@ describe('Indexer integration tests', () => {
// Unreturned promise rejection seems to be uncatchable even with process.on('unhandledRejection'
// However, the next function is run (in this test but not on Lambda).
test.skip("function that rejects a promise should catch the error", async () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray });

const functions = {};
functions['buildnear.testnet/fails'] = {code:`
Expand Down
10 changes: 7 additions & 3 deletions indexer-js-queue-handler/indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,27 @@ import fetch from 'node-fetch';
import { VM } from 'vm2';
import AWS from 'aws-sdk';
import { Block } from '@near-lake/primitives'

import Provisioner from './provisioner.js'
import AWSXRay from "aws-xray-sdk";
import traceFetch from "./trace-fetch.js";
import Metrics from './metrics.js'

export default class Indexer {

DEFAULT_HASURA_ROLE;

constructor(
network,
aws_region,
deps
) {
this.DEFAULT_HASURA_ROLE = 'append';
this.network = network;
this.aws_region = aws_region;
this.aws_region = process.env.REGION;
this.deps = {
fetch: traceFetch(fetch),
s3: new AWS.S3({ region: aws_region }),
s3: new AWS.S3({ region: process.env.REGION }),
metrics: new Metrics('QueryAPI'),
provisioner: new Provisioner(),
awsXray: AWSXRay,
...deps,
Expand All @@ -43,6 +45,8 @@ export default class Indexer {
functionSubsegment.addAnnotation('indexer_function', function_name);
simultaneousPromises.push(this.writeLog(function_name, block_height, 'Running function', function_name, ', lag in ms is: ', lag));

simultaneousPromises.push(this.deps.metrics.putBlockHeight(indexerFunction.account_id, indexerFunction.function_name, block_height));

const hasuraRoleName = function_name.split('/')[0].replace(/[.-]/g, '_');
const functionNameWithoutAccount = function_name.split('/')[1].replace(/[.-]/g, '_');

Expand Down
81 changes: 63 additions & 18 deletions indexer-js-queue-handler/indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ const mockAwsXray = {
}),
};

const mockMetrics = {
putBlockHeight: () => {},
};

describe('Indexer unit tests', () => {
const oldEnv = process.env;

Expand Down Expand Up @@ -62,7 +66,7 @@ describe('Indexer unit tests', () => {
})
})),
};
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray, metrics: mockMetrics });

const functions = {};
functions['buildnear.testnet/test'] = {code:`
Expand All @@ -82,7 +86,7 @@ describe('Indexer unit tests', () => {
errors: null,
}),
}));
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, awsXray: mockAwsXray, metrics: mockMetrics });

const functionName = 'buildnear.testnet/test';
const mutations = {mutations: [`mutation { _0: set(functionName: "${functionName}", key: "foo2", data: "indexer test") }`], variables: {}, keysValues: {}};
Expand All @@ -109,7 +113,7 @@ describe('Indexer unit tests', () => {
errors: null,
}),
}));
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, awsXray: mockAwsXray, metrics: mockMetrics });

const functionName = 'buildnear.testnet/test';
const mutations = {mutations: [
Expand Down Expand Up @@ -147,7 +151,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
})
})),
};
const indexer = new Indexer('mainnet', 'us-west-2', { s3: mockS3, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { s3: mockS3, awsXray: mockAwsXray, metrics: mockMetrics });

const blockHeight = '84333960';
const block = await indexer.fetchBlockPromise(blockHeight);
Expand All @@ -170,7 +174,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
})
})),
};
const indexer = new Indexer('mainnet', 'us-west-2', { s3: mockS3, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { s3: mockS3, awsXray: mockAwsXray, metrics: mockMetrics });

const blockHeight = 82699904;
const shard = 0;
Expand Down Expand Up @@ -210,7 +214,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
const mockS3 = {
getObject,
};
const indexer = new Indexer('mainnet', 'us-west-2', { s3: mockS3, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { s3: mockS3, awsXray: mockAwsXray, metrics: mockMetrics });

const shard = 0;
const streamerMessage = await indexer.fetchStreamerMessage(blockHeight);
Expand All @@ -232,7 +236,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
});

test('Indexer.transformIndexerFunction() applies the necessary transformations', () => {
const indexer = new Indexer('mainnet', 'us-west-2', { awsXray: mockAwsXray })
const indexer = new Indexer('mainnet', { awsXray: mockAwsXray, metrics: mockMetrics })

const transformedFunction = indexer.transformIndexerFunction(`console.log('hello')`);

Expand Down Expand Up @@ -264,7 +268,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
}
})
});
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, awsXray: mockAwsXray, metrics: mockMetrics });

const context = indexer.buildImperativeContextForFunction();

Expand Down Expand Up @@ -315,7 +319,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
errors: ['boom']
})
});
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, awsXray: mockAwsXray, metrics: mockMetrics });

const context = indexer.buildImperativeContextForFunction();

Expand All @@ -330,7 +334,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
data: 'mock',
}),
});
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, awsXray: mockAwsXray, metrics: mockMetrics });

const context = indexer.buildImperativeContextForFunction();

Expand Down Expand Up @@ -429,7 +433,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
}),
}),
};
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray, metrics: mockMetrics });

const functions = {};
functions['buildnear.testnet/test'] = {code:`
Expand Down Expand Up @@ -509,7 +513,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
})
})),
};
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray, metrics: mockMetrics });

const functions = {};
functions['buildnear.testnet/test'] = {code:`
Expand Down Expand Up @@ -557,7 +561,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
doesEndpointExist: jest.fn().mockReturnValue(false),
createAuthenticatedEndpoint: jest.fn(),
}
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, s3: mockS3, provisioner, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, provisioner, awsXray: mockAwsXray, metrics: mockMetrics });

const functions = {
'morgs.near/test': {
Expand Down Expand Up @@ -612,7 +616,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
doesEndpointExist: jest.fn().mockReturnValue(true),
createAuthenticatedEndpoint: jest.fn(),
}
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, s3: mockS3, provisioner, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, provisioner, awsXray: mockAwsXray, metrics: mockMetrics });

const functions = {
'morgs.near/test': {
Expand Down Expand Up @@ -662,7 +666,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
doesEndpointExist: jest.fn().mockReturnValue(true),
createAuthenticatedEndpoint: jest.fn(),
}
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, s3: mockS3, provisioner, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, provisioner, awsXray: mockAwsXray, metrics: mockMetrics });

const functions = {
'morgs.near/test': {
Expand Down Expand Up @@ -716,7 +720,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
doesEndpointExist: jest.fn().mockReturnValue(false),
createAuthenticatedEndpoint: jest.fn().mockRejectedValue(error),
}
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, s3: mockS3, provisioner, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, provisioner, awsXray: mockAwsXray, metrics: mockMetrics });

const functions = {
'morgs.near/test': {
Expand All @@ -731,6 +735,48 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
expect(mockFetch.mock.calls).toMatchSnapshot();
});

test('Indexer.runFunctions() publishes the current block height', async () => {
const mockFetch = jest.fn(() => ({
status: 200,
json: async () => ({
errors: null,
}),
}));
const block_height = 456;
const mockS3 = {
getObject: jest.fn(() => ({
promise: () => Promise.resolve({
Body: {
toString: () => JSON.stringify({
chunks: [],
header: {
height: block_height
}
})
}
})
})),
};
const metrics = {
putBlockHeight: jest.fn().mockReturnValueOnce({ promise: jest.fn() }),
};
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray, metrics });

const functions = {};
functions['buildnear.testnet/test'] = {
code:`
const foo = 3;
block.result = context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\$\{block.blockHeight\}")}\`);
mutationsReturnValue['hack'] = function() {return 'bad'}
`,
account_id: 'buildnear.testnet',
function_name: 'test'
};
await indexer.runFunctions(block_height, functions);

expect(metrics.putBlockHeight).toHaveBeenCalledWith('buildnear.testnet', 'test', block_height);
});

// The unhandled promise causes problems with test reporting.
// Note unhandled promise rejections fail to proceed to the next function on AWS Lambda
test.skip('Indexer.runFunctions() continues despite promise rejection, unable to log rejection', async () => {
Expand All @@ -755,7 +801,7 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
})
})),
};
const indexer = new Indexer('mainnet', 'us-west-2', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray });
const indexer = new Indexer('mainnet', { fetch: mockFetch, s3: mockS3, awsXray: mockAwsXray, metrics: mockMetrics });

const functions = {};
functions['buildnear.testnet/fails'] = {code:`
Expand Down Expand Up @@ -802,5 +848,4 @@ mutation _1 { set(functionName: "buildnear.testnet/test", key: "foo2", data: "in
}
]);
});

});
Loading

0 comments on commit 0e162fb

Please sign in to comment.