diff --git a/README.md b/README.md index eb7f9bea..7c42a335 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv - [Run server with Docker](#run-server-with-docker) - [Getting started for impatient](#getting-started-for-impatient) - [Examples](#examples) +- [Client best practices](#client-best-practices) - [Usage](#usage) * [Connect](#connect) * [Multi hosts](#multi-hosts) @@ -67,7 +68,7 @@ You may need a server to test locally. Let's start the broker: ```shell docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \ - rabbitmq:3-management + rabbitmq:4-management ``` The broker should start in a few seconds. When it’s ready, enable the `stream` plugin and `stream_management`: ```shell @@ -85,6 +86,11 @@ See [getting started](./examples/getting_started.go) example. See [examples](./examples/) directory for more use cases. +### Client best practices + +This client provides a set of best practices to use the client in the best way.
+See [best practices](./best_practices/README.md) for more details. + # Usage ### Connect @@ -252,12 +258,7 @@ producer, err := env.NewProducer("my-stream", nil) ``` With `ProducerOptions` is possible to customize the Producer behaviour: -```golang -type ProducerOptions struct { - Name string // Producer name, it is useful to handle deduplication messages - BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput -} -``` + The client provides two interfaces to send messages. `send`: @@ -275,29 +276,13 @@ for z := 0; z < 10; z++ { err = producer.BatchSend(messages) ``` - -`producer.Send`: -- accepts one message as parameter -- automatically aggregates the messages -- automatically splits the messages in case the size is bigger than `requestedMaxFrameSize` -- automatically splits the messages based on batch-size -- sends the messages in case nothing happens in `producer-send-timeout` -- is asynchronous - -`producer.BatchSend`: -- accepts an array messages as parameter -- is synchronous - -Close the producer: -`producer.Close()` the producer is removed from the server. TCP connection is closed if there aren't -other producers - ### `Send` vs `BatchSend` The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation. `Send` introduces a smart layer to publish messages and internally uses `BatchSend`. -Starting from the version 1.5.0 the `Send` uses a different approach to send messages: Dynamic send.
+Starting from version 1.5.0, the `Send` uses a dynamic send. +The client sends the message buffer regardless of any timeout.
What should you use?
The `Send` method is the best choice for most of the cases:
@@ -318,10 +303,8 @@ With both methods you can have low-latency and/or high-throughput.
The `Send` is the best choice for low-latency without care about aggregation. With `BatchSend` you have more control.
- -Performance test tool can help you to understand the differences between `Send` and `BatchSend`
- -See also "Client performances" example in the [examples](./examples/performances/) directory +Performance test tool can help you to test `Send` and `BatchSend`
+See also the [Performance test tool](#performance-test-tool) section. ### Publish Confirmation @@ -371,8 +354,6 @@ the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.Get See also "Getting started" example in the [examples](./examples/) directory - - ### Deduplication The deduplication is a feature that allows to avoid the duplication of messages.
@@ -622,55 +603,10 @@ Like the standard stream, you should avoid to store the offset for each single m ### Performance test tool Performance test tool it is useful to execute tests. +The performance test tool is in the [perfTest](./perfTest) directory.
See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool -To install you can download the version from github: - -Mac: -``` -https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz -``` - -Linux: -``` -https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz -``` - -Windows -``` -https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip -``` - -execute `stream-perf-test --help` to see the parameters. By default it executes a test with one producer, one consumer. - -here an example: -```shell -stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10 -``` - -### Performance test tool Docker -A docker image is available: `pivotalrabbitmq/go-stream-perf-test`, to test it: - -Run the server is host mode: -```shell - docker run -it --rm --name rabbitmq --network host \ - rabbitmq:3-management -``` -enable the plugin: -``` - docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream -``` -then run the docker image: -```shell -docker run -it --network host pivotalrabbitmq/go-stream-perf-test -``` - -To see all the parameters: -```shell -docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help -``` - ### Build form source ```shell diff --git a/best_practices/README.md b/best_practices/README.md new file mode 100644 index 00000000..cb3512f1 --- /dev/null +++ b/best_practices/README.md @@ -0,0 +1,106 @@ +Client best practices +===================== + +The scope of this document is to provide a set of best practices for the client applications that use the Go client library.
+ + +#### General recommendations +- Messages are not thread-safe, you should not share the same message between different go-routines or different Send/BatchSend calls. +- Use the producer name only if you need deduplication. +- Avoid to store the consumer offset to the server too often. +- `Send` works well in most of the cases, use `BatchSend` when you need more control. +- Connections/producers/consumers are designed to be long-lived. You should avoid creating and closing them too often. +- The library is generally thread-safe,even it is better to use one producer/consumer per go-routine. + +#### Default configuration + +The default configuration of the client library is designed to be used in most of the cases. +No particular tuning is required. Just follow the [Getting started](../examples/getting_started.go) example. + +#### Multiple producers and consumers + +Each connection can support multiple producers and consumers, you can reduce the number of connections by using the same connection for multiple producers and consumers.
+With: +```golang + SetMaxConsumersPerClient(10). + SetMaxConsumersPerClient(10) +``` +The TCP connection will be shared between the producers and consumers. +Note about consumers: One slow consumer can block the others, so it is important: +- To have a good balance between the number of consumers and the speed of the consumers. +- work application side to avoid slow consumers, for example, by using a go-routines/buffers. + +#### High throughput + +To achieve high throughput, you should use one producer per connection, and one consumer per connection. +This will avoid lock contention between the producers when sending messages and between the consumers when receiving messages. + +The method `Send` is usually enough to achieve high throughput. +In some case you can use the `BatchSend` method. See the `Send` vs `BatchSend` documentation for more details. + +#### Low latency + +To achieve Low latency, you should use one producer per connection, and one consumer per connection. + +The method `Send` is the best choice to achieve low latency. Default values are tuned for low latency. +You can change the `BatchSize` parameter to increase or reduce the max number of messages sent in one batch. +Note: Since the client uses dynamic send, the `BatchSize` parameter is a hint to the client, the client can send less than the `BatchSize`. + +#### Store several text based messages + +In case you want to store logs, text-based or big messages, you can use the `Sub Entries Batching` method. +Where it is possible to store multiple messages in one entry and compress the entry with different algorithms. +It is useful to reduce the disk space and the network bandwidth. +See the `Sub Entries Batching` documentation for more details.
+ +#### Store several small messages + +In case you want to store a lot of small messages, you can use the `BatchSend` method. +Where it is possible to store multiple messages in one entry. This will avoid creating small chunks on the server side.
+ + +#### Avoid duplications + +In case you want to store messages with deduplication, you need to set the producer name and the deduplication id. +See the `Deduplication` documentation for more details.
+ + +#### Consumer fail over + +In case you want to have a consumer fail over, you can use the `Single Active Consumer` method. +Where only one consumer is active at a time, and the other consumers are in standby mode. + +#### Reliable producer and consumer + +The client library provides a reliable producer and consumer, where the producer and consumer can recover from a connection failure. +See the `Reliable` documentation for more details.
+ + +#### Scaling the streams + +In case you want to scale the streams, you can use the `Super Stream` method. +Where you can have multiple streams and only one stream is active at a time. +See the `Super Stream` documentation for more details.
+ + +#### Filtering the data when consuming + +In case you want to filter the data when consuming, you can use the `Stream Filtering` method. +Where you can filter the data based on the metadata. +See the `Stream Filtering` documentation for more details.
+ + +#### Using a load balancer + +In case you want to use a load balancer, you can use the `Using a load balancer` method. +In Kubernetes, you can use the service name as load balancer dns. +See the `Using a load balancer` documentation for more details.
+ + + + + + + + + diff --git a/examples/README.md b/examples/README.md index 3abbfd2f..96315143 100644 --- a/examples/README.md +++ b/examples/README.md @@ -16,5 +16,3 @@ Stream examples - [Single Active Consumer](./single_active_consumer) - Single Active Consumer example - [Reliable](./reliable) - Reliable Producer and Reliable Consumer example - [Super Stream](./super_stream) - Super Stream example with Single Active Consumer - - [Client performances](./performances) - Client performances example - diff --git a/examples/performances/README.md b/examples/performances/README.md deleted file mode 100644 index 9059fe41..00000000 --- a/examples/performances/README.md +++ /dev/null @@ -1,38 +0,0 @@ -## Client performances - -This document describes how to tune the parameters of the client to: -- Increase the throughput -- And/or reduce the latency -- And/or reduce the disk space used by the messages. - -### Throughput and Latency - -The parameters that can be tuned are: -- `SetBatchSize(batchSize)` and `SetBatchPublishingDelay(100)` when use the `Send()` method -- The number of the messages when use the `BatchSend()` method - -In this example you can play with the parameters to see the impact on the performances. -There is not a magic formula to find the best parameters, you need to test and find the best values for your use case. - -### How to run the example -``` -go run performances.go async 1000000 100; -``` - -### Disk space used by the messages - -The client supports also the batch entry size and the compression: -`SetSubEntrySize(500).SetCompression(stream.Compression{}...` -These parameters can be used to reduce the space used by the messages due of the compression and the batch entry size. - - -### Default values - -The default producer values are meant to be a good trade-off between throughput and latency. -You can tune the parameters to increase the throughput, reduce the latency or reduce the disk space used by the messages. - - - -### Load tests -To execute load tests, you can use the official load test tool: -https://github.com/rabbitmq/rabbitmq-stream-perf-test \ No newline at end of file diff --git a/examples/performances/performances.go b/examples/performances/performances.go deleted file mode 100644 index 3bdadfb6..00000000 --- a/examples/performances/performances.go +++ /dev/null @@ -1,133 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "github.com/google/uuid" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" - "os" - "strconv" - "sync/atomic" - "time" -) - -func CheckErr(err error) { - if err != nil { - fmt.Printf("%s ", err) - os.Exit(1) - } -} - -var messagesConfirmed int32 - -func handlePublishConfirm(confirms stream.ChannelPublishConfirm) { - go func() { - for confirmed := range confirms { - for _, msg := range confirmed { - if msg.IsConfirmed() { - atomic.AddInt32(&messagesConfirmed, 1) - } - } - } - }() -} - -func main() { - - useSyncBatch := os.Args[1] == "sync" - useAsyncSend := os.Args[1] == "async" - - messagesToSend, err := strconv.Atoi(os.Args[2]) - CheckErr(err) - batchSize, err := strconv.Atoi(os.Args[3]) - messagesToSend = messagesToSend / batchSize - - reader := bufio.NewReader(os.Stdin) - fmt.Println("RabbitMQ performance example") - - // Connect to the broker ( or brokers ) - env, err := stream.NewEnvironment( - stream.NewEnvironmentOptions(). - SetHost("localhost"). - SetPort(5552). - SetUser("guest"). - SetPassword("guest")) - CheckErr(err) - fmt.Println("Connected to the RabbitMQ server") - - streamName := uuid.New().String() - err = env.DeclareStream(streamName, - &stream.StreamOptions{ - MaxLengthBytes: stream.ByteCapacity{}.GB(2), - }, - ) - CheckErr(err) - fmt.Printf("Created Stream: %s \n", streamName) - - producer, err := env.NewProducer(streamName, - stream.NewProducerOptions(). - SetBatchSize(batchSize). - SetBatchPublishingDelay(100)) - CheckErr(err) - - chPublishConfirm := producer.NotifyPublishConfirmation() - handlePublishConfirm(chPublishConfirm) - fmt.Printf("------------------------------------------\n\n") - fmt.Printf("Start sending %d messages, data size: %d bytes\n", messagesToSend*batchSize, len("hello_world")) - var averageLatency time.Duration - var messagesConsumed int32 - handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - atomic.AddInt32(&messagesConsumed, 1) - var latency time.Time - err := latency.UnmarshalBinary(message.Data[0]) - CheckErr(err) - averageLatency += time.Since(latency) - } - _, err = env.NewConsumer(streamName, handleMessages, stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First())) - CheckErr(err) - - start := time.Now() - - // here the client sends the messages in batch and it is up to the user to aggregate the messages - if useSyncBatch { - var arr []message.StreamMessage - for i := 0; i < messagesToSend; i++ { - for i := 0; i < batchSize; i++ { - latency, err := time.Now().MarshalBinary() - CheckErr(err) - arr = append(arr, amqp.NewMessage(latency)) - } - err := producer.BatchSend(arr) - CheckErr(err) - arr = arr[:0] - } - } - - // here the client aggregates the messages based on the batch size and batch publishing delay - if useAsyncSend { - for i := 0; i < messagesToSend; i++ { - for i := 0; i < batchSize; i++ { - latency, err := time.Now().MarshalBinary() - CheckErr(err) - err = producer.Send(amqp.NewMessage(latency)) - CheckErr(err) - } - } - } - - duration := time.Since(start) - fmt.Println("Press any key to report and stop ") - _, _ = reader.ReadString('\n') - fmt.Printf("------------------------------------------\n\n") - fmt.Printf("Sent %d messages in %s. Confirmed: %d avarage latency: %s \n", messagesToSend*batchSize, duration, messagesConfirmed, averageLatency/time.Duration(messagesConsumed)) - fmt.Printf("------------------------------------------\n\n") - - time.Sleep(200 * time.Millisecond) - CheckErr(err) - err = env.DeleteStream(streamName) - CheckErr(err) - err = env.Close() - CheckErr(err) -} diff --git a/perfTest/REAMDE.md b/perfTest/REAMDE.md new file mode 100644 index 00000000..03dbe9ce --- /dev/null +++ b/perfTest/REAMDE.md @@ -0,0 +1,91 @@ +Go stream performances test +=== + +This test is to measure the performance of the stream package. + +#### Install the performance test tool +To install you can download the version from GitHub: + +Mac: +``` +https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz +``` + +Linux: +``` +https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz +``` + +Windows +``` +https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip +``` + +execute `stream-perf-test --help` to see the parameters. By default it executes a test with one producer, one consumer in `BatchSend` mode. + +here an example: +```shell +stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/ --fixed-body 400 --time 10 +``` + +### Performance test tool Docker +A docker image is available: `pivotalrabbitmq/go-stream-perf-test`, to test it: + +Run the server is host mode: +```shell + docker run -it --rm --name rabbitmq --network host \ + rabbitmq:4-management +``` +enable the plugin: +``` + docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream +``` +then run the docker image: +```shell +docker run -it --network host pivotalrabbitmq/go-stream-perf-test +``` + +To see all the parameters: +```shell +docker run -it --network host pivotalrabbitmq/go-stream-perf-test --help +``` + +### Examples + +#### 1. Simple test +1 producer, 1 consumer, 1 stream, 1GB max length +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB +``` + +#### 2. Multiple producers and consumers +3 producers, 2 consumers, 1 stream, 2GB max length +```shell +stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB +``` + +#### 3. Fixed body size +1 producer, 1 consumer, 1 stream, 1GB max length, 400 bytes body +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB --fixed-body 400 +``` + +#### 4. Test async-send +By default, the test uses the `BatchSend` mode, to test the `Send` mode: +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB --async-send +``` + +#### 5. Test fixed rate and async-send +This test is useful to test the latency, the producer sends messages at a fixed rate. +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB --async-send --rate 100 +``` + +#### 6. Batch Size + +Batch Size is valid only for `Send` mode, it is the max number of messages sent in a batch. +```shell +stream-perf-test --streams my_stream --max-length-bytes 1GB --async-send --batch-size 100 +``` + diff --git a/pkg/amqp/types.go b/pkg/amqp/types.go index 3196e8ab..be59a157 100644 --- a/pkg/amqp/types.go +++ b/pkg/amqp/types.go @@ -413,6 +413,12 @@ type Message struct { doneSignal chan struct{} } +// AMQP10 is an AMQP 1.0 message with the necessary fields to work with the +// stream package. +// This is a wrapper around the amqp.Message struct. +// AMQP10 is not thread-safe. +// You should avoid to share the same message between multiple go routines and between multi Send/BatchSend calls. +// Each Send/BatchSend call should use a new message. type AMQP10 struct { publishingId int64 hasPublishingId bool diff --git a/pkg/message/interface.go b/pkg/message/interface.go index 23314d3c..b5b615a3 100644 --- a/pkg/message/interface.go +++ b/pkg/message/interface.go @@ -2,6 +2,10 @@ package message import "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" +// StreamMessage is the interface that wraps the basic methods to interact with a message +// in the context of a stream. +// Currently, the StreamMessage interface is implemented by the amqp.Message struct. +// The implementations are not meant to be thread-safe. type StreamMessage interface { MarshalBinary() ([]byte, error) UnmarshalBinary(data []byte) error