Skip to content

Commit

Permalink
Merge pull request #1187 from Chia-Network/org-table-auto-resubscription
Browse files Browse the repository at this point in the history
resubscribe to org stores
  • Loading branch information
wwills2 authored Oct 3, 2024
2 parents 0d428de + aef3a42 commit d8fcf7b
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 15 deletions.
67 changes: 54 additions & 13 deletions src/datalayer/persistance.js
Original file line number Diff line number Diff line change
Expand Up @@ -522,20 +522,27 @@ const createDataLayerStore = async () => {
}
};

const subscribeToStoreOnDataLayer = async (storeId) => {
const subscribeToStoreOnDataLayer = async (
storeId,
restoreHomeOrgOverride = false,
) => {
if (!storeId) {
logger.info(`No storeId found to subscribe to: ${storeId}`);
return false;
}

const homeOrg = await Organization.getHomeOrg();

if (homeOrg && [(homeOrg.orgUid, homeOrg.registryId)].includes(storeId)) {
if (
!restoreHomeOrgOverride &&
homeOrg &&
[(homeOrg.orgUid, homeOrg.registryId)].includes(storeId)
) {
logger.info(`Cant subscribe to self: ${storeId}`);
return { success: true };
}

const subscriptions = await getSubscriptions();
const { storeIds: subscriptions } = await getSubscriptions();

if (subscriptions.includes(storeId)) {
logger.info(`Already subscribed to: ${storeId}`);
Expand Down Expand Up @@ -578,14 +585,47 @@ const subscribeToStoreOnDataLayer = async (storeId) => {
};

const getSubscriptions = async () => {
if (CONFIG.USE_SIMULATOR) {
return [];
}
try {
if (CONFIG.USE_SIMULATOR) {
return { success: true, storeIds: [] };
}

const url = `${CONFIG.DATALAYER_URL}/subscriptions`;
const { cert, key, timeout } = getBaseOptions();
const url = `${CONFIG.DATALAYER_URL}/subscriptions`;
const { cert, key, timeout } = getBaseOptions();

logger.debug(`invoking ${url} to retrieve subscriptions`);
const response = await superagent
.post(url)
.key(key)
.cert(cert)
.timeout(timeout)
.send({});

const data = response.body;
logger.debug(`data returned from ${url}: ${data.store_ids}`);

if (data.success) {
return { success: true, storeIds: data.store_ids };
}

logger.error(`Failed to retrieve subscriptions from datalayer`);
return { success: false, storeIds: [] };
} catch (error) {
logger.error(error);
return { success: false, storeIds: [] };
}
};

const getOwnedStores = async () => {
try {
if (CONFIG.USE_SIMULATOR) {
return { success: true, storeIds: [] };
}

const url = `${CONFIG.DATALAYER_URL}/get_owned_stores`;
const { cert, key, timeout } = getBaseOptions();

logger.debug(`invoking ${url} to retrieve owned stores`);
const response = await superagent
.post(url)
.key(key)
Expand All @@ -594,17 +634,17 @@ const getSubscriptions = async () => {
.send({});

const data = response.body;
logger.debug(`data returned from ${url}: ${data.store_ids}`);

if (data.success) {
// console.log('Your Subscriptions:', data.store_ids);
return data.store_ids;
return { success: true, storeIds: data.store_ids };
}

logger.error(`FAILED GETTING SUBSCRIPTIONS ON DATALAYER`);
return [];
logger.error(`Failed to retrieve owned stores from datalayer`);
return { success: false, storeIds: [] };
} catch (error) {
logger.error(error);
return [];
return { success: false, storeIds: [] };
}
};

Expand Down Expand Up @@ -762,6 +802,7 @@ export {
pushChangeListToDataLayer,
createDataLayerStore,
getSubscriptions,
getOwnedStores,
cancelOffer,
verifyOffer,
takeOffer,
Expand Down
2 changes: 1 addition & 1 deletion src/datalayer/syncService.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const subscribeToStoreOnDataLayer = async (storeId) => {
};

const getSubscribedStoreData = async (storeId) => {
const subscriptions = await dataLayer.getSubscriptions(storeId);
const { storeIds: subscriptions } = await dataLayer.getSubscriptions(storeId);
const alreadySubscribed = subscriptions.includes(storeId);

if (!alreadySubscribed) {
Expand Down
2 changes: 1 addition & 1 deletion src/models/file-store/file-store.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class FileStore extends Model {
);
}

datalayer.subscribeToStoreOnDataLayer(organization.fileStoreId);
await datalayer.subscribeToStoreOnDataLayer(organization.fileStoreId);
Organization.update({ fileStoreSubscribed: true });
}

Expand Down
121 changes: 121 additions & 0 deletions src/tasks/check-organization-subscriptions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { SimpleIntervalJob, Task } from 'toad-scheduler';
import { getConfig } from '../utils/config-loader.js';
import { Meta, Organization } from '../models/index.js';
import {
getOwnedStores,
getSubscriptions,
subscribeToStoreOnDataLayer,
} from '../datalayer/persistance.js';
import { logger } from '../config/logger.js';

const CONFIG = getConfig();

const task = new Task('check-oranization-subscriptions', async () => {
const hasMigratedToNewSyncMethod = await Meta.findOne({
where: { metaKey: 'migratedToNewSync' },
});

const hasMigratedToGenerationIndexSync = await Meta.findOne({
where: { metaKey: 'migratedToIndexBasedSync' },
});

if (!hasMigratedToGenerationIndexSync || !hasMigratedToNewSyncMethod) {
logger.debug(
'skipping check organization subscriptions task: waiting for migration tasks to complete',
);
return;
}

logger.debug('running check organization subscriptions task');

try {
const organizations = await Organization.findAll();
const subscribedStores = await getSubscriptions();
const ownedStores = await getOwnedStores();

if (!subscribedStores?.success) {
throw new Error('failed to get subscriptions from datalayer');
}

for (const organization of organizations) {
const { orgUid, registryId, isHome, name } = organization;
logger.debug(
`validating that datalayer is subscribed org store ${orgUid} and registry store ${registryId} belonging to ${name}`,
);

if (isHome) {
const homeOrgStoreOwned = ownedStores.storeIds.includes(orgUid);
const homeRegistryStoreOwned =
ownedStores.storeIds.includes(registryId);

if (!homeOrgStoreOwned) {
throw new Error(
`your wallet does not own your home organization store ${orgUid}. this is serious issue that CADT cannot resolve`,
);
}

if (!homeRegistryStoreOwned) {
throw new Error(
`your wallet does not own your home registry store ${registryId}. this is serious issue that CADT cannot resolve`,
);
}
}

const subscribedToOrgStore = subscribedStores.storeIds.includes(orgUid);
const subscribedToRegistryStore =
subscribedStores.storeIds.includes(registryId);

if (!subscribedToOrgStore) {
logger.info(
`datalayer is not subscribed to orgUid store ${orgUid}, subscribing ...`,
);

const result = await subscribeToStoreOnDataLayer(orgUid, true);
if (result) {
logger.info(`subscribed to store ${orgUid}`);
} else {
logger.error(`failed to subscribe to store ${orgUid}`);
}

// wait 5 secs to give RPC a break
await new Promise((resolve) => setTimeout(resolve, 5000));
}

if (!subscribedToRegistryStore) {
logger.info(
`datalayer is not subscribed to registryId store ${registryId}, subscribing ...`,
);

const result = await subscribeToStoreOnDataLayer(registryId, true);
if (result) {
logger.info(`subscribed to store ${registryId}`);
} else {
logger.error(`failed to subscribe to store ${registryId}`);
}

// wait 5 secs to give RPC a break
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
} catch (error) {
logger.error(
`check-organization-subscriptions task encountered an error: ${error.message}`,
);
}
});

/**
* checks that datalayer is subscribed to all organization stores contained in the organization table, and resubscribes
* to any that are missing. This does not resubscribe based on governance data, only the data in the organization table.
* @type {SimpleIntervalJob}
*/
const job = new SimpleIntervalJob(
{
seconds: CONFIG?.TASKS?.CHECK_ORG_TABLE_SUBSCRIPTIONS_TASK_INTERVAL || 1800,
runImmediately: true,
},
task,
{ id: 'check-oranization-subscriptions', preventOverrun: true },
);

export default job;
2 changes: 2 additions & 0 deletions src/tasks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import syncOrganizationMeta from './sync-organization-meta';
import syncGovernanceBody from './sync-governance-body';
import mirrorCheck from './mirror-check';
import resetAuditTable from './reset-audit-table';
import checkOrganizationSubscriptions from './check-organization-subscriptions.js';

const scheduler = new ToadScheduler();

Expand All @@ -27,6 +28,7 @@ const start = () => {
syncOrganizationMeta,
mirrorCheck,
resetAuditTable,
checkOrganizationSubscriptions,
];
defaultJobs.forEach((defaultJob) => {
jobRegistry[defaultJob.id] = defaultJob;
Expand Down
1 change: 1 addition & 0 deletions src/utils/defaultConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export const defaultConfig = {
ORGANIZATION_META_SYNC_TASK_INTERVAL: 300,
PICKLIST_SYNC_TASK_INTERVAL: 60,
MIRROR_CHECK_TASK_INTERVAL: 86460,
CHECK_ORG_TABLE_SUBSCRIPTIONS_TASK_INTERVAL: 1800,
},
},
GOVERNANCE: {
Expand Down

0 comments on commit d8fcf7b

Please sign in to comment.