-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
40af1f4
commit 4927e88
Showing
6 changed files
with
327 additions
and
239 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,149 +1,184 @@ | ||
import _ from 'lodash'; | ||
|
||
import * as dataLayer from './persistance'; | ||
import wallet from './wallet'; | ||
import * as simulator from './simulator'; | ||
import { encodeHex, decodeHex } from '../utils/datalayer-utils'; | ||
import { getConfig } from '../utils/config-loader'; | ||
import { logger } from '../config/logger.cjs'; | ||
|
||
logger.info('climate-warehouse:datalayer:writeService'); | ||
|
||
const { USE_SIMULATOR } = getConfig().APP; | ||
|
||
const createDataLayerStore = async () => { | ||
let storeId; | ||
if (USE_SIMULATOR) { | ||
storeId = await simulator.createDataLayerStore(); | ||
} else { | ||
storeId = await dataLayer.createDataLayerStore(); | ||
|
||
logger.info( | ||
`Created storeId: ${storeId}, waiting for this to be confirmed on the blockchain.`, | ||
); | ||
await waitForStoreToBeConfirmed(storeId); | ||
} | ||
|
||
return storeId; | ||
}; | ||
|
||
const waitForStoreToBeConfirmed = async (storeId, retry = 0) => { | ||
if (retry > 120) { | ||
throw new Error( | ||
`Creating storeId: ${storeId} timed out. Its possible the transaction is stuck.`, | ||
); | ||
} | ||
|
||
const storeExistAndIsConfirmed = await dataLayer.getRoot(storeId); | ||
|
||
if (!storeExistAndIsConfirmed) { | ||
logger.info(`Still waiting for ${storeId} to confirm`); | ||
await new Promise((resolve) => { | ||
setTimeout(() => { | ||
resolve(); | ||
}, 30000); | ||
}); | ||
return waitForStoreToBeConfirmed(storeId, retry + 1); | ||
} | ||
logger.info(`StoreId: ${storeId} has been confirmed. Congrats!`); | ||
}; | ||
|
||
const syncDataLayer = async (storeId, data, failedCallback) => { | ||
logger.info(`Syncing ${storeId}`); | ||
const changeList = Object.keys(data).map((key) => { | ||
return { | ||
action: 'insert', | ||
key: encodeHex(key), | ||
value: encodeHex(data[key]), | ||
}; | ||
}); | ||
|
||
await pushChangesWhenStoreIsAvailable(storeId, changeList, failedCallback); | ||
}; | ||
|
||
const retry = (storeId, changeList, failedCallback, retryAttempts) => { | ||
logger.info(`Retrying pushing to store ${storeId}: ${retryAttempts}`); | ||
if (retryAttempts >= 60) { | ||
logger.info( | ||
'Could not push changelist to datalayer after retrying 10 times', | ||
); | ||
failedCallback(); | ||
return; | ||
} | ||
|
||
setTimeout(async () => { | ||
await pushChangesWhenStoreIsAvailable( | ||
storeId, | ||
changeList, | ||
failedCallback, | ||
retryAttempts + 1, | ||
); | ||
}, 30000); | ||
}; | ||
|
||
const pushChangesWhenStoreIsAvailable = async ( | ||
storeId, | ||
changeList, | ||
failedCallback = _.noop, | ||
retryAttempts = 0, | ||
) => { | ||
if (USE_SIMULATOR) { | ||
return simulator.pushChangeListToDataLayer(storeId, changeList); | ||
} else { | ||
const hasUnconfirmedTransactions = | ||
await wallet.hasUnconfirmedTransactions(); | ||
|
||
const storeExistAndIsConfirmed = await dataLayer.getRoot(storeId); | ||
|
||
if (!hasUnconfirmedTransactions && storeExistAndIsConfirmed) { | ||
logger.info( | ||
`pushing to datalayer ${storeId} ${JSON.stringify( | ||
changeList.map((change) => { | ||
return { | ||
action: change.action, | ||
key: decodeHex(change.key), | ||
value: /{([^*]*)}/.test(decodeHex(change.value)) | ||
? JSON.parse(decodeHex(change.value)) | ||
: decodeHex(change.value), | ||
}; | ||
}), | ||
null, | ||
2, | ||
)}`, | ||
); | ||
|
||
const success = await dataLayer.pushChangeListToDataLayer( | ||
storeId, | ||
changeList, | ||
); | ||
|
||
if (!success) { | ||
logger.info( | ||
`RPC failed when pushing to store ${storeId}, attempting retry.`, | ||
); | ||
retry(storeId, changeList, failedCallback, retryAttempts); | ||
} | ||
} else { | ||
retry(storeId, changeList, failedCallback, retryAttempts); | ||
} | ||
} | ||
}; | ||
|
||
const pushDataLayerChangeList = (storeId, changeList, failedCallback) => { | ||
pushChangesWhenStoreIsAvailable(storeId, changeList, failedCallback); | ||
}; | ||
|
||
const dataLayerAvailable = async () => { | ||
if (USE_SIMULATOR) { | ||
return simulator.dataLayerAvailable(); | ||
} else { | ||
return dataLayer.dataLayerAvailable(); | ||
} | ||
}; | ||
|
||
export default { | ||
dataLayerAvailable, | ||
pushDataLayerChangeList, | ||
syncDataLayer, | ||
createDataLayerStore, | ||
}; | ||
import _ from 'lodash'; | ||
|
||
import * as dataLayer from './persistance'; | ||
import wallet from './wallet'; | ||
import * as simulator from './simulator'; | ||
import { encodeHex, decodeHex } from '../utils/datalayer-utils'; | ||
import { getConfig } from '../utils/config-loader'; | ||
import { logger } from '../config/logger.cjs'; | ||
import { Organization } from '../models'; | ||
|
||
logger.info('climate-warehouse:datalayer:writeService'); | ||
|
||
const { USE_SIMULATOR } = getConfig().APP; | ||
|
||
const createDataLayerStore = async () => { | ||
let storeId; | ||
if (USE_SIMULATOR) { | ||
storeId = await simulator.createDataLayerStore(); | ||
} else { | ||
storeId = await dataLayer.createDataLayerStore(); | ||
|
||
logger.info( | ||
`Created storeId: ${storeId}, waiting for this to be confirmed on the blockchain.`, | ||
); | ||
await waitForStoreToBeConfirmed(storeId); | ||
} | ||
|
||
return storeId; | ||
}; | ||
|
||
const waitForStoreToBeConfirmed = async (storeId, retry = 0) => { | ||
if (retry > 120) { | ||
throw new Error( | ||
`Creating storeId: ${storeId} timed out. Its possible the transaction is stuck.`, | ||
); | ||
} | ||
|
||
const storeExistAndIsConfirmed = await dataLayer.getRoot(storeId); | ||
|
||
if (!storeExistAndIsConfirmed) { | ||
logger.info(`Still waiting for ${storeId} to confirm`); | ||
await new Promise((resolve) => { | ||
setTimeout(() => { | ||
resolve(); | ||
}, 30000); | ||
}); | ||
return waitForStoreToBeConfirmed(storeId, retry + 1); | ||
} | ||
logger.info(`StoreId: ${storeId} has been confirmed. Congrats!`); | ||
}; | ||
|
||
const syncDataLayer = async (storeId, data, failedCallback) => { | ||
logger.info(`Syncing ${storeId}`); | ||
const changeList = Object.keys(data).map((key) => { | ||
return { | ||
action: 'insert', | ||
key: encodeHex(key), | ||
value: encodeHex(data[key]), | ||
}; | ||
}); | ||
|
||
await pushChangesWhenStoreIsAvailable(storeId, changeList, failedCallback); | ||
}; | ||
|
||
const upsertDataLayer = async (storeId, data) => { | ||
logger.info(`Syncing ${storeId}`); | ||
const homeOrg = await Organization.getHomeOrg(); | ||
let changeList = Object.keys(data).map((key) => { | ||
const change = []; | ||
|
||
if (homeOrg[key]) { | ||
change.push({ | ||
action: 'delete', | ||
key: encodeHex(key), | ||
}); | ||
} | ||
|
||
change.push({ | ||
action: 'insert', | ||
key: encodeHex(key), | ||
value: encodeHex(data[key]), | ||
}); | ||
return change; | ||
}); | ||
|
||
const finalChangeList = _.uniqBy( | ||
_.sortBy(_.flatten(_.values(changeList)), 'action'), | ||
(v) => [v.action, v.key].join(), | ||
); | ||
|
||
console.log('!!!!!', finalChangeList); | ||
|
||
await pushChangesWhenStoreIsAvailable(storeId, finalChangeList); | ||
}; | ||
|
||
const retry = (storeId, changeList, failedCallback, retryAttempts) => { | ||
logger.info(`Retrying pushing to store ${storeId}: ${retryAttempts}`); | ||
if (retryAttempts >= 60) { | ||
logger.info( | ||
'Could not push changelist to datalayer after retrying 10 times', | ||
); | ||
failedCallback(); | ||
return; | ||
} | ||
|
||
setTimeout(async () => { | ||
await pushChangesWhenStoreIsAvailable( | ||
storeId, | ||
changeList, | ||
failedCallback, | ||
retryAttempts + 1, | ||
); | ||
}, 30000); | ||
}; | ||
|
||
const pushChangesWhenStoreIsAvailable = async ( | ||
storeId, | ||
changeList, | ||
failedCallback = _.noop, | ||
retryAttempts = 0, | ||
) => { | ||
if (USE_SIMULATOR) { | ||
return simulator.pushChangeListToDataLayer(storeId, changeList); | ||
} else { | ||
const hasUnconfirmedTransactions = | ||
await wallet.hasUnconfirmedTransactions(); | ||
|
||
const storeExistAndIsConfirmed = await dataLayer.getRoot(storeId); | ||
|
||
if (!hasUnconfirmedTransactions && storeExistAndIsConfirmed) { | ||
logger.info( | ||
`pushing to datalayer ${storeId} ${JSON.stringify( | ||
changeList.map((change) => { | ||
return { | ||
action: change.action, | ||
key: decodeHex(change.key), | ||
...(change.value && { | ||
value: /{([^*]*)}/.test(decodeHex(change.value)) | ||
? JSON.parse(decodeHex(change.value)) | ||
: decodeHex(change.value), | ||
}), | ||
}; | ||
}), | ||
null, | ||
2, | ||
)}`, | ||
); | ||
|
||
const success = await dataLayer.pushChangeListToDataLayer( | ||
storeId, | ||
changeList, | ||
); | ||
|
||
if (!success) { | ||
logger.info( | ||
`RPC failed when pushing to store ${storeId}, attempting retry.`, | ||
); | ||
retry(storeId, changeList, failedCallback, retryAttempts); | ||
} | ||
} else { | ||
retry(storeId, changeList, failedCallback, retryAttempts); | ||
} | ||
} | ||
}; | ||
|
||
const pushDataLayerChangeList = (storeId, changeList, failedCallback) => { | ||
pushChangesWhenStoreIsAvailable(storeId, changeList, failedCallback); | ||
}; | ||
|
||
const dataLayerAvailable = async () => { | ||
if (USE_SIMULATOR) { | ||
return simulator.dataLayerAvailable(); | ||
} else { | ||
return dataLayer.dataLayerAvailable(); | ||
} | ||
}; | ||
|
||
export default { | ||
dataLayerAvailable, | ||
pushDataLayerChangeList, | ||
syncDataLayer, | ||
createDataLayerStore, | ||
upsertDataLayer, | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.