diff --git a/runner/src/hasura-client/hasura-client.ts b/runner/src/hasura-client/hasura-client.ts index 3bf4de72a..e8ed2a7b9 100644 --- a/runner/src/hasura-client/hasura-client.ts +++ b/runner/src/hasura-client/hasura-client.ts @@ -10,6 +10,14 @@ interface SqlOptions { source?: string } +interface DatabaseConnectionParameters { + password: string + database: string + username: string + host: string + port: number +} + type MetadataRequestArgs = Record; type MetadataRequests = Record; @@ -99,7 +107,7 @@ export default class HasuraClient { return metadata; } - async getDbConnectionParameters (account: string): Promise { + async getDbConnectionParameters (account: string): Promise { const metadata = await this.exportMetadata(); const source = metadata.sources.find((source: { name: any, configuration: any }) => source.name === account); if (source === undefined) { diff --git a/runner/src/provisioner/provisioner.test.ts b/runner/src/provisioner/provisioner.test.ts index efa8cf1c4..2b1139b8b 100644 --- a/runner/src/provisioner/provisioner.test.ts +++ b/runner/src/provisioner/provisioner.test.ts @@ -3,8 +3,11 @@ import pgFormat from 'pg-format'; import Provisioner from './provisioner'; describe('Provisioner', () => { - let pgClient: any; + let adminPgClient: any; + let cronPgClient: any; let hasuraClient: any; + let provisioner: Provisioner; + let userPgClientQuery: any; const tableNames = ['blocks']; const accountId = 'morgs.near'; @@ -13,7 +16,6 @@ describe('Provisioner', () => { const sanitizedFunctionName = 'test_function'; const databaseSchema = 'CREATE TABLE blocks (height numeric)'; const error = new Error('some error'); - const defaultDatabase = 'default'; const schemaName = `${sanitizedAccountId}_${sanitizedFunctionName}`; const password = 'password'; @@ -39,20 +41,33 @@ describe('Provisioner', () => { doesSourceExist: jest.fn().mockReturnValueOnce(false), doesSchemaExist: jest.fn().mockReturnValueOnce(false), untrackTables: jest.fn().mockReturnValueOnce(null), + grantCronAccess: jest.fn().mockResolvedValueOnce(null), + scheduleLogPartitionJobs: jest.fn().mockResolvedValueOnce(null), + getDbConnectionParameters: jest.fn().mockReturnValueOnce({}), }; - pgClient = { + adminPgClient = { query: jest.fn().mockReturnValue(null), - format: pgFormat, }; + + cronPgClient = { + query: jest.fn().mockReturnValue(null), + }; + + userPgClientQuery = jest.fn().mockReturnValue(null); + const PgClient = jest.fn().mockImplementation(() => { + return { + query: userPgClientQuery, + }; + }); + + provisioner = new Provisioner(hasuraClient, adminPgClient, cronPgClient, crypto, pgFormat, PgClient as any); }); describe('isUserApiProvisioned', () => { it('returns false if datasource doesnt exists', async () => { hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.fetchUserApiProvisioningStatus(accountId, functionName)).resolves.toBe(false); expect(provisioner.isUserApiProvisioned(accountId, functionName)).toBe(false); }); @@ -61,8 +76,6 @@ describe('Provisioner', () => { hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false); hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(false); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.fetchUserApiProvisioningStatus(accountId, functionName)).resolves.toBe(false); expect(provisioner.isUserApiProvisioned(accountId, functionName)).toBe(false); }); @@ -71,8 +84,6 @@ describe('Provisioner', () => { hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(true); hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(true); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.fetchUserApiProvisioningStatus(accountId, functionName)).resolves.toBe(true); expect(provisioner.isUserApiProvisioned(accountId, functionName)).toBe(true); }); @@ -80,16 +91,22 @@ describe('Provisioner', () => { describe('provisionUserApi', () => { it('provisions an API for the user', async () => { - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await provisioner.provisionUserApi(accountId, functionName, databaseSchema); - expect(pgClient.query.mock.calls).toEqual([ + expect(adminPgClient.query.mock.calls).toEqual([ ['CREATE DATABASE morgs_near'], ['CREATE USER morgs_near WITH PASSWORD \'password\''], ['GRANT ALL PRIVILEGES ON DATABASE morgs_near TO morgs_near'], ['REVOKE CONNECT ON DATABASE morgs_near FROM PUBLIC'], ]); + expect(cronPgClient.query.mock.calls).toEqual([ + ['GRANT USAGE ON SCHEMA cron TO morgs_near'], + ['GRANT EXECUTE ON FUNCTION cron.schedule_in_database TO morgs_near;'], + ]); + expect(userPgClientQuery.mock.calls).toEqual([ + ["SELECT cron.schedule_in_database('morgs_near_test_function_logs_create_partition', '0 1 * * *', $$SELECT fn_create_partition('morgs_near_test_function.__logs', CURRENT_DATE, '1 day', '2 day')$$, 'morgs_near');"], + ["SELECT cron.schedule_in_database('morgs_near_test_function_logs_delete_partition', '0 2 * * *', $$SELECT fn_delete_partition('morgs_near_test_function.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, 'morgs_near');"] + ]); expect(hasuraClient.addDatasource).toBeCalledWith(sanitizedAccountId, password, sanitizedAccountId); expect(hasuraClient.createSchema).toBeCalledWith(sanitizedAccountId, schemaName); expect(hasuraClient.runMigrations).toBeCalledWith(sanitizedAccountId, schemaName, databaseSchema); @@ -110,25 +127,12 @@ describe('Provisioner', () => { expect(provisioner.isUserApiProvisioned(accountId, functionName)).toBe(true); }); - it('untracks tables from the previous schema if they exists', async () => { - hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(true); - - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - - await provisioner.provisionUserApi(accountId, functionName, databaseSchema); - - expect(hasuraClient.getTableNames).toBeCalledWith(schemaName, defaultDatabase); - expect(hasuraClient.untrackTables).toBeCalledWith(defaultDatabase, schemaName, tableNames); - }); - it('skips provisioning the datasource if it already exists', async () => { hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(true); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await provisioner.provisionUserApi(accountId, functionName, databaseSchema); - expect(pgClient.query).not.toBeCalled(); + expect(adminPgClient.query).not.toBeCalled(); expect(hasuraClient.addDatasource).not.toBeCalled(); expect(hasuraClient.createSchema).toBeCalledWith(sanitizedAccountId, schemaName); @@ -150,17 +154,13 @@ describe('Provisioner', () => { }); it('formats user input before executing the query', async () => { - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await provisioner.createUserDb('morgs_near', 'pass; DROP TABLE users;--', 'databaseName UNION SELECT * FROM users --'); - expect(pgClient.query.mock.calls).toMatchSnapshot(); + expect(adminPgClient.query.mock.calls).toMatchSnapshot(); }); it('throws an error when it fails to create a postgres db', async () => { - pgClient.query = jest.fn().mockRejectedValue(error); - - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); + adminPgClient.query = jest.fn().mockRejectedValue(error); await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to create user db: some error'); }); @@ -168,49 +168,49 @@ describe('Provisioner', () => { it('throws an error when it fails to add the db to hasura', async () => { hasuraClient.addDatasource = jest.fn().mockRejectedValue(error); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to add datasource: some error'); }); it('throws an error when it fails to run migrations', async () => { hasuraClient.runMigrations = jest.fn().mockRejectedValue(error); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to run migrations: some error'); }); it('throws an error when it fails to fetch table names', async () => { hasuraClient.getTableNames = jest.fn().mockRejectedValue(error); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to fetch table names: some error'); }); it('throws an error when it fails to track tables', async () => { hasuraClient.trackTables = jest.fn().mockRejectedValue(error); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to track tables: some error'); }); it('throws an error when it fails to track foreign key relationships', async () => { hasuraClient.trackForeignKeyRelationships = jest.fn().mockRejectedValue(error); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to track foreign key relationships: some error'); }); it('throws an error when it fails to add permissions to tables', async () => { hasuraClient.addPermissionsToTables = jest.fn().mockRejectedValue(error); - const provisioner = new Provisioner(hasuraClient, pgClient, crypto); - await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to add permissions to tables: some error'); }); + + it('throws when grant cron access fails', async () => { + cronPgClient.query = jest.fn().mockRejectedValue(error); + + await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to setup partitioned logs table: Failed to grant cron access: some error'); + }); + + it('throws when scheduling cron jobs fails', async () => { + userPgClientQuery = jest.fn().mockRejectedValueOnce(error); + + await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to setup partitioned logs table: Failed to schedule log partition jobs: some error'); + }); }); }); diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index e703e36f8..8a9fa2ff2 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -1,12 +1,15 @@ +import { type Tracer, trace } from '@opentelemetry/api'; +import pgFormatLib from 'pg-format'; + import { wrapError } from '../utility'; import cryptoModule from 'crypto'; import HasuraClient from '../hasura-client'; -import PgClient from '../pg-client'; -import { type Tracer, trace } from '@opentelemetry/api'; +import PgClientClass from '../pg-client'; const DEFAULT_PASSWORD_LENGTH = 16; +const CRON_DATABASE = 'cron'; -const sharedPgClient = new PgClient({ +const adminDefaultPgClientGlobal = new PgClientClass({ user: process.env.PGUSER, password: process.env.PGPASSWORD, database: process.env.PGDATABASE, @@ -14,6 +17,14 @@ const sharedPgClient = new PgClient({ port: Number(process.env.PGPORT), }); +const adminCronPgClientGlobal = new PgClientClass({ + user: process.env.PGUSER, + password: process.env.PGPASSWORD, + database: CRON_DATABASE, + host: process.env.PGHOST, + port: Number(process.env.PGPORT), +}); + export interface DatabaseConnectionParameters { host: string port: number @@ -28,13 +39,12 @@ export default class Provisioner { constructor ( private readonly hasuraClient: HasuraClient = new HasuraClient(), - private readonly pgClient: PgClient = sharedPgClient, + private readonly adminDefaultPgClient: PgClientClass = adminDefaultPgClientGlobal, + private readonly adminCronPgClient: PgClientClass = adminCronPgClientGlobal, private readonly crypto: typeof cryptoModule = cryptoModule, - ) { - this.hasuraClient = hasuraClient; - this.pgClient = pgClient; - this.crypto = crypto; - } + private readonly pgFormat: typeof pgFormatLib = pgFormatLib, + private readonly PgClient: typeof PgClientClass = PgClientClass + ) {} generatePassword (length: number = DEFAULT_PASSWORD_LENGTH): string { return this.crypto @@ -57,16 +67,68 @@ export default class Provisioner { } async createDatabase (name: string): Promise { - await this.pgClient.query(this.pgClient.format('CREATE DATABASE %I', name)); + await this.adminDefaultPgClient.query(this.pgFormat('CREATE DATABASE %I', name)); } async createUser (name: string, password: string): Promise { - await this.pgClient.query(this.pgClient.format('CREATE USER %I WITH PASSWORD %L', name, password)); + await this.adminDefaultPgClient.query(this.pgFormat('CREATE USER %I WITH PASSWORD %L', name, password)); } async restrictUserToDatabase (databaseName: string, userName: string): Promise { - await this.pgClient.query(this.pgClient.format('GRANT ALL PRIVILEGES ON DATABASE %I TO %I', databaseName, userName)); - await this.pgClient.query(this.pgClient.format('REVOKE CONNECT ON DATABASE %I FROM PUBLIC', databaseName)); + await this.adminDefaultPgClient.query(this.pgFormat('GRANT ALL PRIVILEGES ON DATABASE %I TO %I', databaseName, userName)); + await this.adminDefaultPgClient.query(this.pgFormat('REVOKE CONNECT ON DATABASE %I FROM PUBLIC', databaseName)); + } + + async grantCronAccess (userName: string): Promise { + await wrapError( + async () => { + await this.adminCronPgClient.query(this.pgFormat('GRANT USAGE ON SCHEMA cron TO %I', userName)); + await this.adminCronPgClient.query(this.pgFormat('GRANT EXECUTE ON FUNCTION cron.schedule_in_database TO %I;', userName)); + }, + 'Failed to grant cron access' + ); + } + + async scheduleLogPartitionJobs (userName: string, databaseName: string, schemaName: string): Promise { + await wrapError( + async () => { + const userDbConnectionParameters = await this.hasuraClient.getDbConnectionParameters(userName); + const userCronPgClient = new this.PgClient({ + user: userDbConnectionParameters.username, + password: userDbConnectionParameters.password, + database: CRON_DATABASE, + host: userDbConnectionParameters.host, + port: userDbConnectionParameters.port, + }); + + await userCronPgClient.query( + this.pgFormat( + "SELECT cron.schedule_in_database('%1$I_logs_create_partition', '0 1 * * *', $$SELECT fn_create_partition('%1$I.__logs', CURRENT_DATE, '1 day', '2 day')$$, %2$L);", + schemaName, + databaseName + ) + ); + await userCronPgClient.query( + this.pgFormat( + "SELECT cron.schedule_in_database('%1$I_logs_delete_partition', '0 2 * * *', $$SELECT fn_delete_partition('%1$I.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, %2$L);", + schemaName, + databaseName + ) + ); + }, + 'Failed to schedule log partition jobs' + ); + } + + async setupPartitionedLogsTable (userName: string, databaseName: string, schemaName: string): Promise { + await wrapError( + async () => { + // TODO: Create logs table + await this.grantCronAccess(userName); + await this.scheduleLogPartitionJobs(userName, databaseName, schemaName); + }, + 'Failed to setup partitioned logs table' + ); } async createUserDb (userName: string, password: string, databaseName: string): Promise { @@ -161,15 +223,11 @@ export default class Provisioner { await this.addDatasource(userName, password, databaseName); } - // Untrack tables from old schema to prevent conflicts with new DB - if (await this.hasuraClient.doesSchemaExist(HasuraClient.DEFAULT_DATABASE, schemaName)) { - const tableNames = await this.getTableNames(schemaName, HasuraClient.DEFAULT_DATABASE); - await this.hasuraClient.untrackTables(HasuraClient.DEFAULT_DATABASE, schemaName, tableNames); - } - await this.createSchema(databaseName, schemaName); await this.runMigrations(databaseName, schemaName, databaseSchema); + await this.setupPartitionedLogsTable(userName, databaseName, schemaName); + const tableNames = await this.getTableNames(schemaName, databaseName); await this.trackTables(schemaName, tableNames, databaseName); @@ -186,7 +244,7 @@ export default class Provisioner { } } - async getDatabaseConnectionParameters (accountId: string): Promise { - return await this.hasuraClient.getDbConnectionParameters(accountId); + async getDatabaseConnectionParameters (userName: string): Promise { + return await this.hasuraClient.getDbConnectionParameters(userName); } }