diff --git a/src/tasks/sync-audit-table.js b/src/tasks/sync-audit-table.js index b725968f..beae8776 100644 --- a/src/tasks/sync-audit-table.js +++ b/src/tasks/sync-audit-table.js @@ -116,6 +116,42 @@ const processJob = async () => { } }; +/** + * Optimizes and sorts an array of key-value differences. + * NOTE: The only reason this function works is because we treat INSERTS as UPSERTS + * If that ever changes, this function will need to be removed. + * + * @param {Array} kvDiff - An array of objects with { key, type } structure. + * @returns {Array} - An optimized and sorted array. + */ +function optimizeAndSortKvDiff(kvDiff) { + const deleteKeys = new Set(); + const insertKeys = new Set(); + + // Populate the Sets for quicker lookup + for (const diff of kvDiff) { + if (diff.type === 'DELETE') { + deleteKeys.add(diff.key); + } else if (diff.type === 'INSERT') { + insertKeys.add(diff.key); + } + } + + // Remove DELETE keys that also exist in INSERT keys + for (const insertKey of insertKeys) { + deleteKeys.delete(insertKey); + } + + // Filter and sort the array based on the optimized DELETE keys + const filteredArray = kvDiff.filter((diff) => { + return diff.type !== 'DELETE' || deleteKeys.has(diff.key); + }); + + return filteredArray.sort((a, b) => { + return a.type === b.type ? 0 : a.type === 'DELETE' ? -1 : 1; + }); +} + async function createTransaction(callback, afterCommitCallbacks) { let result = null; @@ -184,20 +220,26 @@ const syncOrganizationAudit = async (organization) => { order: [['onchainConfirmationTimeStamp', 'DESC']], raw: true, }); + + // There was an oversight in the audit model where we named it onChainConfirmationTimeStamp but + // the RPC result calls in timestamp. This is a temporary fix to ensure that we can still sync + lastRootSaved.timestamp = Number( + lastRootSaved.onchainConfirmationTimeStamp, + ); } - let rootHash = _.get(rootHistory, '[0].root_hash'); + let generation = _.get(rootHistory, '[0]'); if (!lastRootSaved) { logger.info(`Syncing new registry ${organization.name}`); await Audit.create({ orgUid: organization.orgUid, registryId: organization.registryId, - rootHash, + rootHash: generation.root_hash, type: 'CREATE REGISTRY', change: null, table: null, - onchainConfirmationTimeStamp: _.get(rootHistory, '[0].timestamp'), + onchainConfirmationTimeStamp: generation.timestamp, }); // Destroy existing records for this singleton @@ -216,15 +258,22 @@ const syncOrganizationAudit = async (organization) => { return; } else { - rootHash = lastRootSaved.rootHash; + generation = lastRootSaved; } - let isSynced = rootHistory[rootHistory.length - 1].root_hash === rootHash; + let isSynced = + rootHistory[rootHistory.length - 1].root_hash === generation.root_hash; const historyIndex = rootHistory.findIndex( - (root) => root.root_hash === rootHash, + (root) => root.timestamp === generation.timestamp, ); + if (historyIndex === -1) { + logger.error( + `Could not find root history for ${organization.name} with timestamp ${generation.timestamp}, something is wrong and the sync for this organization will be paused until this is resolved.`, + ); + } + const syncRemaining = rootHistory.length - historyIndex - 1; await Organization.update( @@ -293,14 +342,13 @@ const syncOrganizationAudit = async (organization) => { diff.type === 'INSERT', ); - // Process any deletes in the kv diff first to ensure correct processing order - kvDiff.sort((a, b) => { - const typeOrder = { DELETE: 0, INSERT: 1 }; - return typeOrder[a.type] - typeOrder[b.type]; - }); + // This optimizedKvDiff will remove all the DELETES that have corresponding INSERTS + // This is because we treat INSERTS as UPSERTS and we can save time and reduce DB thrashing + // by not processing the DELETE for that record. + const optimizedKvDiff = optimizeAndSortKvDiff(kvDiff); const updateTransaction = async (transaction, mirrorTransaction) => { - for (const diff of kvDiff) { + for (const diff of optimizedKvDiff) { const key = decodeHex(diff.key); const modelKey = key.split('|')[0]; @@ -314,12 +362,16 @@ const syncOrganizationAudit = async (organization) => { change: decodeHex(diff.value), onchainConfirmationTimeStamp: root2.timestamp, comment: _.get( - JSON.parse(decodeHex(_.get(comment, '[0].value', '7b7d'))), + JSON.parse( + decodeHex(_.get(comment, '[0].value', encodeHex('{}'))), + ), 'comment', '', ), author: _.get( - JSON.parse(decodeHex(_.get(author, '[0].value', '7b7d'))), + JSON.parse( + decodeHex(_.get(author, '[0].value', encodeHex('{}'))), + ), 'author', '', ), @@ -331,7 +383,7 @@ const syncOrganizationAudit = async (organization) => { record[ModelKeys[modelKey].primaryKeyAttributes[0]]; if (diff.type === 'INSERT') { - logger.info(`INSERTING: ${modelKey} - ${primaryKeyValue}`); + logger.info(`UPSERTING: ${modelKey} - ${primaryKeyValue}`); await ModelKeys[modelKey].upsert(record, { transaction, mirrorTransaction,