From d2ac1a614557300adbe345ffad16a13befe35e28 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Thu, 7 Dec 2023 12:55:05 -0500 Subject: [PATCH] fix: index based sync --- ...0231207142225-AddGenerationIndexToAudit.js | 22 ++ src/database/migrations/index.js | 5 + src/models/audit/audit.modeltypes.cjs | 3 + src/tasks/sync-registries.js | 271 +++++++----------- src/utils/datalayer-utils.js | 36 +++ src/utils/sync-migration-utils.js | 110 +++++++ 6 files changed, 276 insertions(+), 171 deletions(-) create mode 100644 src/database/migrations/20231207142225-AddGenerationIndexToAudit.js create mode 100644 src/utils/sync-migration-utils.js diff --git a/src/database/migrations/20231207142225-AddGenerationIndexToAudit.js b/src/database/migrations/20231207142225-AddGenerationIndexToAudit.js new file mode 100644 index 00000000..8011fc5d --- /dev/null +++ b/src/database/migrations/20231207142225-AddGenerationIndexToAudit.js @@ -0,0 +1,22 @@ +'use strict'; + +export default { + async up(queryInterface, Sequelize) { + await Promise.all( + ['audit'].map((table) => { + queryInterface.addColumn(table, 'generation', { + type: Sequelize.INTEGER, + allowNull: true, + }); + }), + ); + }, + + async down(queryInterface) { + await Promise.all( + ['audit'].map((table) => { + queryInterface.removeColumn(table, 'generation'); + }), + ); + }, +}; diff --git a/src/database/migrations/index.js b/src/database/migrations/index.js index 1ff83fb2..ab0495b1 100644 --- a/src/database/migrations/index.js +++ b/src/database/migrations/index.js @@ -32,6 +32,7 @@ import AddIsTransferColumn from './20220825124702-add-isTransfer-column'; import AddOrgMetadata from './20220831023546-add-org-metadata'; import OrgSyncStatus from './20231020201652-OrgSyncStatus'; import OrgSyncRemaining from './20231020214357-OrgSyncRemainingCount'; +import AddGenerationIndexToAudit from './20231207142225-AddGenerationIndexToAudit'; export const migrations = [ { @@ -174,4 +175,8 @@ export const migrations = [ migration: OrgSyncRemaining, name: '20231020214357-OrgSyncRemainingCount', }, + { + migration: AddGenerationIndexToAudit, + name: '20231207142225-AddGenerationIndexToAudit', + }, ]; diff --git a/src/models/audit/audit.modeltypes.cjs b/src/models/audit/audit.modeltypes.cjs index fbd0518d..5afa682b 100644 --- a/src/models/audit/audit.modeltypes.cjs +++ b/src/models/audit/audit.modeltypes.cjs @@ -55,4 +55,7 @@ module.exports = { type: Sequelize.DATE, defaultValue: Sequelize.NOW, }, + generation: { + type: Sequelize.INTEGER, + } }; diff --git a/src/tasks/sync-registries.js b/src/tasks/sync-registries.js index bcdd3356..9381313f 100644 --- a/src/tasks/sync-registries.js +++ b/src/tasks/sync-registries.js @@ -1,11 +1,14 @@ import _ from 'lodash'; -import { Sequelize } from 'sequelize'; import { SimpleIntervalJob, Task } from 'toad-scheduler'; import { Mutex } from 'async-mutex'; import { Organization, Audit, ModelKeys, Staging, Meta } from '../models'; import datalayer from '../datalayer'; -import { decodeHex, encodeHex } from '../utils/datalayer-utils'; +import { + decodeHex, + encodeHex, + optimizeAndSortKvDiff, +} from '../utils/datalayer-utils'; import dotenv from 'dotenv'; import { logger } from '../config/logger.cjs'; import { sequelize, sequelizeMirror } from '../database'; @@ -15,6 +18,10 @@ import { assertWalletIsSynced, } from '../utils/data-assertions'; import { mirrorDBEnabled } from '../database'; +import { + migrateToNewSync, + generateGenerationIndex, +} from '../utils/sync-migration-utils'; dotenv.config(); const mutex = new Mutex(); @@ -28,59 +35,24 @@ const task = new Task('sync-registries', async () => { where: { metaKey: 'migratedToNewSync' }, }); - if (hasMigratedToNewSyncMethod || CONFIG.USE_SIMULATOR) { - await processJob(); - } else { - logger.info( - 'Initiating migration to the new synchronization method. This will require a complete resynchronization of all data and may take some time.', - ); + const hasMigratedToGenerationIndexSync = await Meta.findOne({ + where: { metaKey: 'migratedToIndexBasedSync' }, + }); - for (const modelKey of Object.keys(ModelKeys)) { - logger.info(`Resetting ${modelKey}`); - await ModelKeys[modelKey].destroy({ - where: { - id: { - [Sequelize.Op.ne]: null, - }, - }, - truncate: true, - }); + if (hasMigratedToNewSyncMethod || CONFIG.USE_SIMULATOR) { + if (hasMigratedToGenerationIndexSync) { + await processJob(); + } else { + await generateGenerationIndex(); } - - logger.info(`Resetting Audit Table`); - await Audit.destroy({ - where: { - id: { - [Sequelize.Op.ne]: null, - }, - }, - truncate: true, - }); - - await Meta.upsert({ - metaKey: 'migratedToNewSync', - metaValue: 'true', - }); - - await Organization.update( - { - synced: false, - sync_remaining: 0, - }, - { - where: { - id: { - [Sequelize.Op.ne]: null, - }, - }, - }, - ); - - logger.info(`Migration Complete`); + } else { + await migrateToNewSync(); } } catch (error) { logger.error(`Error during datasync: ${error.message}`); + console.trace(error); + // Log additional information if present in the error object if (error.response && error.response.body) { logger.error( @@ -116,42 +88,6 @@ const processJob = async () => { } }; -/** - * Optimizes and sorts an array of key-value differences. - * NOTE: The only reason this function works is because we treat INSERTS as UPSERTS - * If that ever changes, this function will need to be removed. - * - * @param {Array} kvDiff - An array of objects with { key, type } structure. - * @returns {Array} - An optimized and sorted array. - */ -function optimizeAndSortKvDiff(kvDiff) { - const deleteKeys = new Set(); - const insertKeys = new Set(); - - // Populate the Sets for quicker lookup - for (const diff of kvDiff) { - if (diff.type === 'DELETE') { - deleteKeys.add(diff.key); - } else if (diff.type === 'INSERT') { - insertKeys.add(diff.key); - } - } - - // Remove DELETE keys that also exist in INSERT keys - for (const insertKey of insertKeys) { - deleteKeys.delete(insertKey); - } - - // Filter and sort the array based on the optimized DELETE keys - const filteredArray = kvDiff.filter((diff) => { - return diff.type !== 'DELETE' || deleteKeys.has(diff.key); - }); - - return filteredArray.sort((a, b) => { - return a.type === b.type ? 0 : a.type === 'DELETE' ? -1 : 1; - }); -} - async function createTransaction(callback, afterCommitCallbacks) { let result = null; @@ -217,7 +153,7 @@ const syncOrganizationAudit = async (organization) => { } else { lastRootSaved = await Audit.findOne({ where: { registryId: organization.registryId }, - order: [['onchainConfirmationTimeStamp', 'DESC']], + order: [['generation', 'DESC']], raw: true, }); @@ -235,14 +171,16 @@ const syncOrganizationAudit = async (organization) => { if (!lastRootSaved) { logger.info(`Syncing new registry ${organization.name}`); + await Audit.create({ orgUid: organization.orgUid, registryId: organization.registryId, rootHash: generation.root_hash, type: 'CREATE REGISTRY', + generation: 0, change: null, table: null, - onchainConfirmationTimeStamp: generation.timestamp, + onchainConfirmationTimeStamp: generation.timestamp.toString(), }); // Destroy existing records for this singleton @@ -264,20 +202,16 @@ const syncOrganizationAudit = async (organization) => { generation = lastRootSaved; } - let isSynced = - rootHistory[rootHistory.length - 1].root_hash === generation.root_hash; - - const historyIndex = rootHistory.findIndex( - (root) => root.timestamp === generation.timestamp, - ); + const historyIndex = generation.generation + 1; - if (historyIndex === -1) { + if (historyIndex > rootHistory.length) { logger.error( `Could not find root history for ${organization.name} with timestamp ${generation.timestamp}, something is wrong and the sync for this organization will be paused until this is resolved.`, ); } - const syncRemaining = rootHistory.length - historyIndex - 1; + const syncRemaining = rootHistory.length - generation.generation; + const isSynced = syncRemaining === 0; await Organization.update( { @@ -293,7 +227,7 @@ const syncOrganizationAudit = async (organization) => { // Organization not synced, sync it logger.info(' '); - logger.info(`Syncing Registry: ${_.get(organization, 'name')}`); + logger.info(`Syncing ${organization.name} generation ${historyIndex}`); logger.info( `${organization.name} is ${ syncRemaining + 1 @@ -353,90 +287,85 @@ const syncOrganizationAudit = async (organization) => { const optimizedKvDiff = optimizeAndSortKvDiff(kvDiff); const updateTransaction = async (transaction, mirrorTransaction) => { + logger.info(`Syncing ${organization.name} generation ${historyIndex}`); for (const diff of optimizedKvDiff) { const key = decodeHex(diff.key); const modelKey = key.split('|')[0]; - if (!['comment', 'author'].includes(key)) { - const auditData = { - orgUid: organization.orgUid, - registryId: organization.registryId, - rootHash: root2.root_hash, - type: diff.type, - table: modelKey, - change: decodeHex(diff.value), - onchainConfirmationTimeStamp: root2.timestamp, - comment: _.get( - JSON.parse( - decodeHex(_.get(comment, '[0].value', encodeHex('{}'))), - ), - 'comment', - '', - ), - author: _.get( - JSON.parse( - decodeHex(_.get(author, '[0].value', encodeHex('{}'))), - ), - 'author', - '', - ), - }; - - if (modelKey) { - const record = JSON.parse(decodeHex(diff.value)); - const primaryKeyValue = - record[ModelKeys[modelKey].primaryKeyAttributes[0]]; - - if (diff.type === 'INSERT') { - logger.info(`UPSERTING: ${modelKey} - ${primaryKeyValue}`); - await ModelKeys[modelKey].upsert(record, { - transaction, - mirrorTransaction, - }); - } else if (diff.type === 'DELETE') { - logger.info(`DELETING: ${modelKey} - ${primaryKeyValue}`); - await ModelKeys[modelKey].destroy({ - where: { - [ModelKeys[modelKey].primaryKeyAttributes[0]]: - primaryKeyValue, - }, - transaction, - mirrorTransaction, - }); - } + const auditData = { + orgUid: organization.orgUid, + registryId: organization.registryId, + rootHash: root2.root_hash, + type: diff.type, + table: modelKey, + change: decodeHex(diff.value), + onchainConfirmationTimeStamp: root2.timestamp, + generation: historyIndex, + comment: _.get( + JSON.parse(decodeHex(_.get(comment, '[0].value', encodeHex('{}')))), + 'comment', + '', + ), + author: _.get( + JSON.parse(decodeHex(_.get(author, '[0].value', encodeHex('{}')))), + 'author', + '', + ), + }; + + if (modelKey) { + const record = JSON.parse(decodeHex(diff.value)); + const primaryKeyValue = + record[ModelKeys[modelKey].primaryKeyAttributes[0]]; + + if (diff.type === 'INSERT') { + logger.info(`UPSERTING: ${modelKey} - ${primaryKeyValue}`); + await ModelKeys[modelKey].upsert(record, { + transaction, + mirrorTransaction, + }); + } else if (diff.type === 'DELETE') { + logger.info(`DELETING: ${modelKey} - ${primaryKeyValue}`); + await ModelKeys[modelKey].destroy({ + where: { + [ModelKeys[modelKey].primaryKeyAttributes[0]]: primaryKeyValue, + }, + transaction, + mirrorTransaction, + }); + } - if (organization.orgUid === homeOrg?.orgUid) { - const stagingUuid = [ - 'unit', - 'project', - 'units', - 'projects', - ].includes(modelKey) - ? primaryKeyValue - : undefined; - - if (stagingUuid) { - afterCommitCallbacks.push(async () => { - logger.info(`DELETING STAGING: ${stagingUuid}`); - await Staging.destroy({ - where: { uuid: stagingUuid }, - }); + if (organization.orgUid === homeOrg?.orgUid) { + const stagingUuid = [ + 'unit', + 'project', + 'units', + 'projects', + ].includes(modelKey) + ? primaryKeyValue + : undefined; + + if (stagingUuid) { + afterCommitCallbacks.push(async () => { + logger.info(`DELETING STAGING: ${stagingUuid}`); + await Staging.destroy({ + where: { uuid: stagingUuid }, }); - } + }); } } - - // Create the Audit record - await Audit.create(auditData, { transaction, mirrorTransaction }); - await Organization.update( - { registryHash: root2.root_hash }, - { - where: { orgUid: organization.orgUid }, - transaction, - mirrorTransaction, - }, - ); } + + // Create the Audit record + await Audit.create(auditData, { transaction, mirrorTransaction }); + await Organization.update( + { registryHash: root2.root_hash }, + { + where: { orgUid: organization.orgUid }, + transaction, + mirrorTransaction, + }, + ); } }; diff --git a/src/utils/datalayer-utils.js b/src/utils/datalayer-utils.js index 6c53c090..9052cb00 100644 --- a/src/utils/datalayer-utils.js +++ b/src/utils/datalayer-utils.js @@ -71,3 +71,39 @@ export const deserializeMaker = (maker) => { return changes; }; + +/** + * Optimizes and sorts an array of key-value differences. + * NOTE: The only reason this function works is because we treat INSERTS as UPSERTS + * If that ever changes, this function will need to be removed. + * + * @param {Array} kvDiff - An array of objects with { key, type } structure. + * @returns {Array} - An optimized and sorted array. + */ +export const optimizeAndSortKvDiff = (kvDiff) => { + const deleteKeys = new Set(); + const insertKeys = new Set(); + + // Populate the Sets for quicker lookup + for (const diff of kvDiff) { + if (diff.type === 'DELETE') { + deleteKeys.add(diff.key); + } else if (diff.type === 'INSERT') { + insertKeys.add(diff.key); + } + } + + // Remove DELETE keys that also exist in INSERT keys + for (const insertKey of insertKeys) { + deleteKeys.delete(insertKey); + } + + // Filter and sort the array based on the optimized DELETE keys + const filteredArray = kvDiff.filter((diff) => { + return diff.type !== 'DELETE' || deleteKeys.has(diff.key); + }); + + return filteredArray.sort((a, b) => { + return a.type === b.type ? 0 : a.type === 'DELETE' ? -1 : 1; + }); +}; diff --git a/src/utils/sync-migration-utils.js b/src/utils/sync-migration-utils.js new file mode 100644 index 00000000..c5f08511 --- /dev/null +++ b/src/utils/sync-migration-utils.js @@ -0,0 +1,110 @@ +import { Sequelize } from 'sequelize'; +import { Organization, Audit, ModelKeys, Meta } from '../models'; +import { logger } from '../config/logger.cjs'; +import datalayer from '../datalayer'; +import { sequelize } from '../database'; + +export const migrateToNewSync = async () => { + logger.info( + 'Initiating migration to the new synchronization method. This will require a complete resynchronization of all data and may take some time.', + ); + + for (const modelKey of Object.keys(ModelKeys)) { + logger.info(`Resetting ${modelKey}`); + await ModelKeys[modelKey].destroy({ + where: { + id: { + [Sequelize.Op.ne]: null, + }, + }, + truncate: true, + }); + } + + logger.info(`Resetting Audit Table`); + await Audit.destroy({ + where: { + id: { + [Sequelize.Op.ne]: null, + }, + }, + truncate: true, + }); + + await Meta.upsert({ + metaKey: 'migratedToNewSync', + metaValue: 'true', + }); + + await Organization.update( + { + synced: false, + sync_remaining: 0, + }, + { + where: { + id: { + [Sequelize.Op.ne]: null, + }, + }, + }, + ); + + logger.info(`Migration Complete`); +}; + +export const generateGenerationIndex = async () => { + const organizations = await Organization.findAll({ + where: { subscribed: true }, + raw: true, + }); + + for (const organization of organizations) { + const rootHistory = await datalayer.getRootHistory(organization.registryId); + + for (let i = 0; i < rootHistory.length; i++) { + // Find the oldest timestamp with a null value + const oldestNullGenerations = await Audit.findAll({ + where: { + registryId: organization.registryId, + generation: null, + }, + group: 'onChainConfirmationTimeStamp', + order: [ + [ + sequelize.fn('MIN', sequelize.col('onChainConfirmationTimeStamp')), + 'ASC', + ], + ], + limit: 1, + raw: true, + }); + + const oldestNullGeneration = oldestNullGenerations[0]; + + if (!oldestNullGeneration) { + continue; + } + + logger.info(`Syncing ${organization.name} generation ${i}`); + + await Audit.update( + { + generation: i, + }, + { + where: { + registryId: organization.registryId, + onchainConfirmationTimeStamp: + oldestNullGeneration.onchainConfirmationTimeStamp, + }, + }, + ); + } + } + + await Meta.upsert({ + metaKey: 'migratedToIndexBasedSync', + metaValue: 'true', + }); +};