Skip to content

Commit

Permalink
deps!: update stream types
Browse files Browse the repository at this point in the history
libp2p streams are now explicit about the types of sync/sources they provide, showing that they are `AsyncGenerator`s and not just
`AsyncIterable`s.

Refs: achingbrain/it-stream-types#45

BREAKING CHANGE: the type of the source/sink properties have changed
  • Loading branch information
achingbrain committed Apr 18, 2023
1 parent 63d1cb8 commit 5460fd5
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 76 deletions.
10 changes: 7 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
node_modules
.nyc_output
build
dist
coverage
package-lock.json
.docs
.coverage
node_modules
package-lock.json
yarn.lock
.vscode
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# it-ws <!-- omit in toc -->

[![codecov](https://img.shields.io/codecov/c/github/alanshaw/it-ws.svg?style=flat-square)](https://codecov.io/gh/alanshaw/it-ws)
[![CI](https://img.shields.io/github/workflow/status/alanshaw/it-ws/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/alanshaw/it-ws/actions/workflows/js-test-and-release.yml)
[![CI](https://img.shields.io/github/actions/workflow/status/alanshaw/it-ws/js-test-and-release.yml?branch=master\&style=flat-square)](https://github.com/alanshaw/it-ws/actions/workflows/js-test-and-release.yml?query=branch%3Amaster)

> Simple async iterables for websocket client connections
## Table of contents <!-- omit in toc -->

- [Install](#install)
- [Browser `<script>` tag](#browser-script-tag)
- [Usage](#usage)
- [Example - client](#example---client)
- [Example - server](#example---server)
Expand All @@ -21,14 +22,22 @@
- [`import sink from 'it-ws/sink'`](#import-sink-from-it-wssink)
- [`import source from 'it-ws/source'`](#import-source-from-it-wssource)
- [License](#license)
- [Contribute](#contribute)
- [Contribution](#contribution)

## Install

```console
$ npm i it-ws
```

### Browser `<script>` tag

Loading this module through a script tag will make it's exports available as `ItWs` in the global namespace.

```html
<script src="https://unpkg.com/it-ws/dist/index.min.js"></script>
```

## Usage

### Example - client
Expand Down Expand Up @@ -259,6 +268,6 @@ Licensed under either of
- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT ([LICENSE-MIT](LICENSE-MIT) / <http://opensource.org/licenses/MIT>)

## Contribute
## Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
19 changes: 10 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
},
"files": [
"src",
"dist/src",
"dist",
"!dist/test",
"!**/*.tsbuildinfo"
],
Expand Down Expand Up @@ -176,26 +176,27 @@
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:node": "aegir test -t node --cov",
"test:electron-main": "aegir test -t electron-main",
"release": "aegir release"
"release": "aegir release",
"docs": "aegir docs"
},
"dependencies": {
"event-iterator": "^2.0.0",
"iso-url": "^1.1.2",
"it-stream-types": "^1.0.2",
"it-stream-types": "^2.0.1",
"uint8arrays": "^4.0.2",
"ws": "^8.4.0"
},
"devDependencies": {
"@types/ws": "^8.2.2",
"aegir": "^37.0.15",
"aegir": "^38.1.8",
"delay": "^5.0.0",
"it-all": "^2.0.0",
"it-drain": "^2.0.0",
"it-foreach": "^1.0.0",
"it-all": "^3.0.1",
"it-drain": "^3.0.1",
"it-foreach": "^2.0.2",
"it-goodbye": "^4.0.0",
"it-map": "^2.0.0",
"it-map": "^3.0.2",
"it-ndjson": "^1.0.0",
"it-pipe": "^2.0.3",
"it-pipe": "^3.0.1",
"p-defer": "^4.0.0",
"wherearewe": "^2.0.1",
"wsurl": "^1.0.0"
Expand Down
6 changes: 3 additions & 3 deletions src/duplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import source from './source.js'
import sink from './sink.js'
import type WebSocket from './web-socket.js'
import type { SinkOptions } from './sink.js'
import type { Duplex } from 'it-stream-types'
import type { Duplex, Source } from 'it-stream-types'

export interface DuplexWebSocket extends Duplex<Uint8Array, Uint8Array, Promise<void>> {
export interface DuplexWebSocket extends Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>> {
connected: () => Promise<void>
localAddress?: string
localPort?: number
Expand Down Expand Up @@ -43,7 +43,7 @@ export default (socket: WebSocket, options?: DuplexWebSocketOptions): DuplexWebS
const duplex: DuplexWebSocket = {
sink: sink(socket, options),
source: connectedSource,
connected: async () => await connectedSource.connected(),
connected: async () => { await connectedSource.connected() },
close: async () => {
if (socket.readyState === socket.CONNECTING || socket.readyState === socket.OPEN) {
await new Promise<void>((resolve) => {
Expand Down
10 changes: 5 additions & 5 deletions src/ready.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ErrorEvent, WebSocket } from 'ws'

export default (socket: WebSocket) => {
export default async (socket: WebSocket): Promise<void> => {
// if the socket is closing or closed, return end
if (socket.readyState >= 2) {
throw new Error('socket closed')
Expand All @@ -11,18 +11,18 @@ export default (socket: WebSocket) => {
return
}

return new Promise<void>((resolve, reject) => {
function cleanup () {
await new Promise<void>((resolve, reject) => {
function cleanup (): void {
socket.removeEventListener('open', handleOpen)
socket.removeEventListener('error', handleErr)
}

function handleOpen () {
function handleOpen (): void {
cleanup()
resolve()
}

function handleErr (event: ErrorEvent) {
function handleErr (event: ErrorEvent): void {
cleanup()
reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`))
}
Expand Down
20 changes: 10 additions & 10 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,38 @@ class Server extends EventEmitter {
opts = opts ?? {}
this.server = server
this.wsServer = new WSServer({
server: server,
server,
perMessageDeflate: false,
verifyClient: opts.verifyClient
})
this.wsServer.on('connection', this.onWsServerConnection.bind(this))
}

async listen (addrInfo: { port: number } | number) {
async listen (addrInfo: { port: number } | number): Promise<WebSocketServer> {
return await new Promise<WebSocketServer>((resolve, reject) => {
this.wsServer.once('error', (e) => reject(e))
this.wsServer.once('listening', () => resolve(this))
this.wsServer.once('error', (e) => { reject(e) })
this.wsServer.once('listening', () => { resolve(this) })
this.server.listen(typeof addrInfo === 'number' ? addrInfo : addrInfo.port)
})
}

async close () {
return await new Promise<void>((resolve, reject) => {
async close (): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err != null) {
return reject(err)
reject(err); return
}

resolve()
})
})
}

address () {
address (): string | AddressInfo | null {
return this.server.address()
}

onWsServerConnection (socket: WebSocket, req: http.IncomingMessage) {
onWsServerConnection (socket: WebSocket, req: http.IncomingMessage): void {
const addr = this.wsServer.address()

if (typeof addr === 'string') {
Expand Down Expand Up @@ -96,7 +96,7 @@ export function createServer (opts?: ServerOptions): WebSocketServer {
wss.on('connection', opts.onConnection)
}

function proxy (server: http.Server, event: string) {
function proxy (server: http.Server, event: string): http.Server {
return server.on(event, (...args: any[]) => {
wss.emit(event, ...args)
})
Expand Down
10 changes: 5 additions & 5 deletions src/sink.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import ready from './ready.js'
import type { WebSocket } from 'ws'
import type { Sink } from 'it-stream-types'
import type { Sink, Source } from 'it-stream-types'

export interface SinkOptions {
closeOnEnd?: boolean
}

export default (socket: WebSocket, options: SinkOptions) => {
export default (socket: WebSocket, options: SinkOptions): Sink<Source<Uint8Array>, Promise<void>> => {
options = options ?? {}
options.closeOnEnd = options.closeOnEnd !== false

const sink: Sink<Uint8Array, Promise<void>> = async source => {
const sink: Sink<Source<Uint8Array>, Promise<void>> = async source => {
for await (const data of source) {
try {
await ready(socket)
Expand All @@ -23,7 +23,7 @@ export default (socket: WebSocket, options: SinkOptions) => {
}

if (options.closeOnEnd != null && socket.readyState <= 1) {
return await new Promise((resolve, reject) => {
await new Promise<void>((resolve, reject) => {
socket.addEventListener('close', event => {
if (event.wasClean || event.code === 1006) {
resolve()
Expand All @@ -33,7 +33,7 @@ export default (socket: WebSocket, options: SinkOptions) => {
}
})

setTimeout(() => socket.close())
setTimeout(() => { socket.close() })
})
}
}
Expand Down
52 changes: 27 additions & 25 deletions src/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,42 @@ function isArrayBuffer (obj: any): obj is ArrayBuffer {
(obj?.constructor?.name === 'ArrayBuffer' && typeof obj?.byteLength === 'number')
}

export interface ConnectedSource extends AsyncIterable<Uint8Array> {
export interface ConnectedSource extends AsyncGenerator<Uint8Array> {
connected: () => Promise<void>
}

export default (socket: WebSocket): ConnectedSource => {
socket.binaryType = 'arraybuffer'

const connected = async () => await new Promise<void>((resolve, reject) => {
if (isConnected) {
return resolve()
}
if (connError != null) {
return reject(connError)
}

const cleanUp = (cont: () => void) => {
socket.removeEventListener('open', onOpen)
socket.removeEventListener('error', onError)
cont()
}

const onOpen = () => cleanUp(resolve)
const onError = (event: ErrorEvent) => {
cleanUp(() => reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`)))
}

socket.addEventListener('open', onOpen)
socket.addEventListener('error', onError)
})
const connected = async (): Promise<void> => {
await new Promise<void>((resolve, reject) => {
if (isConnected) {
resolve(); return
}
if (connError != null) {
reject(connError); return
}

const cleanUp = (cont: () => void): void => {
socket.removeEventListener('open', onOpen)
socket.removeEventListener('error', onError)
cont()
}

const onOpen = (): void => { cleanUp(resolve) }
const onError = (event: ErrorEvent): void => {
cleanUp(() => { reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`)) })
}

socket.addEventListener('open', onOpen)
socket.addEventListener('error', onError)
})
}

const source = (async function * () {
const messages = new EventIterator<Uint8Array>(
({ push, stop, fail }) => {
const onMessage = (event: MessageEvent) => {
const onMessage = (event: MessageEvent): void => {
let data: Uint8Array | null = null

if (typeof event.data === 'string') {
Expand All @@ -64,7 +66,7 @@ export default (socket: WebSocket): ConnectedSource => {

push(data)
}
const onError = (event: ErrorEvent) => fail(event.error ?? new Error('Socket error'))
const onError = (event: ErrorEvent): void => { fail(event.error ?? new Error('Socket error')) }

socket.addEventListener('message', onMessage)
socket.addEventListener('error', onError)
Expand Down
2 changes: 1 addition & 1 deletion src/ws-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ import { relative } from 'iso-url'
const map = { http: 'ws', https: 'wss' }
const def = 'ws'

export default (url: string, location: string | Partial<Location>) => relative(url, location, map, def)
export default (url: string, location: string | Partial<Location>): string => relative(url, location, map, def)
2 changes: 1 addition & 1 deletion test/echo-inline.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('simple echo server', () => {
[1, 2, 3],
// need a delay, because otherwise ws hangs up wrong.
// otherwise use pull-goodbye.
(source) => map(source, async val => await new Promise(resolve => setTimeout(() => resolve(val), 10))),
(source) => map(source, async val => await new Promise(resolve => setTimeout(() => { resolve(val) }, 10))),
(source) => map(ndjson.stringify(source), str => uint8ArrayFromString(str)),
WS.connect('ws://localhost:5678'),
ndjson.parse,
Expand Down
16 changes: 9 additions & 7 deletions test/echo.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ describe('echo', () => {
pws,
goodbye({
source: expected,
sink: async source => await pipe(
source,
(source) => each(source, (value) => {
expect(value).to.equalBytes(expected.shift())
}),
drain
)
sink: async source => {
await pipe(
source,
(source) => each(source, (value) => {
expect(value).to.equalBytes(expected.shift())
}),
drain
)
}
}),
pws
)
Expand Down
4 changes: 2 additions & 2 deletions test/helpers/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { WebSocket } from 'ws'

const port = parseInt(process.env.PORT ?? '3000', 10)

export function createTestServer () {
export function createTestServer (): WebSocketServer {
const routes: Record<string, (ws: WebSocket) => void> = {
'/read': function (ws: WebSocket) {
const values = ['a', 'b', 'c', 'd']
Expand All @@ -24,7 +24,7 @@ export function createTestServer () {
})
}
}
const wss = new WebSocketServer({ port: port })
const wss = new WebSocketServer({ port })

wss.on('connection', function (ws, req) {
if (req.url == null) {
Expand Down
2 changes: 1 addition & 1 deletion test/pass-in-server.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('simple echo server', () => {
[1, 2, 3],
// need a delay, because otherwise ws hangs up wrong.
// otherwise use pull-goodbye.
(source) => map(source, async val => await new Promise(resolve => setTimeout(() => resolve(val), 10))),
(source) => map(source, async val => await new Promise(resolve => setTimeout(() => { resolve(val) }, 10))),
(source) => map(ndjson.stringify(source), str => uint8ArrayFromString(str)),
stream,
ndjson.parse,
Expand Down
Loading

0 comments on commit 5460fd5

Please sign in to comment.