Skip to content

Commit

Permalink
fix: clean up old sync code
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Nov 28, 2023
1 parent cfc486c commit fd44165
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 231 deletions.
172 changes: 2 additions & 170 deletions src/datalayer/syncService.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import _ from 'lodash';

import { decodeHex, decodeDataLayerResponse } from '../utils/datalayer-utils';
import { Organization, Staging, ModelKeys, Simulator } from '../models';
import { decodeDataLayerResponse } from '../utils/datalayer-utils';
import { Simulator } from '../models';
import { getConfig } from '../utils/config-loader';
import { logger } from '../config/logger.cjs';

Expand All @@ -11,171 +11,6 @@ import * as simulator from './simulator';
const { USE_SIMULATOR } = getConfig().APP;

const POLLING_INTERVAL = 5000;
const frames = ['-', '\\', '|', '/'];

const startDataLayerUpdatePolling = async () => {
logger.info('Start Datalayer Update Polling');
const updateStoreInfo = await dataLayerWasUpdated();
if (updateStoreInfo.length) {
await Promise.all(
updateStoreInfo.map(async (store) => {
logger.info(
`Updates found syncing storeId: ${store.storeId} ${
frames[Math.floor(Math.random() * 3)]
}`,
);
await syncDataLayerStoreToClimateWarehouse(
store.storeId,
store.rootHash,
);

console.log('UPDATE STORE', store.storeId, store.rootHash);
await Organization.update(
{ registryHash: store.rootHash },
{ where: { registryId: store.storeId } },
);
}),
);
}
};

const syncDataLayerStoreToClimateWarehouse = async (storeId, rootHash) => {
let storeData;

if (USE_SIMULATOR) {
storeData = await simulator.getStoreData(storeId, rootHash);
} else {
storeData = await dataLayer.getStoreData(storeId, rootHash);
}

if (!_.get(storeData, 'keys_values', []).length) {
return;
}

const organizationToTruncate = await Organization.findOne({
attributes: ['orgUid'],
where: { registryId: storeId },
raw: true,
});

try {
if (_.get(organizationToTruncate, 'orgUid')) {
const truncateOrganizationPromises = Object.keys(ModelKeys).map((key) =>
ModelKeys[key].destroy({
where: { orgUid: organizationToTruncate.orgUid },
}),
);

await Promise.all(truncateOrganizationPromises);

await Promise.all(
storeData.keys_values.map(async (kv) => {
const key = decodeHex(kv.key.replace(`${storeId}_`, ''));
const modelKey = key.split('|')[0];
let value;

try {
value = JSON.parse(decodeHex(kv.value));
} catch (err) {
console.trace(err);
logger.error(`Cant parse json value: ${decodeHex(kv.value)}`);
}

if (ModelKeys[modelKey]) {
await ModelKeys[modelKey].upsert(value);

const stagingUuid =
modelKey === 'unit'
? value.warehouseUnitId
: modelKey === 'project'
? value.warehouseProjectId
: undefined;

if (stagingUuid) {
await Staging.destroy({
where: { uuid: stagingUuid },
});
}
}
}),
);

// clean up any staging records than involved delete commands,
// since we cant track that they came in through the uuid,
// we can infer this because diff.original is null instead of empty object.
await Staging.cleanUpCommitedAndInvalidRecords();
}
} catch (error) {
console.trace('ERROR DURING SYNC TRANSACTION', error);
}
};

const dataLayerWasUpdated = async () => {
const organizations = await Organization.findAll({
attributes: ['registryId', 'registryHash'],
where: { subscribed: true },
raw: true,
});

// exit early if there are no subscribed organizations
if (!organizations.length) {
return [];
}

const subscribedOrgIds = organizations.map((org) => org.registryId);

if (!subscribedOrgIds.length) {
return [];
}

let rootResponse;
if (USE_SIMULATOR) {
rootResponse = await simulator.getRoots(subscribedOrgIds);
} else {
rootResponse = await dataLayer.getRoots(subscribedOrgIds);
}

if (!rootResponse.success) {
return [];
}

const updatedStores = rootResponse.root_hashes.filter((rootHash) => {
const org = organizations.find(
(org) => org.registryId == rootHash.id.replace('0x', ''),
);

if (org) {
// When a transfer is made, the climate warehouse is locked from making updates
// while waiting for the transfer to either be completed or rejected.
// This means that we know the transfer completed when the root hash changed
// and we can remove it from the pending staging table.
if (org.isHome == 1 && org.registryHash != rootHash.hash) {
Staging.destroy({ where: { isTransfer: true } });
}

// store has been updated if its confirmed and the hash has changed
return rootHash.confirmed && org.registryHash != rootHash.hash;
}

return false;
});

if (!updatedStores.length) {
return [];
}

const updateStoreInfo = await Promise.all(
updatedStores.map(async (rootHash) => {
const storeId = rootHash.id.replace('0x', '');
return {
storeId,
rootHash: rootHash.hash,
};
}),
);

return updateStoreInfo;
};

const unsubscribeFromDataLayerStore = async (storeId) => {
if (!USE_SIMULATOR) {
Expand Down Expand Up @@ -399,9 +234,6 @@ export const waitForAllTransactionsToConfirm = async () => {
};

export default {
startDataLayerUpdatePolling,
syncDataLayerStoreToClimateWarehouse,
dataLayerWasUpdated,
subscribeToStoreOnDataLayer,
getSubscribedStoreData,
getRootHistory,
Expand Down
4 changes: 2 additions & 2 deletions src/tasks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ToadScheduler } from 'toad-scheduler';

import syncDefaultOrganizations from './sync-default-organizations';
import syncPickLists from './sync-picklists';
import syncAudit from './sync-audit-table';
import syncRegistries from './sync-registries';
import syncOrganizationMeta from './sync-organization-meta';
import syncGovernanceBody from './sync-governance-body';

Expand All @@ -21,7 +21,7 @@ const start = () => {
syncGovernanceBody,
syncDefaultOrganizations,
syncPickLists,
syncAudit,
syncRegistries,
syncOrganizationMeta,
];
defaultJobs.forEach((defaultJob) => {
Expand Down
57 changes: 0 additions & 57 deletions src/tasks/sync-datalayer.js

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dotenv.config();
const mutex = new Mutex();
const CONFIG = getConfig().APP;

const task = new Task('sync-audit', async () => {
const task = new Task('sync-registries', async () => {
if (!mutex.isLocked()) {
const releaseMutex = await mutex.acquire();
try {
Expand Down Expand Up @@ -99,7 +99,7 @@ const job = new SimpleIntervalJob(
runImmediately: true,
},
task,
{ id: 'sync-audit', preventOverrun: true },
{ id: 'sync-registries', preventOverrun: true },
);

const processJob = async () => {
Expand Down

0 comments on commit fd44165

Please sign in to comment.