Skip to content

Commit

Permalink
Improve producer edge cases
Browse files Browse the repository at this point in the history
* remove the code duplication when the producer is closed correctly or not

* make the unconfirmed operation more atomic to avoid rare cases when the unconfirmed operations were out of the mutex lock

* Increase the timeout for the Reliable producers and consumers. It was too low. Now, there is one more wait for the producer to wait for pending messages to be flushed when closed.

* refactor publish confirm channel with a lock to check if it is valid or not and avoid race conditions during the closing

* handle when the producer is not found during the confirmation. It can happen when the producer is closed before all the messages are confirmed

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Jan 7, 2025
1 parent 37ff466 commit 5218fc3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
31 changes: 31 additions & 0 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ var _ = Describe("Streaming Producers", func() {
Expect(producer.Close()).NotTo(HaveOccurred())
})

It("Close Publishers before send is competed", func() {
// force a high batch size to test the close
// during the close the producer should send the messages in the buffer
//and flush as timeout the messages can't be sent
producer, err := testEnvironment.NewProducer(testProducerStream, NewProducerOptions().SetBatchSize(500))
ch := producer.NotifyPublishConfirmation()
var confirmedMessages int32
var failedMessages int32

go func(ch ChannelPublishConfirm) {
for confirmed := range ch {
for _, msg := range confirmed {
if msg.IsConfirmed() {
atomic.AddInt32(&confirmedMessages, 1)
} else {
atomic.AddInt32(&failedMessages, 1)
}
}

}
}(ch)
Expect(err).NotTo(HaveOccurred())
for i := 0; i < 1_000; i++ {
Expect(producer.Send(amqp.NewMessage([]byte("test")))).NotTo(HaveOccurred())
}
Expect(producer.Close()).NotTo(HaveOccurred())
Eventually(func() int32 {
return atomic.LoadInt32(&confirmedMessages) + atomic.LoadInt32(&failedMessages)
}).WithPolling(200 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(int32(1_000)))
})

It("Multi-thread newProducer/Send", func() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
Expand Down
34 changes: 20 additions & 14 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,33 +240,39 @@ func (c *Client) commandOpen(readProtocol *ReaderProtocol, r *bufio.Reader) {
}

func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) interface{} {

producerFound := false
readProtocol.PublishID = readByte(r)
//readProtocol.PublishingIdCount = ReadIntFromReader(testEnvironment.reader)
publishingIdCount, _ := readUInt(r)
//var _publishingId int64
producer, err := c.coordinator.GetProducerById(readProtocol.PublishID)
producerFound = err == nil
if err != nil {
logs.LogWarn("can't find the producer during confirmation: %s", err)
return nil
logs.LogWarn("can't find the producer during confirmation: %s. Id %d", err, readProtocol.PublishID)
}

// even the producer is not found we need to read the publishingId
// to empty the buffer.
// The producer here could not exist because the producer is closed before the confirmations are received
var unConfirmedRecv []*ConfirmationStatus
for publishingIdCount != 0 {
seq := readInt64(r)
if producerFound {

m := producer.unConfirmed.extractWithConfirm(seq)
if m != nil {
unConfirmedRecv = append(unConfirmedRecv, m)
m := producer.unConfirmed.extractWithConfirm(seq)
if m != nil {
unConfirmedRecv = append(unConfirmedRecv, m)

// in case of sub-batch entry the client receives only
// one publishingId (or sequence)
// so the other messages are confirmed using the linkedTo
unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...)
// in case of sub-batch entry the client receives only
// one publishingId (or sequence)
// so the other messages are confirmed using the linkedTo
unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...)
}
}

publishingIdCount--
}

producer.sendConfirmationStatus(unConfirmedRecv)
if producerFound {
producer.sendConfirmationStatus(unConfirmedRecv)
}

return 0
}
Expand Down

0 comments on commit 5218fc3

Please sign in to comment.