diff --git a/packages/telemetry/src/flight-recorder.js b/packages/telemetry/src/flight-recorder.js new file mode 100644 index 00000000000..b1b31a32386 --- /dev/null +++ b/packages/telemetry/src/flight-recorder.js @@ -0,0 +1,205 @@ +// @ts-check +/* global BigUint64Array */ +/// + +// https://github.com/Agoric/agoric-sdk/issues/3742#issuecomment-1028451575 +// I'd mmap() a 100MB file, reserve a few bytes for offsets, then use the rest +// as a circular buffer to hold length-prefixed records. The agd process would +// keep writing new events into the RAM window and updating the start/end +// pointers, with some sequencing to make sure the record gets written before +// the pointer is updated. Then, no mattter how abruptly the process is +// terminated, as long as the host computer itself is still running, the on-disk +// file would contain the most recent state, and anybody who reads the file will +// get the most recent state. The host kernel (linux) is under no obligation to +// flush it to disk any particular time, but knows when reads happen, so there's +// no coherency problem, and the speed is unaffected by disk write speeds. + +import BufferFromFile from 'bufferfromfile'; +import fs from 'fs'; +import path from 'path'; + +const { details: X } = assert; + +export const DEFAULT_CIRCULAR_BUFFER_SIZE = 100 * 1024 * 1024; +export const DEFAULT_CIRCULAR_BUFFER_FILE = 'flight-recorder.bin'; +export const SLOG_MAGIC = 0x21474f4c532d4741n; // 'AG-SLOG!' + +const I_MAGIC = 0; +const I_ARENA_SIZE = 1; +const I_CIRC_START = 2; +const I_CIRC_END = 3; +const HEADER_LENGTH = 4; + +export const makeMemoryMappedCircularBuffer = ({ + circularBufferSize = DEFAULT_CIRCULAR_BUFFER_SIZE, + stateDir = '/tmp', + circularBufferFile, +}) => { + const bufferFile = + circularBufferFile || `${stateDir}/${DEFAULT_CIRCULAR_BUFFER_FILE}`; + // console.log({ circularBufferFile, bufferFile }); + + // If the file doesn't exist, or is not large enough, create it. + let stbuf; + try { + stbuf = fs.statSync(bufferFile); + } catch (e) { + if (e.code !== 'ENOENT') { + throw e; + } + } + const arenaSize = BigInt( + circularBufferSize - HEADER_LENGTH * BigUint64Array.BYTES_PER_ELEMENT, + ); + if (!stbuf || stbuf.size < BigUint64Array.BYTES_PER_ELEMENT * 3) { + // Write the header. + const header = new Array(HEADER_LENGTH).fill(0n); + header[I_MAGIC] = SLOG_MAGIC; + header[I_ARENA_SIZE] = arenaSize; + fs.mkdirSync(path.dirname(bufferFile), { recursive: true }); + fs.writeFileSync(bufferFile, BigUint64Array.from(header)); + } + if (!stbuf || stbuf.size < circularBufferSize) { + fs.truncateSync(bufferFile, circularBufferSize); + } + + /** @type {Uint8Array} */ + const fileBuf = BufferFromFile(bufferFile).Uint8Array(); + const header = new BigUint64Array(fileBuf.buffer, 0, HEADER_LENGTH); + + assert.equal( + SLOG_MAGIC, + header[I_MAGIC], + X`${bufferFile} is not a slog buffer; wanted magic ${SLOG_MAGIC}, got ${header[I_MAGIC]}`, + ); + assert.equal( + arenaSize, + header[I_ARENA_SIZE], + X`${bufferFile} arena size mismatch; wanted ${arenaSize}, got ${header[I_ARENA_SIZE]}`, + ); + const arena = new Uint8Array( + fileBuf.buffer, + header.byteLength, + Number(arenaSize), + ); + + /** + * @param {Uint8Array} data + * @param {number} [offset] + */ + const readCircBuf = (data, offset = 0) => { + assert( + offset + data.byteLength <= arenaSize, + X`Reading past end of circular buffer`, + ); + + // Read the data to the end of the arena. + let firstReadLength = data.byteLength; + const circStart = Number(header[I_CIRC_START]); + const readStart = (circStart + offset) % Number(arenaSize); + if (readStart > header[I_CIRC_END]) { + // The data is wrapped around the end of the arena, like BBB---AAA + firstReadLength = Math.min( + firstReadLength, + Number(arenaSize) - readStart, + ); + } + data.set(arena.subarray(readStart, readStart + firstReadLength)); + if (firstReadLength < data.byteLength) { + data.set( + arena.subarray(0, data.byteLength - firstReadLength), + firstReadLength, + ); + } + return data; + }; + + /** @param {Uint8Array} data */ + const writeCircBuf = data => { + if (BigUint64Array.BYTES_PER_ELEMENT + data.byteLength > arena.byteLength) { + // The data is too big to fit in the arena, so skip it. + const tooBigRecord = JSON.stringify({ + type: 'slog-record-too-big', + size: data.byteLength, + }); + data = new TextEncoder().encode(tooBigRecord); + } + + const record = new Uint8Array( + BigUint64Array.BYTES_PER_ELEMENT + data.byteLength, + ); + const lengthPrefix = new BigUint64Array(record.buffer, 0, 1); + lengthPrefix[0] = BigInt(data.byteLength); + record.set(data, BigUint64Array.BYTES_PER_ELEMENT); + + // Check if we need to wrap around. + /** @type {bigint} */ + let capacity; + if (header[I_CIRC_START] <= header[I_CIRC_END]) { + // ---AAAABBBB---- + capacity = + header[I_ARENA_SIZE] - header[I_CIRC_END] + header[I_CIRC_START]; + } else { + // BBB---AAAA + capacity = header[I_CIRC_START] - header[I_CIRC_END]; + } + + let overlap = BigInt(record.byteLength) - capacity; + while (overlap > 0n) { + // Advance the start pointer. + const startRecordLength = new BigUint64Array(1); + readCircBuf(new Uint8Array(startRecordLength.buffer)); + + const totalRecordLength = + BigInt(startRecordLength.byteLength) + // size of the length field + startRecordLength[0]; // size of the record + + header[I_CIRC_START] = + (header[I_CIRC_START] + totalRecordLength) % header[I_ARENA_SIZE]; + overlap -= totalRecordLength; + } + + // Append the record. + let firstWriteLength = record.byteLength; + if (header[I_CIRC_START] < header[I_CIRC_END]) { + // May need to wrap, it's ---AAAABBBB--- + firstWriteLength = Math.min( + firstWriteLength, + Number(header[I_ARENA_SIZE] - header[I_CIRC_END]), + ); + } + + const circEnd = Number(header[I_CIRC_END]); + arena.set(record.subarray(0, firstWriteLength), circEnd); + if (firstWriteLength < record.byteLength) { + // Write to the beginning of the arena. + arena.set(record.subarray(firstWriteLength, record.byteLength), 0); + } + header[I_CIRC_END] = + (header[I_CIRC_END] + BigInt(record.byteLength)) % header[I_ARENA_SIZE]; + }; + + const writeJSON = obj => { + const text = JSON.stringify(obj, (key, value) => { + if (typeof value === 'bigint') { + return Number(value); + } + if (key === 'endoZipBase64') { + // Abridge the source bundle, since it's pretty huge. + return `[${value.length} characters...]`; + } + return value; + }); + // Prepend a newline so that the file can be more easily manipulated. + const data = new TextEncoder().encode(`\n${text}`); + // console.log('have obj', obj); + writeCircBuf(data); + }; + + return { readCircBuf, writeCircBuf, writeJSON }; +}; + +export const makeSlogSender = opts => { + const { writeJSON } = makeMemoryMappedCircularBuffer(opts); + return writeJSON; +}; diff --git a/packages/telemetry/test/prepare-test-env-ava.js b/packages/telemetry/test/prepare-test-env-ava.js index fdb00169daf..84dc9ee3599 100644 --- a/packages/telemetry/test/prepare-test-env-ava.js +++ b/packages/telemetry/test/prepare-test-env-ava.js @@ -1,6 +1,5 @@ -/* eslint-disable import/no-extraneous-dependencies */ -import '@endo/init/pre-bundle-source.js'; -import '@endo/lockdown/commit-debug.js'; +// @ts-check +import '@endo/init'; import { wrapTest } from '@endo/ses-ava'; import rawTest from 'ava'; diff --git a/packages/telemetry/test/test-flight-recorder.js b/packages/telemetry/test/test-flight-recorder.js new file mode 100644 index 00000000000..23db361fea2 --- /dev/null +++ b/packages/telemetry/test/test-flight-recorder.js @@ -0,0 +1,32 @@ +// @ts-check +/* global BigUint64Array */ +import tmp from 'tmp'; +import { test } from './prepare-test-env-ava.js'; + +import { makeMemoryMappedCircularBuffer } from '../src/flight-recorder.js'; + +test('flight-recorder sanity', t => { + const { name: tmpFile } = tmp.fileSync(); + console.log(tmpFile); + const { writeJSON: slogSender, readCircBuf } = makeMemoryMappedCircularBuffer( + { + circularBufferSize: 512, + circularBufferFile: tmpFile, + }, + ); + slogSender({ type: 'start' }); + + const len0 = new BigUint64Array(readCircBuf(new Uint8Array(8)).buffer); + const buf0 = readCircBuf(new Uint8Array(Number(len0[0])), 8); + const buf0Str = new TextDecoder().decode(buf0); + t.is(buf0Str, `\n{"type":"start"}`); + + for (let i = 0; i < 500; i += 1) { + slogSender({ type: 'iteration', iteration: i }); + } + + const len1 = new BigUint64Array(readCircBuf(new Uint8Array(8)).buffer); + const buf1 = readCircBuf(new Uint8Array(Number(len1[0])), 8); + const buf1Str = new TextDecoder().decode(buf1); + t.is(buf1Str, `\n{"type":"iteration","iteration":490}`); +});