Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hash-stream-node): support file stream in readableStreamHasher #3338

Merged
merged 8 commits into from
Feb 21, 2022
1 change: 1 addition & 0 deletions packages/hash-stream-node/src/fileStreamHasher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Readable } from "stream";

import { HashCalculator } from "./HashCalculator";

// ToDo: deprecate in favor of readableStreamHasher
export const fileStreamHasher: StreamHasher<Readable> = (hashCtor: HashConstructor, fileStream: Readable) =>
new Promise((resolve, reject) => {
if (!isReadStream(fileStream)) {
Expand Down
46 changes: 46 additions & 0 deletions packages/hash-stream-node/src/fsCreateReadStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as fs from "fs";

import { fsCreateReadStream } from "./fsCreateReadStream";

jest.setTimeout(30000);

describe(fsCreateReadStream.name, () => {
const mockFileContents = fs.readFileSync(__filename, "utf8");

it("uses file descriptor if defined", (done) => {
fs.promises.open(__filename, "r").then((fd) => {
if ((fd as any).createReadStream) {
const readStream = (fd as any).createReadStream();
const readStreamCopy = fsCreateReadStream(readStream);

const chunks: Array<Buffer> = [];
readStreamCopy.on("data", (chunk) => {
chunks.push(chunk);
});
readStreamCopy.on("end", () => {
const outputFileContents = Buffer.concat(chunks).toString();
expect(outputFileContents).toEqual(mockFileContents);
done();
});
} else {
console.log(`Skipping createReadStream test as it's not available.`);
done();
}
});
});

it("uses start and end if file descriptor is not defined", (done) => {
const readStream = fs.createReadStream(__filename);
const readStreamCopy = fsCreateReadStream(readStream);

const chunks: Array<Buffer> = [];
readStreamCopy.on("data", (chunk) => {
chunks.push(chunk);
});
readStreamCopy.on("end", () => {
const outputFileContents = Buffer.concat(chunks).toString();
expect(outputFileContents).toEqual(mockFileContents);
done();
});
});
});
8 changes: 8 additions & 0 deletions packages/hash-stream-node/src/fsCreateReadStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { createReadStream, ReadStream } from "fs";

export const fsCreateReadStream = (readStream: ReadStream) =>
createReadStream(readStream.path, {
fd: (readStream as any).fd,
start: (readStream as any).start,
end: (readStream as any).end,
});
34 changes: 34 additions & 0 deletions packages/hash-stream-node/src/isFileStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { createReadStream } from "fs";
import { Readable } from "stream";

import { isFileStream } from "./isFileStream";

describe(isFileStream.name, () => {
describe("returns true if readablestream is fs.ReadStream", () => {
it("with string path", () => {
const readStream = createReadStream(__filename);
expect(isFileStream(readStream)).toStrictEqual(true);
});

it("with buffer path", () => {
const readStream = createReadStream(Buffer.from(__filename, "utf-8"));
expect(isFileStream(readStream)).toStrictEqual(true);
});

it("with filehandle", async () => {
const { promises } = await import("fs");
const fd = await promises.open(__filename, "r");
// @ts-expect-error createReadStream is added in v16.11.0
if (fd.createReadStream) {
// @ts-expect-error createReadStream is added in v16.11.0
const readStream = fd.createReadStream();
expect(isFileStream(readStream)).toStrictEqual(true);
}
});
});

it("returns false if readablestream is not an fs.ReadStream", () => {
const readableStream = new Readable({ read: (size) => {} });
expect(isFileStream(readableStream)).toStrictEqual(false);
});
});
7 changes: 7 additions & 0 deletions packages/hash-stream-node/src/isFileStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ReadStream } from "fs";
import { Readable } from "stream";

export const isFileStream = (stream: Readable): stream is ReadStream =>
typeof (stream as ReadStream).path === "string" ||
Buffer.isBuffer((stream as ReadStream).path) ||
typeof (stream as any).fd === "number";
24 changes: 23 additions & 1 deletion packages/hash-stream-node/src/readableStreamHasher.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { Hash } from "@aws-sdk/types";
import { Readable, Writable, WritableOptions } from "stream";
import { createReadStream } from "fs";
import { Readable, Writable } from "stream";

import { fsCreateReadStream } from "./fsCreateReadStream";
import { HashCalculator } from "./HashCalculator";
import { isFileStream } from "./isFileStream";
import { readableStreamHasher } from "./readableStreamHasher";

jest.mock("./fsCreateReadStream");
jest.mock("./HashCalculator");
jest.mock("./isFileStream");
jest.mock("fs");

describe(readableStreamHasher.name, () => {
const mockDigest = jest.fn();
Expand Down Expand Up @@ -38,13 +44,29 @@ describe(readableStreamHasher.name, () => {
(HashCalculator as unknown as jest.Mock).mockImplementation(
(hash) => new MockHashCalculator(hash, mockHashCalculatorWrite, mockHashCalculatorEnd)
);
(isFileStream as unknown as jest.Mock).mockReturnValue(false);
mockDigest.mockResolvedValue(mockHash);
});

afterEach(() => {
jest.clearAllMocks();
});

it("creates a copy in case of fileStream", () => {
(fsCreateReadStream as jest.Mock).mockReturnValue(
new Readable({
read: (size) => {},
})
);
(isFileStream as unknown as jest.Mock).mockReturnValue(true);

const fsReadStream = createReadStream(__filename);
readableStreamHasher(mockHashCtor, fsReadStream);

expect(isFileStream).toHaveBeenCalledWith(fsReadStream);
expect(fsCreateReadStream).toHaveBeenCalledWith(fsReadStream);
});

it("computes hash for a readable stream", async () => {
const readableStream = new Readable({
read: (size) => {},
Expand Down
9 changes: 7 additions & 2 deletions packages/hash-stream-node/src/readableStreamHasher.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import { HashConstructor, StreamHasher } from "@aws-sdk/types";
import { Readable } from "stream";

import { fsCreateReadStream } from "./fsCreateReadStream";
import { HashCalculator } from "./HashCalculator";
import { isFileStream } from "./isFileStream";

export const readableStreamHasher: StreamHasher<Readable> = (hashCtor: HashConstructor, readableStream: Readable) => {
// ToDo: throw if readableStream is already flowing and it's copy can't be created.
const streamToPipe = isFileStream(readableStream) ? fsCreateReadStream(readableStream) : readableStream;

const hash = new hashCtor();
const hashCalculator = new HashCalculator(hash);
readableStream.pipe(hashCalculator);
streamToPipe.pipe(hashCalculator);

return new Promise((resolve, reject) => {
readableStream.on("error", (err: Error) => {
streamToPipe.on("error", (err: Error) => {
// if the source errors, the destination stream needs to manually end
hashCalculator.end();
reject(err);
Expand Down