Skip to content

Commit

Permalink
feat: migrate bus implementation
Browse files Browse the repository at this point in the history
Closes #4
  • Loading branch information
RomainLanz committed Mar 8, 2024
1 parent 2f141d5 commit ff763c0
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 77 deletions.
2 changes: 1 addition & 1 deletion index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

export { configure } from './configure.js'
export { Transmit } from './src/transmit.js'
export { RedisTransport } from './src/transports/redis_transport.js'
export { defineConfig } from './src/define_config.js'
export { stubsRoot } from './stubs/main.js'
export { redis } from '@rlanz/bus/drivers/redis'
25 changes: 13 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,29 @@
},
"dependencies": {
"@poppinss/matchit": "^3.1.2",
"@poppinss/utils": "^6.7.1",
"emittery": "^1.0.1"
"@poppinss/utils": "^6.7.2",
"@rlanz/bus": "^0.0.5",
"emittery": "^1.0.3"
},
"devDependencies": {
"@adonisjs/assembler": "^7.1.0",
"@adonisjs/core": "^6.2.0",
"@adonisjs/eslint-config": "^1.2.1",
"@adonisjs/prettier-config": "^1.2.1",
"@adonisjs/redis": "^8.0.0",
"@adonisjs/tsconfig": "^1.2.1",
"@adonisjs/assembler": "^7.2.3",
"@adonisjs/core": "^6.3.1",
"@adonisjs/eslint-config": "^1.3.0",
"@adonisjs/prettier-config": "^1.3.0",
"@adonisjs/redis": "^8.0.1",
"@adonisjs/tsconfig": "^1.3.0",
"@japa/assert": "2.1.0",
"@japa/runner": "3.1.1",
"@swc/core": "1.3.105",
"@types/node": "^20.11.5",
"@types/node": "^20.11.25",
"c8": "^9.1.0",
"copyfiles": "^2.4.1",
"del-cli": "^5.1.0",
"eslint": "^8.56.0",
"eslint": "^8.57.0",
"np": "^9.2.0",
"prettier": "^3.2.4",
"prettier": "^3.2.5",
"ts-node": "^10.9.2",
"typescript": "^5.2.2"
"typescript": "^5.4.2"
},
"peerDependencies": {
"@adonisjs/core": "^6.2.0",
Expand Down
18 changes: 9 additions & 9 deletions providers/transmit_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,21 @@

import '../src/types/extended.js'
import { Transmit } from '../src/transmit.js'
import { RedisTransport } from '../src/transports/redis_transport.js'
import type { ApplicationService } from '@adonisjs/core/types'
import type { TransmitConfig, Transport } from '../src/types/main.js'
import type { TransmitConfig } from '../src/types/main.js'
import type { Transport } from '@rlanz/bus/types/main'

export default class TransmitProvider {
constructor(protected app: ApplicationService) {}

register() {
this.app.container.singleton(RedisTransport, async () => {
const redis = await this.app.container.make('redis')

return new RedisTransport(redis)
})

this.app.container.singleton('transmit', async () => {
const config = this.app.config.get<TransmitConfig>('transmit', {})

let transport: Transport | null = null

if (config.transport) {
transport = await this.app.container.make(config.transport.driver)
transport = config.transport.driver()
}

return new Transmit(config, transport)
Expand Down Expand Up @@ -70,4 +64,10 @@ export default class TransmitProvider {
return ctx.response.noContent()
})
}

async shutdown() {
const transmit = await this.app.container.make('transmit')

await transmit.closeBusConnection()
}
}
39 changes: 23 additions & 16 deletions src/transmit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
*/

import Emittery from 'emittery'
import { Bus } from '@rlanz/bus'
import string from '@poppinss/utils/string'
import { Stream } from './stream.js'
import { StreamChannelRepository } from './stream_channel_repository.js'
import { SecureChannelStore } from './secure_channel_store.js'
import type { HttpContext } from '@adonisjs/core/http'
import type { Broadcastable, TransmitConfig, Transport } from './types/main.js'
import type { Transport } from '@rlanz/bus/types/main'
import type { Broadcastable, TransmitConfig } from './types/main.js'

interface TransmitLifecycleHooks {
connect: { uid: string; ctx: HttpContext }
Expand Down Expand Up @@ -44,7 +46,7 @@ export class Transmit {
* The transport provider to synchronize messages and subscriptions
* across multiple instance.
*/
#transport: Transport | null
readonly #bus: Bus | null

/**
* The config for the transmit instance.
Expand All @@ -60,15 +62,18 @@ export class Transmit {
this.#config = config
this.#storage = new StreamChannelRepository()
this.#secureChannelStore = new SecureChannelStore()
this.#transport = transport
this.#bus = transport ? new Bus(transport, { retryQueue: { enabled: true } }) : null
this.#emittery = new Emittery()

// @ts-ignore
void this.#transport?.subscribe(this.#config.transport.channel, (message) => {
const { channel, payload } = JSON.parse(message)
void this.#bus?.subscribe<{ channel: string; payload: Broadcastable }>(
// TODO: Create a computed config type
this.#config.transport!.channel!,
(message) => {
const { channel, payload } = message

void this.#broadcastLocally(channel, payload)
})
void this.#broadcastLocally(channel, payload)
}
)

if (this.#config.pingInterval) {
const intervalValue =
Expand Down Expand Up @@ -188,18 +193,20 @@ export class Transmit {
payload = {}
}

if (this.#transport) {
void this.#transport.send(this.#config.transport!.channel!, {
channel,
payload,
})
} else {
this.#broadcastLocally(channel, payload)
}
void this.#bus?.publish(this.#config.transport!.channel!, {
channel,
payload,
})

this.#broadcastLocally(channel, payload)

void this.#emittery.emit('broadcast', { channel, payload })
}

closeBusConnection() {
return this.#bus?.disconnect()
}

on<T extends keyof TransmitLifecycleHooks>(
event: T,
callback: (payload: TransmitLifecycleHooks[T]) => void
Expand Down
31 changes: 0 additions & 31 deletions src/transports/redis_transport.ts

This file was deleted.

12 changes: 4 additions & 8 deletions src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* file that was distributed with this source code.
*/

import type { TransportFactory } from '@rlanz/bus/types/main'

/**
* A Duration can be a number in milliseconds or a string formatted as a duration
*
Expand All @@ -19,15 +21,9 @@ export type Duration = number | string
/**
* A Broadcastable is a value that can be broadcasted to other clients
*/
export type Broadcastable = Record<string, unknown> | string | number | boolean | null

export interface Transport {
send: (channel: string, payload: any) => Promise<void>
subscribe: (channel: string, handler: any) => Promise<void>
unsubscribe: (channel: string) => Promise<void>
}
export type Broadcastable = { [key: string]: Broadcastable } | string | number | boolean | null

export interface TransmitConfig {
pingInterval?: Duration | false
transport: null | { driver: new (...args: any[]) => Transport; channel?: string }
transport: null | { driver: TransportFactory; channel?: string }
}

0 comments on commit ff763c0

Please sign in to comment.