diff --git a/README.md b/README.md index ed03c043..eb7f9bea 100644 --- a/README.md +++ b/README.md @@ -255,9 +255,7 @@ With `ProducerOptions` is possible to customize the Producer behaviour: ```golang type ProducerOptions struct { Name string // Producer name, it is useful to handle deduplication messages - QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server BatchSize int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput - BatchPublishingDelay int // Period to send a batch of messages. } ``` @@ -296,9 +294,32 @@ other producers ### `Send` vs `BatchSend` -The `BatchSend` is the primitive to send the messages, `Send` introduces a smart layer to publish messages and internally uses `BatchSend`. +The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation. +`Send` introduces a smart layer to publish messages and internally uses `BatchSend`. -The `Send` interface works in most of the cases, In some condition is about 15/20 slower than `BatchSend`. See also this [thread](https://groups.google.com/g/rabbitmq-users/c/IO_9-BbCzgQ). +Starting from the version 1.5.0 the `Send` uses a different approach to send messages: Dynamic send.
+ +What should you use?
+The `Send` method is the best choice for most of the cases:
+- It is asynchronous +- It is smart to aggregate the messages in a batch with a low-latency +- It is smart to split the messages in case the size is bigger than `requestedMaxFrameSize` +- You can play with `BatchSize` parameter to increase the throughput + +The `BatchSend` is useful in case you need to manage the aggregation by yourself.
+It gives you more control over the aggregation process:
+- It is synchronous +- It is up to the user to manage the aggregation +- It is up to the user to split the messages in case the size is bigger than `requestedMaxFrameSize` +- It can be faster than `Send` in case the aggregation is managed by the user. + +#### Throughput vs Latency
+With both methods you can have low-latency and/or high-throughput.
+The `Send` is the best choice for low-latency without care about aggregation. +With `BatchSend` you have more control.
+ + +Performance test tool can help you to understand the differences between `Send` and `BatchSend`
See also "Client performances" example in the [examples](./examples/performances/) directory @@ -354,6 +375,11 @@ See also "Getting started" example in the [examples](./examples/) directory ### Deduplication +The deduplication is a feature that allows to avoid the duplication of messages.
+It is enabled by the user by setting the producer name with the options:
+```golang +producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetName("my_producer")) +``` The stream plugin can handle deduplication data, see this blog post for more details: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
You can find a "Deduplication" example in the [examples](./examples/) directory.
diff --git a/examples/deduplication/deduplication.go b/examples/deduplication/deduplication.go index 1ef2568c..686a3801 100644 --- a/examples/deduplication/deduplication.go +++ b/examples/deduplication/deduplication.go @@ -35,7 +35,10 @@ func main() { } producer, err := env.NewProducer(streamName, - stream.NewProducerOptions().SetProducerName("myProducer")) // producer name is mandatory to handle the deduplication + stream.NewProducerOptions(). + // producer name is mandatory to handle the deduplication + // don't use the producer name if you don't need the deduplication + SetProducerName("myProducer")) CheckErr(err) diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index a3e5d943..1d130977 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -51,8 +51,8 @@ func newTCPParameterDefault() *TCPParameters { return &TCPParameters{ RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, - WriteBuffer: 8192, - ReadBuffer: 8192, + WriteBuffer: defaultWriteSocketBuffer, + ReadBuffer: defaultReadSocketBuffer, NoDelay: true, tlsConfig: nil, } diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 7e1f5c38..8ce2de3f 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -592,7 +592,6 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) ( if res.Err == nil { producer.startUnconfirmedMessagesTimeOutTask() producer.processSendingMessages() - //producer.startPublishTask() } return producer, res.Err } diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 26ad5b5e..d44fd4e1 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -94,8 +94,8 @@ const ( LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" /// - defaultWriteSocketBuffer = 8092 - defaultReadSocketBuffer = 65536 + defaultWriteSocketBuffer = 8192 + defaultReadSocketBuffer = 8192 defaultQueuePublisherSize = 10000 minQueuePublisherSize = 100 maxQueuePublisherSize = 1_000_000 diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 0376aa7b..f17db260 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -55,8 +55,10 @@ func (coordinator *Coordinator) NewProducer( coordinator.mutex.Lock() defer coordinator.mutex.Unlock() dynSize := 10000 + tickerTime := defaultConfirmationTimeOut if parameters != nil { dynSize = parameters.BatchSize + tickerTime = parameters.ConfirmationTimeOut } var lastId, err = coordinator.getNextProducerItem() @@ -64,11 +66,13 @@ func (coordinator *Coordinator) NewProducer( return nil, err } var producer = &Producer{id: lastId, - options: parameters, - mutex: &sync.RWMutex{}, - unConfirmed: newUnConfirmed(5), - status: open, - dynamicSendCh: make(chan *messageSequence, dynSize), + options: parameters, + mutex: &sync.RWMutex{}, + unConfirmed: newUnConfirmed(), + timeoutTicker: time.NewTicker(tickerTime), + doneTimeoutTicker: make(chan struct{}), + status: open, + dynamicSendCh: make(chan *messageSequence, dynSize), } coordinator.producers[lastId] = producer return producer, err diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index bbf66018..1e6aff0d 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -59,15 +59,17 @@ type messageSequence struct { } type Producer struct { - id uint8 - options *ProducerOptions - onClose onInternalClose - unConfirmed *unConfirmed - sequence int64 - mutex *sync.RWMutex - publishConfirm chan []*ConfirmationStatus - closeHandler chan Event - status int + id uint8 + options *ProducerOptions + onClose onInternalClose + unConfirmed *unConfirmed + sequence int64 + mutex *sync.RWMutex + publishConfirm chan []*ConfirmationStatus + closeHandler chan Event + status int + timeoutTicker *time.Ticker + doneTimeoutTicker chan struct{} dynamicSendCh chan *messageSequence } @@ -85,24 +87,43 @@ func NewProducerFilter(filterValue FilterValue) *ProducerFilter { } type ProducerOptions struct { - client *Client - streamName string - Name string // Producer name, valid for deduplication - QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server - BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() - BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() - SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id - Compression Compression // Compression type, it is valid only if SubEntrySize > 1 - ConfirmationTimeOut time.Duration // Time to wait for the confirmation - ClientProvidedName string // Client provider name that will be shown in the management UI - Filter *ProducerFilter // Enable the filter feature, by default is disabled. Pointer nil -} - + client *Client + streamName string + // Producer name. You need to set it to enable the deduplication feature. + // Deduplication is a feature that allows the producer to avoid sending duplicate messages to the stream. + // see: https://www.rabbitmq.com/blog/2021/07/28/rabbitmq-streams-message-deduplication for more information. + // Don't use it if you don't need the deduplication. + Name string + // Deprecated: starting from 1.5.0 the QueueSize is deprecated, and it will be removed in the next releases + // It is not used anymore given the dynamic batching + QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server + BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() + // Deprecated: starting from 1.5.0 the BatchPublishingDelay is deprecated, and it will be removed in the next releases + // It is not used anymore given the dynamic batching + BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() + // Size of sub Entry, to aggregate more subEntry using one publishing id + SubEntrySize int + // Compression type, it is valid only if SubEntrySize > 1 + // The messages can be compressed before sending them to the server + Compression Compression + // Time to wait for the confirmation, see the unConfirmed structure + ConfirmationTimeOut time.Duration + // Client provider name that will be shown in the management UI + ClientProvidedName string + // Enable the filter feature, by default is disabled. Pointer nil + Filter *ProducerFilter +} + +// SetProducerName sets the producer name. This name is used to enable the deduplication feature. +// See ProducerOptions.Name for more details. +// Don't use it if you don't need the deduplication. func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions { po.Name = name return po } +// Deprecated: starting from 1.5.0 the SetQueueSize is deprecated, and it will be removed in the next releases +// It is not used anymore given the dynamic batching func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions { po.QueueSize = size return po @@ -116,36 +137,44 @@ func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions { return po } +// Deprecated: starting from 1.5.0 the SetQueueSize is deprecated, and it will be removed in the next releases +// It is not used anymore given the dynamic batching func (po *ProducerOptions) SetBatchPublishingDelay(size int) *ProducerOptions { po.BatchPublishingDelay = size return po } +// SetSubEntrySize See the ProducerOptions.SubEntrySize for more details func (po *ProducerOptions) SetSubEntrySize(size int) *ProducerOptions { po.SubEntrySize = size return po } +// SetCompression sets the compression for the producer. See ProducerOptions.Compression for more details func (po *ProducerOptions) SetCompression(compression Compression) *ProducerOptions { po.Compression = compression return po } +// SetConfirmationTimeOut sets the time to wait for the confirmation. See ProducerOptions.ConfirmationTimeOut for more details func (po *ProducerOptions) SetConfirmationTimeOut(duration time.Duration) *ProducerOptions { po.ConfirmationTimeOut = duration return po } +// SetClientProvidedName sets the client provided name that will be shown in the management UI func (po *ProducerOptions) SetClientProvidedName(name string) *ProducerOptions { po.ClientProvidedName = name return po } +// SetFilter sets the filter for the producer. See ProducerOptions.Filter for more details func (po *ProducerOptions) SetFilter(filter *ProducerFilter) *ProducerOptions { po.Filter = filter return po } +// IsFilterEnabled returns true if the filter is enabled func (po *ProducerOptions) IsFilterEnabled() bool { return po.Filter != nil } @@ -168,17 +197,7 @@ func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus { } func (producer *Producer) addUnConfirmedSequences(message []*messageSequence, producerID uint8) { - for _, msg := range message { - producer.unConfirmed.add(msg.publishingId, - &ConfirmationStatus{ - inserted: time.Now(), - message: *msg.refMessage, - producerID: producerID, - publishingId: msg.publishingId, - confirmed: false, - }) - } - + producer.unConfirmed.addBatch(message, producerID) } func (producer *Producer) addUnConfirmed(sequence int64, message message.StreamMessage, producerID uint8) { producer.unConfirmed.add(sequence, &ConfirmationStatus{ @@ -196,12 +215,8 @@ func (po *ProducerOptions) isSubEntriesBatching() bool { func (producer *Producer) removeFromConfirmationStatus(status []*ConfirmationStatus) { - for _, msg := range status { - producer.unConfirmed.remove(msg.publishingId) - for _, linked := range msg.linkedTo { - producer.unConfirmed.remove(linked.publishingId) - } - } + producer.unConfirmed.removeBatch(status) + } func (producer *Producer) removeUnConfirmed(sequence int64) { @@ -216,12 +231,14 @@ func (producer *Producer) getUnConfirmed(sequence int64) *ConfirmationStatus { return producer.unConfirmed.get(sequence) } +// NotifyPublishConfirmation returns a channel that receives the confirmation status of the messages sent by the producer. func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm { ch := make(chan []*ConfirmationStatus, 1) producer.publishConfirm = ch return ch } +// NotifyClose returns a channel that receives the close event of the producer. func (producer *Producer) NotifyClose() ChannelClose { ch := make(chan Event, 1) producer.closeHandler = ch @@ -250,28 +267,36 @@ func (producer *Producer) getStatus() int { func (producer *Producer) startUnconfirmedMessagesTimeOutTask() { go func() { - for producer.getStatus() == open { - time.Sleep(2 * time.Second) - toRemove := make([]*ConfirmationStatus, 0) - // check the unconfirmed messages and remove the one that are expired - for _, msg := range producer.unConfirmed.getAll() { - if time.Since(msg.inserted) > producer.options.ConfirmationTimeOut { - msg.err = ConfirmationTimoutError - msg.errorCode = timeoutError - msg.confirmed = false - toRemove = append(toRemove, msg) + for { + select { + case <-producer.doneTimeoutTicker: + time.Sleep(1 * time.Second) + producer.flushUnConfirmedMessages(timeoutError, ConfirmationTimoutError) + return + case <-producer.timeoutTicker.C: + toRemove := make([]*ConfirmationStatus, 0) + // check the unconfirmed messages and remove the one that are expired + if producer.getStatus() == open { + m := producer.unConfirmed.getAll() + for _, msg := range m { + if time.Since(msg.inserted) > producer.options.ConfirmationTimeOut { + msg.err = ConfirmationTimoutError + msg.errorCode = timeoutError + msg.confirmed = false + toRemove = append(toRemove, msg) + } + } + + if len(toRemove) > 0 { + producer.removeFromConfirmationStatus(toRemove) + if producer.publishConfirm != nil { + producer.publishConfirm <- toRemove + } + } } } - if len(toRemove) > 0 { - producer.removeFromConfirmationStatus(toRemove) - if producer.publishConfirm != nil { - producer.publishConfirm <- toRemove - } - } } - time.Sleep(1 * time.Second) - producer.flushUnConfirmedMessages(timeoutError, ConfirmationTimoutError) }() } @@ -279,17 +304,17 @@ func (producer *Producer) processSendingMessages() { // the queueMessages is the buffer to accumulate the messages - // queueMessages is shared between the two goroutines + // queueMessages is shared between the two goroutines. + // it contains the messages that are sent by the first goroutine queueMessages := make([]*messageSequence, 0) + // the chSignal is used to signal the two goroutines. + // When chSignal is sent by the second goroutine, the first goroutine sends the messages + chSignal := make(chan struct{}, producer.options.BatchSize) + totalBufferToSend := 0 mutexQueue := sync.RWMutex{} - chSignal := make(chan struct{}, producer.options.BatchSize) - maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize - // temporary variables to calculate the average - sent := 0 - frames := 0 - iterations := 0 + maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize // the waitGroup is used to wait for the two goroutines // the first goroutine is the one that sends the messages @@ -312,14 +337,7 @@ func (producer *Producer) processSendingMessages() { if err != nil { logs.LogError("Producer %d, error during batch send: %s", producer.GetID(), err) } - sent += len(queueMessages) queueMessages = queueMessages[:0] - iterations++ - if iterations > 0 && iterations%1000000 == 0 { - logs.LogInfo("Producer %d, average messages: %d, frames %d, sent:%d", - producer.GetID(), sent/iterations, frames/iterations, sent) - //} - } totalBufferToSend = initBufferPublishSize mutexQueue.Unlock() } @@ -330,8 +348,11 @@ func (producer *Producer) processSendingMessages() { waitGroup.Add(1) /// accumulate the messages in a buffer + // the batch messages are sent with the messages that are accumulated in the buffer. + // The buffer is reset when the messages are sent. go func() { waitGroup.Done() + // Receive the messages from the dynamicSendCh used by Send() function for msg := range producer.dynamicSendCh { toSend := false mutexQueue.Lock() @@ -350,7 +371,7 @@ func (producer *Producer) processSendingMessages() { chSignal <- struct{}{} } - // the channel is closed, so we can close the signal channel + // the dynamicSendCh channel is closed, so we can close the signal channel close(chSignal) }() @@ -389,8 +410,10 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str } // Send sends a message to the stream and returns an error if the message could not be sent. -// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay -// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached. +// The Send is asynchronous. The message is sent to a channel ant then other goroutines aggregate and sent the messages +// The Send is dynamic so the number of messages sent decided internally based on the BatchSize +// and the messages contained in the buffer. The aggregation is up to the client. +// returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) Send(streamMessage message.StreamMessage) error { messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) if err != nil { @@ -413,14 +436,16 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { return FrameTooLarge } producer.addUnConfirmed(messageSeq.publishingId, streamMessage, producer.GetID()) + // se the processSendingMessages function producer.dynamicSendCh <- messageSeq return nil } // BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent. -// The method is synchronous. The aggregation is up to the user. The user has to aggregate the messages +// The method is synchronous.The aggregation is up to the user. The user has to aggregate the messages // and send them in a batch. // BatchSend is not affected by the BatchSize and BatchPublishingDelay options. +// returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize var messagesSequence = make([]*messageSequence, 0) @@ -441,6 +466,8 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error // if totalBufferToSend+initBufferPublishSize > maxFrame { + // if the totalBufferToSend is greater than the requestedMaxFrameSize + // all the messages are unconfirmed for _, msg := range messagesSequence { if producer.publishConfirm != nil { unConfirmedMessage := &ConfirmationStatus{ @@ -455,10 +482,11 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error producer.publishConfirm <- []*ConfirmationStatus{unConfirmedMessage} } } - return FrameTooLarge } + producer.addUnConfirmedSequences(messagesSequence, producer.GetID()) + // all the messages are unconfirmed return producer.internalBatchSend(messagesSequence) } @@ -539,9 +567,8 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c return subEntries, nil } -/// the producer id is always the producer.GetID(). This function is needed only for testing -// some condition, like simulate publish error, see - +// / the producer id is always the producer.GetID(). This function is needed only for testing +// some condition, like simulate publish error. func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error { producer.options.client.socket.mutex.Lock() defer producer.options.client.socket.mutex.Unlock() @@ -618,9 +645,14 @@ func (producer *Producer) flushUnConfirmedMessages(errorCode uint16, err error) } +// GetLastPublishingId returns the last publishing id sent by the producer given the producer name. +// this function is useful when you need to know the last message sent by the producer in case of +// deduplication. func (producer *Producer) GetLastPublishingId() (int64, error) { return producer.options.client.queryPublisherSequence(producer.GetName(), producer.GetStreamName()) } + +// Close closes the producer and returns an error if the producer could not be closed. func (producer *Producer) Close() error { if producer.getStatus() == closed { return AlreadyClosed @@ -629,6 +661,10 @@ func (producer *Producer) Close() error { producer.waitForInflightMessages() producer.setStatus(closed) + producer.timeoutTicker.Stop() + producer.doneTimeoutTicker <- struct{}{} + close(producer.doneTimeoutTicker) + if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") } @@ -668,7 +704,7 @@ func (producer *Producer) waitForInflightMessages() { logs.LogDebug("waitForInflightMessages, channel: %d - unconfirmed len: %d - retry: %d", channelLength, producer.lenUnConfirmed(), tentatives) - time.Sleep(time.Duration(2*producer.options.BatchPublishingDelay) * time.Millisecond) + time.Sleep(time.Duration(500) * time.Millisecond) tentatives++ } } diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index 5162849b..c62e3a6b 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -1,108 +1,97 @@ package stream -import "sync" - -type unConfirmedPartition struct { +import ( + "sync" + "time" +) + +// unConfirmed is a structure that holds unconfirmed messages +// And unconfirmed message is a message that has been sent to the broker but not yet confirmed, +// and it is added to the unConfirmed structure as soon is possible when +// +// the Send() or BatchSend() method is called +// +// The confirmation status is updated when the confirmation is received from the broker (see server_frame.go) +// or due of timeout. The Timeout is configurable, and it is calculated client side. +type unConfirmed struct { messages map[int64]*ConfirmationStatus - mutex sync.Mutex + mutex sync.RWMutex } -func newUnConfirmedPartition() *unConfirmedPartition { - return &unConfirmedPartition{ - messages: make(map[int64]*ConfirmationStatus), - } -} +const DefaultUnconfirmedSize = 10000 -func (u *unConfirmedPartition) add(id int64, cf *ConfirmationStatus) { - u.mutex.Lock() - defer u.mutex.Unlock() - u.messages[id] = cf -} +func newUnConfirmed() *unConfirmed { -func (u *unConfirmedPartition) remove(id int64) { - u.mutex.Lock() - defer u.mutex.Unlock() - delete(u.messages, id) -} + r := &unConfirmed{ + messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), + mutex: sync.RWMutex{}, + } -func (u *unConfirmedPartition) get(id int64) *ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() - return u.messages[id] + return r } -func (u *unConfirmedPartition) size() int { - u.mutex.Lock() - defer u.mutex.Unlock() - return len(u.messages) -} -func (u *unConfirmedPartition) clear() { - u.mutex.Lock() - defer u.mutex.Unlock() - u.messages = make(map[int64]*ConfirmationStatus) -} +func (u *unConfirmed) addBatch(messageSequences []*messageSequence, producerID uint8) { -func (u *unConfirmedPartition) getAll() map[int64]*ConfirmationStatus { u.mutex.Lock() - defer u.mutex.Unlock() - result := make(map[int64]*ConfirmationStatus) - for i, message := range u.messages { - result[i] = message + for _, ms := range messageSequences { + u.messages[ms.publishingId] = &ConfirmationStatus{ + inserted: time.Now(), + message: *ms.refMessage, + producerID: producerID, + publishingId: ms.publishingId, + confirmed: false, + } } - return result + u.mutex.Unlock() + } -type unConfirmed struct { - partitions []*unConfirmedPartition - partitionNumber int +func (u *unConfirmed) add(id int64, cf *ConfirmationStatus) { + u.mutex.Lock() + u.messages[id] = cf + u.mutex.Unlock() } -func newUnConfirmed(partitionNumber int) *unConfirmed { - var partitions []*unConfirmedPartition - for i := 0; i < partitionNumber; i++ { - partitions = append(partitions, newUnConfirmedPartition()) - } - return &unConfirmed{ - partitions: partitions, - partitionNumber: partitionNumber, +func (u *unConfirmed) removeBatch(confirmationStatus []*ConfirmationStatus) { + u.mutex.Lock() + for _, cs := range confirmationStatus { + delete(u.messages, cs.publishingId) } -} + u.mutex.Unlock() -func (u *unConfirmed) add(id int64, cf *ConfirmationStatus) { - partition := id % int64(u.partitionNumber) - u.partitions[partition].add(id, cf) } func (u *unConfirmed) remove(id int64) { - partition := id % int64(u.partitionNumber) - u.partitions[partition].remove(id) + u.mutex.Lock() + delete(u.messages, id) + u.mutex.Unlock() } func (u *unConfirmed) get(id int64) *ConfirmationStatus { - partition := id % int64(u.partitionNumber) - return u.partitions[partition].get(id) + u.mutex.RLock() + defer u.mutex.RUnlock() + return u.messages[id] } func (u *unConfirmed) size() int { - size := 0 - for _, partition := range u.partitions { - size += partition.size() - } - return size + u.mutex.Lock() + defer u.mutex.Unlock() + return len(u.messages) } func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { - result := make(map[int64]*ConfirmationStatus) - for _, partition := range u.partitions { - for _, status := range partition.getAll() { - result[status.publishingId] = status - } + cloned := make(map[int64]*ConfirmationStatus) + u.mutex.RLock() + defer u.mutex.RUnlock() + for k, v := range u.messages { + cloned[k] = v } - return result + return cloned + } func (u *unConfirmed) clear() { - for _, partition := range u.partitions { - partition.clear() - } + u.mutex.Lock() + u.messages = make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize) + u.mutex.Unlock() } diff --git a/pkg/stream/producer_unconfirmed_test.go b/pkg/stream/producer_unconfirmed_test.go deleted file mode 100644 index bfda3621..00000000 --- a/pkg/stream/producer_unconfirmed_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package stream - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "sync" -) - -var _ = Describe("UnConfirmed tests ", func() { - - It("Hash Consistency", func() { - unConfirmed := newUnConfirmed(10) - for i := 0; i < 10; i++ { - unConfirmed.add(int64(i), &ConfirmationStatus{}) - } - for i := 0; i < 10; i++ { - Expect(unConfirmed.get(int64(i))).NotTo(BeNil()) - } - Expect(len(unConfirmed.partitions)).To(Equal(10)) - Expect(unConfirmed.partitions[0].size()).To(Equal(1)) - Expect(unConfirmed.partitions[1].size()).To(Equal(1)) - Expect(unConfirmed.partitions[2].size()).To(Equal(1)) - Expect(unConfirmed.partitions[3].size()).To(Equal(1)) - Expect(unConfirmed.partitions[4].size()).To(Equal(1)) - Expect(unConfirmed.partitions[5].size()).To(Equal(1)) - Expect(unConfirmed.partitions[6].size()).To(Equal(1)) - Expect(unConfirmed.partitions[7].size()).To(Equal(1)) - Expect(unConfirmed.partitions[8].size()).To(Equal(1)) - Expect(unConfirmed.partitions[9].size()).To(Equal(1)) - unConfirmed.clear() - for i := 0; i < 10; i++ { - Expect(unConfirmed.partitions[i].size()).To(Equal(0)) - } - Expect(unConfirmed.size()).To(Equal(0)) - }) - - It("GetAll Result should be consistent", func() { - // the map should be order - // even it is not strictly necessary to be order - - for sz := 1; sz <= 10; sz++ { - unConfirmed := newUnConfirmed(sz) - for i := 0; i < 500; i++ { - unConfirmed.add(int64(i), &ConfirmationStatus{}) - } - result := unConfirmed.getAll() - exceptedValue := 0 - for i, status := range result { - Expect(i).To(Equal(status.GetPublishingId())) - Expect(i).To(Equal(int64(exceptedValue))) - exceptedValue++ - } - } - - }) - - It("GetAll Result should be consistent in multi-thread", func() { - // the map should be order in multi-thread - // even it is not strictly necessary to be order - - for sz := 1; sz <= 10; sz++ { - unConfirmed := newUnConfirmed(sz) - wait := &sync.WaitGroup{} - for i := 0; i < 500; i++ { - wait.Add(1) - go func(idx int) { - unConfirmed.add(int64(idx), &ConfirmationStatus{}) - wait.Done() - }(i) - } - wait.Wait() - - result := unConfirmed.getAll() - exceptedValue := 0 - for i, status := range result { - Expect(i).To(Equal(status.GetPublishingId())) - Expect(i).To(Equal(int64(exceptedValue))) - exceptedValue++ - } - } - - }) - -}) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 82a9b397..242311a2 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -251,31 +251,31 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in logs.LogWarn("can't find the producer during confirmation: %s", err) return nil } - var unConfirmed []*ConfirmationStatus + var unConfirmedRecv []*ConfirmationStatus for publishingIdCount != 0 { seq := readInt64(r) m := producer.getUnConfirmed(seq) if m != nil { m.confirmed = true - unConfirmed = append(unConfirmed, m) + unConfirmedRecv = append(unConfirmedRecv, m) // in case of sub-batch entry the client receives only // one publishingId (or sequence) // so the other messages are confirmed using the linkedTo for _, message := range m.linkedTo { message.confirmed = true - unConfirmed = append(unConfirmed, message) + unConfirmedRecv = append(unConfirmedRecv, message) } } publishingIdCount-- } - producer.removeFromConfirmationStatus(unConfirmed) + producer.removeFromConfirmationStatus(unConfirmedRecv) //producer.mutex.Lock() if producer.publishConfirm != nil { - producer.publishConfirm <- unConfirmed + producer.publishConfirm <- unConfirmedRecv } //producer.mutex.Unlock() @@ -474,7 +474,7 @@ func (c *Client) handlePublishError(buffer *bufio.Reader) { producer, err := c.coordinator.GetProducerById(publisherId) if err != nil { logs.LogWarn("producer id %d not found, publish error :%s", publisherId, lookErrorCode(code)) - producer = &Producer{unConfirmed: newUnConfirmed(10)} + producer = &Producer{unConfirmed: newUnConfirmed()} } else { unConfirmedMessage := producer.getUnConfirmed(publishingId)