Skip to content

rabbitmq/rabbitmq-stream-go-client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RabbitMQ Stream GO Client


Build codecov

Go client for RabbitMQ Stream Queues

Table of Contents

Overview

Go client for RabbitMQ Stream Queues

Installing

 go get -u github.com/rabbitmq/rabbitmq-stream-go-client

imports:

"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly

Run server with Docker


You may need a server to test locally. Let's start the broker:

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\
    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \
    rabbitmq:3-management

The broker should start in a few seconds. When it’s ready, enable the stream plugin and stream_management:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management

Management UI: http://localhost:15672/
Stream uri: rabbitmq-stream://guest:guest@localhost:5552

Getting started for impatient

See getting started example.

Examples

See examples directory for more use cases.

Usage

Connect

Standard way to connect single node:

env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5552).
			SetUser("guest").
			SetPassword("guest"))
	CheckErr(err)

you can define the number of producers per connections, the default value is 1:

stream.NewEnvironmentOptions().
SetMaxProducersPerClient(2))

you can define the number of consumers per connections, the default value is 1:

stream.NewEnvironmentOptions().
SetMaxConsumersPerClient(2))

To have the best performance you should use the default values. Note about multiple consumers per connection: The IO threads is shared across the consumers, so if one consumer is slow it could impact other consumers performances

Multi hosts

It is possible to define multi hosts, in case one fails to connect the clients tries random another one.

addresses := []string{
		"rabbitmq-stream://guest:guest@host1:5552/%2f",
		"rabbitmq-stream://guest:guest@host2:5552/%2f",
		"rabbitmq-stream://guest:guest@host3:5552/%2f"}

env, err := stream.NewEnvironment(
			stream.NewEnvironmentOptions().SetUris(addresses))

Load Balancer

The stream client is supposed to reach all the hostnames, in case of load balancer you can use the stream.AddressResolver parameter in this way:

addressResolver := stream.AddressResolver{
		Host: "load-balancer-ip",
		Port: 5552,
	}
env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost(addressResolver.Host).
			SetPort(addressResolver.Port).
			SetAddressResolver(addressResolver).

In this configuration the client tries the connection until reach the right node.

This rabbitmq blog post explains the details.

See also "Using a load balancer" example in the examples directory

TLS

To configure TLS you need to set the IsTLS parameter:

env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5551). // standard TLS port
			SetUser("guest").
			SetPassword("guest").
			IsTLS(true).
			SetTLSConfig(&tls.Config{}),
	)

The tls.Config is the standard golang tls library https://pkg.go.dev/crypto/tls
See also "Getting started TLS" example in the examples directory.

It is also possible to configure TLS using the Schema URI like:

env, err := stream.NewEnvironment(
				stream.NewEnvironmentOptions().
				SetUri("rabbitmq-stream+tls://guest:guest@localhost:5551/").
				SetTLSConfig(&tls.Config{}),
)

Sasl Mechanisms

To configure SASL you need to set the SaslMechanism parameter Environment.SetSaslConfiguration:

cfg := new(tls.Config)
cfg.ServerName = "my_server_name"
cfg.RootCAs = x509.NewCertPool()

if ca, err := os.ReadFile("certs/ca_certificate.pem"); err == nil {
	cfg.RootCAs.AppendCertsFromPEM(ca)
}

if cert, err := tls.LoadX509KeyPair("certs/client/cert.pem", "certs/client/key.pem"); err == nil {
cfg.Certificates = append(cfg.Certificates, cert)
}

env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
	SetUri("rabbitmq-stream+tls://my_server_name:5551/").
	IsTLS(true).
	SetSaslConfiguration(stream.SaslConfigurationExternal). // SASL EXTERNAL
	SetTLSConfig(cfg))

Streams

To define streams you need to use the the environment interfaces DeclareStream and DeleteStream.

It is highly recommended to define stream retention policies during the stream creation, like MaxLengthBytes or MaxAge:

err = env.DeclareStream(streamName,
		stream.NewStreamOptions().
		SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))

The function DeclareStream doesn't return errors if a stream is already defined with the same parameters. Note that it returns the precondition failed when it doesn't have the same parameters Use StreamExists to check if a stream exists.

Streams Statistics

To get stream statistics you need to use the environment.StreamStats method.

stats, err := environment.StreamStats(testStreamName)

// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet

firstOffset, err := stats.FirstOffset() // first offset of the stream

// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
//	It is the offset of the first message in the last chunk confirmed by a quorum of the stream
//	cluster members (leader and replicas).
//
//	The committed chunk ID is a good indication of what the last offset of a stream can be at a
//	given time. The value can be stale as soon as the application reads it though, as the committed
//	chunk ID for a stream that is published to changes all the time.

committedChunkId, err := statsAfter.CommittedChunkId()

Publish messages

To publish a message you need a *stream.Producer instance:

producer, err :=  env.NewProducer("my-stream", nil)

With ProducerOptions is possible to customize the Producer behaviour:

type ProducerOptions struct {
	Name       string // Producer name, it is useful to handle deduplication messages
	QueueSize  int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
	BatchSize  int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
	BatchPublishingDelay int    // Period to send a batch of messages.
}

The client provides two interfaces to send messages. send:

var message message.StreamMessage
message = amqp.NewMessage([]byte("hello"))
err = producer.Send(message)

and BatchSend:

var messages []message.StreamMessage
for z := 0; z < 10; z++ {
  messages = append(messages, amqp.NewMessage([]byte("hello")))
}
err = producer.BatchSend(messages)

producer.Send:

  • accepts one message as parameter
  • automatically aggregates the messages
  • automatically splits the messages in case the size is bigger than requestedMaxFrameSize
  • automatically splits the messages based on batch-size
  • sends the messages in case nothing happens in producer-send-timeout
  • is asynchronous

producer.BatchSend:

  • accepts an array messages as parameter
  • is synchronous

Close the producer: producer.Close() the producer is removed from the server. TCP connection is closed if there aren't other producers

Send vs BatchSend

The BatchSend is the primitive to send the messages, Send introduces a smart layer to publish messages and internally uses BatchSend.

The Send interface works in most of the cases, In some condition is about 15/20 slower than BatchSend. See also this thread.

See also "Client performances" example in the examples directory

Publish Confirmation

For each publish the server sends back to the client the confirmation or an error. The client provides an interface to receive the confirmation:

//optional publish confirmation channel
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm)

func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
	go func() {
		for confirmed := range confirms {
			for _, msg := range confirmed {
				if msg.IsConfirmed() {
					fmt.Printf("message %s stored \n  ", msg.GetMessage().GetData())
				} else {
					fmt.Printf("message %s failed \n  ", msg.GetMessage().GetData())
				}
			}
		}
	}()
}

In the MessageStatus struct you can find two publishingId:

//first one
messageStatus.GetMessage().GetPublishingId()
// second one
messageStatus.GetPublishingId()

The first one is provided by the user for special cases like Deduplication. The second one is assigned automatically by the client. In case the user specifies the publishingId with:

msg = amqp.NewMessage([]byte("mymessage"))
msg.SetPublishingId(18) // <---

The filed: messageStatus.GetMessage().HasPublishingId() is true and
the values messageStatus.GetMessage().GetPublishingId() and messageStatus.GetPublishingId() are the same.

See also "Getting started" example in the examples directory

Deduplication

The stream plugin can handle deduplication data, see this blog post for more details: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
You can find a "Deduplication" example in the examples directory.
Run it more than time, the messages count will be always 10.

To retrieve the last sequence id for producer you can use:

publishingId, err := producer.GetLastPublishingId()

Sub Entries Batching

The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.
You can find a "Sub Entries Batching" example in the examples directory.

Default compression is None (no compression) but you can define different kind of compressions: GZIP,SNAPPY,LZ4,ZSTD
Compression is valid only is SubEntrySize > 1

producer, err := env.NewProducer(streamName, stream.NewProducerOptions().
		SetSubEntrySize(100).
		SetCompression(stream.Compression{}.Gzip()))

Publish Filtering

Stream filtering is a new feature in RabbitMQ 3.13. It allows to save bandwidth between the broker and consuming applications when those applications need only a subset of the messages of a stream. See this blog post for more details. The blog post also contains a Java example but the Go client is similar. See the Filtering example in the examples directory.

Consume messages

In order to consume messages from a stream you need to use the NewConsumer interface, ex:

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
	fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data)
}

consumer, err := env.NewConsumer(
		"my-stream",
		handleMessages,
		....

With ConsumerOptions it is possible to customize the consumer behaviour.

  stream.NewConsumerOptions().
  SetConsumerName("my_consumer").                  // set a consumer name
  SetCRCCheck(false).  // Enable/Disable the CRC control.
  SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning

Disabling the CRC control can increase the performances.

See also "Offset Start" example in the examples directory

Close the consumer: consumer.Close() the consumer is removed from the server. TCP connection is closed if there aren't other consumers

Manual Track Offset

The server can store the current delivered offset given a consumer, in this way:

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
		if atomic.AddInt32(&count, 1)%1000 == 0 {
			err := consumerContext.Consumer.StoreOffset()  // commit all messages up to the current message's offset
			....

consumer, err := env.NewConsumer(
..
stream.NewConsumerOptions().
			SetConsumerName("my_consumer"). <------

A consumer must have a name to be able to store offsets.
Note: AVOID to store the offset for each single message, it will reduce the performances

See also "Offset Tracking" example in the examples directory

The server can also store a previous delivered offset rather than the current delivered offset, in this way:

processMessageAsync := func(consumer stream.Consumer, message *amqp.Message, offset int64) {
    ....
    err := consumer.StoreCustomOffset(offset)  // commit all messages up to this offset
    ....

This is useful in situations where we have to process messages asynchronously and we cannot block the original message handler. Which means we cannot store the current or latest delivered offset as we saw in the handleMessages function above.

Automatic Track Offset

The following snippet shows how to enable automatic tracking with the defaults:

stream.NewConsumerOptions().
			SetConsumerName("my_consumer").
			SetAutoCommit(stream.NewAutoCommitStrategy() ...

nil is also a valid value. Default values will be used

stream.NewConsumerOptions().
			SetConsumerName("my_consumer").
			SetAutoCommit(nil) ...

Set the consumer name (mandatory for offset tracking)

The automatic tracking strategy has the following available settings:

  • message count before storage: the client will store the offset after the specified number of messages,
    right after the execution of the message handler. The default is every 10,000 messages.

  • flush interval: the client will make sure to store the last received offset at the specified interval.
    This avoids having pending, not stored offsets in case of inactivity. The default is 5 seconds.

Those settings are configurable, as shown in the following snippet:

stream.NewConsumerOptions().
	// set a consumerOffsetNumber name
	SetConsumerName("my_consumer").
	SetAutoCommit(stream.NewAutoCommitStrategy().
		SetCountBeforeStorage(50). // store each 50 messages stores
		SetFlushInterval(10*time.Second)). // store each 10 seconds
	SetOffset(stream.OffsetSpecification{}.First()))

See also "Automatic Offset Tracking" example in the examples directory

Get consumer offset

It is possible to query the consumer offset using:

offset, err := env.QueryOffset("consumer_name", "streamName")

An error is returned if the offset doesn't exist.

Consume Filtering

Stream filtering is a new feature in RabbitMQ 3.13. It allows to save bandwidth between the broker and consuming applications when those applications need only a subset of the messages of a stream. See this blog post for more details. The blog post also contains a Java example but the Go client is similar. See the Filtering example in the examples directory.

Single Active Consumer

The Single Active Consumer pattern ensures that only one consumer processes messages from a stream at a time. See the Single Active Consumer example.

To create a consumer with the Single Active Consumer pattern, you need to set the SingleActiveConsumer option:

    consumerName := "MyFirstGroupConsumer"
	consumerUpdate := func(isActive bool) stream.OffsetSpecification {..}
	stream.NewConsumerOptions().
			SetConsumerName(consumerName).
			SetSingleActiveConsumer(
				stream.NewSingleActiveConsumer(consumerUpdate))

The ConsumerUpdate function is called when the consumer is promoted.
The new consumer will restart consuming from the offset returned by the consumerUpdate function.
It is up to the user to decide the offset to return.
One of the way is to store the offset server side and restart from the last offset.
The Single Active Consumer example uses the server side offset to restart the consumer.

The ConsumerName is mandatory to enable the SAC. It is the way to create different group of consumers
Different groups of consumers can consume the same stream at the same time.

The NewConsumerOptions().SetOffset() is not necessary when the SAC is active the ConsumerUpdate function replaces the value.

See also this post for more details: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams

Handle Close

Client provides an interface to handle the producer/consumer close.

channelClose := consumer.NotifyClose()
defer consumerClose(channelClose)
func consumerClose(channelClose stream.ChannelClose) {
	event := <-channelClose
	fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason)
}

In this way it is possible to handle fail-over

Reliable Producer and Reliable Consumer

The ReliableProducer and ReliableConsumer are built up the standard producer/consumer.
Both use the standard events to handle the close. So you can write your own code to handle the fail-over.

Features:

  • [Both] auto-reconnect in case of disconnection.
  • [Both] check if stream exists, if not they close the ReliableProducer and ReliableConsumer.
  • [Both] check if the stream has a valid leader and replicas, if not they retry until the stream is ready.
  • [ReliableProducer] handle the unconfirmed messages automatically in case of fail.
  • [ReliableConsumer] restart from the last offset in case of restart.

You can find a "Reliable" example in the examples directory.

Super Stream

The Super Stream feature is a new feature in RabbitMQ 3.11. It allows to create a stream with multiple partitions.
Each partition is a separate stream, but the client sees the Super Stream as a single stream.

You can find a "Super Stream" example in the examples directory.

In this blog post you can find more details: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams

You can read also the java stream-client blog post: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams

  • The code is written in Java but the same concepts are valid for the Go client.
  • The Go client has the same features as the Java client.

Super Stream supports publish-filtering and consume-filtering features.

Offset tracking is supported for the Super Stream consumer.
In the same way as the standard stream, you can use the SetAutoCommit or SetManualCommit option to enable/disable the automatic offset tracking.

On the super stream consumer message handler is possible to identify the partition, the consumer and the offset:

	handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
		....
    consumerContext.Consumer.GetName() // consumer name 
    consumerContext.Consumer.GetOffset() // current offset
    consumerContext.Consumer.GetStreamName() // stream name  (partition name )
        ....
	}

Manual tracking API:

  • consumerContext.Consumer.StoreOffset(): stores the current offset.
  • consumerContext.Consumer.StoreCustomOffset(xxx) stores a custom offset.

Like the standard stream, you should avoid to store the offset for each single message: it will reduce the performances.

Performance test tool

Performance test tool it is useful to execute tests. See also the Java Performance tool

To install you can download the version from github:

Mac:

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz

Linux:

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz

Windows

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip

execute stream-perf-test --help to see the parameters. By default it executes a test with one producer, one consumer.

here an example:

stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/  --fixed-body 400 --time 10

Performance test tool Docker

A docker image is available: pivotalrabbitmq/go-stream-perf-test, to test it:

Run the server is host mode:

 docker run -it --rm --name rabbitmq --network host \
    rabbitmq:3-management

enable the plugin:

 docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

then run the docker image:

docker run -it --network host  pivotalrabbitmq/go-stream-perf-test

To see all the parameters:

docker run -it --network host  pivotalrabbitmq/go-stream-perf-test --help

Build form source

make build

To execute the tests you need a docker image, you can use:

make rabbitmq-server

to run a ready rabbitmq-server with stream enabled for tests.

then make test