Skip to content

Commit

Permalink
feat: sync fix by looking at timestamps instead of hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Oct 28, 2023
1 parent b5f79c1 commit f64f650
Showing 1 changed file with 67 additions and 15 deletions.
82 changes: 67 additions & 15 deletions src/tasks/sync-audit-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,42 @@ const processJob = async () => {
}
};

/**
* Optimizes and sorts an array of key-value differences.
* NOTE: The only reason this function works is because we treat INSERTS as UPSERTS
* If that ever changes, this function will need to be removed.
*
* @param {Array} kvDiff - An array of objects with { key, type } structure.
* @returns {Array} - An optimized and sorted array.
*/
function optimizeAndSortKvDiff(kvDiff) {
const deleteKeys = new Set();
const insertKeys = new Set();

// Populate the Sets for quicker lookup
for (const diff of kvDiff) {
if (diff.type === 'DELETE') {
deleteKeys.add(diff.key);
} else if (diff.type === 'INSERT') {
insertKeys.add(diff.key);
}
}

// Remove DELETE keys that also exist in INSERT keys
for (const insertKey of insertKeys) {
deleteKeys.delete(insertKey);
}

// Filter and sort the array based on the optimized DELETE keys
const filteredArray = kvDiff.filter((diff) => {
return diff.type !== 'DELETE' || deleteKeys.has(diff.key);
});

return filteredArray.sort((a, b) => {
return a.type === b.type ? 0 : a.type === 'DELETE' ? -1 : 1;
});
}

async function createTransaction(callback, afterCommitCallbacks) {
let result = null;

Expand Down Expand Up @@ -184,20 +220,26 @@ const syncOrganizationAudit = async (organization) => {
order: [['onchainConfirmationTimeStamp', 'DESC']],
raw: true,
});

// There was an oversight in the audit model where we named it onChainConfirmationTimeStamp but
// the RPC result calls in timestamp. This is a temporary fix to ensure that we can still sync
lastRootSaved.timestamp = Number(
lastRootSaved.onchainConfirmationTimeStamp,
);
}

let rootHash = _.get(rootHistory, '[0].root_hash');
let generation = _.get(rootHistory, '[0]');

if (!lastRootSaved) {
logger.info(`Syncing new registry ${organization.name}`);
await Audit.create({
orgUid: organization.orgUid,
registryId: organization.registryId,
rootHash,
rootHash: generation.root_hash,
type: 'CREATE REGISTRY',
change: null,
table: null,
onchainConfirmationTimeStamp: _.get(rootHistory, '[0].timestamp'),
onchainConfirmationTimeStamp: generation.timestamp,
});

// Destroy existing records for this singleton
Expand All @@ -216,15 +258,22 @@ const syncOrganizationAudit = async (organization) => {

return;
} else {
rootHash = lastRootSaved.rootHash;
generation = lastRootSaved;
}

let isSynced = rootHistory[rootHistory.length - 1].root_hash === rootHash;
let isSynced =
rootHistory[rootHistory.length - 1].root_hash === generation.root_hash;

const historyIndex = rootHistory.findIndex(
(root) => root.root_hash === rootHash,
(root) => root.timestamp === generation.timestamp,
);

if (historyIndex === -1) {
logger.error(
`Could not find root history for ${organization.name} with timestamp ${generation.timestamp}, something is wrong and the sync for this organization will be paused until this is resolved.`,
);
}

const syncRemaining = rootHistory.length - historyIndex - 1;

await Organization.update(
Expand Down Expand Up @@ -293,14 +342,13 @@ const syncOrganizationAudit = async (organization) => {
diff.type === 'INSERT',
);

// Process any deletes in the kv diff first to ensure correct processing order
kvDiff.sort((a, b) => {
const typeOrder = { DELETE: 0, INSERT: 1 };
return typeOrder[a.type] - typeOrder[b.type];
});
// This optimizedKvDiff will remove all the DELETES that have corresponding INSERTS
// This is because we treat INSERTS as UPSERTS and we can save time and reduce DB thrashing
// by not processing the DELETE for that record.
const optimizedKvDiff = optimizeAndSortKvDiff(kvDiff);

const updateTransaction = async (transaction, mirrorTransaction) => {
for (const diff of kvDiff) {
for (const diff of optimizedKvDiff) {
const key = decodeHex(diff.key);
const modelKey = key.split('|')[0];

Expand All @@ -314,12 +362,16 @@ const syncOrganizationAudit = async (organization) => {
change: decodeHex(diff.value),
onchainConfirmationTimeStamp: root2.timestamp,
comment: _.get(
JSON.parse(decodeHex(_.get(comment, '[0].value', '7b7d'))),
JSON.parse(
decodeHex(_.get(comment, '[0].value', encodeHex('{}'))),
),
'comment',
'',
),
author: _.get(
JSON.parse(decodeHex(_.get(author, '[0].value', '7b7d'))),
JSON.parse(
decodeHex(_.get(author, '[0].value', encodeHex('{}'))),
),
'author',
'',
),
Expand All @@ -331,7 +383,7 @@ const syncOrganizationAudit = async (organization) => {
record[ModelKeys[modelKey].primaryKeyAttributes[0]];

if (diff.type === 'INSERT') {
logger.info(`INSERTING: ${modelKey} - ${primaryKeyValue}`);
logger.info(`UPSERTING: ${modelKey} - ${primaryKeyValue}`);
await ModelKeys[modelKey].upsert(record, {
transaction,
mirrorTransaction,
Expand Down

0 comments on commit f64f650

Please sign in to comment.