diff --git a/docker-compose.yml b/docker-compose.yml index 9b77bc5b1..16e2a6cfc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,6 +33,7 @@ services: RUNNER_URL: http://runner:7001 REGISTRY_CONTRACT_ID: dev-queryapi.dataplatform.near RUST_LOG: info + RPC_URL: https://archival-rpc.mainnet.near.org runner: build: @@ -45,9 +46,9 @@ services: HASURA_ENDPOINT: http://hasura-graphql:8080 HASURA_ADMIN_SECRET: myadminsecretkey REDIS_CONNECTION_STRING: redis://redis - PGHOST: postgres - PGHOST_HASURA: postgres - PGPORT: 5432 + PGHOST: pgbouncer + PGHOST_HASURA: pgbouncer + PGPORT: 6432 PGUSER: postgres PGPASSWORD: postgrespassword PGDATABASE: postgres @@ -61,6 +62,7 @@ services: ZIPKIN_ENDPOINT: http://zipkin:9411/api/v2/spans GCP_PROJECT_ID: TRACING_SAMPLE_RATE: 0.1 + MAX_PG_POOL_SIZE: 10 ports: - "7001:7001" @@ -76,15 +78,37 @@ services: - "6379:6379" postgres: - image: postgres:12 + image: postgres:14 restart: always volumes: + - ./init-scripts/postgres:/docker-entrypoint-initdb.d - postgres:/var/lib/postgresql/data environment: POSTGRES_PASSWORD: postgrespassword ports: - "5432:5432" + pgbouncer: + image: darunrs/pgbouncer:auth_dbname # TODO: Replace with edoburu:pgbouncer image once it supports auth_dbname + environment: + LISTEN_PORT: 6432 + DB_HOST: postgres + DB_USER: pgbouncer + DB_PASSWORD: pgbouncer + ADMIN_USERS: postgres + DB_NAME: "*" + AUTH_TYPE: scram-sha-256 + AUTH_FILE: /etc/pgbouncer/userlist.txt + AUTH_USER: pgbouncer + AUTH_QUERY: SELECT uname, phash FROM public.user_lookup($1::text) + AUTH_DBNAME: postgres + MAX_CLIENT_CONN: 4000 # Max Connections to PgBouncer + DEFAULT_POOL_SIZE: 10 # Standard connections open per user/db combo + ports: + - "6432:6432" + depends_on: + - postgres + hasura-auth: build: context: ./hasura-authentication-service diff --git a/init-scripts/postgres/init.sql b/init-scripts/postgres/init.sql new file mode 100644 index 000000000..ffc461d02 --- /dev/null +++ b/init-scripts/postgres/init.sql @@ -0,0 +1,12 @@ +CREATE ROLE pgbouncer LOGIN; +ALTER ROLE pgbouncer WITH PASSWORD 'pgbouncer'; +CREATE OR REPLACE FUNCTION public.user_lookup(in i_username text, out uname text, out phash text) +RETURNS record AS $$ +BEGIN + SELECT usename, passwd FROM pg_catalog.pg_shadow + WHERE usename = i_username INTO uname, phash; + RETURN; +END; +$$ LANGUAGE plpgsql SECURITY DEFINER; +REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public; +GRANT EXECUTE ON FUNCTION public.user_lookup(text) TO pgbouncer; diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 45413b988..ca1be93b0 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -455,16 +455,17 @@ CREATE TABLE }); test('indexer builds context and inserts an objects into existing table', async () => { + const mockDmlHandlerInstance: any = { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; const mockDmlHandler: any = { create: jest.fn().mockImplementation(() => { - return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + return mockDmlHandlerInstance; }) }; const indexer = new Indexer(defaultIndexerBehavior, { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler - }, genericDbCredentials); + }, genericDbCredentials, mockDmlHandlerInstance); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -493,7 +494,7 @@ CREATE TABLE query: jest.fn().mockReturnValue({ rows: [] }), format: jest.fn().mockReturnValue('mock') } as unknown as PgClient; - const dmlHandlerInstance = DmlHandler.create(genericDbCredentials, mockPgClient); + const dmlHandlerInstance: any = DmlHandler.create(genericDbCredentials, mockPgClient); const upsertSpy = jest.spyOn(dmlHandlerInstance, 'upsert'); const mockDmlHandler: any = { create: jest.fn().mockReturnValue(dmlHandlerInstance) @@ -501,7 +502,7 @@ CREATE TABLE const indexer = new Indexer(defaultIndexerBehavior, { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler - }, genericDbCredentials); + }, genericDbCredentials, dmlHandlerInstance); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const promises = []; @@ -531,16 +532,17 @@ CREATE TABLE // Expects limit to be last parameter return args[args.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }]; }); + const mockDmlHandlerInstance: any = { select: selectFn }; const mockDmlHandler: any = { create: jest.fn().mockImplementation(() => { - return { select: selectFn }; + return mockDmlHandlerInstance; }) }; const indexer = new Indexer(defaultIndexerBehavior, { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler - }, genericDbCredentials); + }, genericDbCredentials, mockDmlHandlerInstance); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToSelect = { @@ -554,23 +556,24 @@ CREATE TABLE }); test('indexer builds context and updates multiple objects from existing table', async () => { + const mockDmlHandlerInstance: any = { + update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => { + if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') { + return [{ colA: 'valA' }, { colA: 'valA' }]; + } + return [{}]; + }) + }; const mockDmlHandler: any = { create: jest.fn().mockImplementation(() => { - return { - update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => { - if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') { - return [{ colA: 'valA' }, { colA: 'valA' }]; - } - return [{}]; - }) - }; + return mockDmlHandlerInstance; }) }; const indexer = new Indexer(defaultIndexerBehavior, { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler - }, genericDbCredentials); + }, genericDbCredentials, mockDmlHandlerInstance); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const whereObj = { @@ -586,25 +589,26 @@ CREATE TABLE }); test('indexer builds context and upserts on existing table', async () => { + const mockDmlHandlerInstance: any = { + upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => { + if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) { + return [{ colA: 'valA' }, { colA: 'valA' }]; + } else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) { + return [{ colA: 'valA' }]; + } + return [{}]; + }) + }; const mockDmlHandler: any = { create: jest.fn().mockImplementation(() => { - return { - upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => { - if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) { - return [{ colA: 'valA' }, { colA: 'valA' }]; - } else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) { - return [{ colA: 'valA' }]; - } - return [{}]; - }) - }; + return mockDmlHandlerInstance; }) }; const indexer = new Indexer(defaultIndexerBehavior, { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler - }, genericDbCredentials); + }, genericDbCredentials, mockDmlHandlerInstance); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -631,16 +635,17 @@ CREATE TABLE }); test('indexer builds context and deletes objects from existing table', async () => { + const mockDmlHandlerInstance: any = { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; const mockDmlHandler: any = { create: jest.fn().mockImplementation(() => { - return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + return mockDmlHandlerInstance; }) }; const indexer = new Indexer(defaultIndexerBehavior, { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler - }, genericDbCredentials); + }, genericDbCredentials, mockDmlHandlerInstance); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const deleteFilter = { diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index a9fda9ab3..f455d739f 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -7,7 +7,6 @@ import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; import { type IndexerBehavior, LogLevel, Status } from '../stream-handler/stream-handler'; import { type DatabaseConnectionParameters } from '../provisioner/provisioner'; -import assert from 'assert'; import { trace, type Span } from '@opentelemetry/api'; interface Dependencies { @@ -41,13 +40,15 @@ export default class Indexer { private readonly indexer_behavior: IndexerBehavior; private readonly deps: Dependencies; - // TODO: After provisioning migrated out of Runner, fetch credentials before Indexer initialization + private database_connection_parameters: DatabaseConnectionParameters | undefined; + private dml_handler: DmlHandler | undefined; constructor ( indexerBehavior: IndexerBehavior, deps?: Partial, databaseConnectionParameters = undefined, + dmlHandler = undefined, ) { this.DEFAULT_HASURA_ROLE = 'append'; this.indexer_behavior = indexerBehavior; @@ -59,6 +60,7 @@ export default class Indexer { ...deps, }; this.database_connection_parameters = databaseConnectionParameters; + this.dml_handler = dmlHandler; } async runFunctions ( @@ -73,7 +75,7 @@ export default class Indexer { const simultaneousPromises: Array> = []; const allMutations: string[] = []; - + for (const functionName in functions) { try { const indexerFunction = functions[functionName]; @@ -106,6 +108,7 @@ export default class Indexer { try { this.database_connection_parameters = this.database_connection_parameters ?? await this.deps.provisioner.getDatabaseConnectionParameters(hasuraRoleName); + this.dml_handler = this.dml_handler ?? this.deps.DmlHandler.create(this.database_connection_parameters as DatabaseConnectionParameters); } catch (e) { const error = e as Error; simultaneousPromises.push(this.writeLog(LogLevel.ERROR, functionName, blockHeight, 'Failed to get database connection parameters', error.message)); @@ -265,8 +268,7 @@ export default class Indexer { try { const tables = this.getTableNames(schema); const sanitizedTableNames = new Set(); - assert(this.database_connection_parameters !== undefined, 'Database connection parameters are not set'); - const dmlHandler: DmlHandler = this.deps.DmlHandler.create(this.database_connection_parameters); + const dmlHandler = this.dml_handler as DmlHandler; // Generate and collect methods for each table name const result = tables.reduce((prev, tableName) => { diff --git a/runner/src/pg-client.ts b/runner/src/pg-client.ts index 9cee0ca69..acd3eb288 100644 --- a/runner/src/pg-client.ts +++ b/runner/src/pg-client.ts @@ -15,7 +15,7 @@ export default class PgClient { constructor ( connectionParams: ConnectionParams, - poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 }, + poolConfig: PoolConfig = { max: Number(process.env.MAX_PG_POOL_SIZE ?? 10), idleTimeoutMillis: 3000 }, PgPool: typeof Pool = Pool, pgFormat: typeof pgFormatModule = pgFormatModule ) { @@ -23,7 +23,7 @@ export default class PgClient { user: connectionParams.user, password: connectionParams.password, host: connectionParams.host, - port: Number(connectionParams.port), + port: Number(process.env.PGPORT), database: connectionParams.database, ...poolConfig, });