Skip to content

Commit

Permalink
feat: sqlite db locking mitigation via organizations.model.js and aud…
Browse files Browse the repository at this point in the history
…it.model.js accessing mutex
  • Loading branch information
wwills2 committed Nov 18, 2024
1 parent d679516 commit b489859
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 7 deletions.
5 changes: 5 additions & 0 deletions src/models/audit/audit.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { AuditMirror } from './audit.model.mirror';
import ModelTypes from './audit.modeltypes.cjs';
import findDuplicateIssuancesSql from './sql/find-duplicate-issuances.sql.js';
import { Organization } from '../organizations/index.js';
import { waitForSyncRegistries } from '../../utils/model-utils.js';

class Audit extends Model {
static async create(values, options) {
Expand Down Expand Up @@ -107,4 +108,8 @@ Audit.init(ModelTypes, {
updatedAt: true,
});

Audit.addHook('beforeFind', async () => {
await waitForSyncRegistries();
});

export { Audit };
5 changes: 5 additions & 0 deletions src/models/organizations/organizations.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { getConfig } from '../../utils/config-loader';
const { USE_SIMULATOR, AUTO_SUBSCRIBE_FILESTORE } = getConfig().APP;

import ModelTypes from './organizations.modeltypes.cjs';
import { waitForSyncRegistries } from '../../utils/model-utils.js';

class Organization extends Model {
static async getHomeOrg(includeAddress = true) {
Expand Down Expand Up @@ -469,4 +470,8 @@ Organization.init(ModelTypes, {
timestamps: true,
});

Organization.addHook('beforeFind', async () => {
await waitForSyncRegistries();
});

export { Organization };
21 changes: 14 additions & 7 deletions src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import _ from 'lodash';

import { SimpleIntervalJob, Task } from 'toad-scheduler';
import { Mutex } from 'async-mutex';
import { Organization, Audit, ModelKeys, Staging, Meta } from '../models';
import datalayer from '../datalayer';
import {
Expand All @@ -22,15 +21,18 @@ import {
migrateToNewSync,
generateGenerationIndex,
} from '../utils/sync-migration-utils';
import {
processingUpdateTransactionMutex,
syncRegistriesTaskMutex,
} from '../utils/model-utils.js';

dotenv.config();
const mutex = new Mutex();
const CONFIG = getConfig().APP;

const task = new Task('sync-registries', async () => {
if (!mutex.isLocked()) {
logger.debug('running sync registries task');
const releaseMutex = await mutex.acquire();
logger.debug('sync registries task invoked');
if (!syncRegistriesTaskMutex.isLocked()) {
const releaseSyncTaskMutex = await syncRegistriesTaskMutex.acquire();
try {
const hasMigratedToNewSyncMethod = await Meta.findOne({
where: { metaKey: 'migratedToNewSync' },
Expand Down Expand Up @@ -61,7 +63,7 @@ const task = new Task('sync-registries', async () => {
);
}
} finally {
releaseMutex();
releaseSyncTaskMutex();
}
}
});
Expand Down Expand Up @@ -97,8 +99,11 @@ async function createTransaction(callback, afterCommitCallbacks) {
let transaction;
let mirrorTransaction;

logger.info('Starting sequelize transaction and acquiring transaction mutex');
const releaseTransactionMutex =
await processingUpdateTransactionMutex.acquire();

try {
logger.info('Starting sequelize transaction');
// Start a transaction
transaction = await sequelize.transaction();

Expand Down Expand Up @@ -130,6 +135,8 @@ async function createTransaction(callback, afterCommitCallbacks) {
console.error(error);
await transaction.rollback();
}
} finally {
releaseTransactionMutex();
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/utils/model-utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
import { columnsToInclude } from './helpers.js';
import Sequelize from 'sequelize';

import { Mutex } from 'async-mutex';

export async function waitForSyncRegistries() {
if (processingUpdateTransactionMutex.isLocked()) {
// when the mutex is acquired, the current sync transaction has completed
const releaseMutex = await processingUpdateTransactionMutex.acquire();
await releaseMutex();
}
}

/**
* mutex which must be acquired to run the sync-registries task job.
* this mutex exists to prevent multiple registry sync tasks from running at the same time and overloading the chia
* RPC's or causing a SQLite locking error due to multiple task instances trying to commit large update transactions
* @type {Mutex}
*/
export const syncRegistriesTaskMutex = new Mutex();

/**
* mutex which must be acquired when writing registry update information until the transaction has been committed
* audit model update transactions are large and lock the DB for long periods.
* @type {Mutex}
*/
export const processingUpdateTransactionMutex = new Mutex();

export function formatModelAssociationName(model) {
if (model == null || model.model == null) return '';

Expand Down

0 comments on commit b489859

Please sign in to comment.