Skip to content

Commit

Permalink
feat: Add pgBouncer to QueryApi (#615)
Browse files Browse the repository at this point in the history
QueryApi has experienced issues with Postgres connections since the
introduction of * indexers due to how QueryApi creates these connections
through Application level connection pools. Since we can't use one pool
for all workers, I've introduced PgBouncer as a Middleware to serve as
an additional connection pooler in front of the DB.
  • Loading branch information
darunrs authored Mar 25, 2024
1 parent 1bccff1 commit daf289d
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 39 deletions.
32 changes: 28 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ services:
RUNNER_URL: http://runner:7001
REGISTRY_CONTRACT_ID: dev-queryapi.dataplatform.near
RUST_LOG: info
RPC_URL: https://archival-rpc.mainnet.near.org

runner:
build:
Expand All @@ -45,9 +46,9 @@ services:
HASURA_ENDPOINT: http://hasura-graphql:8080
HASURA_ADMIN_SECRET: myadminsecretkey
REDIS_CONNECTION_STRING: redis://redis
PGHOST: postgres
PGHOST_HASURA: postgres
PGPORT: 5432
PGHOST: pgbouncer
PGHOST_HASURA: pgbouncer
PGPORT: 6432
PGUSER: postgres
PGPASSWORD: postgrespassword
PGDATABASE: postgres
Expand All @@ -61,6 +62,7 @@ services:
ZIPKIN_ENDPOINT: http://zipkin:9411/api/v2/spans
GCP_PROJECT_ID:
TRACING_SAMPLE_RATE: 0.1
MAX_PG_POOL_SIZE: 10
ports:
- "7001:7001"

Expand All @@ -76,15 +78,37 @@ services:
- "6379:6379"

postgres:
image: postgres:12
image: postgres:14
restart: always
volumes:
- ./init-scripts/postgres:/docker-entrypoint-initdb.d
- postgres:/var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: postgrespassword
ports:
- "5432:5432"

pgbouncer:
image: darunrs/pgbouncer:auth_dbname # TODO: Replace with edoburu:pgbouncer image once it supports auth_dbname
environment:
LISTEN_PORT: 6432
DB_HOST: postgres
DB_USER: pgbouncer
DB_PASSWORD: pgbouncer
ADMIN_USERS: postgres
DB_NAME: "*"
AUTH_TYPE: scram-sha-256
AUTH_FILE: /etc/pgbouncer/userlist.txt
AUTH_USER: pgbouncer
AUTH_QUERY: SELECT uname, phash FROM public.user_lookup($1::text)
AUTH_DBNAME: postgres
MAX_CLIENT_CONN: 4000 # Max Connections to PgBouncer
DEFAULT_POOL_SIZE: 10 # Standard connections open per user/db combo
ports:
- "6432:6432"
depends_on:
- postgres

hasura-auth:
build:
context: ./hasura-authentication-service
Expand Down
12 changes: 12 additions & 0 deletions init-scripts/postgres/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE ROLE pgbouncer LOGIN;
ALTER ROLE pgbouncer WITH PASSWORD 'pgbouncer';
CREATE OR REPLACE FUNCTION public.user_lookup(in i_username text, out uname text, out phash text)
RETURNS record AS $$
BEGIN
SELECT usename, passwd FROM pg_catalog.pg_shadow
WHERE usename = i_username INTO uname, phash;
RETURN;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public;
GRANT EXECUTE ON FUNCTION public.user_lookup(text) TO pgbouncer;
61 changes: 33 additions & 28 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,16 +455,17 @@ CREATE TABLE
});

test('indexer builds context and inserts an objects into existing table', async () => {
const mockDmlHandlerInstance: any = { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToInsert = [{
Expand Down Expand Up @@ -493,15 +494,15 @@ CREATE TABLE
query: jest.fn().mockReturnValue({ rows: [] }),
format: jest.fn().mockReturnValue('mock')
} as unknown as PgClient;
const dmlHandlerInstance = DmlHandler.create(genericDbCredentials, mockPgClient);
const dmlHandlerInstance: any = DmlHandler.create(genericDbCredentials, mockPgClient);
const upsertSpy = jest.spyOn(dmlHandlerInstance, 'upsert');
const mockDmlHandler: any = {
create: jest.fn().mockReturnValue(dmlHandlerInstance)
};
const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, dmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');
const promises = [];

Expand Down Expand Up @@ -531,16 +532,17 @@ CREATE TABLE
// Expects limit to be last parameter
return args[args.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }];
});
const mockDmlHandlerInstance: any = { select: selectFn };
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { select: selectFn };
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToSelect = {
Expand All @@ -554,23 +556,24 @@ CREATE TABLE
});

test('indexer builds context and updates multiple objects from existing table', async () => {
const mockDmlHandlerInstance: any = {
update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => {
if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') {
return [{ colA: 'valA' }, { colA: 'valA' }];
}
return [{}];
})
};
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return {
update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => {
if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') {
return [{ colA: 'valA' }, { colA: 'valA' }];
}
return [{}];
})
};
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const whereObj = {
Expand All @@ -586,25 +589,26 @@ CREATE TABLE
});

test('indexer builds context and upserts on existing table', async () => {
const mockDmlHandlerInstance: any = {
upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => {
if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }, { colA: 'valA' }];
} else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }];
}
return [{}];
})
};
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return {
upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => {
if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }, { colA: 'valA' }];
} else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }];
}
return [{}];
})
};
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToInsert = [{
Expand All @@ -631,16 +635,17 @@ CREATE TABLE
});

test('indexer builds context and deletes objects from existing table', async () => {
const mockDmlHandlerInstance: any = { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
return mockDmlHandlerInstance;
})
};

const indexer = new Indexer(defaultIndexerBehavior, {
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
}, genericDbCredentials);
}, genericDbCredentials, mockDmlHandlerInstance);
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const deleteFilter = {
Expand Down
12 changes: 7 additions & 5 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import Provisioner from '../provisioner';
import DmlHandler from '../dml-handler/dml-handler';
import { type IndexerBehavior, LogLevel, Status } from '../stream-handler/stream-handler';
import { type DatabaseConnectionParameters } from '../provisioner/provisioner';
import assert from 'assert';
import { trace, type Span } from '@opentelemetry/api';

interface Dependencies {
Expand Down Expand Up @@ -41,13 +40,15 @@ export default class Indexer {

private readonly indexer_behavior: IndexerBehavior;
private readonly deps: Dependencies;
// TODO: After provisioning migrated out of Runner, fetch credentials before Indexer initialization

private database_connection_parameters: DatabaseConnectionParameters | undefined;
private dml_handler: DmlHandler | undefined;

constructor (
indexerBehavior: IndexerBehavior,
deps?: Partial<Dependencies>,
databaseConnectionParameters = undefined,
dmlHandler = undefined,
) {
this.DEFAULT_HASURA_ROLE = 'append';
this.indexer_behavior = indexerBehavior;
Expand All @@ -59,6 +60,7 @@ export default class Indexer {
...deps,
};
this.database_connection_parameters = databaseConnectionParameters;
this.dml_handler = dmlHandler;
}

async runFunctions (
Expand All @@ -73,7 +75,7 @@ export default class Indexer {

const simultaneousPromises: Array<Promise<any>> = [];
const allMutations: string[] = [];

for (const functionName in functions) {
try {
const indexerFunction = functions[functionName];
Expand Down Expand Up @@ -106,6 +108,7 @@ export default class Indexer {
try {
this.database_connection_parameters = this.database_connection_parameters ??
await this.deps.provisioner.getDatabaseConnectionParameters(hasuraRoleName);
this.dml_handler = this.dml_handler ?? this.deps.DmlHandler.create(this.database_connection_parameters as DatabaseConnectionParameters);
} catch (e) {
const error = e as Error;
simultaneousPromises.push(this.writeLog(LogLevel.ERROR, functionName, blockHeight, 'Failed to get database connection parameters', error.message));
Expand Down Expand Up @@ -265,8 +268,7 @@ export default class Indexer {
try {
const tables = this.getTableNames(schema);
const sanitizedTableNames = new Set<string>();
assert(this.database_connection_parameters !== undefined, 'Database connection parameters are not set');
const dmlHandler: DmlHandler = this.deps.DmlHandler.create(this.database_connection_parameters);
const dmlHandler = this.dml_handler as DmlHandler;

// Generate and collect methods for each table name
const result = tables.reduce((prev, tableName) => {
Expand Down
4 changes: 2 additions & 2 deletions runner/src/pg-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ export default class PgClient {

constructor (
connectionParams: ConnectionParams,
poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 },
poolConfig: PoolConfig = { max: Number(process.env.MAX_PG_POOL_SIZE ?? 10), idleTimeoutMillis: 3000 },
PgPool: typeof Pool = Pool,
pgFormat: typeof pgFormatModule = pgFormatModule
) {
this.pgPool = new PgPool({
user: connectionParams.user,
password: connectionParams.password,
host: connectionParams.host,
port: Number(connectionParams.port),
port: Number(process.env.PGPORT),
database: connectionParams.database,
...poolConfig,
});
Expand Down

0 comments on commit daf289d

Please sign in to comment.