Skip to content

Commit

Permalink
0.6.0 Wait State + Screencasting Fixes (#141)
Browse files Browse the repository at this point in the history
* new options:
- to support browsertrix-cloud, add a --waitOnDone option, which has browsertrix crawler wait when finished 
- when running with redis shared state, set the `<crawl id>:status` field to `running`, `failing`, `failed` or `done` to let job controller know crawl is finished.
- set redis state to `failing` in case of exception, set to `failed` in case of >3 or more failed exits within 60 seconds (todo: make customizable)
- when receiving a SIGUSR1, assume final shutdown and finalize files (eg. save WACZ) before exiting.
- also write WACZ if exiting due to size limit exceed, but not do to other interruptions
- change sleep() to be in seconds

* misc fixes:
- crawlstate.finished() -> isFinished() - return if >0 pages and none left in queue
- don't fail crawl if isFinished() is true
- don't keep looping in pending wait for urls to finish if received abort request

* screencast improvements (fix related to webrecorder/browsertrix#233)
- more optimized screencasting, don't close and restart after every page.
- don't assume targets change after every page, they don't in window mode!
- only send 'close' message when target is actually closed

* bump to 0.6.0
  • Loading branch information
ikreymer authored Jun 17, 2022
1 parent e7eb6a6 commit cf90304
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 40 deletions.
100 changes: 70 additions & 30 deletions crawler.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class Crawler {
this.errorCount = 0;

this.exitCode = 0;

this.done = false;
this.sizeExceeded = false;
this.finalExit = false;
}

statusLog(...args) {
Expand Down Expand Up @@ -150,15 +154,21 @@ class Crawler {

let redis;

try {
redis = await initRedis(redisUrl);
} catch (e) {
throw new Error("Unable to connect to state store Redis: " + redisUrl);
while (true) {
try {
redis = await initRedis(redisUrl);
break;
} catch (e) {
//throw new Error("Unable to connect to state store Redis: " + redisUrl);
console.warn(`Waiting for redis at ${redisUrl}`);
await this.sleep(3);
}
}

this.statusLog(`Storing state via Redis ${redisUrl} @ key prefix "${this.params.crawlId}"`);
this.statusLog(`Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`);

this.crawlState = new RedisCrawlState(redis, this.params.crawlId, this.params.timeout);
this.crawlState = new RedisCrawlState(redis, this.params.crawlId, this.params.timeout * 2, os.hostname());
await this.crawlState.setStatus("running");

} else {
this.statusLog("Storing state in memory");
Expand Down Expand Up @@ -265,14 +275,26 @@ class Crawler {
await fsp.mkdir(this.params.cwd, {recursive: true});

this.bootstrap();
let status;

try {
await this.crawl();
process.exit(this.exitCode);
status = (this.exitCode === 0 ? "done" : "interrupted");
} catch(e) {
console.error("Crawl failed");
console.error(e);
process.exit(9);
this.exitCode = 9;
status = "failing";
if (await this.crawlState.incFailCount()) {
status = "failed";
}

} finally {
console.log(status);

await this.crawlState.setStatus(status);

process.exit(this.exitCode);
}
}

Expand All @@ -293,7 +315,7 @@ class Crawler {
async crawlPage({page, data}) {
try {
if (this.screencaster) {
await this.screencaster.newTarget(page.target());
await this.screencaster.screencastTarget(page.target());
}

if (this.emulateDevice) {
Expand Down Expand Up @@ -341,15 +363,6 @@ class Crawler {

} catch (e) {
console.warn(e);
} finally {

try {
if (this.screencaster) {
await this.screencaster.endTarget(page.target());
}
} catch (e) {
console.warn(e);
}
}
}

Expand Down Expand Up @@ -400,6 +413,7 @@ class Crawler {
if (size >= this.params.sizeLimit) {
console.log(`Size threshold reached ${size} >= ${this.params.sizeLimit}, stopping`);
interrupt = true;
this.sizeExceeded = true;
}
}

Expand Down Expand Up @@ -503,8 +517,26 @@ class Crawler {
await this.awaitProcess(child_process.spawn("wb-manager", ["reindex", this.params.collection], {stdio: "inherit", cwd: this.params.cwd}));
}

if (this.params.generateWACZ) {
if (this.params.generateWACZ && (this.exitCode === 0 || this.finalExit || this.sizeExceeded)) {
await this.generateWACZ();

if (this.sizeExceeded) {
console.log(`Clearing ${this.collDir} before exit`);
try {
fs.rmSync(this.collDir, { recursive: true, force: true });
} catch(e) {
console.warn(e);
}
}
}

if (this.exitCode === 0 && this.params.waitOnDone && this.params.redisStoreUrl && !this.finalExit) {
this.done = true;
this.statusLog("All done, waiting for signal...");
await this.crawlState.setStatus("done");

// wait forever until signal
await new Promise(() => {});
}
}

Expand All @@ -516,8 +548,15 @@ class Crawler {
// Get a list of the warcs inside
const warcFileList = await fsp.readdir(archiveDir);

// is finished (>0 pages and all pages written)
const isFinished = await this.crawlState.isFinished();

console.log(`Num WARC Files: ${warcFileList.length}`);
if (!warcFileList.length) {
// if finished, just return
if (isFinished) {
return;
}
throw new Error("No WARC Files, assuming crawl failed");
}

Expand All @@ -526,7 +565,6 @@ class Crawler {
const waczPath = path.join(this.collDir, waczFilename);

const createArgs = ["create", "--split-seeds", "-o", waczPath, "--pages", this.pagesFile];
const validateArgs = ["validate"];

if (process.env.WACZ_SIGN_URL) {
createArgs.push("--signing-url");
Expand All @@ -538,7 +576,6 @@ class Crawler {
}

createArgs.push("-f");
validateArgs.push("-f");

warcFileList.forEach((val, index) => createArgs.push(path.join(archiveDir, val))); // eslint-disable-line no-unused-vars

Expand All @@ -553,6 +590,9 @@ class Crawler {
this.debugLog(`WACZ successfully generated and saved to: ${waczPath}`);

// Verify WACZ
/*
const validateArgs = ["validate"];
validateArgs.push("-f");
validateArgs.push(waczPath);
const waczVerifyResult = await this.awaitProcess(child_process.spawn("wacz", validateArgs, {stdio: "inherit"}));
Expand All @@ -561,13 +601,12 @@ class Crawler {
console.log("validate", waczVerifyResult);
throw new Error("Unable to verify WACZ created successfully");
}

*/
if (this.storage) {
const finished = await this.crawlState.finished();
const filename = process.env.STORE_FILENAME || "@[email protected]";
const targetFilename = interpolateFilename(filename, this.crawlId);

await this.storage.uploadCollWACZ(waczPath, targetFilename, finished);
await this.storage.uploadCollWACZ(waczPath, targetFilename, isFinished);
}
}

Expand Down Expand Up @@ -719,7 +758,7 @@ class Crawler {
try {
while (await page.$("div.cf-browser-verification.cf-im-under-attack")) {
this.statusLog("Cloudflare Check Detected, waiting for reload...");
await this.sleep(5500);
await this.sleep(5.5);
}
} catch (e) {
//console.warn("Check CF failed, ignoring");
Expand Down Expand Up @@ -850,20 +889,21 @@ class Crawler {

const redis = await initRedis("redis://localhost/0");

while (true) {
// wait until pending, unless canceling
while (this.exitCode !== 1) {
const res = await redis.get(`pywb:${this.params.collection}:pending`);
if (res === "0" || !res) {
break;
}

this.debugLog(`Still waiting for ${res} pending requests to finish...`);

await this.sleep(1000);
await this.sleep(1);
}
}

sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
sleep(seconds) {
return new Promise(resolve => setTimeout(resolve, seconds * 1000));
}

async parseSitemap(url, seedId) {
Expand Down Expand Up @@ -983,7 +1023,7 @@ class Crawler {
if (!done) {
return;
}
if (await this.crawlState.finished()) {
if (await this.crawlState.isFinished()) {
return;
}
break;
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.5'

services:
crawler:
image: webrecorder/browsertrix-crawler:latest
image: ${REGISTRY}webrecorder/browsertrix-crawler:latest
build:
context: ./

Expand Down
13 changes: 10 additions & 3 deletions main.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async function handleTerminate() {
try {
if (!crawler.crawlState.drainMax) {
console.log("SIGNAL: gracefully finishing current pages...");
crawler.crawlState.setDrain();
crawler.crawlState.setDrain(crawler.finalExit);

} else if ((Date.now() - lastSigInt) > 200) {
console.log("SIGNAL: stopping crawl now...");
Expand All @@ -32,10 +32,16 @@ process.on("SIGINT", async () => {
await handleTerminate();
});

process.on("SIGUSR1", () => {
if (crawler) {
crawler.finalExit = true;
}
});

process.on("SIGTERM", async () => {
if (forceTerm) {
if (forceTerm || crawler.done) {
console.log("SIGTERM received, exit immediately");
process.exit(1);
process.exit(crawler.done ? 0 : 1);
}

console.log("SIGTERM received...");
Expand All @@ -45,6 +51,7 @@ process.on("SIGTERM", async () => {
process.on("SIGABRT", async () => {
console.log("SIGABRT received, will force immediate exit on SIGTERM");
forceTerm = true;
crawler.exitCode = 1;
});


Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "browsertrix-crawler",
"version": "0.6.0-beta.1",
"version": "0.6.0",
"main": "browsertrix-crawler",
"repository": "https://github.com/webrecorder/browsertrix-crawler",
"author": "Ilya Kreymer <[email protected]>, Webrecorder Software",
Expand Down
6 changes: 6 additions & 0 deletions util/argParser.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ class ArgParser {
type: "boolean",
default: false
},

"waitOnDone": {
describe: "if set, wait for interrupt signal when finished instead of exiting",
type: "boolean",
default: false
},
};
}

Expand Down
24 changes: 22 additions & 2 deletions util/screencaster.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,31 @@ class ScreenCaster
}
}

detectClose(target) {
const context = target.browserContext();

async newTarget(target) {
const cdp = await target.createCDPSession();
if (context.__destroy_added) {
return;
}

context.on("targetdestroyed", (target) => {
this.endTarget(target);
});

context.__destroy_added = true;
}

async screencastTarget(target) {
const id = target._targetId;

if (this.targets.has(id)) {
return;
}

this.detectClose(target);

const cdp = await target.createCDPSession();

this.targets.set(id, cdp);
this.urls.set(id, target.url());

Expand Down
Loading

0 comments on commit cf90304

Please sign in to comment.