Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve batch send performances #366

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/reliable/reliable_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)
Expand All @@ -33,18 +32,18 @@ var reSent int32

func main() {
// Tune the parameters to test the reliability
const messagesToSend = 5_000_000
const numberOfProducers = 2
const messagesToSend = 10_000_000
const numberOfProducers = 4
const concurrentProducers = 2
const numberOfConsumers = 2
const numberOfConsumers = 4
const sendDelay = 1 * time.Millisecond
const delayEachMessages = 200
const maxProducersPerClient = 4
const maxConsumersPerClient = 2
//

reader := bufio.NewReader(os.Stdin)
stream.SetLevelInfo(logs.DEBUG)
//stream.SetLevelInfo(logs.DEBUG)
fmt.Println("Reliable Producer/Consumer example")
fmt.Println("Connecting to RabbitMQ streaming ...")

Expand Down
2 changes: 1 addition & 1 deletion perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func init() {
func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().StringSliceVarP(&rabbitmqBrokerUrl, "uris", "", []string{stream.LocalhostUriConnection}, "Broker URLs")
baseCmd.PersistentFlags().IntVarP(&publishers, "publishers", "", 1, "Number of Publishers")
baseCmd.PersistentFlags().IntVarP(&batchSize, "batch-size", "", 100, "Batch Size, from 1 to 200")
baseCmd.PersistentFlags().IntVarP(&batchSize, "batch-size", "", 200, "Batch Size, from 1 to 300")
baseCmd.PersistentFlags().IntVarP(&subEntrySize, "sub-entry-size", "", 1, "SubEntry size, default 1. > 1 Enable the subEntryBatch")
baseCmd.PersistentFlags().StringVarP(&compression, "compression", "", "", "Compression for sub batching, none,gzip,lz4,snappy,zstd")
baseCmd.PersistentFlags().IntVarP(&consumers, "consumers", "", 1, "Number of Consumers")
Expand Down
4 changes: 2 additions & 2 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ func startSimulation() error {
stream.SetLevelInfo(logs.DEBUG)
}

if batchSize < 1 || batchSize > 200 {
logError("Invalid batchSize, must be from 1 to 200, value:%d", batchSize)
if batchSize < 1 || batchSize > 300 {
logError("Invalid batchSize, must be from 1 to 300, value:%d", batchSize)
os.Exit(1)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (compression Compression) Lz4() Compression {
}

type subEntry struct {
messages []messageSequence
messages []*messageSequence
publishingId int64 // need to store the publishingId useful in case of aggregation

unCompressedSize int
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ var _ = Describe("Compression algorithms", func() {
messagePayload[i] = 99
}

message := messageSequence{
message := &messageSequence{
messageBytes: messagePayload,
unCompressedSize: len(messagePayload),
publishingId: 0,
}

entries = &subEntries{
items: []*subEntry{{
messages: []messageSequence{message},
messages: []*messageSequence{message},
publishingId: 0,
unCompressedSize: len(messagePayload) + 4,
sizeInBytes: 0,
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (coordinator *Coordinator) NewProducer(
}
var producer = &Producer{id: lastId,
options: parameters,
mutex: &sync.Mutex{},
mutex: &sync.RWMutex{},
mutexPending: &sync.Mutex{},
unConfirmedMessages: map[int64]*ConfirmationStatus{},
status: open,
messageSequenceCh: make(chan messageSequence, size),
pendingMessages: pendingMessagesSequence{
messages: make([]messageSequence, 0),
messages: make([]*messageSequence, 0),
size: initBufferPublishSize,
}}
coordinator.producers[lastId] = producer
Expand Down
87 changes: 63 additions & 24 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 {
}

type pendingMessagesSequence struct {
messages []messageSequence
messages []*messageSequence
size int
}

Expand All @@ -60,6 +60,7 @@ type messageSequence struct {
unCompressedSize int
publishingId int64
filterValue string
refMessage *message.StreamMessage
}

type Producer struct {
Expand All @@ -68,7 +69,7 @@ type Producer struct {
onClose onInternalClose
unConfirmedMessages map[int64]*ConfirmationStatus
sequence int64
mutex *sync.Mutex
mutex *sync.RWMutex
mutexPending *sync.Mutex
publishConfirm chan []*ConfirmationStatus
closeHandler chan Event
Expand Down Expand Up @@ -170,11 +171,27 @@ func NewProducerOptions() *ProducerOptions {
}

func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus {
producer.mutex.Lock()
defer producer.mutex.Unlock()
producer.mutex.RLock()
defer producer.mutex.RUnlock()
return producer.unConfirmedMessages
}

func (producer *Producer) addUnConfirmedSequences(message []*messageSequence, producerID uint8) {
producer.mutex.Lock()
defer producer.mutex.Unlock()

for _, msg := range message {
producer.unConfirmedMessages[msg.publishingId] =
&ConfirmationStatus{
inserted: time.Now(),
message: *msg.refMessage,
producerID: producerID,
publishingId: msg.publishingId,
confirmed: false,
}
}

}
func (producer *Producer) addUnConfirmed(sequence int64, message message.StreamMessage, producerID uint8) {
producer.mutex.Lock()
defer producer.mutex.Unlock()
Expand All @@ -191,6 +208,18 @@ func (po *ProducerOptions) isSubEntriesBatching() bool {
return po.SubEntrySize > 1
}

func (producer *Producer) removeFromConfirmationStatus(status []*ConfirmationStatus) {
producer.mutex.Lock()
defer producer.mutex.Unlock()

for _, msg := range status {
delete(producer.unConfirmedMessages, msg.publishingId)
for _, linked := range msg.linkedTo {
delete(producer.unConfirmedMessages, linked.publishingId)
}
}
}

func (producer *Producer) removeUnConfirmed(sequence int64) {
producer.mutex.Lock()
defer producer.mutex.Unlock()
Expand All @@ -210,13 +239,13 @@ func (producer *Producer) lenPendingMessages() int {
}

func (producer *Producer) getUnConfirmed(sequence int64) *ConfirmationStatus {
producer.mutex.Lock()
defer producer.mutex.Unlock()
producer.mutex.RLock()
defer producer.mutex.RUnlock()
return producer.unConfirmedMessages[sequence]
}

func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm {
ch := make(chan []*ConfirmationStatus)
ch := make(chan []*ConfirmationStatus, 1)
producer.publishConfirm = ch
return ch
}
Expand Down Expand Up @@ -263,19 +292,26 @@ func (producer *Producer) startUnconfirmedMessagesTimeOutTask() {
go func() {
for producer.getStatus() == open {
time.Sleep(2 * time.Second)
producer.mutex.Lock()
toRemove := make([]*ConfirmationStatus, 0)
// check the unconfirmed messages and remove the one that are expired
// use the RLock to avoid blocking the producer
producer.mutex.RLock()
for _, msg := range producer.unConfirmedMessages {
if time.Since(msg.inserted) > producer.options.ConfirmationTimeOut {
msg.err = ConfirmationTimoutError
msg.errorCode = timeoutError
msg.confirmed = false
if producer.publishConfirm != nil {
producer.publishConfirm <- []*ConfirmationStatus{msg}
}
delete(producer.unConfirmedMessages, msg.publishingId)
toRemove = append(toRemove, msg)
}
}
producer.mutex.RUnlock()

if len(toRemove) > 0 {
producer.removeFromConfirmationStatus(toRemove)
if producer.publishConfirm != nil {
producer.publishConfirm <- toRemove
}
}
producer.mutex.Unlock()
}
time.Sleep(5 * time.Second)
producer.flushUnConfirmedMessages(timeoutError, ConfirmationTimoutError)
Expand Down Expand Up @@ -312,7 +348,7 @@ func (producer *Producer) startPublishTask() {
}

producer.pendingMessages.size += msg.unCompressedSize
producer.pendingMessages.messages = append(producer.pendingMessages.messages, msg)
producer.pendingMessages.messages = append(producer.pendingMessages.messages, &msg)
if len(producer.pendingMessages.messages) >= (producer.options.BatchSize) {
producer.sendBufferedMessages()
}
Expand Down Expand Up @@ -384,7 +420,7 @@ func (producer *Producer) assignPublishingID(message message.StreamMessage) int6
// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and
// calls BatchSend internally.
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error {
var messagesSequence = make([]messageSequence, len(batchMessages))
var messagesSequence = make([]*messageSequence, len(batchMessages))
totalBufferToSend := 0
for i, batchMessage := range batchMessages {
messageBytes, err := batchMessage.MarshalBinary()
Expand All @@ -398,16 +434,17 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error

sequence := producer.assignPublishingID(batchMessage)
totalBufferToSend += len(messageBytes)
messagesSequence[i] = messageSequence{
messagesSequence[i] = &messageSequence{
messageBytes: messageBytes,
unCompressedSize: len(messageBytes),
publishingId: sequence,
filterValue: filterValue,
refMessage: &batchMessage,
}

producer.addUnConfirmed(sequence, batchMessage, producer.id)
}

producer.addUnConfirmedSequences(messagesSequence, producer.GetID())

if totalBufferToSend+initBufferPublishSize > producer.options.client.tuneState.requestedMaxFrameSize {
for _, msg := range messagesSequence {

Expand All @@ -432,11 +469,11 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
func (producer *Producer) GetID() uint8 {
return producer.id
}
func (producer *Producer) internalBatchSend(messagesSequence []messageSequence) error {
func (producer *Producer) internalBatchSend(messagesSequence []*messageSequence) error {
return producer.internalBatchSendProdId(messagesSequence, producer.GetID())
}

func (producer *Producer) simpleAggregation(messagesSequence []messageSequence, b *bufio.Writer) {
func (producer *Producer) simpleAggregation(messagesSequence []*messageSequence, b *bufio.Writer) {
for _, msg := range messagesSequence {
r := msg.messageBytes
writeBLong(b, msg.publishingId) // publishingId
Expand All @@ -459,13 +496,15 @@ func (producer *Producer) subEntryAggregation(aggregation subEntries, b *bufio.W
}
}

func (producer *Producer) aggregateEntities(msgs []messageSequence, size int, compression Compression) (subEntries, error) {
func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, compression Compression) (subEntries, error) {
subEntries := subEntries{}

var entry *subEntry
for _, msg := range msgs {
if len(subEntries.items) == 0 || len(entry.messages) >= size {
entry = &subEntry{}
entry = &subEntry{
messages: make([]*messageSequence, 0),
}
entry.publishingId = -1
subEntries.items = append(subEntries.items, entry)
}
Expand Down Expand Up @@ -506,7 +545,7 @@ func (producer *Producer) aggregateEntities(msgs []messageSequence, size int, co
/// the producer id is always the producer.GetID(). This function is needed only for testing
// some condition, like simulate publish error, see

func (producer *Producer) internalBatchSendProdId(messagesSequence []messageSequence, producerID uint8) error {
func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error {
producer.options.client.socket.mutex.Lock()
defer producer.options.client.socket.mutex.Unlock()
if producer.getStatus() == closed {
Expand Down Expand Up @@ -656,7 +695,7 @@ func (producer *Producer) GetName() string {
return producer.options.Name
}

func (producer *Producer) sendWithFilter(messagesSequence []messageSequence, producerID uint8) error {
func (producer *Producer) sendWithFilter(messagesSequence []*messageSequence, producerID uint8) error {
frameHeaderLength := initBufferPublishSize
var msgLen int
for _, msg := range messagesSequence {
Expand Down
Loading
Loading