Skip to content

Latest commit

 

History

History

message-bus

message-bus

Provider independent, high-level, opinionated messaging library

installation

npm install @totalsoft/message-bus

philosophy

The message bus is a high level api for messaging communication that abstracts away from its consumers some of the involving complexity like:

  • Messaging transport (Nats, Kafka, etc)
  • Message SerDes
  • Topic Registry
  • Message envelope

transport

By default the message bus uses the Nats streaming transport. When working with other transports, you need to globally set the transport, before using the message bus api.

const { messageBus, useTransport, transport } = require('@totalsoft/message-bus');

useTransport(transport.rusi) //or transport.nats or transport.jetstream
const msgBus = messageBus() //now every message bus instance points to that transport
await msgBus.publish('some_subject', {});

Built-in transport options:

serDes

When publishing and receiving messages the message bus needs to serialize / deserialize messages. By default it uses a built-in Json serializer. Should you need to hook in your custom serializer, you can globally set your custom serializer, before using the message bus api.

const { messageBus, useSerDes, serDes } = require('@totalsoft/message-bus');

useSerDes(customize(serDes))
const msgBus = messageBus() //now every message bus instance points to that serDes
await msgBus.publish('some_subject', {});

publish

const { messageBus } = require('@totalsoft/message-bus');

const userUpdatedEvent = { userId: 5, userName:'rpopovici' }
const correlationId = 'some-correlation-id'
const tenantId = 'some-tenant-id'
const msgBus = messageBus()

await msgBus.publish('USER_UPDATED', userUpdatedEvent, {correlationId, tenantId});

subscribe

const { messageBus, SubscriptionOptions } = require('@totalsoft/message-bus');

const handler = console.log
const msgBus = messageBus()

const subscription = await msgBus.subscribe('USER_UPDATED', handler, SubscriptionOptions.STREAM_PROCESSOR)

The last optional parameter subscription options is a high level configuration of the subscription type. See below.

subscription options

When subscribing to a stream you ca opt in one of the following:

  • STREAM_PROCESSOR: typical event driven subscriptions; durable, at-least-once, within a queue group
  • PUB_SUB: lite weight, non-durable, at-most-once, within a queue group
  • RPC: lite weight, non-durable, at-most-once, without queue group, used in send-command-and-wait-for-event scenarios

request / response over messaging

const { messageBus } = require('@totalsoft/message-bus');

const correlationId = 'some-correlation-id'
const tenantId = 'some-tenant-id'

const updateUserCommand = { userId: 5, userName:'rpopovici' }
const msgBus = messageBus()

const [topic, event] = await msgBus.sendCommandAndReceiveEvent(
    'UPDATE_USER', updateUserCommand,
    ['USER_UPDATED', 'UPDATE_USER_FAILED'],
    {correlationId, tenantId}
)

environment variables

Messaging__TopicPrefix="deprecated_please_use_Messaging__Env" Messaging__Env="messaging_env" Messaging__Source="your_service_name" Messaging__Transport="jetstream_nats_or_rusi"

NATS_URL="your_nats_url" NATS_CLUSTER="your_nats_cluster" NATS_CLIENT_ID="your_nats_client_id" NATS_Q_GROUP="your_q_group" NATS_DURABLE_NAME="durable" NATS_STREAM_PROCESSOR_MaxInflight="1" NATS_STREAM_PROCESSOR_AckWait="5000" NATS_PUB_SUB_MaxInflight="100" NATS_PUB_SUB_AckWait="5000" NATS_RPC_MaxInflight="1" NATS_RPC_AckWait="5000"

RUSI_GRPC_ENDPOINT="localhost:50003" RUSI_GRPC_PORT="50003" RUSI_PUB_SUB_NAME="natsstreaming-pubsub" RUSI_STREAM_PROCESSOR_MaxConcurrentMessages="1" RUSI_STREAM_PROCESSOR_AckWaitTime="5000" RUSI_PUB_SUB_MaxConcurrentMessages="100" RUSI_PUB_SUB_AckWaitTime="5000" RUSI_RPC_MaxConcurrentMessages="1" RUSI_RPC_AckWaitTime="5000"

JETSTREAM_URL, JETSTREAM_CLIENT_ID, JETSTREAM_COMMANDS_STREAM, JETSTREAM_EVENTS_STREAM, JETSTREAM_STREAM_PROCESSOR_MaxConcurrentMessages = '1', JETSTREAM_STREAM_PROCESSOR_AckWaitTime = '5000000000', // 5 seconds JETSTREAM_PUB_SUB_MaxConcurrentMessages = '100', JETSTREAM_PUB_SUB_AckWaitTime = '5000000000', // 5 seconds JETSTREAM_RPC_MaxConcurrentMessages = '1', JETSTREAM_RPC_AckWaitTime = '5000000000' // 5 seconds