diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index c58f210..d2a0cd6 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -47,6 +47,37 @@ var _ = Describe("Streaming Producers", func() { Expect(producer.Close()).NotTo(HaveOccurred()) }) + It("Close Publishers before send is competed", func() { + // force a high batch size to test the close + // during the close the producer should send the messages in the buffer + //and flush as timeout the messages can't be sent + producer, err := testEnvironment.NewProducer(testProducerStream, NewProducerOptions().SetBatchSize(500)) + ch := producer.NotifyPublishConfirmation() + var confirmedMessages int32 + var failedMessages int32 + + go func(ch ChannelPublishConfirm) { + for confirmed := range ch { + for _, msg := range confirmed { + if msg.IsConfirmed() { + atomic.AddInt32(&confirmedMessages, 1) + } else { + atomic.AddInt32(&failedMessages, 1) + } + } + + } + }(ch) + Expect(err).NotTo(HaveOccurred()) + for i := 0; i < 1_000; i++ { + Expect(producer.Send(amqp.NewMessage([]byte("test")))).NotTo(HaveOccurred()) + } + Expect(producer.Close()).NotTo(HaveOccurred()) + Eventually(func() int32 { + return atomic.LoadInt32(&confirmedMessages) + atomic.LoadInt32(&failedMessages) + }).WithPolling(200 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(int32(1_000))) + }) + It("Multi-thread newProducer/Send", func() { var wg sync.WaitGroup for i := 0; i < 10; i++ { diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 04780ca..14d3cfb 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -240,33 +240,39 @@ func (c *Client) commandOpen(readProtocol *ReaderProtocol, r *bufio.Reader) { } func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) interface{} { - + producerFound := false readProtocol.PublishID = readByte(r) - //readProtocol.PublishingIdCount = ReadIntFromReader(testEnvironment.reader) publishingIdCount, _ := readUInt(r) - //var _publishingId int64 producer, err := c.coordinator.GetProducerById(readProtocol.PublishID) + producerFound = err == nil if err != nil { - logs.LogWarn("can't find the producer during confirmation: %s", err) - return nil + logs.LogWarn("can't find the producer during confirmation: %s. Id %d", err, readProtocol.PublishID) } + + // even the producer is not found we need to read the publishingId + // to empty the buffer. + // The producer here could not exist because the producer is closed before the confirmations are received var unConfirmedRecv []*ConfirmationStatus for publishingIdCount != 0 { seq := readInt64(r) + if producerFound { - m := producer.unConfirmed.extractWithConfirm(seq) - if m != nil { - unConfirmedRecv = append(unConfirmedRecv, m) + m := producer.unConfirmed.extractWithConfirm(seq) + if m != nil { + unConfirmedRecv = append(unConfirmedRecv, m) - // in case of sub-batch entry the client receives only - // one publishingId (or sequence) - // so the other messages are confirmed using the linkedTo - unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) + // in case of sub-batch entry the client receives only + // one publishingId (or sequence) + // so the other messages are confirmed using the linkedTo + unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) + } } + publishingIdCount-- } - - producer.sendConfirmationStatus(unConfirmedRecv) + if producerFound { + producer.sendConfirmationStatus(unConfirmedRecv) + } return 0 }