Skip to content

Commit

Permalink
feat: Schedule log partition jobs during provisioning (#625)
Browse files Browse the repository at this point in the history
This PR expands provisioning to also schedule the cron jobs for
adding/deleting log partitions. It assumes:
1. The `cron` database exists and has `pg_cron` enabled
(near/near-ops#1665)
2. The `__logs` table exists and has the partition functions defined
(#608)

In relation to this flow, the high-level steps are:
1. Use an admin connection to the `cron` database to grant the required
access to the user
2. Use a user connection to the `cron` database to schedule the jobs

The cron job is executed under the user which schedules the job,
therefore the user _must_ schedule the job as they are the only ones who
have access to their schemas. If the admin were to schedule the job the
job itself would fail as it doesn't have the required access.

Merging this before 2. is fine, the jobs will just fail, but should
start to succeed after it has been implemented.
  • Loading branch information
morgsmccauley authored Mar 29, 2024
1 parent 41ccc6a commit 738ad09
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 67 deletions.
10 changes: 9 additions & 1 deletion runner/src/hasura-client/hasura-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ interface SqlOptions {
source?: string
}

interface DatabaseConnectionParameters {
password: string
database: string
username: string
host: string
port: number
}

type MetadataRequestArgs = Record<string, any>;

type MetadataRequests = Record<string, any>;
Expand Down Expand Up @@ -99,7 +107,7 @@ export default class HasuraClient {
return metadata;
}

async getDbConnectionParameters (account: string): Promise<any> {
async getDbConnectionParameters (account: string): Promise<DatabaseConnectionParameters> {
const metadata = await this.exportMetadata();
const source = metadata.sources.find((source: { name: any, configuration: any }) => source.name === account);
if (source === undefined) {
Expand Down
90 changes: 45 additions & 45 deletions runner/src/provisioner/provisioner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import pgFormat from 'pg-format';
import Provisioner from './provisioner';

describe('Provisioner', () => {
let pgClient: any;
let adminPgClient: any;
let cronPgClient: any;
let hasuraClient: any;
let provisioner: Provisioner;
let userPgClientQuery: any;

const tableNames = ['blocks'];
const accountId = 'morgs.near';
Expand All @@ -13,7 +16,6 @@ describe('Provisioner', () => {
const sanitizedFunctionName = 'test_function';
const databaseSchema = 'CREATE TABLE blocks (height numeric)';
const error = new Error('some error');
const defaultDatabase = 'default';
const schemaName = `${sanitizedAccountId}_${sanitizedFunctionName}`;

const password = 'password';
Expand All @@ -39,20 +41,33 @@ describe('Provisioner', () => {
doesSourceExist: jest.fn().mockReturnValueOnce(false),
doesSchemaExist: jest.fn().mockReturnValueOnce(false),
untrackTables: jest.fn().mockReturnValueOnce(null),
grantCronAccess: jest.fn().mockResolvedValueOnce(null),
scheduleLogPartitionJobs: jest.fn().mockResolvedValueOnce(null),
getDbConnectionParameters: jest.fn().mockReturnValueOnce({}),
};

pgClient = {
adminPgClient = {
query: jest.fn().mockReturnValue(null),
format: pgFormat,
};

cronPgClient = {
query: jest.fn().mockReturnValue(null),
};

userPgClientQuery = jest.fn().mockReturnValue(null);
const PgClient = jest.fn().mockImplementation(() => {
return {
query: userPgClientQuery,
};
});

provisioner = new Provisioner(hasuraClient, adminPgClient, cronPgClient, crypto, pgFormat, PgClient as any);
});

describe('isUserApiProvisioned', () => {
it('returns false if datasource doesnt exists', async () => {
hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.fetchUserApiProvisioningStatus(accountId, functionName)).resolves.toBe(false);
expect(provisioner.isUserApiProvisioned(accountId, functionName)).toBe(false);
});
Expand All @@ -61,8 +76,6 @@ describe('Provisioner', () => {
hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(false);
hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(false);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.fetchUserApiProvisioningStatus(accountId, functionName)).resolves.toBe(false);
expect(provisioner.isUserApiProvisioned(accountId, functionName)).toBe(false);
});
Expand All @@ -71,25 +84,29 @@ describe('Provisioner', () => {
hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(true);
hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(true);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.fetchUserApiProvisioningStatus(accountId, functionName)).resolves.toBe(true);
expect(provisioner.isUserApiProvisioned(accountId, functionName)).toBe(true);
});
});

describe('provisionUserApi', () => {
it('provisions an API for the user', async () => {
const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await provisioner.provisionUserApi(accountId, functionName, databaseSchema);

expect(pgClient.query.mock.calls).toEqual([
expect(adminPgClient.query.mock.calls).toEqual([
['CREATE DATABASE morgs_near'],
['CREATE USER morgs_near WITH PASSWORD \'password\''],
['GRANT ALL PRIVILEGES ON DATABASE morgs_near TO morgs_near'],
['REVOKE CONNECT ON DATABASE morgs_near FROM PUBLIC'],
]);
expect(cronPgClient.query.mock.calls).toEqual([
['GRANT USAGE ON SCHEMA cron TO morgs_near'],
['GRANT EXECUTE ON FUNCTION cron.schedule_in_database TO morgs_near;'],
]);
expect(userPgClientQuery.mock.calls).toEqual([
["SELECT cron.schedule_in_database('morgs_near_test_function_logs_create_partition', '0 1 * * *', $$SELECT fn_create_partition('morgs_near_test_function.__logs', CURRENT_DATE, '1 day', '2 day')$$, 'morgs_near');"],
["SELECT cron.schedule_in_database('morgs_near_test_function_logs_delete_partition', '0 2 * * *', $$SELECT fn_delete_partition('morgs_near_test_function.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, 'morgs_near');"]
]);
expect(hasuraClient.addDatasource).toBeCalledWith(sanitizedAccountId, password, sanitizedAccountId);
expect(hasuraClient.createSchema).toBeCalledWith(sanitizedAccountId, schemaName);
expect(hasuraClient.runMigrations).toBeCalledWith(sanitizedAccountId, schemaName, databaseSchema);
Expand All @@ -110,25 +127,12 @@ describe('Provisioner', () => {
expect(provisioner.isUserApiProvisioned(accountId, functionName)).toBe(true);
});

it('untracks tables from the previous schema if they exists', async () => {
hasuraClient.doesSchemaExist = jest.fn().mockReturnValueOnce(true);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await provisioner.provisionUserApi(accountId, functionName, databaseSchema);

expect(hasuraClient.getTableNames).toBeCalledWith(schemaName, defaultDatabase);
expect(hasuraClient.untrackTables).toBeCalledWith(defaultDatabase, schemaName, tableNames);
});

it('skips provisioning the datasource if it already exists', async () => {
hasuraClient.doesSourceExist = jest.fn().mockReturnValueOnce(true);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await provisioner.provisionUserApi(accountId, functionName, databaseSchema);

expect(pgClient.query).not.toBeCalled();
expect(adminPgClient.query).not.toBeCalled();
expect(hasuraClient.addDatasource).not.toBeCalled();

expect(hasuraClient.createSchema).toBeCalledWith(sanitizedAccountId, schemaName);
Expand All @@ -150,67 +154,63 @@ describe('Provisioner', () => {
});

it('formats user input before executing the query', async () => {
const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await provisioner.createUserDb('morgs_near', 'pass; DROP TABLE users;--', 'databaseName UNION SELECT * FROM users --');

expect(pgClient.query.mock.calls).toMatchSnapshot();
expect(adminPgClient.query.mock.calls).toMatchSnapshot();
});

it('throws an error when it fails to create a postgres db', async () => {
pgClient.query = jest.fn().mockRejectedValue(error);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);
adminPgClient.query = jest.fn().mockRejectedValue(error);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to create user db: some error');
});

it('throws an error when it fails to add the db to hasura', async () => {
hasuraClient.addDatasource = jest.fn().mockRejectedValue(error);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to add datasource: some error');
});

it('throws an error when it fails to run migrations', async () => {
hasuraClient.runMigrations = jest.fn().mockRejectedValue(error);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to run migrations: some error');
});

it('throws an error when it fails to fetch table names', async () => {
hasuraClient.getTableNames = jest.fn().mockRejectedValue(error);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to fetch table names: some error');
});

it('throws an error when it fails to track tables', async () => {
hasuraClient.trackTables = jest.fn().mockRejectedValue(error);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to track tables: some error');
});

it('throws an error when it fails to track foreign key relationships', async () => {
hasuraClient.trackForeignKeyRelationships = jest.fn().mockRejectedValue(error);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to track foreign key relationships: some error');
});

it('throws an error when it fails to add permissions to tables', async () => {
hasuraClient.addPermissionsToTables = jest.fn().mockRejectedValue(error);

const provisioner = new Provisioner(hasuraClient, pgClient, crypto);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to add permissions to tables: some error');
});

it('throws when grant cron access fails', async () => {
cronPgClient.query = jest.fn().mockRejectedValue(error);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to setup partitioned logs table: Failed to grant cron access: some error');
});

it('throws when scheduling cron jobs fails', async () => {
userPgClientQuery = jest.fn().mockRejectedValueOnce(error);

await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to setup partitioned logs table: Failed to schedule log partition jobs: some error');
});
});
});
100 changes: 79 additions & 21 deletions runner/src/provisioner/provisioner.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
import { type Tracer, trace } from '@opentelemetry/api';
import pgFormatLib from 'pg-format';

import { wrapError } from '../utility';
import cryptoModule from 'crypto';
import HasuraClient from '../hasura-client';
import PgClient from '../pg-client';
import { type Tracer, trace } from '@opentelemetry/api';
import PgClientClass from '../pg-client';

const DEFAULT_PASSWORD_LENGTH = 16;
const CRON_DATABASE = 'cron';

const sharedPgClient = new PgClient({
const adminDefaultPgClientGlobal = new PgClientClass({
user: process.env.PGUSER,
password: process.env.PGPASSWORD,
database: process.env.PGDATABASE,
host: process.env.PGHOST,
port: Number(process.env.PGPORT),
});

const adminCronPgClientGlobal = new PgClientClass({
user: process.env.PGUSER,
password: process.env.PGPASSWORD,
database: CRON_DATABASE,
host: process.env.PGHOST,
port: Number(process.env.PGPORT),
});

export interface DatabaseConnectionParameters {
host: string
port: number
Expand All @@ -28,13 +39,12 @@ export default class Provisioner {

constructor (
private readonly hasuraClient: HasuraClient = new HasuraClient(),
private readonly pgClient: PgClient = sharedPgClient,
private readonly adminDefaultPgClient: PgClientClass = adminDefaultPgClientGlobal,
private readonly adminCronPgClient: PgClientClass = adminCronPgClientGlobal,
private readonly crypto: typeof cryptoModule = cryptoModule,
) {
this.hasuraClient = hasuraClient;
this.pgClient = pgClient;
this.crypto = crypto;
}
private readonly pgFormat: typeof pgFormatLib = pgFormatLib,
private readonly PgClient: typeof PgClientClass = PgClientClass
) {}

generatePassword (length: number = DEFAULT_PASSWORD_LENGTH): string {
return this.crypto
Expand All @@ -57,16 +67,68 @@ export default class Provisioner {
}

async createDatabase (name: string): Promise<void> {
await this.pgClient.query(this.pgClient.format('CREATE DATABASE %I', name));
await this.adminDefaultPgClient.query(this.pgFormat('CREATE DATABASE %I', name));
}

async createUser (name: string, password: string): Promise<void> {
await this.pgClient.query(this.pgClient.format('CREATE USER %I WITH PASSWORD %L', name, password));
await this.adminDefaultPgClient.query(this.pgFormat('CREATE USER %I WITH PASSWORD %L', name, password));
}

async restrictUserToDatabase (databaseName: string, userName: string): Promise<void> {
await this.pgClient.query(this.pgClient.format('GRANT ALL PRIVILEGES ON DATABASE %I TO %I', databaseName, userName));
await this.pgClient.query(this.pgClient.format('REVOKE CONNECT ON DATABASE %I FROM PUBLIC', databaseName));
await this.adminDefaultPgClient.query(this.pgFormat('GRANT ALL PRIVILEGES ON DATABASE %I TO %I', databaseName, userName));
await this.adminDefaultPgClient.query(this.pgFormat('REVOKE CONNECT ON DATABASE %I FROM PUBLIC', databaseName));
}

async grantCronAccess (userName: string): Promise<void> {
await wrapError(
async () => {
await this.adminCronPgClient.query(this.pgFormat('GRANT USAGE ON SCHEMA cron TO %I', userName));
await this.adminCronPgClient.query(this.pgFormat('GRANT EXECUTE ON FUNCTION cron.schedule_in_database TO %I;', userName));
},
'Failed to grant cron access'
);
}

async scheduleLogPartitionJobs (userName: string, databaseName: string, schemaName: string): Promise<void> {
await wrapError(
async () => {
const userDbConnectionParameters = await this.hasuraClient.getDbConnectionParameters(userName);
const userCronPgClient = new this.PgClient({
user: userDbConnectionParameters.username,
password: userDbConnectionParameters.password,
database: CRON_DATABASE,
host: userDbConnectionParameters.host,
port: userDbConnectionParameters.port,
});

await userCronPgClient.query(
this.pgFormat(
"SELECT cron.schedule_in_database('%1$I_logs_create_partition', '0 1 * * *', $$SELECT fn_create_partition('%1$I.__logs', CURRENT_DATE, '1 day', '2 day')$$, %2$L);",
schemaName,
databaseName
)
);
await userCronPgClient.query(
this.pgFormat(
"SELECT cron.schedule_in_database('%1$I_logs_delete_partition', '0 2 * * *', $$SELECT fn_delete_partition('%1$I.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, %2$L);",
schemaName,
databaseName
)
);
},
'Failed to schedule log partition jobs'
);
}

async setupPartitionedLogsTable (userName: string, databaseName: string, schemaName: string): Promise<void> {
await wrapError(
async () => {
// TODO: Create logs table
await this.grantCronAccess(userName);
await this.scheduleLogPartitionJobs(userName, databaseName, schemaName);
},
'Failed to setup partitioned logs table'
);
}

async createUserDb (userName: string, password: string, databaseName: string): Promise<void> {
Expand Down Expand Up @@ -161,15 +223,11 @@ export default class Provisioner {
await this.addDatasource(userName, password, databaseName);
}

// Untrack tables from old schema to prevent conflicts with new DB
if (await this.hasuraClient.doesSchemaExist(HasuraClient.DEFAULT_DATABASE, schemaName)) {
const tableNames = await this.getTableNames(schemaName, HasuraClient.DEFAULT_DATABASE);
await this.hasuraClient.untrackTables(HasuraClient.DEFAULT_DATABASE, schemaName, tableNames);
}

await this.createSchema(databaseName, schemaName);
await this.runMigrations(databaseName, schemaName, databaseSchema);

await this.setupPartitionedLogsTable(userName, databaseName, schemaName);

const tableNames = await this.getTableNames(schemaName, databaseName);
await this.trackTables(schemaName, tableNames, databaseName);

Expand All @@ -186,7 +244,7 @@ export default class Provisioner {
}
}

async getDatabaseConnectionParameters (accountId: string): Promise<any> {
return await this.hasuraClient.getDbConnectionParameters(accountId);
async getDatabaseConnectionParameters (userName: string): Promise<any> {
return await this.hasuraClient.getDbConnectionParameters(userName);
}
}

0 comments on commit 738ad09

Please sign in to comment.