Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add pgBouncer to QueryApi #615

Merged
merged 10 commits into from
Mar 25, 2024
32 changes: 28 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ services:
RUNNER_URL: http://runner:7001
REGISTRY_CONTRACT_ID: dev-queryapi.dataplatform.near
RUST_LOG: info
RPC_URL: https://archival-rpc.mainnet.near.org

runner:
build:
Expand All @@ -45,9 +46,9 @@ services:
HASURA_ENDPOINT: http://hasura-graphql:8080
HASURA_ADMIN_SECRET: myadminsecretkey
REDIS_CONNECTION_STRING: redis://redis
PGHOST: postgres
PGHOST_HASURA: postgres
PGPORT: 5432
PGHOST: pgbouncer
PGHOST_HASURA: pgbouncer
PGPORT: 6432
PGUSER: postgres
PGPASSWORD: postgrespassword
PGDATABASE: postgres
Expand All @@ -61,6 +62,7 @@ services:
ZIPKIN_ENDPOINT: http://zipkin:9411/api/v2/spans
GCP_PROJECT_ID:
TRACING_SAMPLE_RATE: 0.1
MAX_PG_POOL_SIZE: 10
ports:
- "7001:7001"

Expand All @@ -76,15 +78,37 @@ services:
- "6379:6379"

postgres:
image: postgres:12
image: postgres:14
restart: always
volumes:
- ./init-scripts/postgres:/docker-entrypoint-initdb.d
- postgres:/var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: postgrespassword
ports:
- "5432:5432"

pgbouncer:
image: darunrs/pgbouncer:auth_dbname # TODO: Replace with edoburu:pgbouncer image once it supports auth_dbname
environment:
LISTEN_PORT: 6432
DB_HOST: postgres
DB_USER: pgbouncer
DB_PASSWORD: pgbouncer
ADMIN_USERS: postgres
DB_NAME: "*"
AUTH_TYPE: scram-sha-256
AUTH_FILE: /etc/pgbouncer/userlist.txt
AUTH_USER: pgbouncer
AUTH_QUERY: SELECT uname, phash FROM public.user_lookup($1::text)
AUTH_DBNAME: postgres
MAX_CLIENT_CONN: 4000 # Max Connections to PgBouncer
DEFAULT_POOL_SIZE: 10 # Standard connections open per user/db combo
ports:
- "6432:6432"
depends_on:
- postgres

hasura-auth:
build:
context: ./hasura-authentication-service
Expand Down
12 changes: 12 additions & 0 deletions init-scripts/postgres/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE ROLE pgbouncer LOGIN;
ALTER ROLE pgbouncer WITH PASSWORD 'pgbouncer';
Copy link
Collaborator Author

@darunrs darunrs Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Password should match DB_PASSWORD in docker container.
Note that this SQL needs to also be run in the postgres instance. It's re-defined in this NEAR Ops PR: https://github.com/near/near-ops/pull/1652

CREATE OR REPLACE FUNCTION public.user_lookup(in i_username text, out uname text, out phash text)
RETURNS record AS $$
BEGIN
SELECT usename, passwd FROM pg_catalog.pg_shadow
WHERE usename = i_username INTO uname, phash;
RETURN;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public;
GRANT EXECUTE ON FUNCTION public.user_lookup(text) TO pgbouncer;
61 changes: 33 additions & 28 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,16 +455,17 @@ CREATE TABLE
});

test('indexer builds context and inserts an objects into existing table', async () => {
const mockDmlHandlerInstance: any = { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToInsert = [{
Expand Down Expand Up @@ -493,15 +494,15 @@ CREATE TABLE
query: jest.fn().mockReturnValue({ rows: [] }),
format: jest.fn().mockReturnValue('mock')
} as unknown as PgClient;
const dmlHandlerInstance = DmlHandler.create(genericDbCredentials, mockPgClient);
const dmlHandlerInstance: any = DmlHandler.create(genericDbCredentials, mockPgClient);
const upsertSpy = jest.spyOn(dmlHandlerInstance, 'upsert');
const mockDmlHandler: any = {
create: jest.fn().mockReturnValue(dmlHandlerInstance)
};
const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, dmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');
const promises = [];

Expand Down Expand Up @@ -531,16 +532,17 @@ CREATE TABLE
// Expects limit to be last parameter
return args[args.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }];
});
const mockDmlHandlerInstance: any = { select: selectFn };
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { select: selectFn };
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToSelect = {
Expand All @@ -554,23 +556,24 @@ CREATE TABLE
});

test('indexer builds context and updates multiple objects from existing table', async () => {
const mockDmlHandlerInstance: any = {
update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => {
if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') {
return [{ colA: 'valA' }, { colA: 'valA' }];
}
return [{}];
})
};
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return {
update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => {
if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') {
return [{ colA: 'valA' }, { colA: 'valA' }];
}
return [{}];
})
};
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const whereObj = {
Expand All @@ -586,25 +589,26 @@ CREATE TABLE
});

test('indexer builds context and upserts on existing table', async () => {
const mockDmlHandlerInstance: any = {
upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => {
if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }, { colA: 'valA' }];
} else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }];
}
return [{}];
})
};
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return {
upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => {
if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }, { colA: 'valA' }];
} else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }];
}
return [{}];
})
};
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToInsert = [{
Expand All @@ -631,16 +635,17 @@ CREATE TABLE
});

test('indexer builds context and deletes objects from existing table', async () => {
const mockDmlHandlerInstance: any = { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const deleteFilter = {
Expand Down
12 changes: 7 additions & 5 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import Provisioner from '../provisioner';
import DmlHandler from '../dml-handler/dml-handler';
import { type IndexerBehavior, LogLevel, Status } from '../stream-handler/stream-handler';
import { type DatabaseConnectionParameters } from '../provisioner/provisioner';
import assert from 'assert';
import { trace, type Span } from '@opentelemetry/api';

interface Dependencies {
Expand Down Expand Up @@ -41,13 +40,15 @@ export default class Indexer {

private readonly indexer_behavior: IndexerBehavior;
private readonly deps: Dependencies;
// TODO: After provisioning migrated out of Runner, fetch credentials before Indexer initialization

private database_connection_parameters: DatabaseConnectionParameters | undefined;
private dml_handler: DmlHandler | undefined;

constructor (
indexerBehavior: IndexerBehavior,
deps?: Partial<Dependencies>,
databaseConnectionParameters = undefined,
dmlHandler = undefined,
) {
this.DEFAULT_HASURA_ROLE = 'append';
this.indexer_behavior = indexerBehavior;
Expand All @@ -59,6 +60,7 @@ export default class Indexer {
...deps,
};
this.database_connection_parameters = databaseConnectionParameters;
this.dml_handler = dmlHandler;
}

async runFunctions (
Expand All @@ -73,7 +75,7 @@ export default class Indexer {

const simultaneousPromises: Array<Promise<any>> = [];
const allMutations: string[] = [];

for (const functionName in functions) {
try {
const indexerFunction = functions[functionName];
Expand Down Expand Up @@ -106,6 +108,7 @@ export default class Indexer {
try {
this.database_connection_parameters = this.database_connection_parameters ??
await this.deps.provisioner.getDatabaseConnectionParameters(hasuraRoleName);
this.dml_handler = this.dml_handler ?? this.deps.DmlHandler.create(this.database_connection_parameters as DatabaseConnectionParameters);
} catch (e) {
const error = e as Error;
simultaneousPromises.push(this.writeLog(LogLevel.ERROR, functionName, blockHeight, 'Failed to get database connection parameters', error.message));
Expand Down Expand Up @@ -265,8 +268,7 @@ export default class Indexer {
try {
const tables = this.getTableNames(schema);
const sanitizedTableNames = new Set<string>();
assert(this.database_connection_parameters !== undefined, 'Database connection parameters are not set');
const dmlHandler: DmlHandler = this.deps.DmlHandler.create(this.database_connection_parameters);
const dmlHandler = this.dml_handler as DmlHandler;

// Generate and collect methods for each table name
const result = tables.reduce((prev, tableName) => {
Expand Down
4 changes: 2 additions & 2 deletions runner/src/pg-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ export default class PgClient {

constructor (
connectionParams: ConnectionParams,
poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 },
poolConfig: PoolConfig = { max: Number(process.env.MAX_PG_POOL_SIZE ?? 10), idleTimeoutMillis: 3000 },
PgPool: typeof Pool = Pool,
pgFormat: typeof pgFormatModule = pgFormatModule
) {
this.pgPool = new PgPool({
user: connectionParams.user,
password: connectionParams.password,
host: connectionParams.host,
port: Number(connectionParams.port),
port: Number(process.env.PGPORT),
database: connectionParams.database,
...poolConfig,
});
Expand Down
Loading