Skip to content

Commit

Permalink
feat: implement an async iterable request class
Browse files Browse the repository at this point in the history
  • Loading branch information
chdh committed Aug 1, 2024
1 parent 3856dc5 commit 411fb00
Show file tree
Hide file tree
Showing 4 changed files with 384 additions and 1 deletion.
254 changes: 254 additions & 0 deletions src/iterable-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// This module implements an iterable `Request` class.

import Request, { type RequestOptions } from './request';
import { type ColumnMetadata } from './token/colmetadata-token-parser';

export interface ColumnValue {
metadata: ColumnMetadata;
value: any;
}

type RowData = ColumnValue[] | Record<string, ColumnValue>; // type variant depending on config.options.useColumnNames
type ColumnMetadataDef = ColumnMetadata[] | Record<string, ColumnMetadata>; // type variant depending on config.options.useColumnNames

export interface IterableRequestOptions extends RequestOptions {
iteratorFifoSize: number;
}

/**
* The item object of the request iterator.
*/
export interface IterableRequestItem {

/**
* Row data.
*/
row: RowData;

/**
* Result set number, 1..n.
*/
resultSetNo: number;

/**
* Metadata of all columns.
*/
columnMetadata: ColumnMetadataDef;
}

type iteratorPromiseResolveFunction = (value: IteratorResult<IterableRequestItem>) => void;
type iteratorPromiseRejectFunction = (error: Error) => void;

// Internal class for the state controller logic of the iterator.
class IterableRequestController {

private request: Request;
private requestCompleted: boolean;
private requestPaused: boolean;
private error: Error | undefined;
private terminating: boolean;

private resultSetNo: number;
private columnMetadata: ColumnMetadataDef | undefined;
private fifo: IterableRequestItem[];
private fifoPauseLevel: number;
private fifoResumeLevel: number;

private promisePending: boolean;
private resolvePromise: iteratorPromiseResolveFunction | undefined;
private rejectPromise: iteratorPromiseRejectFunction | undefined;
private terminatorResolve: (() => void) | undefined;

// --- Constructor / Terminator ----------------------------------------------

constructor(request: Request, options?: IterableRequestOptions) {
this.request = request;
this.requestCompleted = false;
this.requestPaused = false;
this.terminating = false;

this.resultSetNo = 0;
this.fifo = [];
const fifoSize = options?.iteratorFifoSize ?? 1024;
this.fifoPauseLevel = fifoSize;
this.fifoResumeLevel = Math.floor(fifoSize / 2);

this.promisePending = false;

request.addListener('row', this.rowEventHandler);
request.addListener('columnMetadata', this.columnMetadataEventHandler);
}

public terminate(): Promise<void> {
this.terminating = true;
this.request.resume(); // (just to be sure)
if (this.requestCompleted || !this.request.connection) {
return Promise.resolve();

Check warning on line 86 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L86

Added line #L86 was not covered by tests
}
this.request.connection.cancel();
return new Promise<void>((resolve: () => void) => {
this.terminatorResolve = resolve;
});
}

// --- Promise logic ---------------------------------------------------------

private serveError(): boolean {
if (!this.error || !this.promisePending) {
return false;
}
this.rejectPromise!(this.error);
this.promisePending = false;
return true;

Check warning on line 102 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L100-L102

Added lines #L100 - L102 were not covered by tests
}

private serveRowItem(): boolean {
if (!this.fifo.length || !this.promisePending) {
return false;
}
const item = this.fifo.shift()!;
this.resolvePromise!({ value: item });
this.promisePending = false;
if (this.fifo.length <= this.fifoResumeLevel && this.requestPaused) {
this.request.resume();
this.requestPaused = false;
}
return true;
}

private serveRequestCompletion(): boolean {
if (!this.requestCompleted || !this.promisePending) {
return false;
}
this.resolvePromise!({ done: true, value: undefined });
this.promisePending = false;
return true;
}

private servePromise() {
if (this.serveError()) {
return;

Check warning on line 130 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L130

Added line #L130 was not covered by tests
}
if (this.serveRowItem()) {
return;
}
if (this.serveRequestCompletion()) {
return; // eslint-disable-line no-useless-return
}
}

// This promise executor is called synchronously from within Iterator.next().
public promiseExecutor = (resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction) => {
if (this.promisePending) {
throw new Error('Previous promise is still active.');

Check warning on line 143 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L143

Added line #L143 was not covered by tests
}
this.resolvePromise = resolve;
this.rejectPromise = reject;
this.promisePending = true;
this.servePromise();
};

// --- Event handlers --------------------------------------------------------

public completionCallback(error: Error | null | undefined) {
this.requestCompleted = true;
if (this.terminating) {
if (this.terminatorResolve) {
this.terminatorResolve();
}
return;
}
if (error && !this.error) {
this.error = error;

Check warning on line 162 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L162

Added line #L162 was not covered by tests
}
this.servePromise();
}

private columnMetadataEventHandler = (columnMetadata: ColumnMetadata[] | Record<string, ColumnMetadata>) => {
this.resultSetNo++;
this.columnMetadata = columnMetadata;
};

private rowEventHandler = (row: RowData) => {
if (this.requestCompleted || this.error || this.terminating) {
return;

Check warning on line 174 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L174

Added line #L174 was not covered by tests
}
if (this.resultSetNo === 0 || !this.columnMetadata) {
this.error = new Error('No columnMetadata event received before row event.');
this.servePromise();
return;

Check warning on line 179 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L177-L179

Added lines #L177 - L179 were not covered by tests
}
const item: IterableRequestItem = { row, resultSetNo: this.resultSetNo, columnMetadata: this.columnMetadata };
this.fifo.push(item);
if (this.fifo.length >= this.fifoPauseLevel && !this.requestPaused) {
this.request.pause();
this.requestPaused = true;
}
this.servePromise();
};

}

// Internal class for the iterator object which is passed to the API client.
class IterableRequestIterator implements AsyncIterator<IterableRequestItem> {

private controller: IterableRequestController;

constructor(controller: IterableRequestController) {
this.controller = controller;
}

public next(): Promise<IteratorResult<IterableRequestItem>> {
return new Promise<IteratorResult<IterableRequestItem>>(this.controller.promiseExecutor);
}

public async return(value?: any): Promise<any> {
await this.controller.terminate();
return Promise.resolve({ value, done: true }); // eslint-disable-line @typescript-eslint/return-await
}

public async throw(_exception?: any): Promise<any> {
await this.controller.terminate();
return Promise.resolve({ done: true }); // eslint-disable-line @typescript-eslint/return-await

Check warning on line 212 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L211-L212

Added lines #L211 - L212 were not covered by tests
}

}

/**
* An iterable `Request` class.
*
* This iterable version is a super class of the normal `Request` class.
*
* Usage:
* ```js
* const request = new IterableRequest("select 42, 'hello world'");
* connection.execSql(request);
* for await (const item of request) {
* console.log(item.row);
* }
* ```
*/
class IterableRequest extends Request implements AsyncIterable<IterableRequestItem> {

private iterator: IterableRequestIterator;

constructor(sqlTextOrProcedure: string | undefined, options?: IterableRequestOptions) {
super(sqlTextOrProcedure, completionCallback, options);
const controller = new IterableRequestController(this, options);
this.iterator = new IterableRequestIterator(controller);

function completionCallback(error: Error | null | undefined) {
if (controller) {
controller.completionCallback(error);
}
}
}

[Symbol.asyncIterator](): AsyncIterator<IterableRequestItem> {
return this.iterator;
}

}

export default IterableRequest;
module.exports = IterableRequest;
2 changes: 1 addition & 1 deletion src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export interface ParameterOptions {
scale?: number;
}

interface RequestOptions {
export interface RequestOptions {
statementColumnEncryptionSetting?: SQLServerStatementColumnEncryptionSetting;
}

Expand Down
2 changes: 2 additions & 0 deletions src/tedious.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import BulkLoad from './bulk-load';
import Connection, { type ConnectionAuthentication, type ConnectionConfiguration, type ConnectionOptions } from './connection';
import Request from './request';
import IterableRequest from './iterable-request';
import { name } from './library';

import { ConnectionError, RequestError } from './errors';
Expand All @@ -21,6 +22,7 @@ export {
BulkLoad,
Connection,
Request,
IterableRequest,
library,
ConnectionError,
RequestError,
Expand Down
Loading

0 comments on commit 411fb00

Please sign in to comment.