Skip to content

Commit

Permalink
Refactor the BulkFileUploader in preparation of the coming changes
Browse files Browse the repository at this point in the history
issue #67
  • Loading branch information
sedwards2009 committed Jan 15, 2018
1 parent 330bd18 commit 11c4d5a
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 45 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,4 @@ src/render_process/UploadProgressBar.js
src/render_process/PtyIpcBridge.js
src/pty/Pty.js
src/utils/DoLater.js
src/utils/ByteCountingStreamTransform.js
5 changes: 1 addition & 4 deletions src/render_process/Terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1938,10 +1938,7 @@ export class EtTerminal extends ThemeableElementBase implements Commandable, Acc
return;
}

const uploader = new BulkFileUploader(bulkFileHandle); //, this._pty);
uploader.onPtyData(text => {
this.send(text);
});
const uploader = new BulkFileUploader(bulkFileHandle, this._pty);

// Filter
const containerDiv = DomUtils.getShadowId(this, ID_CONTAINER);
Expand Down
116 changes: 75 additions & 41 deletions src/render_process/bulk_file_handling/BulkFileUploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
*
* This source code is licensed under the MIT license which is detailed in the LICENSE.txt file.
*/
import getUri = require('get-uri'); // Top level on this import is a callable, have to use other syntax.
import * as http from 'http';
import {Event} from 'extraterm-extension-api';
import {Transform, Readable} from 'stream';

import getUri = require('get-uri'); // Top level on this import is a callable, have to use other syntax.
import {BulkFileHandle} from './BulkFileHandle';
import {Event} from 'extraterm-extension-api';
import {Metadata} from '../../main_process/bulk_file_handling/BulkFileStorage';
import {ByteCountingStreamTransform} from '../../utils/ByteCountingStreamTransform';
import {EventEmitter} from '../../utils/EventEmitter';
import {Logger, getLogger} from '../../logging/Logger';
import log from '../../logging/LogDecorator';
import {Pty} from '../../pty/Pty';


const BYTES_PER_LINE = 3*1024;
const BYTES_PER_LINE = 90;

/**
* Uploads files to a remote process over shell and remote's stdin.
Expand All @@ -31,50 +36,97 @@ export class BulkFileUploader {

private _log: Logger;
private _buffer: Buffer = Buffer.alloc(0);
private _onPtyDataEventEmitter = new EventEmitter<string>();
private _onUploadedChangeEmitter = new EventEmitter<number>();
private _onFinishedEmitter = new EventEmitter<void>();
private _uploadedCount = 0;

constructor(private _bulkFileHandle: BulkFileHandle) {
constructor(private _bulkFileHandle: BulkFileHandle, private _pty: Pty) {
this._log = getLogger("BulkFileUploader", this);
this.onPtyData = this._onPtyDataEventEmitter.event;
this.onUploadedChange = this._onUploadedChangeEmitter.event;
this.onFinished = this._onFinishedEmitter.event;
}

onPtyData: Event<string>;
onUploadedChange: Event<number>;
onFinished: Event<void>;

upload(): void {
this._sendEncodedLine("metadata");
const jsonString = JSON.stringify(this._bulkFileHandle.getMetadata());
this._sendEncodedDataToPty(Buffer.from(jsonString, "utf8"));

this._sendEncodedLine("body");

const url = this._bulkFileHandle.getUrl();
if (url.startsWith("data:")) {
getUri(url, (err, stream) => {
stream.on('data', this._responseOnData.bind(this));
stream.on('end', this._responseOnEnd.bind(this));
stream.on('error', this._reponseOnError.bind(this));
const lastStage = this._configurePipeline(stream);
lastStage.on('error', this._reponseOnError.bind(this));
});
} else {
const req = http.request(<any> url, (res) => {
res.on('data', this._responseOnData.bind(this));
res.on('end', this._responseOnEnd.bind(this));
this._configurePipeline(res);
});
req.on('error', this._reponseOnError.bind(this));

req.end();
}
}


private _configurePipeline(sourceStream: NodeJS.ReadableStream): NodeJS.ReadableStream {
const countingTransform = new ByteCountingStreamTransform();
sourceStream.pipe(countingTransform);
countingTransform.onCountUpdate((count: number) => {
this._onUploadedChangeEmitter.fire(count);
});

const encodeTransform = new UploadEncodeDataTransform(this._bulkFileHandle.getMetadata());
countingTransform.pipe(encodeTransform);

encodeTransform.on('data', this._responseOnData.bind(this));
encodeTransform.on('end', this._responseOnEnd.bind(this));
return encodeTransform;
}

private _responseOnData(chunk: Buffer): void {
this._pty.write(chunk.toString("utf8"));
}

private _responseOnEnd(): void {
this._onFinishedEmitter.fire(undefined);
}

private _reponseOnError(e): void {
this._log.warn(`Problem with request: ${e.message}`);
}
}


class UploadEncodeDataTransform extends Transform {
private _log: Logger;

private _doneIntro = false;
private _buffer: Buffer = Buffer.alloc(0);

constructor(private _metadata: Metadata) {
super();
this._log = getLogger("UploadEncodeDataTransform", this);
}

protected _transform(chunk: Buffer, encoding: string, callback: Function): void {
this._appendChunkToBuffer(chunk);
if ( ! this._doneIntro) {
this._doneIntro = true;
this._sendHeader();
}
this._sendBuffer();

callback();
}

protected _flush(callback: Function): void {
this._sendEncodedDataToPty(this._buffer);
callback();
}

private _sendHeader(): void {
this._sendEncodedLine("metadata");
const jsonString = JSON.stringify(this._metadata);
this._sendEncodedDataToPty(Buffer.from(jsonString, "utf8"));
this._sendEncodedLine("body");
}

private _appendChunkToBuffer(chunk: Buffer): void {
Expand All @@ -93,7 +145,6 @@ export class BulkFileUploader {
for (let i = 0; i < lines; i++) {
const lineBuffer = this._buffer.slice(i*BYTES_PER_LINE, (i+1) * BYTES_PER_LINE);
this._sendEncodedLine(lineBuffer.toString("base64"));
this._uploadedCount += lineBuffer.length;
}

const remainder = this._buffer.length % BYTES_PER_LINE;
Expand All @@ -104,25 +155,12 @@ export class BulkFileUploader {
} else {
this._buffer = Buffer.alloc(0);
}

this._onUploadedChangeEmitter.fire(this._uploadedCount);
}

private _responseOnEnd(): void {
this._uploadedCount += this._buffer.length;
this._sendEncodedDataToPty(this._buffer);

this._onUploadedChangeEmitter.fire(this._uploadedCount);
this._onFinishedEmitter.fire(undefined);
}

private _reponseOnError(e): void {
this._log.warn(`Problem with request: ${e.message}`);
}

private _sendEncodedLine(line: string): void {
const fullLine = "#" + line + "\n";
this._sendDataToPtyEvent(fullLine);
private _sendEncodedLine(data: string): void {
const fullLine = "#" + data + "\n";
const buf = Buffer.from(fullLine, "utf8");
this.push(buf);
}

private _sendEncodedDataToPty(buffer: Buffer): void {
Expand All @@ -132,8 +170,4 @@ export class BulkFileUploader {
}
this._sendEncodedLine(""); // Terminator
}

private _sendDataToPtyEvent(text: string): void {
this._onPtyDataEventEmitter.fire(text);
}
}
52 changes: 52 additions & 0 deletions src/utils/ByteCountingStreamTransform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2018 Simon Edwards <[email protected]>
*
* This source code is licensed under the MIT license which is detailed in the LICENSE.txt file.
*/
import {Event} from 'extraterm-extension-api';
import {Transform} from 'stream';

import {DebouncedDoLater} from './DoLater';
import {EventEmitter} from './EventEmitter';


const DEBOUNCE_DELAY_MILLIS = 100;


/**
* Stream transform which emits events signaling the amount of data which
* has passed through.
*/
export class ByteCountingStreamTransform extends Transform {

private _counter = 0;
private _doLater: DebouncedDoLater = null;
private _onCountUpdateEventEmitter = new EventEmitter<number>();

onCountUpdate: Event<number>;

constructor(options?) {
super(options);
this.onCountUpdate = this._onCountUpdateEventEmitter.event;

this._doLater = new DebouncedDoLater(() => {
this._onCountUpdateEventEmitter.fire(this.getCount());
}, DEBOUNCE_DELAY_MILLIS);
}

protected _transform(chunk: any, encoding: string, callback: Function): void {
this._counter += chunk.length;
this.push(chunk);
this._doLater.trigger();
callback();
}

protected _flush(callback: Function): void {
this._doLater.doNow();
callback();
}

getCount(): number {
return this._counter;
}
}
5 changes: 5 additions & 0 deletions src/utils/DoLater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ export class DebouncedDoLater {
this._laterDisposable = null;
}
}

doNow(): void {
this.cancel();
this._callback();
}
}
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
"./src/typings/extra_lib.d.ts",
"./src/typings/ptyw.js/ptyw.js.d.ts",
"./src/typings/requirejs.d.ts",
"./src/utils/ByteCountingStreamTransform.ts",
"./src/utils/DoLater.ts",
"./src/utils/EventEmitter.ts",
"./src/utils/OwnerTrackingList.ts",
Expand Down

0 comments on commit 11c4d5a

Please sign in to comment.