From b735808285dab4eb98d36fe348d57f22fb2c61ad Mon Sep 17 00:00:00 2001 From: William Wills Date: Tue, 3 Dec 2024 12:35:26 -0500 Subject: [PATCH] fix: organization meta sync task hanging --- src/datalayer/syncService.js | 4 +- .../organizations/organizations.model.js | 113 +++++++++--------- src/tasks/sync-default-organizations.js | 2 +- src/tasks/sync-organization-meta.js | 2 +- src/tasks/sync-registries.js | 2 +- 5 files changed, 60 insertions(+), 63 deletions(-) diff --git a/src/datalayer/syncService.js b/src/datalayer/syncService.js index 6ab03425..5de948f5 100644 --- a/src/datalayer/syncService.js +++ b/src/datalayer/syncService.js @@ -109,8 +109,8 @@ const getRootDiff = (storeId, root1, root2) => { * @param {number} retry - Number of retry attempts. */ const getStoreData = async (storeId, callback, onFail, rootHash, retry = 0) => { - const MAX_RETRIES = 50; - const RETRY_DELAY = 120000; + const MAX_RETRIES = 6; + const RETRY_DELAY = 10000; try { logger.info(`Getting store data, retry: ${retry}`); diff --git a/src/models/organizations/organizations.model.js b/src/models/organizations/organizations.model.js index b77d42b0..92df326e 100644 --- a/src/models/organizations/organizations.model.js +++ b/src/models/organizations/organizations.model.js @@ -343,69 +343,66 @@ class Organization extends Model { try { const allSubscribedOrganizations = await Organization.findAll({ where: { subscribed: true }, + raw: true, }); - await Promise.all( - allSubscribedOrganizations.map(async (organization) => { - const processData = (data, keyFilter) => - data - .filter(({ key }) => keyFilter(key)) - .reduce( - (update, { key, value }) => ({ ...update, [key]: value }), - {}, - ); - - const onFail = (message) => { - logger.info(`Unable to sync metadata from ${organization.orgUid}`); - logger.error(`ORGANIZATION DATA SYNC ERROR: ${message}`); - Organization.update( - { orgHash: '0' }, - { where: { orgUid: organization.orgUid } }, + for (const organization of allSubscribedOrganizations) { + const processData = (data, keyFilter) => + data + .filter(({ key }) => keyFilter(key)) + .reduce( + (update, { key, value }) => ({ ...update, [key]: value }), + {}, ); - }; - - const onResult = async (updateHash, data) => { - try { - const updateData = processData( - data, - (key) => !key.includes('meta_'), - ); - const metadata = processData(data, (key) => - key.includes('meta_'), - ); - - await Organization.update( - { - ..._.omit(updateData, ['registryId']), - prefix: updateData.prefix || '0', - metadata: JSON.stringify(metadata), - }, - { where: { orgUid: organization.orgUid } }, - ); - - logger.debug( - `Updating orgUid ${organization.orgUid} with hash ${updateHash}`, - ); - await Organization.update( - { orgHash: updateHash }, - { where: { orgUid: organization.orgUid } }, - ); - } catch (error) { - logger.info(error.message); - onFail(error.message); - } - }; - - datalayer.getStoreIfUpdated( - organization.orgUid, - organization.orgHash, - onResult, - onFail, + + const onFail = async (message) => { + logger.info(`Unable to sync metadata from ${organization.orgUid}`); + logger.error(`ORGANIZATION DATA SYNC ERROR: ${message}`); + await Organization.update( + { orgHash: '0' }, + { where: { orgUid: organization.orgUid } }, ); - }), - ); + }; + + const onResult = async (updateHash, data) => { + try { + const updateData = processData( + data, + (key) => !key.includes('meta_'), + ); + const metadata = processData(data, (key) => key.includes('meta_')); + + await Organization.update( + { + ..._.omit(updateData, ['registryId']), + prefix: updateData.prefix || '0', + metadata: JSON.stringify(metadata), + }, + { where: { orgUid: organization.orgUid } }, + ); + + logger.debug( + `Updating orgUid ${organization.orgUid} with hash ${updateHash}`, + ); + await Organization.update( + { orgHash: updateHash }, + { where: { orgUid: organization.orgUid } }, + ); + } catch (error) { + logger.info(error.message); + onFail(error.message); + } + }; + + await datalayer.getStoreIfUpdated( + organization.orgUid, + organization.orgHash, + onResult, + onFail, + ); + } } catch (error) { - logger.info(error.message); + logger.error(error.message); } } diff --git a/src/tasks/sync-default-organizations.js b/src/tasks/sync-default-organizations.js index d9074635..15aa9f0d 100644 --- a/src/tasks/sync-default-organizations.js +++ b/src/tasks/sync-default-organizations.js @@ -16,7 +16,7 @@ const task = new Task('sync-default-organizations', async () => { await assertDataLayerAvailable(); await assertWalletIsSynced(); if (!CONFIG.USE_SIMULATOR) { - Organization.subscribeToDefaultOrganizations(); + await Organization.subscribeToDefaultOrganizations(); } } catch (error) { logger.error( diff --git a/src/tasks/sync-organization-meta.js b/src/tasks/sync-organization-meta.js index 5b45efcb..7cd34929 100644 --- a/src/tasks/sync-organization-meta.js +++ b/src/tasks/sync-organization-meta.js @@ -17,7 +17,7 @@ const task = new Task('sync-organization-meta', async () => { await assertDataLayerAvailable(); await assertWalletIsSynced(); if (!CONFIG.USE_SIMULATOR) { - Organization.syncOrganizationMeta(); + await Organization.syncOrganizationMeta(); } } catch (error) { logger.error( diff --git a/src/tasks/sync-registries.js b/src/tasks/sync-registries.js index b5d9db9c..ab34cf14 100644 --- a/src/tasks/sync-registries.js +++ b/src/tasks/sync-registries.js @@ -102,7 +102,7 @@ const processJob = async () => { }); // verify that the latest organization root hash is up to date with the audit records. attempt correction. - if (mostRecentOrgAuditRecord.rootHash !== organization.registryHash) { + if (mostRecentOrgAuditRecord?.rootHash !== organization?.registryHash) { logger.warn( `latest root hash in org table for organization ${organization.name} (orgUid ${organization.orgUid}) does not match the audit records. attempting to correct`, );