Skip to content

Commit

Permalink
feat: throw error when calling toStream twice
Browse files Browse the repository at this point in the history
  • Loading branch information
freaz committed Dec 7, 2022
1 parent 4718a07 commit 62ae7e4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
23 changes: 9 additions & 14 deletions src/node/filesystem/binary.node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,17 @@ describe('Node Binary', () => {
expect(Buffer.concat(chunks).toString('utf8')).toBe('0123456789');
});

it('streams data to all returned streams', async () => {
const streamOne = reader.toStream();
const streamTwo = reader.toStream();

const chunksOne = [];
for await (const chunk of streamOne) {
chunksOne.push(chunk);
}
it('throws error when calling toStream() twice', () => {
reader.toStream();

const chunksTwo = [];
for await (const chunk of streamTwo) {
chunksTwo.push(chunk);
let error: unknown;
try {
reader.toStream();
} catch (err: unknown) {
error = err;
}

expect(Buffer.concat(chunksOne).toString('utf8')).toBe('0123456789');
expect(chunksOne).toEqual(chunksTwo);

expect(error).toBeInstanceOf(UnexpectedError);
});
});
});
Expand Down
23 changes: 17 additions & 6 deletions src/node/filesystem/binary.node.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */

import { createReadStream } from 'fs';
import type { FileHandle } from 'fs/promises';
import { open } from 'fs/promises';
Expand All @@ -19,7 +21,7 @@ import { UnexpectedError } from '../../lib';
import { handleNodeError } from './filesystem.node';

export class StreamReader {
private stream: NodeJS.ReadableStream;
private stream: NodeJS.ReadableStream | undefined;
private buffer: Buffer;
private ended = false;

Expand All @@ -39,13 +41,13 @@ export class StreamReader {
}

private hook() {
this.stream.on('data', this.dataCallback);
this.stream.on('end', this.endCallback);
this.stream!.on('data', this.dataCallback);
this.stream!.on('end', this.endCallback);
}

private unhook() {
this.stream.off('data', this.dataCallback);
this.stream.off('end', this.endCallback);
this.stream!.off('data', this.dataCallback);
this.stream!.off('end', this.endCallback);
}

private onData(chunk: Buffer) {
Expand All @@ -60,6 +62,10 @@ export class StreamReader {

// assumption: this function is never called twice without awaiting its promise in between
private async waitForData(): Promise<void> {
if (this.stream === undefined) {
throw new UnexpectedError('Stream ejected', { reason: 'Stream moved by calling toStream()' });
}

this.stream.resume();

return new Promise((resolve, reject) => {
Expand All @@ -75,7 +81,7 @@ export class StreamReader {
this.pendingReadResolve();
}

this.stream.pause();
this.stream!.pause();
this.pendingReadResolve = undefined;
}

Expand All @@ -101,6 +107,10 @@ export class StreamReader {
}

public toStream(): Readable {
if (this.stream === undefined) {
throw new UnexpectedError('Stream ejected', { reason: 'Stream moved by calling toStream()' });
}

this.unhook();

const buffer = this.buffer;
Expand All @@ -113,6 +123,7 @@ export class StreamReader {
}

this.stream.pipe(pass);
this.stream = undefined;

return pass;
}
Expand Down

0 comments on commit 62ae7e4

Please sign in to comment.