Skip to content

Commit

Permalink
refactor: migrate file streaming to portable bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCalzone committed Dec 10, 2024
1 parent 7a7b868 commit 8e89e47
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 11 deletions.
42 changes: 40 additions & 2 deletions packages/core/src/bindings/fs/node.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import {
fileHandleToReadableStream,
fileHandleToWritableStream,
} from "@zwave-js/shared";
import type {
FSStats,
FileHandle,
Expand Down Expand Up @@ -56,18 +60,52 @@ export const fs: FileSystem = {
mode = flags.read ? "w+" : "w";
}

return new NodeFileHandle(await fsp.open(path, mode));
return new NodeFileHandle(
await fsp.open(path, mode),
{
read: flags.read,
write: flags.write,
},
);
},
};

export class NodeFileHandle implements FileHandle {
public constructor(handle: fsp.FileHandle) {
public constructor(
handle: fsp.FileHandle,
flags: { read: boolean; write: boolean },
) {
this.open = true;
this.handle = handle;
this.flags = flags;
}

private open: boolean;
private handle: fsp.FileHandle;
private flags: { read: boolean; write: boolean };

private _readable?: ReadableStream<Uint8Array>;
private _writable?: WritableStream<Uint8Array>;

public get readable(): ReadableStream<Uint8Array> {
if (!this.flags.read) {
throw new Error("File is not readable");
}
if (!this._readable) {
this._readable = fileHandleToReadableStream(this);
}
return this._readable;
}

public get writable(): WritableStream<Uint8Array> {
if (!this.flags.write) {
throw new Error("File is not writable");
}
if (!this._writable) {
this._writable = fileHandleToWritableStream(this);
}
return this._writable;
}

async close(): Promise<void> {
if (!this.open) return;
Expand Down
6 changes: 5 additions & 1 deletion packages/shared/src/bindings.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Definitions for runtime-agnostic low level bindings like cryptography,
// file system access, etc.

import { type ReadableWritablePair } from "node:stream/web";

export interface CryptoPrimitives {
randomBytes(length: number): Uint8Array;
/** Encrypts a payload using AES-128-ECB */
Expand Down Expand Up @@ -61,7 +63,9 @@ export interface FSStats {
size: number;
}

export interface FileHandle {
export interface FileHandle
extends ReadableWritablePair<Uint8Array, Uint8Array>
{
close(): Promise<void>;
read(
position?: number | null,
Expand Down
35 changes: 35 additions & 0 deletions packages/shared/src/fs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { type ReadableWritablePair } from "node:stream/web";
import path from "pathe";
import { Bytes } from "./Bytes.js";
import {
type CopyFile,
type FileHandle,
type ManageDirectory,
type ReadFile,
type ReadFileSystemInfo,
Expand Down Expand Up @@ -90,3 +92,36 @@ export async function pathExists(
return false;
}
}

export function fileHandleToWritableStream(
handle: Omit<FileHandle, "readable" | "writable">,
): WritableStream<Uint8Array> {
return new WritableStream<Uint8Array>({
async write(chunk) {
while (chunk.length > 0) {
const { bytesWritten } = await handle.write(chunk);
chunk = chunk.subarray(bytesWritten);
}
},
});
}

export function fileHandleToReadableStream(
handle: Omit<FileHandle, "readable" | "writable">,
): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
async pull(controller) {
const { data } = await handle.read(null, 16 * 1024);
controller.enqueue(data);
},
});
}

export function fileHandleToStreams(
handle: Omit<FileHandle, "readable" | "writable">,
): ReadableWritablePair<Uint8Array, Uint8Array> {
return {
readable: fileHandleToReadableStream(handle),
writable: fileHandleToWritableStream(handle),
};
}
1 change: 1 addition & 0 deletions packages/zwave-js/src/lib/driver/Driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,7 @@ export class Driver extends TypedEventTarget<DriverEventCallbacks>
const symlinkedPorts: string[] = [];
const localPorts: string[] = [];
const remotePorts: string[] = [];
// FIXME: Move this into the serial bindings
if (local) {
// Put symlinks to the serial ports first if possible
if (os.platform() === "linux") {
Expand Down
18 changes: 10 additions & 8 deletions packages/zwave-js/src/lib/driver/UpdateConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import {
} from "@zwave-js/shared";
import {
type CopyFile,
type FileHandle,
type ManageDirectory,
type OpenFile,
type ReadFileSystemInfo,
type WriteFile,
} from "@zwave-js/shared/bindings";
Expand Down Expand Up @@ -70,7 +72,7 @@ export async function checkForConfigUpdates(
* This only works if an external configuation directory is used.
*/
export async function installConfigUpdate(
fs: ManageDirectory & ReadFileSystemInfo & WriteFile & CopyFile,
fs: ManageDirectory & ReadFileSystemInfo & WriteFile & CopyFile & OpenFile,
newVersion: string,
external: {
configDir: string;
Expand Down Expand Up @@ -118,17 +120,17 @@ export async function installConfigUpdate(

// Download the package tarball into the temporary directory
const tarFilename = path.join(tmpDir, "zjs-config-update.tgz");
let fileHandle: fsp.FileHandle | undefined;
let fileHandle: FileHandle | undefined;
try {
fileHandle = await fsp.open(tarFilename, "w");
const writable = new WritableStream({
async write(chunk) {
await fileHandle!.write(chunk);
},
fileHandle = await fs.open(tarFilename, {
read: false,
write: true,
create: true,
truncate: true,
});

const response = await ky.get(url);
await response.body?.pipeTo(writable);
await response.body?.pipeTo(fileHandle.writable);
} catch (e) {
throw new ZWaveError(
`Config update failed: Could not download tarball. Reason: ${
Expand Down

0 comments on commit 8e89e47

Please sign in to comment.