Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

typeorm: typeorm-store rework #293

Open
wants to merge 11 commits into
base: beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion typeorm/typeorm-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
},
"dependencies": {
"@subsquid/typeorm-config": "^4.1.1",
"@subsquid/util-internal": "^3.2.0"
"@subsquid/util-internal": "^3.2.0",
"@subsquid/logger": "^1.3.3"
},
"peerDependencies": {
"typeorm": "^0.3.17",
Expand Down
198 changes: 137 additions & 61 deletions typeorm/typeorm-store/src/database.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,88 @@
import {createOrmConfig} from '@subsquid/typeorm-config'
import {assertNotNull, last, maybeLast} from '@subsquid/util-internal'
import {assertNotNull, def, last, maybeLast} from '@subsquid/util-internal'
import assert from 'assert'
import {DataSource, EntityManager} from 'typeorm'
import {ChangeTracker, rollbackBlock} from './hot'
import {ChangeWriter, rollbackBlock} from './utils/changeWriter'
import {DatabaseState, FinalTxInfo, HashAndHeight, HotTxInfo} from './interfaces'
import {Store} from './store'
import {CacheMode, FlushMode, ResetMode, Store} from './store'
import {createLogger} from '@subsquid/logger'
import {StateManager} from './utils/stateManager'
import {sortMetadatasInCommitOrder} from './utils/commitOrder'
import {IsolationLevel} from './utils/tx'


export type IsolationLevel = 'SERIALIZABLE' | 'READ COMMITTED' | 'REPEATABLE READ'
export {IsolationLevel}


export interface TypeormDatabaseOptions {

/**
* Support for storing the data on unfinalized / hot
* blocks and the related rollbacks.
* See {@link https://docs.subsquid.io/sdk/resources/basics/unfinalized-blocks/}
*
* @defaultValue true
*/
supportHotBlocks?: boolean

/**
* PostgreSQL ransaction isolation level
* See {@link https://www.postgresql.org/docs/current/transaction-iso.html}
*
* @defaultValue 'SERIALIZABLE'
*/
isolationLevel?: IsolationLevel

/**
* When the queries should be sent to the database?
*
* @defaultValue FlushMode.AUTO
*/
flushMode?: FlushMode

/**
* When the cache should be dropped?
*
* @defaultValue ResetMode.BATCH
*/
resetMode?: ResetMode

/**
* Which database reads should be cached?
*
* @defaultValue CacheMode.ALL
*/
cacheMode?: CacheMode

/**
* Name of the database schema that the processor
* will use to track its state (height and hash of
* the highest indexed block). Set this if you run
* more than one processor against the same DB.
*
* @defaultValue 'squid_processor'
*/
stateSchema?: string

/**
* Directory with model definitions (at lib/model)
* and migrations (at db/migrations).
*
* @defaultValue process.cwd()
*/
projectDir?: string
}


const STATE_MANAGERS: WeakMap<DataSource, StateManager> = new WeakMap()


export class TypeormDatabase {
private statusSchema: string
private isolationLevel: IsolationLevel
private flushMode: FlushMode
private resetMode: ResetMode
private cacheMode: CacheMode
private con?: DataSource
private projectDir: string

Expand All @@ -29,6 +91,9 @@ export class TypeormDatabase {
constructor(options?: TypeormDatabaseOptions) {
this.statusSchema = options?.stateSchema || 'squid_processor'
this.isolationLevel = options?.isolationLevel || 'SERIALIZABLE'
this.resetMode = options?.resetMode || ResetMode.BATCH
this.flushMode = options?.flushMode || FlushMode.AUTO
this.cacheMode = options?.cacheMode || CacheMode.ALL
this.supportsHotBlocks = options?.supportHotBlocks !== false
this.projectDir = options?.projectDir || process.cwd()
}
Expand All @@ -43,47 +108,45 @@ export class TypeormDatabase {

try {
return await this.con.transaction('SERIALIZABLE', em => this.initTransaction(em))
} catch(e: any) {
} catch (e: any) {
await this.con.destroy().catch(() => {}) // ignore error
this.con = undefined
throw e
}
}

async disconnect(): Promise<void> {
await this.con?.destroy().finally(() => this.con = undefined)
await this.con?.destroy().finally(() => (this.con = undefined))
}

private async initTransaction(em: EntityManager): Promise<DatabaseState> {
let schema = this.escapedSchema()

await em.query(
`CREATE SCHEMA IF NOT EXISTS ${schema}`
)
await em.query(`CREATE SCHEMA IF NOT EXISTS ${schema}`)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.status (` +
`id int4 primary key, ` +
`height int4 not null, ` +
`hash text DEFAULT '0x', ` +
`nonce int4 DEFAULT 0`+
`)`
`id int4 primary key, ` +
`height int4 not null, ` +
`hash text DEFAULT '0x', ` +
`nonce int4 DEFAULT 0` +
`)`
)
await em.query( // for databases created by prev version of typeorm store
await em.query(
// for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS hash text DEFAULT '0x'`
)
await em.query( // for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0`
)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)`
// for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0`
)
await em.query(`CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)`)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.hot_change_log (` +
`block_height int4 not null references ${schema}.hot_block on delete cascade, ` +
`index int4 not null, ` +
`change jsonb not null, ` +
`PRIMARY KEY (block_height, index)` +
`)`
`block_height int4 not null references ${schema}.hot_block on delete cascade, ` +
`index int4 not null, ` +
`change jsonb not null, ` +
`PRIMARY KEY (block_height, index)` +
`)`
)

let status: (HashAndHeight & {nonce: number})[] = await em.query(
Expand All @@ -94,9 +157,7 @@ export class TypeormDatabase {
status.push({height: -1, hash: '0x', nonce: 0})
}

let top: HashAndHeight[] = await em.query(
`SELECT height, hash FROM ${schema}.hot_block ORDER BY height`
)
let top: HashAndHeight[] = await em.query(`SELECT height, hash FROM ${schema}.hot_block ORDER BY height`)

return assertStateInvariants({...status[0], top})
}
Expand All @@ -110,9 +171,7 @@ export class TypeormDatabase {

assert(status.length == 1)

let top: HashAndHeight[] = await em.query(
`SELECT hash, height FROM ${schema}.hot_block ORDER BY height`
)
let top: HashAndHeight[] = await em.query(`SELECT hash, height FROM ${schema}.hot_block ORDER BY height`)

return assertStateInvariants({...status[0], top})
}
Expand Down Expand Up @@ -146,15 +205,21 @@ export class TypeormDatabase {
})
}

transactHot2(info: HotTxInfo, cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise<void>): Promise<void> {
transactHot2(
info: HotTxInfo,
cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise<void>
): Promise<void> {
return this.submit(async em => {
let state = await this.getState(em)
let chain = [state, ...state.top]

assertChainContinuity(info.baseHead, info.newBlocks)
assert(info.finalizedHead.height <= (maybeLast(info.newBlocks) ?? info.baseHead).height)

assert(chain.find(b => b.hash === info.baseHead.hash), RACE_MSG)
assert(
chain.find(b => b.hash === info.baseHead.hash),
RACE_MSG
)
if (info.newBlocks.length == 0) {
assert(last(chain).hash === info.baseHead.hash, RACE_MSG)
}
Expand All @@ -179,7 +244,7 @@ export class TypeormDatabase {
await this.performUpdates(
store => cb(store, i, i + 1),
em,
new ChangeTracker(em, this.statusSchema, b.height)
new ChangeWriter(em, this.statusSchema, b.height)
)
}
}
Expand All @@ -195,17 +260,14 @@ export class TypeormDatabase {
}

private deleteHotBlocks(em: EntityManager, finalizedHeight: number): Promise<void> {
return em.query(
`DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`,
[finalizedHeight]
)
return em.query(`DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`, [finalizedHeight])
}

private insertHotBlock(em: EntityManager, block: HashAndHeight): Promise<void> {
return em.query(
`INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`,
[block.height, block.hash]
)
return em.query(`INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`, [
block.height,
block.hash,
])
}

private async updateStatus(em: EntityManager, nonce: number, next: HashAndHeight): Promise<void> {
Expand All @@ -220,32 +282,30 @@ export class TypeormDatabase {

// Will never happen if isolation level is SERIALIZABLE or REPEATABLE_READ,
// but occasionally people use multiprocessor setups and READ_COMMITTED.
assert.strictEqual(
rowsChanged,
1,
RACE_MSG
)
assert.strictEqual(rowsChanged, 1, RACE_MSG)
}

private async performUpdates(
cb: (store: Store) => Promise<void>,
em: EntityManager,
changeTracker?: ChangeTracker
changeWriter?: ChangeWriter
): Promise<void> {
let running = true

let store = new Store(
() => {
assert(running, `too late to perform db updates, make sure you haven't forgot to await on db query`)
return em
},
changeTracker
)
let store = new Store({
em,
state: this.getStateManager(),
logger: this.getLogger(),
changes: changeWriter,
cacheMode: this.cacheMode,
flushMode: this.flushMode,
resetMode: this.resetMode,
})

try {
await cb(store)
await store.flush()
if (this.resetMode === ResetMode.BATCH) store.reset()
} finally {
running = false
store['isClosed'] = true
}
}

Expand All @@ -256,7 +316,7 @@ export class TypeormDatabase {
let con = this.con
assert(con != null, 'not connected')
return await con.transaction(this.isolationLevel, tx)
} catch(e: any) {
} catch (e: any) {
if (e.code == '40001' && retries) {
retries -= 1
} else {
Expand All @@ -270,11 +330,28 @@ export class TypeormDatabase {
let con = assertNotNull(this.con)
return con.driver.escape(this.statusSchema)
}
}

@def
private getLogger() {
return createLogger('sqd:typeorm-db')
}

const RACE_MSG = 'status table was updated by foreign process, make sure no other processor is running'
private getStateManager() {
let con = assertNotNull(this.con)
let stateManager = STATE_MANAGERS.get(con)
if (stateManager != null) return stateManager

stateManager = new StateManager({
commitOrder: sortMetadatasInCommitOrder(con),
logger: this.getLogger(),
})
STATE_MANAGERS.set(con, stateManager)

return stateManager
}
}

const RACE_MSG = 'status table was updated by foreign process, make sure no other processor is running'

function assertStateInvariants(state: DatabaseState): DatabaseState {
let height = state.height
Expand All @@ -287,7 +364,6 @@ function assertStateInvariants(state: DatabaseState): DatabaseState {
return state
}


function assertChainContinuity(base: HashAndHeight, chain: HashAndHeight[]) {
let prev = base
for (let b of chain) {
Expand Down
4 changes: 2 additions & 2 deletions typeorm/typeorm-store/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './database'
export {EntityClass, FindManyOptions, FindOneOptions, Store} from './store'
export * from './store'
export * from './decorators'
export * from './transformers'
export * from './transformers'
Loading
Loading