diff --git a/src/datalayer/syncService.js b/src/datalayer/syncService.js index 9cb5a267..a0312943 100644 --- a/src/datalayer/syncService.js +++ b/src/datalayer/syncService.js @@ -1,7 +1,7 @@ import _ from 'lodash'; -import { decodeHex, decodeDataLayerResponse } from '../utils/datalayer-utils'; -import { Organization, Staging, ModelKeys, Simulator } from '../models'; +import { decodeDataLayerResponse } from '../utils/datalayer-utils'; +import { Simulator } from '../models'; import { getConfig } from '../utils/config-loader'; import { logger } from '../config/logger.cjs'; @@ -11,171 +11,6 @@ import * as simulator from './simulator'; const { USE_SIMULATOR } = getConfig().APP; const POLLING_INTERVAL = 5000; -const frames = ['-', '\\', '|', '/']; - -const startDataLayerUpdatePolling = async () => { - logger.info('Start Datalayer Update Polling'); - const updateStoreInfo = await dataLayerWasUpdated(); - if (updateStoreInfo.length) { - await Promise.all( - updateStoreInfo.map(async (store) => { - logger.info( - `Updates found syncing storeId: ${store.storeId} ${ - frames[Math.floor(Math.random() * 3)] - }`, - ); - await syncDataLayerStoreToClimateWarehouse( - store.storeId, - store.rootHash, - ); - - console.log('UPDATE STORE', store.storeId, store.rootHash); - await Organization.update( - { registryHash: store.rootHash }, - { where: { registryId: store.storeId } }, - ); - }), - ); - } -}; - -const syncDataLayerStoreToClimateWarehouse = async (storeId, rootHash) => { - let storeData; - - if (USE_SIMULATOR) { - storeData = await simulator.getStoreData(storeId, rootHash); - } else { - storeData = await dataLayer.getStoreData(storeId, rootHash); - } - - if (!_.get(storeData, 'keys_values', []).length) { - return; - } - - const organizationToTruncate = await Organization.findOne({ - attributes: ['orgUid'], - where: { registryId: storeId }, - raw: true, - }); - - try { - if (_.get(organizationToTruncate, 'orgUid')) { - const truncateOrganizationPromises = Object.keys(ModelKeys).map((key) => - ModelKeys[key].destroy({ - where: { orgUid: organizationToTruncate.orgUid }, - }), - ); - - await Promise.all(truncateOrganizationPromises); - - await Promise.all( - storeData.keys_values.map(async (kv) => { - const key = decodeHex(kv.key.replace(`${storeId}_`, '')); - const modelKey = key.split('|')[0]; - let value; - - try { - value = JSON.parse(decodeHex(kv.value)); - } catch (err) { - console.trace(err); - logger.error(`Cant parse json value: ${decodeHex(kv.value)}`); - } - - if (ModelKeys[modelKey]) { - await ModelKeys[modelKey].upsert(value); - - const stagingUuid = - modelKey === 'unit' - ? value.warehouseUnitId - : modelKey === 'project' - ? value.warehouseProjectId - : undefined; - - if (stagingUuid) { - await Staging.destroy({ - where: { uuid: stagingUuid }, - }); - } - } - }), - ); - - // clean up any staging records than involved delete commands, - // since we cant track that they came in through the uuid, - // we can infer this because diff.original is null instead of empty object. - await Staging.cleanUpCommitedAndInvalidRecords(); - } - } catch (error) { - console.trace('ERROR DURING SYNC TRANSACTION', error); - } -}; - -const dataLayerWasUpdated = async () => { - const organizations = await Organization.findAll({ - attributes: ['registryId', 'registryHash'], - where: { subscribed: true }, - raw: true, - }); - - // exit early if there are no subscribed organizations - if (!organizations.length) { - return []; - } - - const subscribedOrgIds = organizations.map((org) => org.registryId); - - if (!subscribedOrgIds.length) { - return []; - } - - let rootResponse; - if (USE_SIMULATOR) { - rootResponse = await simulator.getRoots(subscribedOrgIds); - } else { - rootResponse = await dataLayer.getRoots(subscribedOrgIds); - } - - if (!rootResponse.success) { - return []; - } - - const updatedStores = rootResponse.root_hashes.filter((rootHash) => { - const org = organizations.find( - (org) => org.registryId == rootHash.id.replace('0x', ''), - ); - - if (org) { - // When a transfer is made, the climate warehouse is locked from making updates - // while waiting for the transfer to either be completed or rejected. - // This means that we know the transfer completed when the root hash changed - // and we can remove it from the pending staging table. - if (org.isHome == 1 && org.registryHash != rootHash.hash) { - Staging.destroy({ where: { isTransfer: true } }); - } - - // store has been updated if its confirmed and the hash has changed - return rootHash.confirmed && org.registryHash != rootHash.hash; - } - - return false; - }); - - if (!updatedStores.length) { - return []; - } - - const updateStoreInfo = await Promise.all( - updatedStores.map(async (rootHash) => { - const storeId = rootHash.id.replace('0x', ''); - return { - storeId, - rootHash: rootHash.hash, - }; - }), - ); - - return updateStoreInfo; -}; const unsubscribeFromDataLayerStore = async (storeId) => { if (!USE_SIMULATOR) { @@ -399,9 +234,6 @@ export const waitForAllTransactionsToConfirm = async () => { }; export default { - startDataLayerUpdatePolling, - syncDataLayerStoreToClimateWarehouse, - dataLayerWasUpdated, subscribeToStoreOnDataLayer, getSubscribedStoreData, getRootHistory, diff --git a/src/tasks/index.js b/src/tasks/index.js index 29e1a530..7c72f303 100644 --- a/src/tasks/index.js +++ b/src/tasks/index.js @@ -2,7 +2,7 @@ import { ToadScheduler } from 'toad-scheduler'; import syncDefaultOrganizations from './sync-default-organizations'; import syncPickLists from './sync-picklists'; -import syncAudit from './sync-audit-table'; +import syncRegistries from './sync-registries'; import syncOrganizationMeta from './sync-organization-meta'; import syncGovernanceBody from './sync-governance-body'; @@ -21,7 +21,7 @@ const start = () => { syncGovernanceBody, syncDefaultOrganizations, syncPickLists, - syncAudit, + syncRegistries, syncOrganizationMeta, ]; defaultJobs.forEach((defaultJob) => { diff --git a/src/tasks/sync-datalayer.js b/src/tasks/sync-datalayer.js deleted file mode 100644 index 4e8090a5..00000000 --- a/src/tasks/sync-datalayer.js +++ /dev/null @@ -1,57 +0,0 @@ -import { SimpleIntervalJob, Task } from 'toad-scheduler'; -import datalayer from '../datalayer'; -import dotenv from 'dotenv'; -import cliSpinner from 'cli-spinner'; -import { - assertDataLayerAvailable, - assertWalletIsSynced, -} from '../utils/data-assertions'; -import { logger } from '../config/logger.cjs'; - -const Spinner = cliSpinner.Spinner; -dotenv.config(); -import { getConfig } from '../utils/config-loader'; -const CONFIG = getConfig().APP; - -const spinner = new Spinner('Waiting for Updates %s'); -spinner.setSpinnerString('|/-\\'); -spinner.setSpinnerDelay(500); - -let taskIsRunning = false; - -const task = new Task('sync-datalayer', async () => { - try { - if (!taskIsRunning) { - taskIsRunning = true; - logger.info('Syncing datalayer data'); - await assertDataLayerAvailable(); - await assertWalletIsSynced(); - - spinner.stop(); - spinner.start(); - datalayer.startDataLayerUpdatePolling(); - } - } catch (error) { - logger.error( - `Retrying in ${ - CONFIG?.TASKS?.DATAMODEL_SYNC_TASK_INTERVAL || 60 - } seconds`, - error, - ); - } finally { - taskIsRunning = false; - } -}); - -let seconds = 5; -if (process.env.NODE_ENV !== 'test') { - seconds = CONFIG?.TASKS?.DATAMODEL_SYNC_TASK_INTERVAL || 60; -} - -const job = new SimpleIntervalJob( - { seconds, runImmediately: true }, - task, - 'sync-datalayer', -); - -export default job; diff --git a/src/tasks/sync-audit-table.js b/src/tasks/sync-registries.js similarity index 99% rename from src/tasks/sync-audit-table.js rename to src/tasks/sync-registries.js index 21b5a45b..bcdd3356 100644 --- a/src/tasks/sync-audit-table.js +++ b/src/tasks/sync-registries.js @@ -20,7 +20,7 @@ dotenv.config(); const mutex = new Mutex(); const CONFIG = getConfig().APP; -const task = new Task('sync-audit', async () => { +const task = new Task('sync-registries', async () => { if (!mutex.isLocked()) { const releaseMutex = await mutex.acquire(); try { @@ -99,7 +99,7 @@ const job = new SimpleIntervalJob( runImmediately: true, }, task, - { id: 'sync-audit', preventOverrun: true }, + { id: 'sync-registries', preventOverrun: true }, ); const processJob = async () => {