Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Rename logs and metadata tables #677

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions runner/src/indexer-meta/indexer-meta.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('IndexerMeta', () => {
const infoEntry = LogEntry.systemInfo('Info message');
await indexerMeta.writeLogs([infoEntry]);

const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`;
const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.sys_logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`;
expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure);
});

Expand All @@ -54,7 +54,7 @@ describe('IndexerMeta', () => {
const errorEntry = LogEntry.systemError('Error message', 12345);
await indexerMeta.writeLogs([errorEntry]);

const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`;
const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.sys_logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`;
expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure);
});

Expand All @@ -77,7 +77,7 @@ describe('IndexerMeta', () => {

await indexerMeta.writeLogs(logEntries);

const expectedQuery = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES`;
const expectedQuery = `INSERT INTO ${infoIndexerConfig.schemaName()}.sys_logs (block_height, date, timestamp, type, level, message) VALUES`;
expect(query.mock.calls[0][0]).toContain(expectedQuery);
});

Expand Down Expand Up @@ -116,15 +116,15 @@ describe('IndexerMeta', () => {
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.setStatus(IndexerStatus.RUNNING);
expect(query).toBeCalledWith(
`INSERT INTO ${infoIndexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
`INSERT INTO ${infoIndexerConfig.schemaName()}.sys_metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
);
});

it('writes last processed block height for indexer', async () => {
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.updateBlockHeight(123);
expect(query).toBeCalledWith(
`INSERT INTO ${infoIndexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
`INSERT INTO ${infoIndexerConfig.schemaName()}.sys_metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
);
});
});
Expand Down
6 changes: 3 additions & 3 deletions runner/src/indexer-meta/indexer-meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export enum IndexerStatus {
STOPPED = 'STOPPED',
}

export const METADATA_TABLE_UPSERT = 'INSERT INTO %I.__metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *';
export const METADATA_TABLE_UPSERT = 'INSERT INTO %I.sys_metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *';
export enum MetadataFields {
STATUS = 'STATUS',
LAST_PROCESSED_BLOCK_HEIGHT = 'LAST_PROCESSED_BLOCK_HEIGHT'
Expand All @@ -24,7 +24,7 @@ export default class IndexerMeta {

private readonly pgClient: PgClient;
private readonly indexerConfig: IndexerConfig;
private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.__logs (block_height, date, timestamp, type, level, message) VALUES %L';
private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.sys_logs (block_height, date, timestamp, type, level, message) VALUES %L';

constructor (
indexerConfig: IndexerConfig,
Expand Down Expand Up @@ -62,7 +62,7 @@ export default class IndexerMeta {

const query = format(this.logInsertQueryTemplate, this.indexerConfig.schemaName(), values);
await this.pgClient.query(query);
}, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.indexerConfig.schemaName()}.__logs table`)
}, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.indexerConfig.schemaName()}.sys_logs table`)
.finally(() => {
writeLogSpan.end();
});
Expand Down
20 changes: 10 additions & 10 deletions runner/src/provisioner/provisioner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('Provisioner', () => {
const functionName = 'test-function';
const databaseSchema = 'CREATE TABLE blocks (height numeric)';
indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO);
const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`;
const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.sys_metadata (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`;
const logsDDL = expect.any(String);
const metadataDDL = expect.any(String);
const error = new Error('some error');
Expand Down Expand Up @@ -111,8 +111,8 @@ describe('Provisioner', () => {
]);
expect(userPgClientQuery.mock.calls).toEqual([
[setProvisioningStatusQuery],
["SELECT cron.schedule_in_database('morgs_near_test_function_logs_create_partition', '0 1 * * *', $$SELECT morgs_near_test_function.fn_create_partition('morgs_near_test_function.__logs', CURRENT_DATE, '1 day', '2 day')$$, 'morgs_near');"],
["SELECT cron.schedule_in_database('morgs_near_test_function_logs_delete_partition', '0 2 * * *', $$SELECT morgs_near_test_function.fn_delete_partition('morgs_near_test_function.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, 'morgs_near');"]
["SELECT cron.schedule_in_database('morgs_near_test_function_sys_logs_create_partition', '0 1 * * *', $$SELECT morgs_near_test_function.fn_create_partition('morgs_near_test_function.sys_logs', CURRENT_DATE, '1 day', '2 day')$$, 'morgs_near');"],
["SELECT cron.schedule_in_database('morgs_near_test_function_sys_logs_delete_partition', '0 2 * * *', $$SELECT morgs_near_test_function.fn_delete_partition('morgs_near_test_function.sys_logs', CURRENT_DATE, '-15 day', '-14 day')$$, 'morgs_near');"]
]);
expect(hasuraClient.addDatasource).toBeCalledWith(indexerConfig.userName(), password, indexerConfig.databaseName());
expect(hasuraClient.createSchema).toBeCalledWith(indexerConfig.userName(), indexerConfig.schemaName());
Expand Down Expand Up @@ -243,23 +243,23 @@ describe('Provisioner', () => {
});

it('provisions logs and metadata tables once', async () => {
hasuraClient.getTableNames = jest.fn().mockReturnValueOnce(['blocks']).mockReturnValue(['blocks', '__logs', '__metadata']);
hasuraClient.getTableNames = jest.fn().mockReturnValueOnce(['blocks']).mockReturnValue(['blocks', 'sys_logs', 'sys_metadata']);
await provisioner.provisionLogsAndMetadataIfNeeded(indexerConfig);
expect(hasuraClient.executeSqlOnSchema).toBeCalledTimes(2);
expect(cronPgClient.query).toBeCalledTimes(2);
expect(userPgClientQuery).toBeCalledTimes(3); // Set provisioning status, schedule today and tomorrow partitions
});

it('ensuring consistent state tracks logs and metadata table once, adds permissions twice', async () => {
hasuraClient.getTableNames = jest.fn().mockReturnValue(['blocks', '__logs', '__metadata']);
hasuraClient.getTableNames = jest.fn().mockReturnValue(['blocks', 'sys_logs', 'sys_metadata']);
hasuraClient.getTrackedTablePermissions = jest.fn()
.mockReturnValueOnce([
generateTableConfig('morgs_near_test_function', 'blocks', 'morgs_near', ['select', 'insert', 'update', 'delete']),
])
.mockReturnValueOnce([
generateTableConfig('morgs_near_test_function', 'blocks', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', '__logs', 'morgs_near', []),
generateTableConfig('morgs_near_test_function', '__metadata', 'morgs_near', []),
generateTableConfig('morgs_near_test_function', 'sys_logs', 'morgs_near', []),
generateTableConfig('morgs_near_test_function', 'sys_metadata', 'morgs_near', []),
]);
await provisioner.ensureConsistentHasuraState(indexerConfig);
await provisioner.ensureConsistentHasuraState(indexerConfig);
Expand All @@ -270,11 +270,11 @@ describe('Provisioner', () => {
});

it('ensuring consistent state caches result', async () => {
hasuraClient.getTableNames = jest.fn().mockReturnValue(['blocks', '__logs', '__metadata']);
hasuraClient.getTableNames = jest.fn().mockReturnValue(['blocks', 'sys_logs', 'sys_metadata']);
hasuraClient.getTrackedTablePermissions = jest.fn().mockReturnValue([
generateTableConfig('morgs_near_test_function', 'blocks', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', '__logs', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', '__metadata', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', 'sys_logs', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', 'sys_metadata', 'morgs_near', ['select', 'insert', 'update', 'delete']),
]);
await provisioner.ensureConsistentHasuraState(indexerConfig);
await provisioner.ensureConsistentHasuraState(indexerConfig);
Expand Down
20 changes: 16 additions & 4 deletions runner/src/provisioner/provisioner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ export default class Provisioner {
const userCronPgClient = new this.PgClient(userDbConnectionParameters);
await userCronPgClient.query(
this.pgFormat(
"SELECT cron.schedule_in_database('%1$I_logs_create_partition', '0 1 * * *', $$SELECT %1$I.fn_create_partition('%1$I.__logs', CURRENT_DATE, '1 day', '2 day')$$, %2$L);",
"SELECT cron.schedule_in_database('%1$I_sys_logs_create_partition', '0 1 * * *', $$SELECT %1$I.fn_create_partition('%1$I.sys_logs', CURRENT_DATE, '1 day', '2 day')$$, %2$L);",
schemaName,
databaseName
)
);
await userCronPgClient.query(
this.pgFormat(
"SELECT cron.schedule_in_database('%1$I_logs_delete_partition', '0 2 * * *', $$SELECT %1$I.fn_delete_partition('%1$I.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, %2$L);",
"SELECT cron.schedule_in_database('%1$I_sys_logs_delete_partition', '0 2 * * *', $$SELECT %1$I.fn_delete_partition('%1$I.sys_logs', CURRENT_DATE, '-15 day', '-14 day')$$, %2$L);",
schemaName,
databaseName
)
Expand Down Expand Up @@ -255,12 +255,24 @@ export default class Provisioner {
if (this.#hasLogsMetadataBeenProvisioned[indexerConfig.accountId]?.[indexerConfig.functionName]) {
return;
}
const logsTable = '__logs';
const metadataTable = '__metadata';
const oldLogsTable = '__logs';
const oldMetadataTable = '__metadata';
const logsTable = 'sys_logs';
const metadataTable = 'sys_metadata';

await wrapError(
async () => {
const tableNames = await this.getTableNames(indexerConfig.schemaName(), indexerConfig.databaseName());
const tablesToDelete: string[] = tableNames.filter((tableName: string) => tableName === oldLogsTable || tableName === oldMetadataTable);
if (tablesToDelete.length > 0) {
await this.hasuraClient.untrackTables(indexerConfig.databaseName(), indexerConfig.schemaName(), tablesToDelete, true);
}
if (tableNames.includes(oldLogsTable)) {
await this.hasuraClient.executeSqlOnSchema(indexerConfig.databaseName(), indexerConfig.schemaName(), `DROP TABLE IF EXISTS ${oldLogsTable} CASCADE;`);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this delete partitions too?

}
if (tableNames.includes(oldMetadataTable)) {
await this.hasuraClient.executeSqlOnSchema(indexerConfig.databaseName(), indexerConfig.schemaName(), `DROP TABLE IF EXISTS ${oldMetadataTable};`);
}

if (!tableNames.includes(logsTable)) {
await this.setupPartitionedLogsTable(indexerConfig.userName(), indexerConfig.databaseName(), indexerConfig.schemaName());
Expand Down
16 changes: 8 additions & 8 deletions runner/src/provisioner/schemas/logs-table.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export const logsTableDDL = (schemaName: string): string => `
CREATE TABLE __logs (
CREATE TABLE sys_logs (
id BIGSERIAL NOT NULL,
block_height NUMERIC(20),
date DATE NOT NULL,
Expand All @@ -10,11 +10,11 @@ CREATE TABLE __logs (
PRIMARY KEY (date, id)
) PARTITION BY RANGE (date);

CREATE INDEX __logs_timestamp_idx ON __logs USING btree (timestamp);
CREATE INDEX __logs_type_idx ON __logs USING btree (type);
CREATE INDEX __logs_level_idx ON __logs USING btree (level);
CREATE INDEX __logs_block_height_idx ON __logs USING btree (block_height);
CREATE INDEX __logs_search_vector_idx ON __logs USING GIN (to_tsvector('english', message));
CREATE INDEX sys_logs_timestamp_idx ON sys_logs USING btree (timestamp);
CREATE INDEX sys_logs_type_idx ON sys_logs USING btree (type);
CREATE INDEX sys_logs_level_idx ON sys_logs USING btree (level);
CREATE INDEX sys_logs_block_height_idx ON sys_logs USING btree (block_height);
CREATE INDEX sys_logs_search_vector_idx ON sys_logs USING GIN (to_tsvector('english', message));


CREATE OR REPLACE FUNCTION fn_create_partition(_tbl text, _date date, _interval_start text, _interval_end text)
Expand All @@ -34,8 +34,8 @@ EXECUTE 'CREATE TABLE IF NOT EXISTS ' || _tbl || '_p' || _partition_name || ' PA
END
$func$;

SELECT fn_create_partition('${schemaName}.__logs', CURRENT_DATE, '0 day', '1 day');
SELECT fn_create_partition('${schemaName}.__logs', CURRENT_DATE, '1 day', '2 day');
SELECT fn_create_partition('${schemaName}.sys_logs', CURRENT_DATE, '0 day', '1 day');
SELECT fn_create_partition('${schemaName}.sys_logs', CURRENT_DATE, '1 day', '2 day');

CREATE OR REPLACE FUNCTION fn_delete_partition(_tbl text, _date date, _interval_start text, _interval_end text)
RETURNS void
Expand Down
2 changes: 1 addition & 1 deletion runner/src/provisioner/schemas/metadata-table.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export const metadataTableDDL = (): string => `
CREATE TABLE IF NOT EXISTS __metadata (
CREATE TABLE IF NOT EXISTS sys_metadata (
attribute TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (attribute)
Expand Down
8 changes: 4 additions & 4 deletions runner/tests/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,12 @@ async function indexerOldLogsQuery (indexerSchemaName: string, graphqlClient: Gr
async function indexerLogsQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise<any> {
const graphqlResult: any = await graphqlClient.request(gql`
query {
${indexerSchemaName}___logs {
${indexerSchemaName}_sys_logs {
message
}
}
`);
return graphqlResult[`${indexerSchemaName}___logs`];
return graphqlResult[`${indexerSchemaName}_sys_logs`];
}

async function indexerStatusQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise<any> {
Expand All @@ -295,11 +295,11 @@ async function indexerBlockHeightQuery (indexerSchemaName: string, graphqlClient
async function indexerMetadataQuery (indexerSchemaName: string, attribute: string, graphqlClient: GraphQLClient): Promise<any> {
const graphqlResult: any = await graphqlClient.request(gql`
query {
${indexerSchemaName}___metadata(where: {attribute: {_eq: "${attribute}"}}) {
${indexerSchemaName}_sys_metadata(where: {attribute: {_eq: "${attribute}"}}) {
attribute
value
}
}
`);
return graphqlResult[`${indexerSchemaName}___metadata`][0];
return graphqlResult[`${indexerSchemaName}_sys_metadata`][0];
}
Loading