diff --git a/runner/src/indexer-config/indexer-config.ts b/runner/src/indexer-config/indexer-config.ts index f975f484..04b4b136 100644 --- a/runner/src/indexer-config/indexer-config.ts +++ b/runner/src/indexer-config/indexer-config.ts @@ -17,7 +17,8 @@ export class ProvisioningConfig extends BaseConfig { constructor ( public readonly accountId: string, public readonly functionName: string, - public readonly schema: string + public readonly schema: string, + public readonly logLevel: LogLevel = LogLevel.INFO ) { super(accountId, functionName); } @@ -101,7 +102,7 @@ export default class IndexerConfig extends ProvisioningConfig { public readonly schema: string, public readonly logLevel: LogLevel ) { - super(accountId, functionName, schema); + super(accountId, functionName, schema, logLevel); const hash = crypto.createHash('sha256'); hash.update(`${accountId}/${functionName}`); this.executorId = hash.digest('hex'); diff --git a/runner/src/indexer-meta/indexer-meta.ts b/runner/src/indexer-meta/indexer-meta.ts index f0da10b9..d09b4139 100644 --- a/runner/src/indexer-meta/indexer-meta.ts +++ b/runner/src/indexer-meta/indexer-meta.ts @@ -4,7 +4,7 @@ import PgClient, { type PostgresConnectionParams } from '../pg-client'; import { trace } from '@opentelemetry/api'; import type LogEntry from './log-entry'; import { LogLevel } from './log-entry'; -import type IndexerConfig from '../indexer-config'; +import { type ProvisioningConfig } from '../indexer-config/indexer-config'; export enum IndexerStatus { PROVISIONING = 'PROVISIONING', @@ -29,11 +29,11 @@ export default class IndexerMeta implements IndexerMetaInterface { tracer = trace.getTracer('queryapi-runner-indexer-logger'); private readonly pgClient: PgClient; - private readonly indexerConfig: IndexerConfig; + private readonly indexerConfig: ProvisioningConfig; private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.sys_logs (block_height, date, timestamp, type, level, message) VALUES %L'; constructor ( - indexerConfig: IndexerConfig, + indexerConfig: ProvisioningConfig, databaseConnectionParameters: PostgresConnectionParams, pgClientInstance: PgClient | undefined = undefined, ) { diff --git a/runner/src/provisioner/provisioner.test.ts b/runner/src/provisioner/provisioner.test.ts index 8ef12371..0b58c486 100644 --- a/runner/src/provisioner/provisioner.test.ts +++ b/runner/src/provisioner/provisioner.test.ts @@ -72,7 +72,13 @@ describe('Provisioner', () => { }; }); - provisioner = new Provisioner(hasuraClient, adminPgClient, cronPgClient, undefined, crypto, pgFormat, PgClient as any, testingRetryConfig); + const IndexerMeta = jest.fn().mockImplementation(() => { + return { + writeLogs: jest.fn() + }; + }); + + provisioner = new Provisioner(hasuraClient, adminPgClient, cronPgClient, undefined, crypto, pgFormat, PgClient as any, testingRetryConfig, IndexerMeta); indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO); }); diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index f9d4e511..237bdb1e 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -10,7 +10,8 @@ import { logsTableDDL } from './schemas/logs-table'; import { metadataTableDDL } from './schemas/metadata-table'; import PgClientClass, { type PostgresConnectionParams } from '../pg-client'; import { type ProvisioningConfig } from '../indexer-config/indexer-config'; -import { METADATA_TABLE_UPSERT, MetadataFields, IndexerStatus } from '../indexer-meta'; +import IndexerMetaClass, { METADATA_TABLE_UPSERT, MetadataFields, IndexerStatus, LogEntry } from '../indexer-meta'; +import logger from '../logger'; const DEFAULT_PASSWORD_LENGTH = 16; @@ -60,6 +61,9 @@ const defaultRetryConfig: RetryConfig = { export default class Provisioner { tracer: Tracer = trace.getTracer('queryapi-runner-provisioner'); + private readonly SYSTEM_TABLES = ['sys_logs', 'sys_metadata']; + private readonly logger: typeof logger; + constructor ( private readonly hasuraClient: HasuraClient = new HasuraClient(), private readonly adminDefaultPgClient: PgClientClass = adminDefaultPgClientGlobal, @@ -69,7 +73,10 @@ export default class Provisioner { private readonly pgFormat: typeof pgFormatLib = pgFormatLib, private readonly PgClient: typeof PgClientClass = PgClientClass, private readonly retryConfig: RetryConfig = defaultRetryConfig, - ) {} + private readonly IndexerMeta: typeof IndexerMetaClass = IndexerMetaClass + ) { + this.logger = logger.child({ service: 'Provisioner' }); + } generatePassword (length: number = DEFAULT_PASSWORD_LENGTH): string { return this.crypto @@ -322,42 +329,82 @@ export default class Provisioner { }, 'Failed to deprovision'); } - async provisionUserApi (indexerConfig: ProvisioningConfig): Promise { // replace any with actual type + async provisionUserApi (indexerConfig: ProvisioningConfig): Promise { + const logger = this.logger.child({ accountId: indexerConfig.accountId, functionName: indexerConfig.functionName }); + + await wrapSpan(async () => { + await wrapError(async () => { + try { + await this.provisionSystemResources(indexerConfig); + } catch (error) { + logger.error('Failed to provision system resources', error); + throw error; + } + + try { + await this.provisionUserResources(indexerConfig); + } catch (err) { + const error = err as Error; + + try { + await this.writeFailureToUserLogs(indexerConfig, error); + } catch (error) { + logger.error('Failed to log provisioning failure', error); + } + + logger.warn('Failed to provision user resources', error); + throw error; + } + }, 'Failed to provision endpoint'); + }, this.tracer, 'provision indexer resources'); + } + + async writeFailureToUserLogs (indexerConfig: ProvisioningConfig, error: Error): Promise { + const indexerMeta = new this.IndexerMeta(indexerConfig, await this.getPostgresConnectionParameters(indexerConfig.userName())); + await indexerMeta.writeLogs([LogEntry.systemError(error.message)]); + } + + async provisionSystemResources (indexerConfig: ProvisioningConfig): Promise { const userName = indexerConfig.userName(); const databaseName = indexerConfig.databaseName(); const schemaName = indexerConfig.schemaName(); - await wrapSpan(async () => { - await wrapError( - async () => { - if (!await this.hasuraClient.doesSourceExist(databaseName)) { - const password = this.generatePassword(); - await this.createUserDb(userName, password, databaseName); - await this.addDatasource(userName, password, databaseName); - } + if (!await this.hasuraClient.doesSourceExist(databaseName)) { + const password = this.generatePassword(); + await this.createUserDb(userName, password, databaseName); + await this.addDatasource(userName, password, databaseName); + } - await this.createSchema(databaseName, schemaName); + await this.createSchema(databaseName, schemaName); - await this.createMetadataTable(databaseName, schemaName); - await this.setProvisioningStatus(userName, schemaName); - await this.setupPartitionedLogsTable(userName, databaseName, schemaName); - await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); + await this.createMetadataTable(databaseName, schemaName); + await this.setProvisioningStatus(userName, schemaName); + await this.setupPartitionedLogsTable(userName, databaseName, schemaName); - const updatedTableNames = await this.getTableNames(schemaName, databaseName); + await this.trackTables(schemaName, this.SYSTEM_TABLES, databaseName); - await this.trackTables(schemaName, updatedTableNames, databaseName); + await this.exponentialRetry(async () => { + await this.addPermissionsToTables(indexerConfig, this.SYSTEM_TABLES, ['select', 'insert', 'update', 'delete']); + }); + } - await this.exponentialRetry(async () => { - await this.trackForeignKeyRelationships(schemaName, databaseName); - }); + async provisionUserResources (indexerConfig: ProvisioningConfig): Promise { + const databaseName = indexerConfig.databaseName(); + const schemaName = indexerConfig.schemaName(); - await this.exponentialRetry(async () => { - await this.addPermissionsToTables(indexerConfig, updatedTableNames, ['select', 'insert', 'update', 'delete']); - }); - }, - 'Failed to provision endpoint' - ); - }, this.tracer, 'provision indexer resources'); + await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); + + const userTableNames = (await this.getTableNames(schemaName, databaseName)).filter((tableName) => !this.SYSTEM_TABLES.includes(tableName)); + + await this.trackTables(schemaName, userTableNames, databaseName); + + await this.exponentialRetry(async () => { + await this.trackForeignKeyRelationships(schemaName, databaseName); + }); + + await this.exponentialRetry(async () => { + await this.addPermissionsToTables(indexerConfig, userTableNames, ['select', 'insert', 'update', 'delete']); + }); } async exponentialRetry (fn: () => Promise): Promise { diff --git a/runner/src/server/services/data-layer/data-layer-service.ts b/runner/src/server/services/data-layer/data-layer-service.ts index d24befa0..61a1c94e 100644 --- a/runner/src/server/services/data-layer/data-layer-service.ts +++ b/runner/src/server/services/data-layer/data-layer-service.ts @@ -111,16 +111,12 @@ export function createDataLayerService ( .then(() => { logger.info('Successfully provisioned Data Layer'); }) - .catch((err) => { - logger.error('Failed to provision Data Layer', err); - throw err; - }) ); callback(null, { taskId }); }) .catch((err) => { - logger.error('Failed to check if Data Layer is provisioned', err); + logger.warn('Failed to check if Data Layer is provisioned', err); const internal = new StatusBuilder() .withCode(status.INTERNAL) @@ -148,7 +144,7 @@ export function createDataLayerService ( logger.info('Successfully deprovisioned Data Layer'); }) .catch((err) => { - logger.error('Failed to deprovision Data Layer', err); + logger.warn('Failed to deprovision Data Layer', err); throw err; }) ); diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 6299beac..dd29d392 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -81,7 +81,7 @@ export function getRunnerService ( executors.set(indexerConfig.executorId, streamHandler); callback(null, { executorId: indexerConfig.executorId }); streamHandler.start().catch((error: Error) => { - logger.error('Failed to start executor', error); + logger.warn('Failed to start executor', error); }); } catch (e) { const error = e as Error; diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index b6742840..9b9ea9ee 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -74,7 +74,7 @@ export default class StreamHandler { this.executorContext.executionState = ExecutionState.RUNNING; } catch (error: any) { const errorContent = error instanceof Error ? error.toString() : JSON.stringify(error); - this.logger.error('Terminating thread', error); + this.logger.warn('Terminating thread', error); this.executorContext.executionState = ExecutionState.STALLED; throw new Error(`Failed to start Indexer: ${errorContent}`); } @@ -92,7 +92,7 @@ export default class StreamHandler { } private handleError (error: Error): void { - this.logger.error('Terminating thread', error); + this.logger.warn('Terminating thread', error); this.executorContext.executionState = ExecutionState.STALLED; if (this.indexerMeta) { diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 3ca3e5c9..38a6483b 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -189,7 +189,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise const error = err as Error; if (previousError !== error.message) { previousError = error.message; - workerContext.logger.error(`Failed on block ${currBlockHeight}`, err); + workerContext.logger.warn(`Failed on block ${currBlockHeight}`, err); } const sleepSpan = tracer.startSpan('Sleep for 10 seconds after failing', {}, context.active()); await sleep(10000); diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index 1fd71d9c..cca35999 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -270,6 +270,23 @@ describe('Indexer integration', () => { await expect(pgClient.query('SELECT * FROM cron.job WHERE jobname like $1', ['provisioning_near_test_provisioning%']).then(({ rows }) => rows)).resolves.toHaveLength(0); await expect(hasuraClient.doesSourceExist(testConfig1.databaseName())).resolves.toBe(false); }); + + it('Writes provisioning errors to user logs table', async () => { + const testConfig = new IndexerConfig( + 'test:stream', + 'user-failures.near', // must be unique to prevent conflicts with other tests + 'test', + 0, + '', + 'broken schema', + LogLevel.INFO + ); + + await expect(provisioner.provisionUserApi(testConfig)).rejects.toThrow(); + + const logs: any = await indexerLogsQuery(testConfig.schemaName(), graphqlClient); + expect(logs[0].message).toContain('Failed to run user script'); + }); }); async function prepareIndexer (indexerConfig: IndexerConfig, provisioner: Provisioner, hasuraContainer: StartedHasuraGraphQLContainer): Promise {