Skip to content

Commit

Permalink
fix: add exclusive r/w lock on smartapi files for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
NeuralFlux committed Oct 31, 2024
1 parent 1ea4ba5 commit 1e3f83d
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 28 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
"node-cron": "^2.0.3",
"npm": "^9.9.0",
"piscina": "^3.2.0",
"proper-lockfile": "^4.1.2",
"ps-node": "^0.1.6",
"snake-case": "^3.0.4",
"stream-chunker": "^1.2.8",
Expand Down
5 changes: 3 additions & 2 deletions src/controllers/cron/update_local_smartapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { SmartApiOverrides } from "../../types";
import apiList from "../../config/api_list";
import MetaKG, { SmartAPISpec } from "@biothings-explorer/smartapi-kg";
import { redisClient } from "@biothings-explorer/utils";
import { writeFileWithLock } from "../../utils/common";

const userAgent = `BTE/${process.env.NODE_ENV === "production" ? "prod" : "dev"} Node/${process.version} ${
process.platform
Expand Down Expand Up @@ -325,9 +326,9 @@ async function updateSmartAPISpecs() {
delete obj._score;
});

await fs.writeFile(localFilePath, JSON.stringify({ hits: hits }));
await writeFileWithLock(localFilePath, JSON.stringify({ hits: hits }));
const predicatesInfo = await getOpsFromPredicatesEndpoints(res.data.hits);
await fs.writeFile(predicatesFilePath, JSON.stringify(predicatesInfo));
await writeFileWithLock(predicatesFilePath, JSON.stringify(predicatesInfo));

// Create a new metakg
const metakg = new MetaKG();
Expand Down
64 changes: 47 additions & 17 deletions src/controllers/meta_knowledge_graph.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import meta_kg, { KGQualifiersObject } from "@biothings-explorer/smartapi-kg";
import { snakeCase } from "snake-case";
import lockfile from "proper-lockfile";
import path from "path";
import PredicatesLoadingError from "../utils/errors/predicates_error";
const debug = require("debug")("bte:biothings-explorer-trapi:metakg");
import apiList from "../config/api_list";
import { supportedLookups } from "@biothings-explorer/query_graph_handler";
import MetaKG from "@biothings-explorer/smartapi-kg";

interface PredicateInfo {
predicate: string;
Expand All @@ -31,25 +33,50 @@ export default class MetaKnowledgeGraphHandler {
const smartapi_specs = path.resolve(__dirname, "../../data/smartapi_specs.json");
const predicates = path.resolve(__dirname, "../../data/predicates.json");
const kg = new meta_kg(smartapi_specs, predicates);

try {
if (smartAPIID !== undefined) {
debug(`Constructing with SmartAPI ID ${smartAPIID}`);
kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID });
} else if (teamName !== undefined) {
debug(`Constructing with team ${teamName}`);
kg.constructMetaKGSync(false, { apiList, teamName: teamName });
} else {
debug(`Constructing with default`);
kg.constructMetaKGSync(true, { apiList });
}
if (kg.ops.length === 0) {
debug(`Found 0 operations`);
throw new PredicatesLoadingError("Not Found - 0 operations");
// obtain exclusive lock to avoid cron job updating the file
// NOTE: we trade off some read parallelism for consistency here
const release = await lockfile.lock(smartapi_specs, {
retries: {
retries: 10,
factor: 2,
minTimeout: 100,
maxTimeout: 1000,
},
stale: 5000,
});

try {
if (smartAPIID !== undefined) {
debug(`Constructing with SmartAPI ID ${smartAPIID}`);
kg.constructMetaKGSync(false, { apiList, smartAPIID: smartAPIID });
} else if (teamName !== undefined) {
debug(`Constructing with team ${teamName}`);
kg.constructMetaKGSync(false, { apiList, teamName: teamName });
} else {
debug(`Constructing with default`);
kg.constructMetaKGSync(true, { apiList });
}
if (kg.ops.length === 0) {
debug(`Found 0 operations`);
throw new PredicatesLoadingError("Not Found - 0 operations");
}
return kg;
} catch (error) {
debug(`ERROR getting graph with ID:${smartAPIID} team:${teamName} because ${error}`);
throw new PredicatesLoadingError(`Failed to Load MetaKG: ${error}`);
} finally {
await release();
}
return kg;
} catch (error) {
debug(`ERROR getting graph with ID:${smartAPIID} team:${teamName} because ${error}`);
throw new PredicatesLoadingError(`Failed to Load MetaKG: ${error}`);
if (error instanceof PredicatesLoadingError) {
throw error;
}
else {
debug(`ERROR locking file because ${error}.`);
throw new PredicatesLoadingError(`Failed to Lock File: ${error}`);
}
}
}

Expand Down Expand Up @@ -86,10 +113,13 @@ export default class MetaKnowledgeGraphHandler {
}

async getKG(
metakg: MetaKG = undefined,
smartAPIID: string = this.smartAPIID,
teamName: string = this.teamName,
): Promise<{ nodes: {}; edges: any[] }> {
const kg = await this._loadMetaKG(smartAPIID, teamName);
// read metakg from files if not globally defined
const kg = metakg ?? await this._loadMetaKG(smartAPIID, teamName);

let knowledge_graph = {
nodes: {},
edges: [],
Expand Down
17 changes: 8 additions & 9 deletions src/routes/v1/meta_knowledge_graph_v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import * as utils from "../../utils/common";
import { runTask, taskResponse, taskError } from "../../controllers/threading/threadHandler";
import { Express, NextFunction, Request, Response, RequestHandler } from "express";

import MetaKnowledgeGraph from "@biothings-explorer/smartapi-kg";

class MetaKG {
setRoutes(app: Express) {
app
Expand All @@ -23,15 +25,12 @@ class MetaKG {

async task(taskInfo: TaskInfo) {
try {
let kg = undefined;

// read metakg from files if not globally defined
if(!taskInfo.data.options.metakg) {
const metaKGHandler = new handler(undefined);
kg = await metaKGHandler.getKG();
} else {
kg = taskInfo.data.options.metakg;
}
const metaKGHandler = new handler(undefined);
let metakg = undefined;
// initialize MetaKG only if ops are provided because handler logic is built upon that
if (taskInfo.data.options.metakg_ops !== undefined)
metakg = new MetaKnowledgeGraph(undefined, undefined, taskInfo.data.options.metakg_ops);
const kg = await metaKGHandler.getKG(metakg);
// response.logs = utils.filterForLogLevel(response.logs, options.logLevel);
return taskResponse(kg);
} catch (error) {
Expand Down
25 changes: 25 additions & 0 deletions src/utils/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import WorkflowError from "./errors/workflow_error";
import { URL } from "url";
import yaml2json from "js-yaml";
import fs from "fs/promises";
import * as lockfile from 'proper-lockfile';
import path from "path";
import { TrapiLog, TrapiSchema, TrapiWorkflow } from "@biothings-explorer/types";
import { NextFunction, Request, Response } from "express";
Expand Down Expand Up @@ -64,3 +65,27 @@ export function filterForLogLevel(logs: TrapiLog[], logLevel: string) {
export function methodNotAllowed(_req: Request, res: Response, _next: NextFunction) {
res.status(405).send();
}

export async function writeFileWithLock(filePath: string, data: string) {
let release: (() => Promise<void>) | undefined;

try {
release = await lockfile.lock(filePath, {
retries: {
retries: 10, // number of retry attempts
factor: 2, // exponential backoff factor
minTimeout: 100, // initial retry delay in milliseconds
maxTimeout: 1000 // maximum retry delay in milliseconds
},
stale: 5000 // lock expiration in milliseconds to prevent deadlocks
});

await fs.writeFile(filePath, data);
} catch (error) {
// console.error("Failed to write file:", error);
} finally {
if (release) {
await release();
}
}
}

0 comments on commit 1e3f83d

Please sign in to comment.