Skip to content

Commit

Permalink
feat(runner): Save client storage and slog
Browse files Browse the repository at this point in the history
Backup storage in background
Force storage save on error
  • Loading branch information
mhofman committed Jan 17, 2022
1 parent 41471db commit 778b4e8
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 57 deletions.
2 changes: 1 addition & 1 deletion runner/lib/helpers/async.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export declare function warnOnRejection(

export declare function aggregateTryFinally<T>(
trier: () => Promise<T>,
finalizer: () => Promise<void>,
finalizer: (error?: unknown) => Promise<void>,
): Promise<T>;

export declare function tryTimeout<T>(
Expand Down
2 changes: 1 addition & 1 deletion runner/lib/helpers/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export const aggregateTryFinally = async (trier, finalizer) =>
trier().then(
async (result) => finalizer().then(() => result),
async (tryError) =>
finalizer()
finalizer(tryError)
.then(
() => tryError,
(finalizeError) => makeAggregateError([tryError, finalizeError]),
Expand Down
151 changes: 119 additions & 32 deletions runner/lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ const coerceBooleanOption = (
*/
const makeInterrupterKit = ({ console }) => {
const signal = makePromiseKit();
/** @type {Error | null} */
/** @type {NodeJS.ErrnoException | null} */
let rejection = null;
const onInterrupt = () => {
if (rejection) {
console.warn('Interruption already in progress');
} else {
rejection = new Error('Interrupted');
rejection.code = 'ERR_SCRIPT_EXECUTION_INTERRUPTED';
signal.reject(rejection);
}
};
Expand Down Expand Up @@ -225,6 +226,32 @@ const main = async (progName, rawArgs, powers) => {
errPrefix: prefix && `${chalk.bold.red(prefix)}: `,
});

/**
* @param {string} source
* @param {string} tmpSuffix
* @param {string} destination
*/
const backgroundCompressFolder = async (source, tmpSuffix, destination) => {
const tmp = `${source}${tmpSuffix}`;
const cleanup = async () => {
await childProcessDone(spawn('rm', ['-rf', tmp]));
};

try {
await childProcessDone(
spawn('cp', ['-a', '--reflink=auto', source, tmp]),
);
} catch (err) {
await aggregateTryFinally(cleanup, () => Promise.reject(err));
}

return {
done: aggregateTryFinally(async () => {
await childProcessDone(spawn('tar', ['-cSJf', destination, tmp]));
}, cleanup),
};
};

const { console: topConsole } = makeConsole();

const outputDir = String(argv.outputDir || `results/run-${Date.now()}`);
Expand Down Expand Up @@ -257,6 +284,8 @@ const main = async (progName, rawArgs, powers) => {
Number(argv.monitorInterval || defaultMonitorIntervalMinutes) * 60 * 1000;

let currentStage = -1;
/** @type {Promise<void>[]} */
const pendingBackups = [];
const timeSource = makeTimeSource({ performance });
const cpuTimeOffset = await getCPUTimeOffset();
const cpuTimeSource = timeSource.shift(0 - cpuTimeOffset);
Expand Down Expand Up @@ -330,6 +359,8 @@ const main = async (progName, rawArgs, powers) => {
} = config;
/** @type {string | void} */
let chainStorageLocation;
/** @type {string | void} */
let clientStorageLocation;
currentStageTimeSource = timeSource.shift();

const { out, err } = makeConsole(`stage-${currentStage}`);
Expand Down Expand Up @@ -496,6 +527,21 @@ const main = async (progName, rawArgs, powers) => {
logPerfEvent('client-stopped');
});

clientStorageLocation = runClientResult.storageLocation;

const slogOutput = zlib.createGzip({
level: zlib.constants.Z_BEST_COMPRESSION,
});
const slogOutputWriteStream = fsStream.createWriteStream(
joinPath(outputDir, `client-stage-${currentStage}.slog.gz`),
);
await fsStreamReady(slogOutputWriteStream);
const slogOutputPipeResult = pipeline(
runClientResult.slogLines,
slogOutput,
slogOutputWriteStream,
);

await aggregateTryFinally(
async () => {
await tryTimeout(10 * 60 * 1000, () =>
Expand All @@ -516,14 +562,20 @@ const main = async (progName, rawArgs, powers) => {

await nextStep(done);
},
async () => {
if (!clientExited) {
stageConsole.log('Stopping client');
async () =>
aggregateTryFinally(
async () => {
if (!clientExited) {
stageConsole.log('Stopping client');

runClientResult.stop();
await done;
}
},
runClientResult.stop();
await done;
}
},
async () => {
await slogOutputPipeResult;
},
),
);
};

Expand Down Expand Up @@ -681,21 +733,39 @@ const main = async (progName, rawArgs, powers) => {
await sequential(...tasks)((stop) => stop);
stats.recordEnd(timeSource.getTime());
},
async () =>
async (...stageError) =>
aggregateTryFinally(
async () => {
if (saveStorage && chainStorageLocation != null) {
stageConsole.log('Saving chain storage');
await childProcessDone(
spawn('tar', [
'-cSJf',
joinPath(
outputDir,
`chain-storage-stage-${currentStage}.tar.xz`,
),
chainStorageLocation,
]),
);
const suffix = `-stage-${currentStage}`;
if (
!saveStorage &&
(stageError.length === 0 ||
/** @type {NodeJS.ErrnoException} */ (stageError[0]).code ===
'ERR_SCRIPT_EXECUTION_INTERRUPTED')
) {
return;
}
const backupResults = await Promise.all(
Object.entries({
chain: chainStorageLocation,
client: clientStorageLocation,
}).map(([type, location]) => {
if (location != null) {
stageConsole.log(`Saving ${type} storage`);
return backgroundCompressFolder(
location,
suffix,
joinPath(outputDir, `${type}-storage${suffix}.tar.xz`),
);
}
return undefined;
}),
);

for (const result of backupResults) {
if (result) {
pendingBackups.push(result.done);
}
}
},
async () => {
Expand Down Expand Up @@ -918,18 +988,35 @@ const main = async (progName, rawArgs, powers) => {
outputStream.end();

const { console } = makeConsole('summary');
console.log('Live blocks stats:', {
...(runStats.liveBlocksSummary || {
blockCount: 0,
}),
});
console.log('Cycles stats:', {
...(runStats.cyclesSummary || {
cycleCount: 0,
}),
});

await finished(outputStream);
await aggregateTryFinally(
async () => {
const backupsDone = Promise.all(pendingBackups).then(() => true);
if (
!(await Promise.race([
backupsDone,
Promise.resolve().then(() => false),
]))
) {
console.log('Waiting for storage backups to finish');
}
await backupsDone;
},
async () => {
console.log('Live blocks stats:', {
...(runStats.liveBlocksSummary || {
blockCount: 0,
}),
});
console.log('Cycles stats:', {
...(runStats.cyclesSummary || {
cycleCount: 0,
}),
});

await finished(outputStream);
},
);
},
);
};
Expand Down
2 changes: 1 addition & 1 deletion runner/lib/monitor/chain-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { PromiseAllOrErrors, warnOnRejection } from '../helpers/async.js';
const vatIdentifierRE = /^(v\d+):(.*)$/;

/**
* @param {Pick<import("../tasks/types.js").RunChainInfo, 'storageLocation' | 'processInfo'>} chainInfo
* @param {Pick<import("../tasks/types.js").RunKernelInfo, 'storageLocation' | 'processInfo'>} kernelInfo
* @param {Object} param1
* @param {Console} param1.console
* @param {import('../stats/types.js').LogPerfEvent} param1.logPerfEvent
Expand Down
2 changes: 1 addition & 1 deletion runner/lib/monitor/slog-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ const activeEventRE = filterSlogEvent([
]);

/**
* @param {Pick<import("../tasks/types.js").RunChainInfo, 'slogLines'>} chainInfo
* @param {Pick<import("../tasks/types.js").RunKernelInfo, 'slogLines'>} chainInfo
* @param {Object} param1
* @param {StageStats} param1.stats
* @param {{blockDone(block: BlockStats): void}} [param1.notifier]
Expand Down
11 changes: 10 additions & 1 deletion runner/lib/tasks/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ const protocolModules = {

/** @typedef {http.RequestOptions & {body?: Buffer}} HTTPRequestOptions */

/**
* @template T
* @param {AsyncIterable<T>} iterable
* @returns {AsyncIterable<T>}
*/
export const cleanAsyncIterable = (iterable) => ({
[Symbol.asyncIterator]: () => iterable[Symbol.asyncIterator](),
});

/**
* @param {string | URL} urlOrString
* @param {HTTPRequestOptions} [options]
Expand All @@ -34,7 +43,7 @@ export const httpRequest = (urlOrString, options = {}) => {
// Ugly cast hack to make res look like what the consumer cares about
const res = /** @type {http.IncomingMessage} */ (harden(
/** @type {unknown} */ ({
[Symbol.asyncIterator]: () => stream[Symbol.asyncIterator](),
...cleanAsyncIterable(stream),
statusCode: 200,
}),
));
Expand Down
34 changes: 28 additions & 6 deletions runner/lib/tasks/local-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
getChildMatchingArgv,
wrapArgvMatcherIgnoreEnvShebang,
getConsoleAndStdio,
cleanAsyncIterable,
} from './helpers.js';
import { makeGetEnvInfo } from './shared-env-info.js';
import { makeLoadgenTask } from './shared-loadgen.js';
Expand All @@ -27,6 +28,7 @@ const pipeline = promisify(pipelineCallback);
const stateDir = '_agstate/agoric-servers';
const keysDir = '_agstate/keys';
const profileName = 'local-chain';
const CLIENT_PORT = 8000;
const CHAIN_PORT = 26657;
const CHAIN_ID = 'agoric';
const GAS_ADJUSTMENT = '1.2';
Expand Down Expand Up @@ -206,9 +208,7 @@ export const makeTasks = ({
stop,
done,
ready,
slogLines: {
[Symbol.asyncIterator]: () => slogLines[Symbol.asyncIterator](),
},
slogLines: cleanAsyncIterable(slogLines),
storageLocation,
processInfo,
});
Expand All @@ -232,7 +232,7 @@ export const makeTasks = ({

console.log('Starting client');

const portNum = 8000;
const portNum = CLIENT_PORT;

const agServer = `${stateDir}/${profileName}-${portNum}`;

Expand Down Expand Up @@ -360,9 +360,18 @@ export const makeTasks = ({
),
);

const slogFifo = await makeFIFO('client.slog');
const slogReady = fsStreamReady(slogFifo);
const slogLines = new BufferLineTransform();
const slogPipeResult = pipeline(slogFifo, slogLines);

const clientEnv = Object.create(process.env);
clientEnv.SOLO_SLOGFILE = slogFifo.path;

const soloCp = printerSpawn(sdkBinaries.agSolo, ['start'], {
stdio: ['ignore', 'pipe', stdio[2]],
cwd: agServer,
env: clientEnv,
detached: true,
});

Expand Down Expand Up @@ -391,7 +400,13 @@ export const makeTasks = ({
},
);

const done = PromiseAllOrErrors([outputParsed, clientDone]).then(() => {});
const done = PromiseAllOrErrors([
slogPipeResult,
outputParsed,
clientDone,
]).then(() => {});

const ready = PromiseAllOrErrors([walletReady, slogReady]).then(() => {});

return tryTimeout(
timeout * 1000,
Expand All @@ -401,6 +416,10 @@ export const makeTasks = ({

console.log('Client running');

const processInfo = await getProcessInfo(
/** @type {number} */ (soloCp.pid),
);

const stop = () => {
ignoreKill.signal = 'SIGTERM';
soloCp.kill(ignoreKill.signal);
Expand All @@ -409,7 +428,10 @@ export const makeTasks = ({
return harden({
stop,
done,
ready: walletReady,
ready,
slogLines: cleanAsyncIterable(slogLines),
storageLocation: agServer,
processInfo,
});
},
async () => {
Expand Down
10 changes: 6 additions & 4 deletions runner/lib/tasks/shared-loadgen.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import {
import LineStreamTransform from '../helpers/line-stream-transform.js';
import { PromiseAllOrErrors, tryTimeout } from '../helpers/async.js';
import { whenStreamSteps } from '../helpers/stream.js';
import { httpRequest, getConsoleAndStdio } from './helpers.js';
import {
httpRequest,
getConsoleAndStdio,
cleanAsyncIterable,
} from './helpers.js';

const pipeline = promisify(pipelineCallback);

Expand Down Expand Up @@ -153,9 +157,7 @@ export const makeLoadgenTask = ({ spawn }) => {
done,
ready,
updateConfig,
taskEvents: {
[Symbol.asyncIterator]: () => taskEvents[Symbol.asyncIterator](),
},
taskEvents: cleanAsyncIterable(taskEvents),
});
},
async () => {
Expand Down
Loading

0 comments on commit 778b4e8

Please sign in to comment.