diff --git a/src/models/audit/audit.model.js b/src/models/audit/audit.model.js index e10057df..31a6610c 100644 --- a/src/models/audit/audit.model.js +++ b/src/models/audit/audit.model.js @@ -7,6 +7,7 @@ import { AuditMirror } from './audit.model.mirror'; import ModelTypes from './audit.modeltypes.cjs'; import findDuplicateIssuancesSql from './sql/find-duplicate-issuances.sql.js'; import { Organization } from '../organizations/index.js'; +import { waitForSyncRegistries } from '../../utils/model-utils.js'; class Audit extends Model { static async create(values, options) { @@ -107,4 +108,8 @@ Audit.init(ModelTypes, { updatedAt: true, }); +Audit.addHook('beforeFind', async () => { + await waitForSyncRegistries(); +}); + export { Audit }; diff --git a/src/models/organizations/organizations.model.js b/src/models/organizations/organizations.model.js index 2e62da8f..5c4cc850 100644 --- a/src/models/organizations/organizations.model.js +++ b/src/models/organizations/organizations.model.js @@ -19,6 +19,7 @@ import { getConfig } from '../../utils/config-loader'; const { USE_SIMULATOR, AUTO_SUBSCRIBE_FILESTORE } = getConfig().APP; import ModelTypes from './organizations.modeltypes.cjs'; +import { waitForSyncRegistries } from '../../utils/model-utils.js'; class Organization extends Model { static async getHomeOrg(includeAddress = true) { @@ -469,4 +470,8 @@ Organization.init(ModelTypes, { timestamps: true, }); +Organization.addHook('beforeFind', async () => { + await waitForSyncRegistries(); +}); + export { Organization }; diff --git a/src/tasks/sync-registries.js b/src/tasks/sync-registries.js index 095bd440..2e164e72 100644 --- a/src/tasks/sync-registries.js +++ b/src/tasks/sync-registries.js @@ -1,7 +1,6 @@ import _ from 'lodash'; import { SimpleIntervalJob, Task } from 'toad-scheduler'; -import { Mutex } from 'async-mutex'; import { Organization, Audit, ModelKeys, Staging, Meta } from '../models'; import datalayer from '../datalayer'; import { @@ -22,15 +21,18 @@ import { migrateToNewSync, generateGenerationIndex, } from '../utils/sync-migration-utils'; +import { + processingUpdateTransactionMutex, + syncRegistriesTaskMutex, +} from '../utils/model-utils.js'; dotenv.config(); -const mutex = new Mutex(); const CONFIG = getConfig().APP; const task = new Task('sync-registries', async () => { - if (!mutex.isLocked()) { - logger.debug('running sync registries task'); - const releaseMutex = await mutex.acquire(); + logger.debug('sync registries task invoked'); + if (!syncRegistriesTaskMutex.isLocked()) { + const releaseSyncTaskMutex = await syncRegistriesTaskMutex.acquire(); try { const hasMigratedToNewSyncMethod = await Meta.findOne({ where: { metaKey: 'migratedToNewSync' }, @@ -61,7 +63,7 @@ const task = new Task('sync-registries', async () => { ); } } finally { - releaseMutex(); + releaseSyncTaskMutex(); } } }); @@ -97,8 +99,11 @@ async function createTransaction(callback, afterCommitCallbacks) { let transaction; let mirrorTransaction; + logger.info('Starting sequelize transaction and acquiring transaction mutex'); + const releaseTransactionMutex = + await processingUpdateTransactionMutex.acquire(); + try { - logger.info('Starting sequelize transaction'); // Start a transaction transaction = await sequelize.transaction(); @@ -130,6 +135,8 @@ async function createTransaction(callback, afterCommitCallbacks) { console.error(error); await transaction.rollback(); } + } finally { + releaseTransactionMutex(); } } diff --git a/src/utils/model-utils.js b/src/utils/model-utils.js index f0fbca82..02decbd0 100644 --- a/src/utils/model-utils.js +++ b/src/utils/model-utils.js @@ -1,6 +1,31 @@ import { columnsToInclude } from './helpers.js'; import Sequelize from 'sequelize'; +import { Mutex } from 'async-mutex'; + +export async function waitForSyncRegistries() { + if (processingUpdateTransactionMutex.isLocked()) { + // when the mutex is acquired, the current sync transaction has completed + const releaseMutex = await processingUpdateTransactionMutex.acquire(); + await releaseMutex(); + } +} + +/** + * mutex which must be acquired to run the sync-registries task job. + * this mutex exists to prevent multiple registry sync tasks from running at the same time and overloading the chia + * RPC's or causing a SQLite locking error due to multiple task instances trying to commit large update transactions + * @type {Mutex} + */ +export const syncRegistriesTaskMutex = new Mutex(); + +/** + * mutex which must be acquired when writing registry update information until the transaction has been committed + * audit model update transactions are large and lock the DB for long periods. + * @type {Mutex} + */ +export const processingUpdateTransactionMutex = new Mutex(); + export function formatModelAssociationName(model) { if (model == null || model.model == null) return '';