Skip to content

Commit

Permalink
Attempt (failing, hanging) at custom sync GET requests
Browse files Browse the repository at this point in the history
  • Loading branch information
mpizenberg committed Jan 7, 2022
1 parent bfc3e44 commit ab0bc27
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 762 deletions.
28 changes: 17 additions & 11 deletions lib/DependencyProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const fs = require('fs');
const os = require('os');
const path = require('path');
const request = require('sync-request').default;

// Cache of existing versions according to the package website.
let onlineVersionsCache /*: Map<string, Array<string>> */ = new Map();
Expand All @@ -14,13 +13,14 @@ const listVersionsMemoCache /*: Map<string, Array<string>> */ = new Map();

function fetchElmJsonOnline(
pkg /*: string */,
version /*: string */
version /*: string */,
syncGetWorker
) /*: string */ {
try {
return fetchElmJsonOffline(pkg, version);
} catch (_) {
const remoteUrl = remoteElmJsonUrl(pkg, version);
const elmJson = request('GET', remoteUrl).getBody('utf8'); // need utf8 to convert from gunzipped buffer
const elmJson = syncGetWorker.get(remoteUrl);
const cachePath = cacheElmJsonPath(pkg, version);
const parentDir = path.dirname(cachePath);
fs.mkdirSync(parentDir, { recursive: true });
Expand All @@ -45,7 +45,8 @@ function fetchElmJsonOffline(
}
}

function updateOnlineVersionsCache() /*: void */ {
function updateOnlineVersionsCache(syncGetWorker) /*: void */ {
console.error('syncGetWorker:', syncGetWorker);
const pubgrubHome = path.join(elmHome(), 'pubgrub');
fs.mkdirSync(pubgrubHome, { recursive: true });
const cachePath = path.join(pubgrubHome, 'versions_cache.json');
Expand All @@ -55,24 +56,27 @@ function updateOnlineVersionsCache() /*: void */ {
// Read from disk what is already cached, and complete with a request to the package server.
const cache = JSON.parse(fs.readFileSync(cachePath, 'utf8'));
onlineVersionsCache = parseOnlineVersions(cache);
updateCacheWithRequestSince(cachePath, remotePackagesUrl);
updateCacheWithRequestSince(cachePath, remotePackagesUrl, syncGetWorker);
} catch (_) {
// The cache file does not exist, let's download it all.
updateCacheFromScratch(cachePath, remotePackagesUrl);
updateCacheFromScratch(cachePath, remotePackagesUrl, syncGetWorker);
}
} else {
// The cache is not empty, we just need to update it.
updateCacheWithRequestSince(cachePath, remotePackagesUrl);
updateCacheWithRequestSince(cachePath, remotePackagesUrl, syncGetWorker);
}
}

// Reset the cache of existing versions from scratch
// with a request to the package server.
function updateCacheFromScratch(
cachePath /*: string */,
remotePackagesUrl /*: string */
remotePackagesUrl /*: string */,
syncGetWorker
) /*: void */ {
const onlineVersionsJson = request('GET', remotePackagesUrl).getBody('utf8');
console.error('Inside scratch');
const onlineVersionsJson = syncGetWorker.get(remotePackagesUrl);
console.error('onlineVersionsJson:', onlineVersionsJson);
fs.writeFileSync(cachePath, onlineVersionsJson);
const onlineVersions = JSON.parse(onlineVersionsJson);
onlineVersionsCache = parseOnlineVersions(onlineVersions);
Expand All @@ -81,7 +85,8 @@ function updateCacheFromScratch(
// Update the cache with a request to the package server.
function updateCacheWithRequestSince(
cachePath /*: string */,
remotePackagesUrl /*: string */
remotePackagesUrl /*: string */,
syncGetWorker
) /*: void */ {
// Count existing versions.
let versionsCount = 0;
Expand All @@ -91,7 +96,8 @@ function updateCacheWithRequestSince(

// Complete cache with a remote call to the package server.
const remoteUrl = remotePackagesUrl + '/since/' + (versionsCount - 1); // -1 to check if no package was deleted.
const newVersions = JSON.parse(request('GET', remoteUrl).getBody('utf8'));
console.error('Inside since/');
const newVersions = syncGetWorker.get(remoteUrl);
if (newVersions.length === 0) {
// Reload from scratch since it means at least one package was deleted from the registry.
updateCacheFromScratch(cachePath, remotePackagesUrl);
Expand Down
36 changes: 23 additions & 13 deletions lib/Solve.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const fs = require('fs');
const path = require('path');
const ElmJson = require('./ElmJson');
const Project = require('./Project');
const SyncGet = require('./SyncGet.js');

const DependencyProvider = require('./DependencyProvider.js');
const wasm = require('elm-solve-deps-wasm');
Expand Down Expand Up @@ -72,19 +73,28 @@ async function getDependencies(
DependencyProvider.listAvailableVersionsOffline
);
} catch (_) {
console.log('Offline solver failed, switching to online');
// Update the online cache of existing versions.
DependencyProvider.updateOnlineVersionsCache();
console.log('updateOnlineVersionsCache ok');
DependencyProvider.clearListVersionsMemoCacheBeforeSolve();
// Solve again, in online mode.
solution = wasm.solve_deps(
JSON.stringify(elmJson),
useTest,
extra,
DependencyProvider.fetchElmJsonOnline,
DependencyProvider.listAvailableVersionsOnline
);
const syncGetWorker = SyncGet.startWorker();
try {
console.error('Offline solver failed, switching to new online');
// Update the online cache of existing versions.
DependencyProvider.updateOnlineVersionsCache(syncGetWorker);
console.error('updateOnlineVersionsCache ok');
DependencyProvider.clearListVersionsMemoCacheBeforeSolve();
// Solve again, in online mode.
solution = wasm.solve_deps(
JSON.stringify(elmJson),
useTest,
extra,
(pkg, version) =>
DependencyProvider.fetchElmJsonOnline(pkg, version, syncGetWorker),
(pkg) =>
DependencyProvider.listAvailableVersionsOnline(pkg, syncGetWorker)
);
syncGetWorker.shutDown();
} catch (err) {
syncGetWorker.shutDown();
throw err;
}
}
return solution;
}
Expand Down
34 changes: 34 additions & 0 deletions lib/SyncGet.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const {
Worker,
MessageChannel,
receiveMessageOnPort,
} = require('worker_threads');

// Start a worker thread and return a `syncGetWorker`
// capable of making sync requests until shut down.
function startWorker() {
const { port1: localPort, port2: workerPort } = new MessageChannel();
const sharedLock = new SharedArrayBuffer(4);
const sharedLockArray = new Int32Array(sharedLock);
const worker = new Worker('./SyncGetWorker.js', {
workerData: { sharedLock, requestPort: workerPort },
transferList: [workerPort],
});
function get(url) {
worker.postMessage(url);
// Atomics.wait(sharedLockArray, 0, 0); // blocks until notified at index 0.
const response = receiveMessageOnPort(localPort);
if (response.message.error) {
throw response.message.error;
} else {
return response.message;
}
}
function shutDown() {
localPort.close();
worker.terminate();
}
return { get, shutDown };
}

module.exports.startWorker = startWorker;
36 changes: 36 additions & 0 deletions lib/SyncGetWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const { parentPort, workerData } = require('worker_threads');
const https = require('https');

const { sharedLock, requestPort } = workerData;
const sharedLockArray = new Int32Array(sharedLock);

parentPort.on('message', async (url) => {
console.log('worker message:', url);
try {
const response = await getBody(url);
requestPort.postMessage(response);
} catch (error) {
requestPort.postMessage({ error });
}
Atomics.notify(sharedLockArray, 0);
});

// Helpers ###########################################################

async function getBody(url) {
return new Promise(function (resolve, reject) {
https
.get(url, function (res) {
let body = '';
res.on('data', function (chunk) {
body += chunk;
});
res.on('end', function () {
resolve(body);
});
})
.on('error', function (err) {
reject(err);
});
});
}
Loading

0 comments on commit ab0bc27

Please sign in to comment.