Skip to content

Commit

Permalink
feat(bindings/nodejs): impl Readable Stream for rows (#531)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Dec 10, 2024
1 parent fa5e70c commit 750865d
Show file tree
Hide file tree
Showing 9 changed files with 1,319 additions and 277 deletions.
22 changes: 22 additions & 0 deletions bindings/nodejs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,28 @@ while (row) {
console.log(row.data());
row = await rows.next();
}

// iter rows
const rows = await conn.queryIter("SELECT * FROM test");
for await (const row of rows) {
console.log(row.values());
}

// pipe rows
import { Transform } from "node:stream";
import { finished, pipeline } from "node:stream/promises";

const rows = await conn.queryIter("SELECT * FROM test");
const stream = rows.stream();
const transformer = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(row, _, callback) {
console.log(row.data());
},
});
await pipeline(stream, transformer);
await finished(stream);
```

## Type Mapping
Expand Down
1 change: 0 additions & 1 deletion bindings/nodejs/cucumber.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"default": {
"publishQuiet": true,
"paths": ["tests/*.feature"],
"require": ["tests/*.js"]
}
Expand Down
64 changes: 61 additions & 3 deletions bindings/nodejs/generated.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,72 @@ switch (platform) {
}
break
case 'arm':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-arm-musleabihf.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-arm-musleabihf.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-arm-musleabihf')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-arm-gnueabihf.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-arm-gnueabihf.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-arm-gnueabihf')
}
} catch (e) {
loadError = e
}
}
break
case 'riscv64':
if (isMusl()) {
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-riscv64-musl.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-riscv64-musl.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-riscv64-musl')
}
} catch (e) {
loadError = e
}
} else {
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-riscv64-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-riscv64-gnu.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-riscv64-gnu')
}
} catch (e) {
loadError = e
}
}
break
case 's390x':
localFileExisted = existsSync(
join(__dirname, 'databend-driver.linux-arm-gnueabihf.node')
join(__dirname, 'databend-driver.linux-s390x-gnu.node')
)
try {
if (localFileExisted) {
nativeBinding = require('./databend-driver.linux-arm-gnueabihf.node')
nativeBinding = require('./databend-driver.linux-s390x-gnu.node')
} else {
nativeBinding = require('@databend-driver/lib-linux-arm-gnueabihf')
nativeBinding = require('@databend-driver/lib-linux-s390x-gnu')
}
} catch (e) {
loadError = e
Expand Down
28 changes: 14 additions & 14 deletions bindings/nodejs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

/* auto-generated by NAPI-RS */

export class ValueOptions {
export declare class ValueOptions {
variantAsObject: boolean
}
export class Client {
export declare class Client {
/** Create a new databend client with a given DSN. */
constructor(dsn: string, opts?: ValueOptions | undefined | null)
/** Get a connection from the client. */
getConn(): Promise<Connection>
}
export class Connection {
export declare class Connection {
/** Get the connection information. */
info(): Promise<ConnectionInfo>
/** Get the databend version. */
Expand All @@ -49,49 +49,49 @@ export class Connection {
*/
streamLoad(sql: string, data: Array<Array<string>>): Promise<ServerStats>
}
export class ConnectionInfo {
export declare class ConnectionInfo {
get handler(): string
get host(): string
get port(): number
get user(): string
get database(): string | null
get warehouse(): string | null
}
export class Schema {
export declare class Schema {
fields(): Array<Field>
}
export class Field {
export declare class Field {
get name(): string
get dataType(): string
}
export class RowIterator {
export declare class RowIterator {
/** Get Schema for rows. */
schema(): Schema
/**
* Fetch next row.
* Returns `None` if there are no more rows.
*/
next(): Promise<Error | Row | null>
/** Get Schema for rows. */
schema(): Schema
}
export class RowIteratorExt {
export declare class RowIteratorExt {
schema(): Schema
/**
* Fetch next row or stats.
* Returns `None` if there are no more rows.
*/
next(): Promise<Error | RowOrStats | null>
schema(): Schema
}
/** Must contain either row or stats. */
export class RowOrStats {
export declare class RowOrStats {
get row(): Row | null
get stats(): ServerStats | null
}
export class Row {
export declare class Row {
setOpts(opts: ValueOptions): void
values(): Array<any>
data(): Record<string, any>
}
export class ServerStats {
export declare class ServerStats {
get totalRows(): bigint
get totalBytes(): bigint
get readRows(): bigint
Expand Down
36 changes: 35 additions & 1 deletion bindings/nodejs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,40 @@

/// <reference types="node" />

const { Client } = require("./generated.js");
const { Readable } = require("node:stream");

const { Client, RowIterator } = require("./generated.js");

class RowsStream extends Readable {
constructor(reader, options) {
super({ objectMode: true, ...options });
this.reader = reader;
}

_read() {
this.reader
.next()
.then((item) => {
this.push(item);
})
.catch((e) => {
this.emit("error", e);
});
}
}

RowIterator.prototype[Symbol.asyncIterator] = async function* () {
while (true) {
const item = await this.next();
if (item === null) {
break;
}
yield item;
}
};

RowIterator.prototype.stream = function () {
return new RowsStream(this);
};

module.exports.Client = Client;
16 changes: 8 additions & 8 deletions bindings/nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@
"LICENSE"
],
"devDependencies": {
"@cucumber/cucumber": "^9.6.0",
"@napi-rs/cli": "^2.16.3",
"@types/node": "^18.14.5",
"prettier": "^3.0.3",
"typedoc": "^0.25.1",
"typescript": "^5.2.2"
"@cucumber/cucumber": "^11.1.0",
"@napi-rs/cli": "^2.18.4",
"@types/node": "^22.10.1",
"prettier": "^3.4.2",
"typedoc": "^0.27.4",
"typescript": "^5.7.2"
},
"engines": {
"node": ">= 10"
"node": ">= 16"
},
"scripts": {
"build": "napi build --platform --target=$NAPI_TARGET --release --js generated.js && node ./scripts/header.js",
Expand All @@ -72,5 +72,5 @@
"registry": "https://registry.npmjs.org/",
"access": "public"
},
"packageManager": "pnpm@9.14.2+sha512.6e2baf77d06b9362294152c851c4f278ede37ab1eba3a55fda317a4a17b209f4dbb973fb250a77abc463a341fcb1f17f17cfa24091c4eb319cda0d9b84278387"
"packageManager": "pnpm@9.15.0+sha512.76e2379760a4328ec4415815bcd6628dee727af3779aaa4c914e3944156c4299921a89f976381ee107d41f12cfa4b66681ca9c718f0668fa0831ed4c6d8ba56c"
}
Loading

0 comments on commit 750865d

Please sign in to comment.