Skip to content

Commit

Permalink
fix: resolve check, qXEdge -> qEdge, formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
tokebe committed Oct 31, 2022
1 parent 31a22b0 commit bf8413d
Show file tree
Hide file tree
Showing 8 changed files with 506 additions and 506 deletions.
52 changes: 28 additions & 24 deletions src/batch_edge_query.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ module.exports = class BatchEdgeQueryHandler {
}

/**
* @param {Array} qXEdges - an array of TRAPI Query Edges;
* @param {Array} qEdges - an array of TRAPI Query Edges;
*/
setEdges(qXEdges) {
this.qXEdges = qXEdges;
setEdges(qEdges) {
this.qEdges = qEdges;
}

/**
*
*/
getEdges() {
return this.qXEdges;
return this.qEdges;
}

/**
Expand Down Expand Up @@ -64,23 +64,26 @@ module.exports = class BatchEdgeQueryHandler {
* Remove curies which resolve to the same thing, keeping the first.
* @private
*/
async _rmEquivalentDuplicates(qXEdges) {
Object.values(qXEdges).forEach((qXEdge) => {
async _rmEquivalentDuplicates(qEdges) {
Object.values(qEdges).forEach((qEdge) => {
const nodes = {
subject: qXEdge.subject,
object: qXEdge.object,
subject: qEdge.subject,
object: qEdge.object,
};
const strippedCuries = [];
Object.entries(nodes).forEach(([nodeType, node]) => {
const reducedCuries = [];
const nodeStrippedCuries = [];
if (!node.curie) { return; }
if (!node.curie) {
return;
}
node.curie.forEach((curie) => {
// if the curie is already present, or an equivalent is, remove it
if (!reducedCuries.includes(curie)) {
const equivalentAlreadyIncluded = qXEdge.getInputNode().getEquivalentIDs()[curie][0].curies.some(
(equivalentCurie) => reducedCuries.includes(equivalentCurie),
);
const equivalentAlreadyIncluded = qEdge
.getInputNode()
.getEquivalentIDs()
[curie][0].curies.some((equivalentCurie) => reducedCuries.includes(equivalentCurie));
if (!equivalentAlreadyIncluded) {
reducedCuries.push(curie);
} else {
Expand All @@ -99,34 +102,34 @@ module.exports = class BatchEdgeQueryHandler {
}
});
strippedCuries.forEach((curie) => {
qXEdge.getInputNode().removeEquivalentID(curie);
qEdge.getInputNode().removeEquivalentID(curie);
});
});
}

async query(qXEdges, unavailableAPIs = {}) {
async query(qEdges, unavailableAPIs = {}) {
debug('Node Update Start');
//it's now a single edge but convert to arr to simplify refactoring
qXEdges = Array.isArray(qXEdges) ? qXEdges : [qXEdges];
const nodeUpdate = new NodesUpdateHandler(qXEdges);
qEdges = Array.isArray(qEdges) ? qEdges : [qEdges];
const nodeUpdate = new NodesUpdateHandler(qEdges);
//difference is there is no previous edge info anymore
await nodeUpdate.setEquivalentIDs(qXEdges);
await this._rmEquivalentDuplicates(qXEdges);
await nodeUpdate.setEquivalentIDs(qEdges);
await this._rmEquivalentDuplicates(qEdges);
debug('Node Update Success');
const cacheHandler = new CacheHandler(this.caching, this.metaKG, this.recordConfig);
const { cachedRecords, nonCachedQXEdges } = await cacheHandler.categorizeEdges(qXEdges);
const { cachedRecords, nonCachedQEdges } = await cacheHandler.categorizeEdges(qEdges);
this.logs = [...this.logs, ...cacheHandler.logs];
let queryRecords;

if (nonCachedQXEdges.length === 0) {
if (nonCachedQEdges.length === 0) {
queryRecords = [];
if (parentPort) {
parentPort.postMessage({ cacheDone: true });
}
} else {
debug('Start to convert qXEdges into APIEdges....');
const edgeConverter = new QEdge2APIEdgeHandler(nonCachedQXEdges, this.metaKG);
const APIEdges = await edgeConverter.convert(nonCachedQXEdges);
debug('Start to convert qEdges into APIEdges....');
const edgeConverter = new QEdge2APIEdgeHandler(nonCachedQEdges, this.metaKG);
const APIEdges = await edgeConverter.convert(nonCachedQEdges);
debug(`qEdges are successfully converted into ${APIEdges.length} APIEdges....`);
this.logs = [...this.logs, ...edgeConverter.logs];
if (APIEdges.length === 0 && cachedRecords.length === 0) {
Expand All @@ -141,7 +144,8 @@ module.exports = class BatchEdgeQueryHandler {
debug(`Total number of records is (${queryRecords.length})`);
if (!isMainThread) {
cacheHandler.cacheEdges(queryRecords);
} else { // await caching if async so end of job doesn't cut it off
} else {
// await caching if async so end of job doesn't cut it off
await cacheHandler.cacheEdges(queryRecords);
}
}
Expand Down
56 changes: 28 additions & 28 deletions src/cache_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,20 @@ module.exports = class {
);
}

async categorizeEdges(qXEdges) {
async categorizeEdges(qEdges) {
if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS) {
return {
cachedRecords: [],
nonCachedQXEdges: qXEdges,
nonCachedQEdges: qEdges,
};
}
let nonCachedQXEdges = [];
let nonCachedQEdges = [];
let cachedRecords = [];
debug('Begin edge cache lookup...');
await async.eachSeries(qXEdges, async (qXEdge) => {
const qXEdgeMetaKGHash = this._hashEdgeByMetaKG(qXEdge.getHashedEdgeRepresentation());
await async.eachSeries(qEdges, async (qEdge) => {
const qEdgeMetaKGHash = this._hashEdgeByMetaKG(qEdge.getHashedEdgeRepresentation());
const unpackedRecords = await new Promise(async (resolve) => {
const redisID = 'bte:edgeCache:' + qXEdgeMetaKGHash;
const redisID = 'bte:edgeCache:' + qEdgeMetaKGHash;
await redisClient.client.usingLock([`redisLock:${redisID}`], 600000, async (signal) => {
try {
const compressedRecordPack = await redisClient.client.hgetallTimeout(redisID);
Expand All @@ -129,7 +129,7 @@ module.exports = class {
recordStream
.pipe(this.createDecodeStream())
.on('data', (obj) => recordPack.push(obj))
.on('end', () => resolve(Record.unpackRecords(recordPack, qXEdge, this.recordConfig)));
.on('end', () => resolve(Record.unpackRecords(recordPack, qEdge, this.recordConfig)));
} else {
resolve(null);
}
Expand All @@ -145,47 +145,47 @@ module.exports = class {
new LogEntry(
'DEBUG',
null,
`BTE finds cached records for ${qXEdge.getID()}`,
`BTE finds cached records for ${qEdge.getID()}`,
{
type: 'cacheHit',
qEdgeID: qXEdge.getID(),
qEdgeID: qEdge.getID(),
}
).getLog()
);
cachedRecords = [...cachedRecords, ...unpackedRecords];
} else {
nonCachedQXEdges.push(qXEdge);
nonCachedQEdges.push(qEdge);
}
debug(`Found (${cachedRecords.length}) cached records.`);
});

return { cachedRecords, nonCachedQXEdges };
return { cachedRecords, nonCachedQEdges };
}

_hashEdgeByMetaKG(qXEdgeHash) {
_hashEdgeByMetaKG(qEdgeHash) {
if (!this.metaKG) {
return qXEdgeHash;
return qEdgeHash;
}
const len = String(this.metaKG.ops.length);
const allIDs = Array.from(new Set(this.metaKG.ops.map((op) => op.association.smartapi.id))).join('');
return helper._generateHash(qXEdgeHash + len + allIDs);
return helper._generateHash(qEdgeHash + len + allIDs);
}

_groupQueryRecordsByQXEdgeHash(queryRecords) {
_groupQueryRecordsByQEdgeHash(queryRecords) {
let groupedRecords = {};
queryRecords.map((record) => {
try {
const qXEdgeMetaKGHash = this._hashEdgeByMetaKG(record.qXEdge.getHashedEdgeRepresentation());
if (!(qXEdgeMetaKGHash in groupedRecords)) {
groupedRecords[qXEdgeMetaKGHash] = [];
const qEdgeMetaKGHash = this._hashEdgeByMetaKG(record.qEdge.getHashedEdgeRepresentation());
if (!(qEdgeMetaKGHash in groupedRecords)) {
groupedRecords[qEdgeMetaKGHash] = [];
}
groupedRecords[qXEdgeMetaKGHash].push(record);
groupedRecords[qEdgeMetaKGHash].push(record);
} catch (e) {
debug('skipping malformed record');
}
});
Object.entries(groupedRecords).forEach(([qXEdgeMetaKGHash, records]) => {
groupedRecords[qXEdgeMetaKGHash] = Record.packRecords(records);
Object.entries(groupedRecords).forEach(([qEdgeMetaKGHash, records]) => {
groupedRecords[qEdgeMetaKGHash] = Record.packRecords(records);
});
return groupedRecords;
}
Expand All @@ -210,11 +210,11 @@ module.exports = class {
}
debug('Start to cache query records.');
try {
const groupedRecords = this._groupQueryRecordsByQXEdgeHash(queryRecords);
const qXEdgeHashes = Array.from(Object.keys(groupedRecords));
debug(`Number of hashed edges: ${qXEdgeHashes.length}`);
const groupedRecords = this._groupQueryRecordsByQEdgeHash(queryRecords);
const qEdgeHashes = Array.from(Object.keys(groupedRecords));
debug(`Number of hashed edges: ${qEdgeHashes.length}`);
const failedHashes = [];
await async.eachSeries(qXEdgeHashes, async (hash) => {
await async.eachSeries(qEdgeHashes, async (hash) => {
// lock to prevent caching to/reading from actively caching edge
const redisID = 'bte:edgeCache:' + hash;
if (parentPort) {
Expand All @@ -236,7 +236,7 @@ module.exports = class {
try {
await redisClient.client.delTimeout(redisID);
} catch (e) {
debug(`Unable to remove partial cache ${redisID} from redis during cache failure due to error ${error}. This may result in failed or improper cache retrieval of this qXEdge.`)
debug(`Unable to remove partial cache ${redisID} from redis during cache failure due to error ${error}. This may result in failed or improper cache retrieval of this qEdge.`)
}
}
})
Expand All @@ -247,7 +247,7 @@ module.exports = class {
await redisClient.client.expireTimeout(redisID, process.env.REDIS_KEY_EXPIRE_TIME || 1800);
} catch (error) {
failedHashes.push(hash);
debug(`Failed to cache qXEdge ${hash} records due to error ${error}. This does not stop other edges from caching nor terminate the query.`)
debug(`Failed to cache qEdge ${hash} records due to error ${error}. This does not stop other edges from caching nor terminate the query.`)
} finally {
if (parentPort) {
parentPort.postMessage({ completeCacheKey: redisID });
Expand All @@ -261,7 +261,7 @@ module.exports = class {
if (successCount) {
debug(`Successfully cached (${successCount}) query records.`);
} else {
debug(`qXEdge caching failed.`);
debug(`qEdge caching failed.`);
}
} catch (error) {
debug(`Caching failed due to ${error}. This does not terminate the query.`);
Expand Down
Loading

0 comments on commit bf8413d

Please sign in to comment.