Skip to content

Commit

Permalink
feat: implement organization subscribe/unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Feb 2, 2022
1 parent 8e10937 commit 3a22188
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 35 deletions.
19 changes: 19 additions & 0 deletions src/controllers/organization.controller.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Organization } from '../models/organizations';

import { assertHomeOrgExists } from '../utils/data-assertions';

export const findAll = async (req, res) => {
return res.json(await Organization.getOrgsMap());
};
Expand All @@ -23,3 +25,20 @@ export const create = async (req, res) => {

// eslint-disable-next-line
export const importOrg = async (req, res) => {};

export const subscribeToOrganization = async (req, res) => {
try {
await assertHomeOrgExists();

await Organization.subscribeToOrganization(req.body.orgUid);

return res.json({
message: 'Subscribed to organization',
});
} catch (error) {
res.status(400).json({
message: 'Error subscribing to organization',
error: error.message,
});
}
};
20 changes: 15 additions & 5 deletions src/fullnode/simulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,25 @@ export const getRoots = async (storeIds) => {
}

return Promise.resolve({
hash: storeIds.map((storeId) => {
root_hashes: storeIds.map((storeId) => {
if (myOrganization.registryId === storeId) {
return createHash('md5')
.update(JSON.stringify(simulatorTable))
.digest('hex');
return {
hash: createHash('md5')
.update(JSON.stringify(simulatorTable))
.digest('hex'),
id: storeId,
};
}

return 0;
// no hash for non existent org tables
return {
hash: 0,
id: storeId,
};
}),
success: true,
});
};

// eslint-disable-next-line
export const subscribeToStore = async (storeId) => {};
70 changes: 44 additions & 26 deletions src/fullnode/syncService.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import _ from 'lodash';
import logUpdate from 'log-update';

import {
Expand All @@ -24,23 +23,21 @@ const frames = ['-', '\\', '|', '/'];

console.log('Start Datalayer Update Polling');
export const startDataLayerUpdatePolling = async () => {
const tablesToUpdate = await dataLayerWasUpdated();
await Promise.all(
_.keys(tablesToUpdate).map(async (storeId) => {
if (tablesToUpdate[storeId]) {
const storeIdsToUpdate = await dataLayerWasUpdated();
if (storeIdsToUpdate.length) {
await Promise.all(
storeIdsToUpdate.map(async (storeId) => {
logUpdate(
`Updates found syncing storeId: ${storeId} ${
frames[Math.floor(Math.random() * 3)]
}`,
);
await syncDataLayerStoreToClimateWarehouse(storeId);
} else {
logUpdate(
`No updates found yet ${frames[Math.floor(Math.random() * 3)]}`,
);
}
}),
);
}),
);
} else {
logUpdate(`Polling For Updates ${frames[Math.floor(Math.random() * 3)]}`);
}

// after all the updates are complete, check again in a bit
setTimeout(() => startDataLayerUpdatePolling(), POLLING_INTERVAL);
Expand Down Expand Up @@ -153,32 +150,53 @@ export const syncDataLayerStoreToClimateWarehouse = async (storeId) => {
export const dataLayerWasUpdated = async () => {
const organizations = await Organization.findAll({
attributes: ['registryId', 'registryHash'],
where: { subscribed: true },
raw: true,
});

let hashMap = {};
// exit early if there are no subscribed organizations
if (!organizations.length) {
return [];
}

organizations.forEach((org) => {
hashMap[org.registryId] = org.registryHash;
});
const subscribedOrgIds = organizations.map((org) => org.registryId);

let newHashes;
if (!subscribedOrgIds.length) {
return [];
}

let rootResponse;
if (process.env.USE_SIMULATOR === 'true') {
newHashes = await simulator.getRoots(_.keys(hashMap));
rootResponse = await simulator.getRoots(subscribedOrgIds);
} else {
newHashes = await dataLayer.getRoots(_.keys(hashMap));
rootResponse = await dataLayer.getRoots(subscribedOrgIds);
}

if (!rootResponse.success) {
return [];
}

const updatedStores = rootResponse.root_hashes.filter((rootHash) => {
return subscribedOrgIds.includes(rootHash.id);
});

if (!updatedStores.length) {
return [];
}

const tablesWereUpdatedMap = {};
await Promise.all(
_.keys(hashMap).map(async (key, index) => {
const updatedStoreIds = await Promise.all(
updatedStores.map(async (rootHash) => {
const storeId = rootHash.id.replace('0x', '');

// update the organization with the new hash
await Organization.update(
{ registryHash: newHashes.hash[index] },
{ where: { registryId: key } },
{ registryHash: rootHash.hash },
{ where: { registryId: storeId } },
);
tablesWereUpdatedMap[key] = hashMap[key] !== newHashes.hash[index];

return storeId;
}),
);

return tablesWereUpdatedMap;
return updatedStoreIds;
};
8 changes: 8 additions & 0 deletions src/fullnode/writeService.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,11 @@ export const pushDataLayerChangeList = async (storeId, changeList) => {
return dataLayer.pushChangeListToDataLayer(storeId, changeList);
}
};

export const subscribeToStore = async (storeId) => {
if (process.env.USE_SIMULATOR === 'true') {
return simulator.subscribeToStore(storeId);
} else {
return dataLayer.subscribeToStore(storeId);
}
};
13 changes: 9 additions & 4 deletions src/models/organizations/organizations.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,18 @@ class Organization extends Model {
};

// eslint-disable-next-line
static subscribeToOrganization = (orgUid) => {
throw new Error('Not implemented yet');
static subscribeToOrganization = async (orgUid) => {
const exists = await Organization.findOne({ where: { orgUid } });
if (exists) {
await Organization.update({ subscribed: true }, { orgUid });
} else {
Organization.importHomeOrganization(orgUid);
}
};

// eslint-disable-next-line
static unsubscribeToOrganization = (orgUid) => {
throw new Error('Not implemented yet');
static unsubscribeToOrganization = async (orgUid) => {
await Organization.update({ subscribed: false }, { orgUid });
};
}

Expand Down

0 comments on commit 3a22188

Please sign in to comment.