Skip to content

Commit

Permalink
feat: allow to broadcast other type of data than object
Browse files Browse the repository at this point in the history
Closes #6
  • Loading branch information
RomainLanz committed Mar 4, 2024
1 parent 6ae2584 commit 3c2e1d4
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 17 deletions.
40 changes: 32 additions & 8 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@

import { Transform } from 'node:stream'
import type { IncomingMessage, OutgoingHttpHeaders } from 'node:http'
import type { Broadcastable } from './types/main.js'

function dataToString(data: Broadcastable): string {
if (typeof data === 'object') {
return dataToString(JSON.stringify(data))
}

if (typeof data === 'number' || typeof data === 'boolean') {
return `data: ${data}\n`
}

function dataString(data: string | object): string {
if (typeof data === 'object') return dataString(JSON.stringify(data))
return data
.split(/\r\n|\r|\n/)
.map((line) => `data: ${line}\n`)
.join('')
}

interface Message {
data: string | object
data: Broadcastable
comment?: string
event?: string
id?: string
Expand Down Expand Up @@ -83,12 +91,28 @@ export class Stream extends Transform {
_encoding: string,
callback: (error?: Error | null, data?: any) => void
) {
if (message.comment) this.push(`: ${message.comment}\n`)
if (message.event) this.push(`event: ${message.event}\n`)
if (message.id) this.push(`id: ${message.id}\n`)
if (message.retry) this.push(`retry: ${message.retry}\n`)
if (message.data) this.push(dataString(message.data))
if (message.comment) {
this.push(`: ${message.comment}\n`)
}

if (message.event) {
this.push(`event: ${message.event}\n`)
}

if (message.id) {
this.push(`id: ${message.id}\n`)
}

if (message.retry) {
this.push(`retry: ${message.retry}\n`)
}

if (message.data) {
this.push(dataToString(message.data))
}

this.push('\n')

callback()
}

Expand Down
14 changes: 5 additions & 9 deletions src/transmit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import { Stream } from './stream.js'
import { StorageBag } from './storage_bag.js'
import { SecureChannelStore } from './secure_channel_store.js'
import type { HttpContext } from '@adonisjs/core/http'
import type { TransmitConfig, Transport } from './types/main.js'
import type { Broadcastable, TransmitConfig, Transport } from './types/main.js'

interface TransmitLifecycleHooks {
connect: { uid: string; ctx: HttpContext }
disconnect: { uid: string; ctx: HttpContext }
broadcast: { channel: string; payload: Record<string, unknown> }
broadcast: { channel: string; payload: Broadcastable }
subscribe: { uid: string; channel: string; ctx: HttpContext }
unsubscribe: { uid: string; channel: string; ctx: HttpContext }
}
Expand Down Expand Up @@ -163,11 +163,7 @@ export class Transmit {
}
}

#broadcastLocally(
channel: string,
payload: Record<string, unknown>,
senderUid?: string | string[]
) {
#broadcastLocally(channel: string, payload: Broadcastable, senderUid?: string | string[]) {
const subscribers = this.#storage.findByChannel(channel)

for (const subscriber of subscribers) {
Expand All @@ -183,11 +179,11 @@ export class Transmit {
}
}

broadcastExcept(channel: string, payload: Record<string, unknown>, senderUid: string | string[]) {
broadcastExcept(channel: string, payload: Broadcastable, senderUid: string | string[]) {
return this.#broadcastLocally(channel, payload, senderUid)
}

broadcast(channel: string, payload?: Record<string, unknown>) {
broadcast(channel: string, payload?: Broadcastable) {
if (!payload) {
payload = {}
}
Expand Down
5 changes: 5 additions & 0 deletions src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
export type Duration = number | string

/**
* A Broadcastable is a value that can be broadcasted to other clients
*/
export type Broadcastable = Record<string, unknown> | string | number | boolean | null

export interface Transport {
send(channel: string, payload: any): Promise<void>
subscribe(channel: string, handler: any): Promise<void>
Expand Down
40 changes: 40 additions & 0 deletions tests/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,44 @@ test.group('Stream', () => {

stream.pipe(sink, undefined, { 'X-Foo': 'bar' })
})

test('should correctly send the data when it is an object', async ({ assert }) => {
const stream = new Stream(randomUUID())
const sink = new Sink()
stream.pipe(sink)

stream.writeMessage({ data: { channel: 'foo', payload: 'bar' } })

assert.equal(sink.content, [`:ok\n\n`, `data: {"channel":"foo","payload":"bar"}\n\n`].join(''))
})

test('should correctly send the data when it is a number', async ({ assert }) => {
const stream = new Stream(randomUUID())
const sink = new Sink()
stream.pipe(sink)

stream.writeMessage({ data: 42 })

assert.equal(sink.content, [`:ok\n\n`, `data: 42\n\n`].join(''))
})

test('should correctly send the data when it is a string', async ({ assert }) => {
const stream = new Stream(randomUUID())
const sink = new Sink()
stream.pipe(sink)

stream.writeMessage({ data: 'foo' })

assert.equal(sink.content, [`:ok\n\n`, `data: foo\n\n`].join(''))
})

test('should correctly send the data when it is a boolean', async ({ assert }) => {
const stream = new Stream(randomUUID())
const sink = new Sink()
stream.pipe(sink)

stream.writeMessage({ data: true })

assert.equal(sink.content, [`:ok\n\n`, `data: true\n\n`].join(''))
})
})

0 comments on commit 3c2e1d4

Please sign in to comment.