diff --git a/cmd/dump.js b/cmd/dump.js index bc5d34a15..aa5099254 100644 --- a/cmd/dump.js +++ b/cmd/dump.js @@ -2,7 +2,7 @@ const os = require('bare-os') const { isAbsolute, resolve } = require('bare-path') const { outputter } = require('./iface') -const { ERR_INPUT } = require('../lib/errors') +const { ERR_INVALID_INPUT } = require('../lib/errors') const output = outputter('stage', { dumping: ({ key, dir }) => `\nšŸ Dumping ${key} into ${dir}`, complete: '\nDumping complete!\n', @@ -11,8 +11,8 @@ const output = outputter('stage', { module.exports = (ipc) => async function dump (cmd) { const { checkout, json } = cmd.flags - const { key, dir = os.cwd() } = cmd.args - if (!dir) throw new ERR_INPUT('Output dir must be specified.') - if (!key) throw new ERR_INPUT('The pear link must be specified.') - await output(json, ipc.dump({ id: Bare.pid, key, dir: isAbsolute(dir) ? dir : resolve(os.cwd(), dir), checkout })) + const { link, dir = os.cwd() } = cmd.args + if (!dir) throw ERR_INVALID_INPUT('Output dir must be specified.') + if (!link) throw ERR_INVALID_INPUT('The pear link must be specified.') + await output(json, ipc.dump({ id: Bare.pid, link, dir: isAbsolute(dir) ? dir : resolve(os.cwd(), dir), checkout })) } diff --git a/cmd/info.js b/cmd/info.js index 7cf81e20a..22807ebbe 100644 --- a/cmd/info.js +++ b/cmd/info.js @@ -13,10 +13,9 @@ const keys = ({ content, discovery, project }) => ` content ${content} ` -const info = ({ channel, release, name, live }) => ` +const info = ({ channel, release, name }) => ` info value ----------- ----------------- - live ${live} name ${name} channel ${channel} release ${release} @@ -29,7 +28,7 @@ const changelog = ({ changelog, full }) => ` ` const output = outputter('info', { - retrieving: ({ z32 }) => `šŸ”‘ :-\n pear://${z32}\n...`, + retrieving: ({ z32 }) => `---:\n pear://${z32}\n...`, keys, info, changelog, @@ -38,15 +37,15 @@ const output = outputter('info', { module.exports = (ipc) => async function info (cmd) { const { json, changelog, fullChangelog: full, metadata, key: showKey, keys } = cmd.flags - const isKey = parseLink(cmd.args.link).key !== null + const isKey = cmd.args.link && parseLink(cmd.args.link).key !== null const channel = isKey ? null : cmd.args.link - const key = isKey ? cmd.args.link : null - if (key && isKey === false) throw new ERR_INVALID_INPUT('Key "' + key + '" is not valid') + const link = isKey ? cmd.args.link : null + if (link && isKey === false) throw ERR_INVALID_INPUT('Link "' + link + '" is not a valid key') let dir = cmd.args.dir || os.cwd() if (isAbsolute(dir) === false) dir = dir ? resolve(os.cwd(), dir) : os.cwd() await output(json, ipc.info({ - key, + link, channel, dir, showKey, diff --git a/cmd/release.js b/cmd/release.js index fc80abbce..bdd00095f 100644 --- a/cmd/release.js +++ b/cmd/release.js @@ -6,9 +6,9 @@ const { ERR_INVALID_INPUT } = require('../lib/errors') const parseLink = require('../run/parse-link') const output = outputter('release', { - releasing: ({ name, channel }) => `\n${ansi.pear} Releasing ${name} [ ${channel} ]\n`, + releasing: ({ name, channel, link }) => `\n${ansi.pear} Releasing ${name} [ ${channel || link} ]\n`, 'updating-to': ({ releaseLength, currentLength }) => `Current length is ${currentLength}\nSetting release to ${releaseLength}\n`, - released: ({ name, channel, length }) => `The ${name} app (${channel} channel) was successfully released.\nLatest length: ${length}\n`, + released: ({ name, channel, link, length }) => `The ${name} app (${channel || link} channel) was successfully released.\nLatest length: ${length}\n`, final: { output: 'print', message: 'Release complete\n', success: true } }) @@ -16,13 +16,13 @@ module.exports = (ipc) => async function release (cmd) { const { checkout, name, json } = cmd.flags const isKey = parseLink(cmd.args.channel).key !== null const channel = isKey ? null : cmd.args.channel - const key = isKey ? cmd.args.channel : null - if (!channel && !key) throw new ERR_INVALID_INPUT('A key or the channel name must be specified.') + const link = isKey ? cmd.args.channel : null + if (!channel && !link) throw ERR_INVALID_INPUT('A valid pear link or the channel name must be specified.') let dir = cmd.args.dir || os.cwd() if (isAbsolute(dir) === false) dir = resolve(os.cwd(), dir) if (checkout !== undefined && Number.isInteger(+checkout) === false) { - throw new ERR_INVALID_INPUT('--checkout flag must supply an integer if set') + throw ERR_INVALID_INPUT('--checkout flag must supply an integer if set') } const id = Bare.pid - await output(json, ipc.release({ id, name, channel, key, checkout, dir })) + await output(json, ipc.release({ id, name, channel, link, checkout, dir })) } diff --git a/cmd/shift.js b/cmd/shift.js index de9290081..51a4ec1fa 100644 --- a/cmd/shift.js +++ b/cmd/shift.js @@ -17,12 +17,12 @@ module.exports = (ipc) => async function shift (cmd) { const src = cmd.args.source const dst = cmd.args.destination - if (!src || parseLink(src).key === null) { - throw new ERR_INVALID_INPUT('A source application key must be specified.') + if (parseLink(src).key === null) { + throw ERR_INVALID_INPUT('A valid source application link must be specified.') } - if (!dst || parseLink(dst).key === null) { - throw new ERR_INVALID_INPUT('A destination application key must be specified.') + if (parseLink(dst).key === null) { + throw ERR_INVALID_INPUT('A valid destination application link must be specified.') } await output(json, ipc.shift({ src, dst, force })) diff --git a/cmd/stage.js b/cmd/stage.js index 8829114f9..eb78de73c 100644 --- a/cmd/stage.js +++ b/cmd/stage.js @@ -3,7 +3,7 @@ const os = require('bare-os') const { isAbsolute, resolve } = require('bare-path') const { outputter, ansi } = require('./iface') const parseLink = require('../run/parse-link') -const { ERR_INPUT } = require('../lib/errors') +const { ERR_INVALID_INPUT } = require('../lib/errors') let blocks = 0 let total = 0 @@ -29,7 +29,7 @@ module.exports = (ipc) => async function stage (cmd) { const isKey = cmd.args.channel && parseLink(cmd.args.channel).key !== null const channel = isKey ? null : cmd.args.channel const key = isKey ? cmd.args.channel : null - if (!channel && !key) throw new ERR_INPUT('A key or the channel name must be specified.') + if (!channel && !key) throw ERR_INVALID_INPUT('A key or the channel name must be specified.') let { dir = os.cwd() } = cmd.args if (isAbsolute(dir) === false) dir = dir ? resolve(os.cwd(), dir) : os.cwd() const id = Bare.pid diff --git a/lib/sidecar.js b/lib/sidecar.js index df88a0139..1e2ce35d2 100644 --- a/lib/sidecar.js +++ b/lib/sidecar.js @@ -4,7 +4,7 @@ const fs = require('bare-fs') const path = require('bare-path') const { spawn } = require('bare-subprocess') const { once } = require('bare-events') -const { pipelinePromise: pipeline } = require('streamx') +const streamx = require('streamx') const ReadyResource = require('ready-resource') const ScriptLinker = require('script-linker') const LocalDrive = require('localdrive') @@ -423,7 +423,7 @@ class Engine extends ReadyResource { if (report.err?.code === 'ERR_PERMISSION_REQUIRED') return reports.permissionRequired(report) if (report.err?.code === 'ERR_INVALID_LENGTH') return reports.minver(report) if (report.err?.code === 'ERR_CONNECTION') return reports.connection() - if (report.err) console.error('REPORT', report.err) // send generic errors to the text error log as well + if (report.err) console.trace('REPORT', report.err) // send generic errors to the text error log as well const args = [report.err?.message, report.err?.stack, report.info || report.err] if (report.err?.code === 'ERR_OPEN') return reports.dev(...args) if (report.err?.code === 'ERR_CRASH') return reports.crash(...args) @@ -513,10 +513,11 @@ class Engine extends ReadyResource { message (msg) { return this.engine.bus.pub({ topic: 'messages', id: this.id, data: msg }) } - async * messages (ptn) { - for await (const { data } of this.engine.bus.sub({ topic: 'messages', id: this.id, ...(ptn ? { data: ptn } : {}) })) { - yield data - } + messages (ptn) { + const subscriber = this.engine.bus.sub({ topic: 'messages', id: this.id, ...(ptn ? { data: ptn } : {}) }) + const stream = new DataStream() + streamx.pipeline(subscriber, stream) + return stream } teardown () { @@ -631,7 +632,7 @@ class Engine extends ReadyResource { app.warmup({ protocol, batch }) } const stream = await bundle.streamFrom(link.filename) - await pipeline(stream, res) + await streamx.pipelinePromise(stream, res) } } @@ -675,7 +676,6 @@ class Engine extends ReadyResource { if (id === 'Platform') return await this.lookup(this, 'holepunch', type, req, res) const [clientId, startId] = id.split('@') - const ipcClient = this.sidecar.client(clientId) if (ipcClient === null) throw ERR_HTTP_BAD_REQUEST('Bad Client ID') @@ -721,378 +721,54 @@ class Engine extends ReadyResource { this.host = `http://127.0.0.1:${this.port}` } - async * seed ({ name, channel, link, verbose, seeders, dir, clientArgv } = {}, client) { - const session = new Session(client) - try { - const ctx = new Context({ - id: `seeder-${randomBytes(16).toString('hex')}`, - flags: { channel, link }, - dir, - clientArgv - }) - client.userData = new this.App({ ctx, session }) - - yield { tag: 'seeding', data: { key: link, name, channel } } - await this.ready() - - const corestore = this.#getCorestore(name, channel) - const key = link ? hypercoreid.decode(link) : null - if (key !== null && await Bundle.provisioned(corestore, key) === false) { - throw ERR_PLATFORM_ERROR('Pear Platform: Nothing to seed') - } - - const log = (msg) => this.bus.pub({ topic: 'seed', id: client.id, msg }) - const notices = this.bus.sub({ topic: 'seed', id: client.id }) - const bundle = new Bundle({ corestore, key, channel, log }) - await session.add(bundle) - - if (verbose) { - yield { tag: 'meta-key', data: bundle.drive.key.toString('hex') } - yield { tag: 'meta-discovery-key', data: bundle.drive.discoveryKey.toString('hex') } - yield { tag: 'content-key', data: bundle.drive.contentKey.toString('hex') } - } - - yield { tag: 'key', data: hypercoreid.encode(bundle.drive.key) } - - await bundle.join(this.swarm, { seeders, server: true }) - - for await (const { msg } of notices) yield msg - } catch (err) { - yield { tag: 'error', data: { ...err, stack: err.stack, message: err.message, code: err.code } } - yield null - await session.close() - } - // no need for teardown, seed is tied to the lifecycle of the client - } - - async * release ({ name, channel, checkout, key, dir }, client) { - const channelName = channel || key - if (key) key = hypercoreid.decode(key) - // If channel is specified, then the bundle's drive is - // obtained using corestore namespaces. - // Otherwise, the key is used directly - - const session = new Session(client) - - try { - const ctx = new Context({ - id: `releaser-${randomBytes(16).toString('hex')}`, - argv: [`--checkout=${checkout}`, `--channel=${channel}`], - dir - }) - - await this.ready() - - name = name || ctx.name - - yield { tag: 'releasing', data: { name, channel: channelName } } - - const corestore = this.#getCorestore(name || ctx.name, channel, { writable: true }) - - const bundle = new Bundle({ corestore, channel, key }) - await session.add(bundle) - - if (await bundle.db.get('manifest') === null) { - yield { tag: 'final', data: { reason: `The "${name}" app has not been staged on "${channel}" channel.`, success: false } } - return - } - - const currentLength = bundle.db.feed.length - const releaseLength = checkout || currentLength + 1 - - yield { tag: 'updating-to', data: { currentLength, releaseLength } } - - await bundle.db.put('release', releaseLength) - - yield { tag: 'released', data: { name, channel, length: bundle.db.feed.length } } - - yield { tag: 'final', data: { success: true } } - } finally { - yield null - await session.close() - } - } - - async * stage ({ channel, key, dir, dryRun, name, truncate, bare = false, clientArgv, ignore = '.git,.github,.DS_Store' }, client) { - const session = new Session(client) - - let success = true - try { - const ctx = new Context({ - id: `stager-${randomBytes(16).toString('hex')}`, - flags: { channel, stage: true }, - dir, - clientArgv - }) - await this.ready() - if (key) key = hypercoreid.decode(key) - - const corestore = this.#getCorestore(name || ctx.name, channel, { writable: true }) - const bundle = new Bundle({ - key, - corestore, - channel, - truncate, - stage: true, - failure (err) { console.error(err) } - }) - await session.add(bundle) - client.userData = new this.App({ ctx, bundle }) - - const currentVersion = bundle.version - await ctx.initialize({ bundle, dryRun }) - const z32 = hypercoreid.encode(bundle.drive.key) - await this.trust({ z32 }, client) - const type = ctx.manifest.pear?.type || 'desktop' - const terminalBare = type === 'terminal' - if (terminalBare) bare = true - if (ctx.manifest.pear?.stage?.ignore) ignore = ctx.manifest.pear.stage?.ignore - else ignore = (Array.isArray(ignore) ? ignore : ignore.split(',')) - - ignore = ignore.map((file) => unixPathResolve('/', file)) - const release = (await bundle.db.get('release'))?.value || 0 - const pearkey = 'pear://' + z32 - - yield { tag: 'staging', data: { name: ctx.name, channel: bundle.channel, key: pearkey, current: currentVersion, release } } - - if (dryRun) yield { tag: 'dry' } - - const root = unixPathResolve(ctx.dir) - const main = unixPathResolve('/', ctx.main) - const src = new LocalDrive(root, { followLinks: bare === false, metadata: new Map() }) - const dst = bundle.drive - const opts = { filter: (key) => ignore.some((path) => key.startsWith(path)) === false, dryRun, batch: true } - - const builtins = terminalBare ? gunk.bareBuiltins : gunk.builtins - const linker = new ScriptLinker(src, { builtins }) - const entrypoints = [main, ...(ctx.manifest.pear?.stage?.entrypoints || [])].map((entry) => unixPathResolve('/', entry)) - const mods = await linker.warmup(entrypoints) - for await (const [filename, mod] of mods) src.metadata.put(filename, mod.cache()) - const mirror = new Mirror(src, dst, opts) - for await (const diff of mirror) { - if (diff.op === 'add') { - yield { tag: 'byte-diff', data: { type: 1, sizes: [diff.bytesAdded], message: diff.key } } - } else if (diff.op === 'change') { - yield { tag: 'byte-diff', data: { type: 0, sizes: [-diff.bytesRemoved, diff.bytesAdded], message: diff.key } } - } else if (diff.op === 'remove') { - yield { tag: 'byte-diff', data: { type: -1, sizes: [-diff.bytesRemoved], message: diff.key } } - } - } - yield { - tag: 'summary', - data: { - files: mirror.count.files, - add: mirror.count.add, - remove: mirror.count.remove, - change: mirror.count.change - } - } - - if (dryRun || bare) { - const reason = dryRun ? 'dry-run' : 'bare' - yield { tag: 'skipping', data: { reason, success: true } } - } else if (mirror.count.add || mirror.count.remove || mirror.count.change) { - for await (const { blocks, total } of this.#trace(bundle, client)) { - yield { tag: 'warming', data: { blocks, total } } - } - yield { tag: 'warming', data: { success: true } } - } else { - yield { tag: 'skipping', data: { reason: 'no changes', success: true } } - } - - yield { tag: 'complete', data: { dryRun } } - - if (dryRun) return - - yield { tag: 'addendum', data: { version: bundle.version, release, channel, key: pearkey } } - } catch ({ stack, code, message }) { - success = false - yield { tag: 'error', data: { stack, code, message, success } } - } finally { - yield { tag: 'final', data: { success } } - yield null - await session.close() - } - } - - async * dump ({ key, dir, checkout }, client) { - const session = new Session(client) - try { - await this.ready() - if (key) key = hypercoreid.decode(key) - checkout = Number(checkout) - const corestore = this.#getCorestore(null, null) - const bundle = new Bundle({ corestore, key, checkout }) - - await session.add(bundle) - - if (this.swarm) bundle.join(this.swarm) - - const pearkey = 'pear://' + hypercoreid.encode(bundle.drive.key) - - yield { - tag: 'dumping', - data: { key: pearkey, dir } - } - - try { - await bundle.calibrate() - } catch (err) { - await session.close() - throw err - } - - const out = unixPathResolve(dir) - const dst = new LocalDrive(out) - const src = bundle.drive - - const mirror = new Mirror(src, dst) - - for await (const diff of mirror) { - if (diff.op === 'add') { - yield { tag: 'byte-diff', data: { type: 1, sizes: [diff.bytesAdded], message: diff.key } } - } else if (diff.op === 'change') { - yield { tag: 'byte-diff', data: { type: 0, sizes: [-diff.bytesRemoved, diff.bytesAdded], message: diff.key } } - } else if (diff.op === 'remove') { - yield { tag: 'byte-diff', data: { type: -1, sizes: [-diff.bytesRemoved], message: diff.key } } - } - } - } catch ({ stack, code, message }) { - yield { tag: 'error', data: { stack, code, message, success: false } } - } finally { - yield { tag: 'final', data: { success: true } } - yield null - await session.close() - } - } - - async * #trace (bundle, client) { - await bundle.ready() - const tracer = bundle.startTracing() - const sp = spawn( - DESKTOP_RUNTIME, - [BOOT, `--trace=${client.id}`, '--swap', SWAP, 'pear://' + hypercoreid.encode(bundle.drive.key)] - ) + seed (params, client) { return new Seed(params, client, this) } - const onclose = () => sp.kill() - client.on('close', onclose) - - const closed = once(sp, 'exit') - client.off('close', onclose) + release (params, client) { return new Release(params, client, this) } - const total = bundle.drive.core.length + (bundle.drive.blobs?.core.length || 0) - for await (const { blocks } of tracer) yield { total, blocks } - - const [status] = await closed - - if (status) { - const err = ERR_TRACER_FAILED('Tracer Failed!') - err.exitCode = status - throw err - } - - await bundle.finalizeTracing() - } - - async * info ({ key, channel, dir, showKey, metadata, changelog, full } = {}, client) { - const session = new Session(client) - let bundle = null - const anyFlag = [changelog, full, metadata, showKey].some(flag => flag === true) - const display = (flag) => anyFlag ? !!flag : flag !== false - try { - if (key) { - const parsed = parseLink(key) - key = parsed.key.buffer - const hex = parsed.key.hex - const z32 = parsed.key.z32 - const corestore = this.#getCorestore(null, null) - bundle = new Bundle({ corestore, key }) - await bundle.ready() - if (display(showKey)) yield { tag: 'retrieving', data: { hex, z32 } } - } else if (channel) { - const ctx = new Context({ argv: [`--channel=${channel}`], dir }) - const corestore = this.#getCorestore(ctx.name, channel) - bundle = new Bundle({ corestore, channel }) - await bundle.ready() - const hex = bundle.drive.key.toString('hex') - const z32 = hypercoreid.encode(bundle.drive.key) - if (display(showKey)) yield { tag: 'retrieving', data: { hex, z32 } } - } else if (this.drive.key) { - const hex = this.drive.key.toString('hex') - const z32 = hypercoreid.encode(this.drive.key) - if (display(showKey)) yield { tag: 'retrieving', data: { hex, z32 } } - } - await this.ready() - if (bundle) { - await session.add(bundle) - await bundle.join(this.swarm) - } - const drive = bundle?.drive || this.drive - - if (drive.key && drive.contentKey && drive.discoveryKey) { - if (display(metadata)) { - yield { - tag: 'keys', - data: { - project: drive.key.toString('hex'), - content: drive.contentKey.toString('hex'), - discovery: drive.discoveryKey.toString('hex') - } - } - } + stage (params, client) { return new Stage(params, client, this) } - const channel = (await drive.db.get('channel'))?.value - const release = (await drive.db.get('release'))?.value - const manifest = (await drive.db.get('manifest'))?.value - const name = manifest?.pear?.name || manifest?.holepunch?.name || manifest.name - if (display(metadata)) yield { tag: 'info', data: { channel, release, name, live: bundle?.live || false } } - } + dump (params, client) { return new Dump(params, client, this) } - const contents = await drive.get('/CHANGELOG.md') + info (params, client) { return new Info(params, client, this) } - const type = full ? 'full' : 'latest' - const showChangelog = display(changelog) || full ? type : false - const blank = '[ No Changelog ]' - const parsed = showChangelog === 'latest' - ? (await clog.parse(contents).at(0)?.[1]) || blank - : showChangelog === 'full' - ? (await clog.parse(contents).map(entry => entry[1]).join('\n\n')) || blank - : blank + shift (params, client) { return new Shift(params, client, this) } - if (showChangelog) yield { tag: 'changelog', data: { changelog: parsed, full } } - } catch (err) { - yield { tag: 'error', data: { ...err, stack: err.stack, message: err.message, code: err.code } } - } finally { - yield null - if (bundle) await bundle.close() - await session.close() - } + gc ({ pid, resource }, client) { + const gc = new GarbageCollector(client, this) + if (resource === 'release') gc.release({ resource }) + else if (resource === 'sidecar') gc.sidecar({ pid, resource }) + else throw ERR_INVALID_GC_RESOURCE('Invalid resource to gc: ' + resource) + return gc } - async * warmup (params, client) { + warmup (params, client) { if (!client.userData) return return client.userData.warmup(params) } - async * warming (params, client) { + warming (params, client) { if (!client.userData) return - for await (const { data } of client.userData.warming) yield data + const stream = new DataStream() + streamx.pipeline(client.userData.warming, stream) + return stream } async versions (params, client) { return { platform: this.version, app: client.userData?.ctx?.version } } - async * reports (params, client) { + reports (params, client) { if (!client.userData) return - for await (const { data: report } of client.userData.reporter) yield report + const stream = new DataStream() + streamx.pipeline(client.userData.reporter, stream) } createReport (err, client) { if (!client.userData) { - console.error('REPORT', err) + console.trace('REPORT', err) return } + console.log('??', err) return client.userData.report({ err: { message: err.message, stack: err.stack, code: err.code, clientCreated: true } }) } @@ -1112,9 +788,9 @@ class Engine extends ReadyResource { return client.userData.message(params) } - async * messages (pattern, client) { + messages (pattern, client) { if (!client.userData) return - yield * client.userData.messages(pattern) + return client.userData.messages(pattern) } async trust ({ z32 } = {}, client) { @@ -1158,59 +834,6 @@ class Engine extends ReadyResource { closeClients () { return this.sidecar.closeClients() } - gc ({ pid, resource }, client) { - const gc = new GarbageCollector(client, this) - if (resource === 'release') gc.release({ resource }) - else if (resource === 'sidecar') gc.sidecar({ pid, resource }) - else throw ERR_INVALID_GC_RESOURCE('Invalid resource to gc: ' + resource) - return gc - } - - async * shift (params, client) { - const session = new Session(client) - try { - yield { tag: 'moving', data: { src: params.src, dst: params.dst } } - const { from, to } = await this.#moveStorage(params) - yield { tag: 'complete', data: { from, to, src: params.src, dst: params.dst } } - } catch (err) { - yield { tag: 'error', data: { ...err, stack: err.stack, message: err.message, code: err.code } } - yield null - await session.close() - } - } - - async #moveStorage (params) { - if (!params.src) throw ERR_INVALID_INPUT('src must be specified') - if (!params.dst) throw ERR_INVALID_INPUT('dst must be specified') - const src = parseLink(params.src) - const dst = parseLink(params.dst) - if (src.key === null) throw ERR_INVALID_INPUT('Invalid source app key') - if (dst.key === null) throw ERR_INVALID_INPUT('Invalid destination app key') - const byDkey = path.join(PLATFORM_DIR, 'app-storage', 'by-dkey') - const from = path.join(byDkey, discoveryKey(src.key.buffer).toString('hex')) - const to = path.join(byDkey, discoveryKey(dst.key.buffer).toString('hex')) - const exists = (path) => fs.promises.stat(path).then(() => true, () => false) - let gc = null - try { - if (await exists(from) === false) { - throw ERR_INVALID_INPUT('No app storage for found for ' + params.src) - } - if (await exists(to)) { - if (params.force) { - gc = path.join(GC, randomBytes(8).toString('hex')) - await fs.promises.rename(to, gc) - } else { - throw ERR_INVALID_INPUT('App storage for ' + params.dst + ' already exists. Use --force to overwrite') - } - } - - await fs.promises.rename(from, to) - return { from, to } - } finally { - if (gc) await fs.promises.rm(gc, { recursive: true }) - } - } - restart (params, client) { return this.sidecar.restart(params, client) } wakeup (params) { return this.sidecar.wakeup(params) } @@ -1306,7 +929,7 @@ class Engine extends ReadyResource { : null const appBundle = new Bundle({ - corestore: this.#getCorestore(ctx.manifest?.name, ctx.channel), + corestore: this._getCorestore(ctx.manifest?.name, ctx.channel), appling: ctx.appling, channel: ctx.channel, checkout: ctx.checkout, @@ -1374,7 +997,7 @@ class Engine extends ReadyResource { if (this.replicator !== null) this.replicator.join(this.swarm, { server: false, client: true }).catch(safetyCatch) } - #getCorestore (name, channel, opts) { + _getCorestore (name, channel, opts) { if (!name || !channel) return this.corestore.session({ writable: false, ...opts }) return this.corestore.namespace(`${name}~${channel}`, { writable: false, ...opts }) } @@ -1412,4 +1035,406 @@ class Engine extends ReadyResource { } } +class DataStream extends streamx.Transform { + constructor () { + super({ + objectMode: true, + transform ({ data }, cb) { + this.push(data) + cb() + } + }) + } +} + +class OpStream extends streamx.Readable { + constructor (op, params, client, engine) { + super({ + read (cb) { + const error = (err) => { + const { stack, code, message } = err + this.push({ tag: 'error', data: { stack, code, message, success: false } }) + } + const close = () => { + this.push({ tag: 'final', data: { success: true } }) + this.push(null) + cb(null) + return this.session.close() + } + op(params).catch(error).finally(close) + } + }) + this.client = client + this.engine = engine + this.session = new Session(client) + } +} + +class Release extends OpStream { + constructor (...args) { + super((...args) => this.#op(...args), ...args) + } + + async #op ({ name, channel, checkout, link, dir }) { + const key = link ? hypercoreid.decode(link) : null + + const { session } = this + + const ctx = new Context({ + id: `releaser-${randomBytes(16).toString('hex')}`, + flags: { checkout, channel, link }, + cwd: dir + }) + + await this.engine.ready() + + name = name || ctx.name + + this.push({ tag: 'releasing', data: { name, channel, link } }) + + const corestore = this.engine._getCorestore(name || ctx.name, channel, { writable: true }) + + const bundle = new Bundle({ corestore, channel, key }) + await session.add(bundle) + + if (await bundle.db.get('manifest') === null) { + this.push({ tag: 'final', data: { reason: `The "${name}" app has not been staged on ${channel ? '"' + channel + '" channel' : link}.`, success: false } }) + return + } + + const currentLength = bundle.db.feed.length + const releaseLength = checkout || currentLength + 1 + + this.push({ tag: 'updating-to', data: { currentLength, releaseLength } }) + + await bundle.db.put('release', releaseLength) + + this.push({ tag: 'released', data: { name, channel, link, length: bundle.db.feed.length } }) + } +} + +class Stage extends OpStream { + static async * trace (bundle, client) { + await bundle.ready() + const tracer = bundle.startTracing() + const sp = spawn( + DESKTOP_RUNTIME, + [BOOT, `--trace=${client.id}`, '--swap', SWAP, 'pear://' + hypercoreid.encode(bundle.drive.key)] + ) + + const onclose = () => sp.kill() + client.on('close', onclose) + + const closed = once(sp, 'exit') + client.off('close', onclose) + + const total = bundle.drive.core.length + (bundle.drive.blobs?.core.length || 0) + for await (const { blocks } of tracer) yield { total, blocks } + + const [status] = await closed + + if (status) { + const err = ERR_TRACER_FAILED('Tracer Failed!') + err.exitCode = status + throw err + } + + await bundle.finalizeTracing() + } + + constructor (...args) { super((...args) => this.#op(...args), ...args) } + + async #op ({ channel, key, dir, dryRun, name, truncate, bare = false, clientArgv, ignore = '.git,.github,.DS_Store' }) { + const { client, session, engine } = this + const ctx = new Context({ + id: `stager-${randomBytes(16).toString('hex')}`, + flags: { channel, stage: true }, + dir, + clientArgv + }) + await engine.ready() + if (key) key = hypercoreid.decode(key) + + const corestore = engine._getCorestore(name || ctx.name, channel, { writable: true }) + const bundle = new Bundle({ + key, + corestore, + channel, + truncate, + stage: true, + failure (err) { console.error(err) } + }) + await session.add(bundle) + client.userData = new engine.App({ ctx, bundle }) + + const currentVersion = bundle.version + await ctx.initialize({ bundle, dryRun }) + const z32 = hypercoreid.encode(bundle.drive.key) + await engine.trust({ z32 }, client) + const type = ctx.manifest.pear?.type || 'desktop' + const terminalBare = type === 'terminal' + if (terminalBare) bare = true + if (ctx.manifest.pear?.stage?.ignore) ignore = ctx.manifest.pear.stage?.ignore + else ignore = (Array.isArray(ignore) ? ignore : ignore.split(',')) + + ignore = ignore.map((file) => unixPathResolve('/', file)) + const release = (await bundle.db.get('release'))?.value || 0 + const pearkey = 'pear://' + z32 + + this.push({ tag: 'staging', data: { name: ctx.name, channel: bundle.channel, key: pearkey, current: currentVersion, release } }) + + if (dryRun) this.push({ tag: 'dry' }) + + const root = unixPathResolve(ctx.dir) + const main = unixPathResolve('/', ctx.main) + const src = new LocalDrive(root, { followLinks: bare === false, metadata: new Map() }) + const dst = bundle.drive + const opts = { filter: (key) => ignore.some((path) => key.startsWith(path)) === false, dryRun, batch: true } + const builtins = terminalBare ? gunk.bareBuiltins : gunk.builtins + const linker = new ScriptLinker(src, { builtins }) + const entrypoints = [main, ...(ctx.manifest.pear?.stage?.entrypoints || [])].map((entry) => unixPathResolve('/', entry)) + const mods = await linker.warmup(entrypoints) + for await (const [filename, mod] of mods) src.metadata.put(filename, mod.cache()) + const mirror = new Mirror(src, dst, opts) + for await (const diff of mirror) { + if (diff.op === 'add') { + this.push({ tag: 'byte-diff', data: { type: 1, sizes: [diff.bytesAdded], message: diff.key } }) + } else if (diff.op === 'change') { + this.push({ tag: 'byte-diff', data: { type: 0, sizes: [-diff.bytesRemoved, diff.bytesAdded], message: diff.key } }) + } else if (diff.op === 'remove') { + this.push({ tag: 'byte-diff', data: { type: -1, sizes: [-diff.bytesRemoved], message: diff.key } }) + } + } + this.push({ + tag: 'summary', + data: { + files: mirror.count.files, + add: mirror.count.add, + remove: mirror.count.remove, + change: mirror.count.change + } + }) + + if (dryRun || bare) { + const reason = dryRun ? 'dry-run' : 'bare' + this.push({ tag: 'skipping', data: { reason, success: true } }) + } else if (mirror.count.add || mirror.count.remove || mirror.count.change) { + for await (const { blocks, total } of this.constructor.trace(bundle, client)) { + this.push({ tag: 'warming', data: { blocks, total } }) + } + this.push({ tag: 'warming', data: { success: true } }) + } else { + this.push({ tag: 'skipping', data: { reason: 'no changes', success: true } }) + } + + this.push({ tag: 'complete', data: { dryRun } }) + + if (dryRun) return + + this.push({ tag: 'addendum', data: { version: bundle.version, release, channel, key: pearkey } }) + } +} + +class Seed extends OpStream { + constructor (...args) { super((...args) => this.#op(...args), ...args) } + + async #op ({ name, channel, link, verbose, seeders, dir, clientArgv } = {}) { + const { client, session } = this + const ctx = new Context({ + id: `seeder-${randomBytes(16).toString('hex')}`, + flags: { channel, link }, + dir, + clientArgv + }) + client.userData = new this.engine.App({ ctx, session }) + + this.push({ tag: 'seeding', data: { key: link, name, channel } }) + await this.engine.ready() + + const corestore = this.engine._getCorestore(name, channel) + const key = link ? hypercoreid.decode(link) : null + if (key !== null && await Bundle.provisioned(corestore, key) === false) { + throw ERR_PLATFORM_ERROR('Pear Platform: Nothing to seed') + } + + const log = (msg) => this.engine.bus.pub({ topic: 'seed', id: client.id, msg }) + const notices = this.engine.bus.sub({ topic: 'seed', id: client.id }) + const bundle = new Bundle({ corestore, key, channel, log }) + await session.add(bundle) + + if (verbose) { + this.push({ tag: 'meta-key', data: bundle.drive.key.toString('hex') }) + this.push({ tag: 'meta-discovery-key', data: bundle.drive.discoveryKey.toString('hex') }) + this.push({ tag: 'content-key', data: bundle.drive.contentKey.toString('hex') }) + } + + this.push({ tag: 'key', data: hypercoreid.encode(bundle.drive.key) }) + + await bundle.join(this.engine.swarm, { seeders, server: true }) + + for await (const { msg } of notices) this.push(msg) + // no need for teardown, seed is tied to the lifecycle of the client + } +} + +class Dump extends OpStream { + constructor (...args) { super((...args) => this.#op(...args), ...args) } + + async #op ({ link, dir, checkout }) { + const { session, engine } = this + await engine.ready() + const key = link ? hypercoreid.decode(link) : null + checkout = Number(checkout) + const corestore = engine._getCorestore(null, null) + const bundle = new Bundle({ corestore, key, checkout }) + + await session.add(bundle) + + if (engine.swarm) bundle.join(engine.swarm) + + const pearkey = 'pear://' + hypercoreid.encode(bundle.drive.key) + + this.push({ tag: 'dumping', data: { key: pearkey, dir } }) + + try { + await bundle.calibrate() + } catch (err) { + await session.close() + throw err + } + + const out = unixPathResolve(dir) + const dst = new LocalDrive(out) + const src = bundle.drive + + const mirror = new Mirror(src, dst) + + for await (const diff of mirror) { + if (diff.op === 'add') { + this.push({ tag: 'byte-diff', data: { type: 1, sizes: [diff.bytesAdded], message: diff.key } }) + } else if (diff.op === 'change') { + this.push({ tag: 'byte-diff', data: { type: 0, sizes: [-diff.bytesRemoved, diff.bytesAdded], message: diff.key } }) + } else if (diff.op === 'remove') { + this.push({ tag: 'byte-diff', data: { type: -1, sizes: [-diff.bytesRemoved], message: diff.key } }) + } + } + } +} + +class Info extends OpStream { + constructor (...args) { + super((...args) => this.#op(...args), ...args) + } + + async #op ({ link, channel, dir, showKey, metadata, changelog, full } = {}) { + const { session } = this + let bundle = null + const anyFlag = [changelog, full, metadata, showKey].some(flag => flag === true) + const isEnabled = (flag) => anyFlag ? !!flag : flag !== false + if (link) { + const parsed = parseLink(link) + const key = parsed.key.buffer + const hex = parsed.key.hex + const z32 = parsed.key.z32 + const corestore = this.engine._getCorestore(null, null) + bundle = new Bundle({ corestore, key }) + await bundle.ready() + if (isEnabled(showKey)) this.push({ tag: 'retrieving', data: { hex, z32 } }) + } else if (channel) { + const ctx = new Context({ flags: { channel, link }, dir }) + const corestore = this.engine._getCorestore(ctx.name, channel) + bundle = new Bundle({ corestore, channel }) + await bundle.ready() + const hex = bundle.drive.key.toString('hex') + const z32 = hypercoreid.encode(bundle.drive.key) + if (isEnabled(showKey)) this.push({ tag: 'retrieving', data: { hex, z32 } }) + } else if (this.engine.drive.key) { + const hex = this.engine.drive.key.toString('hex') + const z32 = hypercoreid.encode(this.engine.drive.key) + if (isEnabled(showKey)) this.push({ tag: 'retrieving', data: { hex, z32 } }) + } + await this.engine.ready() + if (bundle) { + await session.add(bundle) + await bundle.join(this.engine.swarm) + } + const drive = bundle?.drive || this.engine.drive + + if (drive.key && drive.contentKey && drive.discoveryKey) { + if (isEnabled(metadata)) { + this.push({ + tag: 'keys', + data: { + project: drive.key.toString('hex'), + content: drive.contentKey.toString('hex'), + discovery: drive.discoveryKey.toString('hex') + } + }) + } + + const channel = (await drive.db.get('channel'))?.value + const release = (await drive.db.get('release'))?.value + const manifest = (await drive.db.get('manifest'))?.value + const name = manifest?.pear?.name || manifest?.holepunch?.name || manifest.name + if (isEnabled(metadata)) this.push({ tag: 'info', data: { channel, release, name } }) + } + + const contents = await drive.get('/CHANGELOG.md') + + const type = full ? 'full' : 'latest' + const showChangelog = isEnabled(changelog) || full ? type : false + const blank = '[ No Changelog ]' + const parsed = showChangelog === 'latest' + ? (clog.parse(contents).at(0)?.[1]) || blank + : showChangelog === 'full' + ? (clog.parse(contents).map(entry => entry[1]).join('\n\n')) || blank + : blank + + if (showChangelog) this.push({ tag: 'changelog', data: { changelog: parsed, full } }) + } +} + +class Shift extends OpStream { + constructor (...args) { + super((...args) => this.#op(...args), ...args) + } + + async #op ({ src, dst, force }) { + let from = null + let to = null + this.push({ tag: 'moving', data: { src, dst } }) + + if (!src) throw ERR_INVALID_INPUT('src must be specified') + if (!dst) throw ERR_INVALID_INPUT('dst must be specified') + const srcKey = parseLink(src)?.key + const dstKey = parseLink(dst)?.key + if (!srcKey) throw ERR_INVALID_INPUT('Invalid source app key') + if (!dstKey) throw ERR_INVALID_INPUT('Invalid destination app key') + const byDkey = path.join(PLATFORM_DIR, 'app-storage', 'by-dkey') + from = path.join(byDkey, discoveryKey(srcKey.buffer).toString('hex')) + to = path.join(byDkey, discoveryKey(dstKey.buffer).toString('hex')) + const exists = (path) => fs.promises.stat(path).then(() => true, () => false) + let gc = null + try { + if (await exists(from) === false) { + throw ERR_INVALID_INPUT('No app storage for found for ' + src) + } + if (await exists(to)) { + if (force) { + gc = path.join(GC, randomBytes(8).toString('hex')) + await fs.promises.rename(to, gc) + } else { + throw ERR_INVALID_INPUT('App storage for ' + dst + ' already exists. Use --force to overwrite') + } + } + + await fs.promises.rename(from, to) + } finally { + if (gc) await fs.promises.rm(gc, { recursive: true }) + } + + this.push({ tag: 'complete', data: { from, to, src, dst } }) + } +} + module.exports = Sidecar diff --git a/run/index.js b/run/index.js index a64c0c608..f03891473 100644 --- a/run/index.js +++ b/run/index.js @@ -27,7 +27,7 @@ module.exports = async function run ({ ipc, args, link, storage, detached, flags key = parseLink(link).key if (key !== null && link.startsWith('pear://') === false) { - throw new ERR_INVALID_INPUT('Key must start with pear://') + throw ERR_INVALID_INPUT('Key must start with pear://') } const cwd = os.cwd() @@ -43,7 +43,7 @@ module.exports = async function run ({ ipc, args, link, storage, detached, flags try { JSON.parse(fs.readFileSync(path.join(dir, 'package.json'))) } catch (err) { - throw new ERR_INVALID_INPUT(`A valid package.json file must exist at: "${dir}"`, { showUsage: false }) + throw ERR_INVALID_INPUT(`A valid package.json file must exist at: "${dir}"`, { showUsage: false }) } } @@ -88,9 +88,7 @@ module.exports = async function run ({ ipc, args, link, storage, detached, flags ipc.close().catch(console.error) return stream } - const { startId, host, id, type = 'desktop', bundle, bail } = await ipc.start({ flags, env: ENV, dir, link, args: appArgs }) - if (bail && args.indexOf('--detach') === -1) { const err = ERR_PERMISSION_REQUIRED('Permission required to run key') err.key = key