From 37ff4662d9a88ccd9c153032284eb9de4c1a85eb Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 7 Jan 2025 15:46:46 +0100 Subject: [PATCH] Improve producer edge cases * remove the code duplication when the producer is closed correctly or not * make the unconfirmed operation more atomic to avoid rare cases when the unconfirmed operations were out of the mutex lock * Increase the timeout for the Reliable producers and consumers. It was too low. Now, there is one more wait for the producer to wait for pending messages to be flushed when closed. * refactor publish confirm channel with a lock to check if it is valid or not and avoid race conditions during the closing * handle when the producer is not found during the confirmation. It can happen when the producer is closed before all the messages are confirmed Signed-off-by: Gabriele Santomaggio --- examples/getting_started.go | 4 +- examples/reliable/README.md | 4 + examples/reliable/reliable_client.go | 63 +++++++---- pkg/ha/ha_consumer.go | 58 ++++++---- pkg/ha/ha_consumer_test.go | 9 +- pkg/ha/ha_publisher.go | 67 ++++++++---- pkg/ha/ha_publisher_test.go | 8 +- pkg/ha/reliable_common.go | 23 +++- pkg/stream/blocking_queue.go | 42 ++++++-- pkg/stream/client.go | 70 +++++++----- pkg/stream/constants.go | 1 + pkg/stream/consumer.go | 2 +- pkg/stream/coordinator.go | 16 ++- pkg/stream/producer.go | 130 +++++++++++++---------- pkg/stream/producer_test.go | 5 +- pkg/stream/server_frame.go | 13 +-- pkg/stream/super_stream_producer_test.go | 5 +- 17 files changed, 330 insertions(+), 190 deletions(-) diff --git a/examples/getting_started.go b/examples/getting_started.go index 0d9a7cbd..7fabd0a2 100644 --- a/examples/getting_started.go +++ b/examples/getting_started.go @@ -111,7 +111,9 @@ func main() { channelClose := consumer.NotifyClose() // channelClose receives all the closing events, here you can handle the // client reconnection or just log - defer consumerClose(channelClose) + go func() { + consumerClose(channelClose) + }() fmt.Println("Press any key to stop ") _, _ = reader.ReadString('\n') diff --git a/examples/reliable/README.md b/examples/reliable/README.md index 4f70387b..abfe40e9 100644 --- a/examples/reliable/README.md +++ b/examples/reliable/README.md @@ -12,6 +12,10 @@ Note: - The `unConfirmedMessages` are not persisted, so if the application is restarted, the `unConfirmedMessages` will be lost. - The `unConfirmedMessages` order is not guaranteed - The `unConfirmedMessages` can grow indefinitely if the broker is unavailable for a long time. +- The `re-send` in an option that can be enabled by setting `enableResend` to `true`. + +The example enables golang `pprof` you can check the url: localhost:6060/debug/pprof/.
+The scope is to check the resources used by the application in case of reconnection. The `reliable_common.go/retry` function does different checks because during the restart broker can happen different events, please check: diff --git a/examples/reliable/reliable_client.go b/examples/reliable/reliable_client.go index 54344a3c..365d5ca7 100644 --- a/examples/reliable/reliable_client.go +++ b/examples/reliable/reliable_client.go @@ -4,7 +4,11 @@ import ( "bufio" "errors" "fmt" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "log" + "net/http" "os" + "runtime" "sync" "sync/atomic" "time" @@ -13,6 +17,7 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + _ "net/http/pprof" ) // The ha producer and consumer provide a way to auto-reconnect in case of connection problems @@ -33,19 +38,24 @@ var reSent int32 const enableResend = false func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + // Your application code here + // Tune the parameters to test the reliability - const messagesToSend = 10_000_000 - const numberOfProducers = 1 + const messagesToSend = 50_000_000 + const numberOfProducers = 5 const concurrentProducers = 2 const numberOfConsumers = 1 const sendDelay = 1 * time.Millisecond - const delayEachMessages = 200 - const maxProducersPerClient = 4 + const delayEachMessages = 500 + const maxProducersPerClient = 2 const maxConsumersPerClient = 2 // reader := bufio.NewReader(os.Stdin) - //stream.SetLevelInfo(logs.DEBUG) + stream.SetLevelInfo(logs.DEBUG) fmt.Println("Reliable Producer/Consumer example") fmt.Println("Connecting to RabbitMQ streaming ...") @@ -72,27 +82,52 @@ func main() { } err = env.DeclareStream(streamName, &stream.StreamOptions{ - MaxLengthBytes: stream.ByteCapacity{}.GB(2), + MaxLengthBytes: stream.ByteCapacity{}.GB(1), }, ) CheckErr(err) + var producers []*ha.ReliableProducer + var consumers []*ha.ReliableConsumer isRunning := true go func() { for isRunning { totalConfirmed := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail) expectedMessages := messagesToSend * numberOfProducers * concurrentProducers * 2 + fmt.Printf("********************************************\n") fmt.Printf("%s - ToSend: %d - nProducers: %d - concurrentProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC850), expectedMessages, numberOfProducers, concurrentProducers, numberOfConsumers) fmt.Printf("Sent:%d - ReSent %d - Confirmed:%d - Not confirmed:%d - Fail+Confirmed :%d \n", sent, atomic.LoadInt32(&reSent), atomic.LoadInt32(&confirmed), atomic.LoadInt32(&fail), totalConfirmed) fmt.Printf("Total Consumed: %d - Per consumer: %d \n", atomic.LoadInt32(&consumed), atomic.LoadInt32(&consumed)/numberOfConsumers) + + for _, producer := range producers { + fmt.Printf("%s, status: %s \n", + producer.GetInfo(), producer.GetStatusAsString()) + + } + for _, consumer := range consumers { + fmt.Printf("%s, status: %s \n", + consumer.GetInfo(), consumer.GetStatusAsString()) + } + fmt.Printf("go-routine: %d\n", runtime.NumGoroutine()) fmt.Printf("********************************************\n") time.Sleep(5 * time.Second) } }() - var producers []*ha.ReliableProducer + + for i := 0; i < numberOfConsumers; i++ { + consumer, err := ha.NewReliableConsumer(env, + streamName, + stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()), + func(consumerContext stream.ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&consumed, 1) + }) + CheckErr(err) + consumers = append(consumers, consumer) + } + for i := 0; i < numberOfProducers; i++ { var mutex = sync.Mutex{} // Here we store the messages that have not been confirmed @@ -151,20 +186,6 @@ func main() { } }() } - var consumers []*ha.ReliableConsumer - - for i := 0; i < numberOfConsumers; i++ { - go func(name string) { - consumer, err := ha.NewReliableConsumer(env, - streamName, - stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()), - func(consumerContext stream.ConsumerContext, message *amqp.Message) { - atomic.AddInt32(&consumed, 1) - }) - CheckErr(err) - consumers = append(consumers, consumer) - }(streamName) - } fmt.Println("Press enter to close the connections.") _, _ = reader.ReadString('\n') diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go index 6a71dcd3..24e8bb9d 100644 --- a/pkg/ha/ha_consumer.go +++ b/pkg/ha/ha_consumer.go @@ -28,28 +28,41 @@ type ReliableConsumer struct { bootstrap bool } +func (c *ReliableConsumer) GetStatusAsString() string { + switch c.GetStatus() { + case StatusOpen: + return "Open" + case StatusClosed: + return "Closed" + case StatusStreamDoesNotExist: + return "StreamDoesNotExist" + case StatusReconnecting: + return "Reconnecting" + default: + return "Unknown" + } +} + func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) { go func() { - for event := range channelClose { - if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { - c.setStatus(StatusReconnecting) - logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo()) - c.bootstrap = false - err, reconnected := retry(0, c) - if err != nil { - logs.LogInfo(""+ - "[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err) - } - if reconnected { - c.setStatus(StatusOpen) - } else { - c.setStatus(StatusClosed) - } - + event := <-channelClose + if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { + c.setStatus(StatusReconnecting) + logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo()) + c.bootstrap = false + err, reconnected := retry(1, c) + if err != nil { + logs.LogInfo(""+ + "[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err) + } + if reconnected { + c.setStatus(StatusOpen) } else { - logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", c.getInfo(), event.Reason) c.setStatus(StatusClosed) } + } else { + logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", c.getInfo(), event.Reason) + c.setStatus(StatusClosed) } }() } @@ -72,11 +85,13 @@ func NewReliableConsumer(env *stream.Environment, streamName string, if consumerOptions == nil { return nil, fmt.Errorf("the consumer options is mandatory") } - + logs.LogDebug("[Reliable] - creating %s", res.getInfo()) err := res.newConsumer() if err == nil { + res.setStatus(StatusOpen) } + logs.LogDebug("[Reliable] - created %s", res.getInfo()) return res, err } @@ -124,14 +139,15 @@ func (c *ReliableConsumer) newConsumer() error { c.bootstrap, offset) consumer, err := c.env.NewConsumer(c.streamName, func(consumerContext stream.ConsumerContext, message *amqp.Message) { c.mutexConnection.Lock() - c.currentPosition = consumerContext.Consumer.GetOffset() c.mutexConnection.Unlock() + c.messagesHandler(consumerContext, message) }, c.consumerOptions.SetOffset(offset)) if err != nil { return err } + channelNotifyClose := consumer.NotifyClose() c.handleNotifyClose(channelNotifyClose) c.consumer = consumer @@ -146,3 +162,7 @@ func (c *ReliableConsumer) Close() error { } return nil } + +func (c *ReliableConsumer) GetInfo() string { + return c.getInfo() +} diff --git a/pkg/ha/ha_consumer_test.go b/pkg/ha/ha_consumer_test.go index 0153f70e..d175e845 100644 --- a/pkg/ha/ha_consumer_test.go +++ b/pkg/ha/ha_consumer_test.go @@ -108,11 +108,13 @@ var _ = Describe("Reliable Consumer", func() { // kill the connection errDrop := test_helper.DropConnection(connectionToDrop, "15672") Expect(errDrop).NotTo(HaveOccurred()) - - time.Sleep(2 * time.Second) // we give some time to the client to reconnect - Expect(consumer.GetStatus()).To(Equal(StatusOpen)) + /// just give some time to raise the event + time.Sleep(1200 * time.Millisecond) + Eventually(func() int { return consumer.GetStatus() }, "15s").WithPolling(300 * time.Millisecond).Should(Equal(StatusOpen)) + Expect(consumer.GetStatusAsString()).To(Equal("Open")) Expect(consumer.Close()).NotTo(HaveOccurred()) Expect(consumer.GetStatus()).To(Equal(StatusClosed)) + Expect(consumer.GetStatusAsString()).To(Equal("Closed")) }) It("Delete the stream should close the consumer", func() { @@ -123,6 +125,7 @@ var _ = Describe("Reliable Consumer", func() { Expect(err).NotTo(HaveOccurred()) Expect(consumer).NotTo(BeNil()) Expect(consumer.GetStatus()).To(Equal(StatusOpen)) + Expect(consumer.GetStatusAsString()).To(Equal("Open")) Expect(envForRConsumer.DeleteStream(streamForRConsumer)).NotTo(HaveOccurred()) Eventually(func() int { return consumer.GetStatus() diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index f28db2ca..e6a0b1db 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -24,29 +24,31 @@ func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishCo func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { go func() { - for event := range channelClose { - if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { - p.setStatus(StatusReconnecting) - logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", p.getInfo()) - err, reconnected := retry(0, p) - if err != nil { - logs.LogInfo( - "[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err) - } - if reconnected { - p.setStatus(StatusOpen) - } else { - p.setStatus(StatusClosed) - } + event := <-channelClose + if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) { + p.setStatus(StatusReconnecting) + waitTime := randomWaitWithBackoff(1) + logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting in %d milliseconds waiting pending messages", p.getInfo(), waitTime) + time.Sleep(time.Duration(waitTime) * time.Millisecond) + err, reconnected := retry(1, p) + if err != nil { + logs.LogInfo( + "[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err) + } + if reconnected { + p.setStatus(StatusOpen) } else { - logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", p.getInfo(), event.Reason) p.setStatus(StatusClosed) } - - p.reconnectionSignal.L.Lock() - p.reconnectionSignal.Broadcast() - p.reconnectionSignal.L.Unlock() + } else { + logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", p.getInfo(), event.Reason) + p.setStatus(StatusClosed) } + + p.reconnectionSignal.L.Lock() + p.reconnectionSignal.Broadcast() + p.reconnectionSignal.L.Unlock() + logs.LogInfo("[Reliable] - %s reconnection signal sent", p.getInfo()) }() } @@ -61,6 +63,7 @@ type ReliableProducer struct { producerOptions *stream.ProducerOptions count int32 confirmMessageHandler ConfirmMessageHandler + channelPublishConfirm stream.ChannelPublishConfirm mutex *sync.Mutex mutexStatus *sync.Mutex status int @@ -98,14 +101,17 @@ func NewReliableProducer(env *stream.Environment, streamName string, } func (p *ReliableProducer) newProducer() error { + p.mutex.Lock() + defer p.mutex.Unlock() producer, err := p.env.NewProducer(p.streamName, p.producerOptions) if err != nil { + return err } - channelPublishConfirm := producer.NotifyPublishConfirmation() + p.channelPublishConfirm = producer.NotifyPublishConfirmation() + p.handlePublishConfirm(p.channelPublishConfirm) channelNotifyClose := producer.NotifyClose() p.handleNotifyClose(channelNotifyClose) - p.handlePublishConfirm(channelPublishConfirm) p.producer = producer return err } @@ -180,6 +186,21 @@ func (p *ReliableProducer) GetStatus() int { return p.status } +func (p *ReliableProducer) GetStatusAsString() string { + switch p.GetStatus() { + case StatusOpen: + return "Open" + case StatusClosed: + return "Closed" + case StatusStreamDoesNotExist: + return "StreamDoesNotExist" + case StatusReconnecting: + return "Reconnecting" + default: + return "Unknown" + } +} + // IReliable interface func (p *ReliableProducer) setStatus(value int) { p.mutexStatus.Lock() @@ -224,3 +245,7 @@ func (p *ReliableProducer) Close() error { } return nil } + +func (p *ReliableProducer) GetInfo() string { + return p.getInfo() +} diff --git a/pkg/ha/ha_publisher_test.go b/pkg/ha/ha_publisher_test.go index 131aeb2e..3c44455b 100644 --- a/pkg/ha/ha_publisher_test.go +++ b/pkg/ha/ha_publisher_test.go @@ -179,6 +179,12 @@ var _ = Describe("Reliable Producer", func() { }, time.Second*5, time.Millisecond). Should(BeTrue()) + // wait for the producer to be in reconnecting state + Eventually(func() bool { + return producer.GetStatusAsString() == "Reconnecting" + }, time.Second*5, time.Millisecond). + Should(BeTrue()) + go sendMsg() go sendMsg() @@ -195,7 +201,7 @@ var _ = Describe("Reliable Producer", func() { Expect(envForRProducer.DeleteStream(streamForRProducer)).NotTo(HaveOccurred()) Eventually(func() int { return producer.GetStatus() - }, "15s").WithPolling(300 * time.Millisecond).Should(Equal(StatusClosed)) + }, "21s").WithPolling(300 * time.Millisecond).Should(Equal(StatusClosed)) }) diff --git a/pkg/ha/reliable_common.go b/pkg/ha/reliable_common.go index 5f71aca7..c4e02b73 100644 --- a/pkg/ha/reliable_common.go +++ b/pkg/ha/reliable_common.go @@ -25,6 +25,7 @@ type IReliable interface { getNewInstance() newEntityInstance getTimeOut() time.Duration getStreamName() string + GetStatusAsString() string } // Retry is a function that retries the IReliable to the stream @@ -41,11 +42,12 @@ type IReliable interface { // In both cases it retries the reconnection func retry(backoff int, reliable IReliable) (error, bool) { - sleepValue := rand.Intn(int((reliable.getTimeOut().Seconds()-2+1)+2)*1000) + backoff*1000 - logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), sleepValue) - time.Sleep(time.Duration(sleepValue) * time.Millisecond) + waitTime := randomWaitWithBackoff(backoff) + logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), waitTime) + time.Sleep(time.Duration(waitTime) * time.Millisecond) streamMetaData, errS := reliable.getEnv().StreamMetaData(reliable.getStreamName()) if errors.Is(errS, stream.StreamDoesNotExist) { + logs.LogInfo("[Reliable] - The stream %s does not exist for %s. Stopping it", reliable.getStreamName(), reliable.getInfo()) return errS, false } if errors.Is(errS, stream.StreamNotAvailable) { @@ -75,3 +77,18 @@ func retry(backoff int, reliable IReliable) (error, bool) { return result, true } + +func randomWaitWithBackoff(attempt int) int { + baseWait := 2_000 + rand.Intn(7_000) + + // Calculate the wait time considering the number of attempts + waitTime := baseWait * (1 << (attempt - 1)) // Exponential back-off + + // Cap the wait time at a maximum of 20 seconds + if waitTime > 20_000 { + waitTime = 20_000 + } + + return waitTime + +} diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index 822ccccc..d949b6e0 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -2,6 +2,7 @@ package stream import ( "errors" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "sync/atomic" "time" ) @@ -9,17 +10,15 @@ import ( var ErrBlockingQueueStopped = errors.New("blocking queue stopped") type BlockingQueue[T any] struct { - queue chan T - capacity int - status int32 + queue chan T + status int32 } // NewBlockingQueue initializes a new BlockingQueue with the given capacity func NewBlockingQueue[T any](capacity int) *BlockingQueue[T] { return &BlockingQueue[T]{ - queue: make(chan T, capacity), - capacity: capacity, - status: 0, + queue: make(chan T, capacity), + status: 0, } } @@ -28,7 +27,8 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error { if bq.IsStopped() { return ErrBlockingQueueStopped } - bq.queue <- item // This will block if the queue is full + bq.queue <- item + return nil } @@ -39,7 +39,11 @@ func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T { return zeroValue } select { - case item := <-bq.queue: + case item, ok := <-bq.queue: + if !ok { + var zeroValue T // Zero value of type T + return zeroValue + } return item case <-time.After(timeout): var zeroValue T // Zero value of type T @@ -56,12 +60,30 @@ func (bq *BlockingQueue[T]) IsEmpty() bool { } // Stop stops the queue from accepting new items -// but allows the existing items to be processed +// but allows some pending items. // Stop is different from Close in that it allows the // existing items to be processed. -// That avoids the need to drain the queue before closing it. +// Drain the queue to be sure there are not pending messages func (bq *BlockingQueue[T]) Stop() { atomic.StoreInt32(&bq.status, 1) + // drain the queue. To be sure there are not pending messages + // in the queue. + // it does not matter if we lose some messages here + // since there is the unConfirmed map to handle the messages + isActive := true + for isActive { + select { + case <-bq.queue: + // do nothing + case <-time.After(10 * time.Millisecond): + isActive = false + return + default: + isActive = false + return + } + } + logs.LogDebug("BlockingQueue stopped") } func (bq *BlockingQueue[T]) Close() { diff --git a/pkg/stream/client.go b/pkg/stream/client.go index eda08a98..9e25a992 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -69,6 +69,8 @@ type Client struct { socketCallTimeout time.Duration availableFeatures *availableFeatures serverProperties map[string]string + + doneTimeoutTicker chan struct{} } func newClient(connectionName string, broker *Broker, @@ -103,6 +105,7 @@ func newClient(connectionName string, broker *Broker, }, socketCallTimeout: rpcTimeOut, availableFeatures: newAvailableFeatures(), + doneTimeoutTicker: make(chan struct{}, 1), } c.setConnectionName(connectionName) return c @@ -416,43 +419,54 @@ func (c *Client) heartBeat() { ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second) tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second) - resp := c.coordinator.NewResponseWitName("heartbeat") var heartBeatMissed int32 - + doneSendingTimeoutTicker := make(chan struct{}, 1) + wg := sync.WaitGroup{} + wg.Add(2) go func() { - for c.socket.isOpen() { - <-tickerHeartbeat.C - if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { - v := atomic.AddInt32(&heartBeatMissed, 1) - logs.LogWarn("Missing heart beat: %d", v) - if v >= 2 { - logs.LogWarn("Too many heartbeat missing: %d", v) - c.Close() + wg.Done() + select { + case <-c.doneTimeoutTicker: + doneSendingTimeoutTicker <- struct{}{} + ticker.Stop() + tickerHeartbeat.Stop() + return + case <-tickerHeartbeat.C: + for c.socket.isOpen() { + if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { + v := atomic.AddInt32(&heartBeatMissed, 1) + logs.LogWarn("Missing heart beat: %d", v) + if v >= 2 { + logs.LogWarn("Too many heartbeat missing: %d", v) + c.Close() + } + } else { + atomic.StoreInt32(&heartBeatMissed, 0) } - } else { - atomic.StoreInt32(&heartBeatMissed, 0) } - } - tickerHeartbeat.Stop() + }() go func() { + wg.Done() for { select { - case code := <-resp.code: - if code.id == closeChannel { - _ = c.coordinator.RemoveResponseByName("heartbeat") - } - ticker.Stop() + case <-doneSendingTimeoutTicker: + logs.LogDebug("Stopping sending heartbeat") return - case <-ticker.C: + case _, ok := <-ticker.C: + if !ok { + return + } logs.LogDebug("Sending heart beat: %s", time.Now()) c.sendHeartbeat() } } }() + wg.Wait() + } func (c *Client) sendHeartbeat() { @@ -464,12 +478,8 @@ func (c *Client) sendHeartbeat() { func (c *Client) closeHartBeat() { c.destructor.Do(func() { - r, err := c.coordinator.GetResponseByName("heartbeat") - if err != nil { - logs.LogDebug("error removing heartbeat: %s", err) - } else { - r.code <- Code{id: closeChannel} - } + c.doneTimeoutTicker <- struct{}{} + close(c.doneTimeoutTicker) }) } @@ -506,10 +516,9 @@ func (c *Client) Close() error { close(c.metadataListener) c.metadataListener = nil } - + c.closeHartBeat() if c.getSocket().isOpen() { - c.closeHartBeat() res := c.coordinator.NewResponse(CommandClose) length := 2 + 2 + 4 + 2 + 2 + len("OK") var b = bytes.NewBuffer(make([]byte, 0, length+4)) @@ -988,7 +997,10 @@ func (c *Client) DeclareSubscriber(streamName string, return } - case chunk := <-consumer.response.chunkForConsumer: + case chunk, ok := <-consumer.response.chunkForConsumer: + if !ok { + return + } for _, offMessage := range chunk.offsetMessages { consumer.setCurrentOffset(offMessage.offset) if canDispatch(offMessage) { diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 83cdd73f..a19ab723 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -118,6 +118,7 @@ const ( MetaDataUpdate = "metadata Data update" LeaderLocatorBalanced = "balanced" LeaderLocatorClientLocal = "client-local" + DeletePublisher = "deletePublisher" StreamTcpPort = "5552" diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 0fc20c30..e9ff54bc 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -332,7 +332,7 @@ func (consumer *Consumer) Close() error { if err.Err != nil && err.isTimeout { return err.Err } - consumer.response.code <- Code{id: closeChannel} + errC := consumer.options.client.coordinator.RemoveConsumerById(consumer.ID, Event{ Command: CommandUnsubscribe, StreamName: consumer.GetStreamName(), diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 4d6f1cf9..a2d9b198 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -73,6 +73,7 @@ func (coordinator *Coordinator) NewProducer( doneTimeoutTicker: make(chan struct{}, 1), status: open, pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize), + confirmMutex: &sync.Mutex{}, } coordinator.producers[lastId] = producer return producer, err @@ -83,14 +84,19 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) if err != nil { return err } + consumer.setStatus(closed) reason.StreamName = consumer.GetStreamName() reason.Name = consumer.GetName() if closeHandler := consumer.GetCloseHandler(); closeHandler != nil { closeHandler <- reason close(closeHandler) + closeHandler = nil } + close(consumer.response.chunkForConsumer) + close(consumer.response.code) + return nil } func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { @@ -99,15 +105,7 @@ func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error if err != nil { return err } - reason.StreamName = producer.GetStreamName() - reason.Name = producer.GetName() - if producer.closeHandler != nil { - producer.closeHandler <- reason - } - - producer.stopAndWaitPendingSequencesQueue() - - return nil + return producer.close(reason) } func (coordinator *Coordinator) RemoveResponseById(id interface{}) error { diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index dd30408c..a6bdb773 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -59,18 +59,21 @@ type messageSequence struct { } type Producer struct { - id uint8 - options *ProducerOptions - onClose onInternalClose - unConfirmed *unConfirmed - sequence int64 - mutex *sync.RWMutex - publishConfirm chan []*ConfirmationStatus + id uint8 + options *ProducerOptions + onClose onInternalClose + unConfirmed *unConfirmed + sequence int64 + mutex *sync.RWMutex + closeHandler chan Event status int confirmationTimeoutTicker *time.Ticker doneTimeoutTicker chan struct{} + confirmMutex *sync.Mutex + publishConfirmation chan []*ConfirmationStatus + pendingSequencesQueue *BlockingQueue[*messageSequence] } @@ -207,17 +210,13 @@ func (producer *Producer) lenUnConfirmed() int { // NotifyPublishConfirmation returns a channel that receives the confirmation status of the messages sent by the producer. func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm { ch := make(chan []*ConfirmationStatus, 1) - producer.publishConfirm = ch + producer.publishConfirmation = ch return ch } // NotifyClose returns a channel that receives the close event of the producer. func (producer *Producer) NotifyClose() ChannelClose { - // starting from 1.5 the closeHandler is a channel with a buffer of 0 - // because the handler is blocking and the user can handle in a better way the close event - // for example the HA producer can immediately set the status to reconnecting - // and deal with the inflight messages - ch := make(chan Event) + ch := make(chan Event, 1) producer.closeHandler = ch return ch } @@ -244,23 +243,20 @@ func (producer *Producer) getStatus() int { func (producer *Producer) startUnconfirmedMessagesTimeOutTask() { go func() { - isActive := true - for isActive { + for { select { case <-producer.doneTimeoutTicker: - isActive = false + logs.LogDebug("producer %d timeout thread closed", producer.id) return case <-producer.confirmationTimeoutTicker.C: // check the unconfirmed messages and remove the one that are expired if producer.getStatus() == open { toRemove := producer.unConfirmed.extractWithTimeOut(producer.options.ConfirmationTimeOut) if len(toRemove) > 0 { - if producer.publishConfirm != nil { - producer.publishConfirm <- toRemove - } + producer.sendConfirmationStatus(toRemove) } } else { - isActive = false + logs.LogInfo("producer %d confirmationTimeoutTicker closed", producer.id) return } } @@ -269,6 +265,23 @@ func (producer *Producer) startUnconfirmedMessagesTimeOutTask() { } +func (producer *Producer) sendConfirmationStatus(status []*ConfirmationStatus) { + producer.confirmMutex.Lock() + defer producer.confirmMutex.Unlock() + if producer.publishConfirmation != nil { + producer.publishConfirmation <- status + } +} + +func (producer *Producer) closeConfirmationStatus() { + producer.confirmMutex.Lock() + defer producer.confirmMutex.Unlock() + if producer.publishConfirmation != nil { + close(producer.publishConfirmation) + producer.publishConfirmation = nil + } +} + // processPendingSequencesQueue aggregates the messages sequence in the queue and sends them to the server // messages coming form the Send method through the pendingSequencesQueue func (producer *Producer) processPendingSequencesQueue() { @@ -358,23 +371,14 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { if err != nil { return err } + producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) + if len(messageSeq.messageBytes) > producer.options.client.getTuneState().requestedMaxFrameSize { - if producer.publishConfirm != nil { - producer.publishConfirm <- []*ConfirmationStatus{ - { - inserted: time.Now(), - message: streamMessage, - producerID: producer.GetID(), - publishingId: messageSeq.publishingId, - confirmed: false, - err: FrameTooLarge, - errorCode: responseCodeFrameTooLarge, - }, - } - } + tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge) + producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge}) return FrameTooLarge } - producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) + // se the processPendingSequencesQueue function err = producer.pendingSequencesQueue.Enqueue(messageSeq) if err != nil { @@ -410,14 +414,11 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error for _, msg := range messagesSequence { m := producer.unConfirmed.extractWithError(msg.publishingId, responseCodeFrameTooLarge) - if producer.publishConfirm != nil { - producer.publishConfirm <- []*ConfirmationStatus{m} - } + producer.sendConfirmationStatus([]*ConfirmationStatus{m}) } return FrameTooLarge } - //producer.unConfirmed.addFromSequences(messagesSequence, producer.GetID()) // all the messages are unconfirmed return producer.internalBatchSend(messagesSequence) } @@ -553,19 +554,15 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSeq err := producer.options.client.socket.writer.Flush() //writeAndFlush(b.Bytes()) if err != nil { logs.LogError("Producer BatchSend error during flush: %s", err) - producer.setStatus(closed) return err } return nil } func (producer *Producer) flushUnConfirmedMessages() { - timeOut := producer.unConfirmed.extractWithTimeOut(time.Duration(0)) - if producer.publishConfirm != nil { - if len(timeOut) > 0 { - producer.publishConfirm <- timeOut - } + if len(timeOut) > 0 { + producer.sendConfirmationStatus(timeOut) } } @@ -579,20 +576,49 @@ func (producer *Producer) GetLastPublishingId() (int64, error) { // Close closes the producer and returns an error if the producer could not be closed. func (producer *Producer) Close() error { + + return producer.close(Event{ + Command: CommandDeletePublisher, + StreamName: producer.GetStreamName(), + Name: producer.GetName(), + Reason: DeletePublisher, + Err: nil, + }) +} +func (producer *Producer) close(reason Event) error { + if producer.getStatus() == closed { return AlreadyClosed } producer.setStatus(closed) + reason.StreamName = producer.GetStreamName() + reason.Name = producer.GetName() + if producer.closeHandler != nil { + producer.closeHandler <- reason + close(producer.closeHandler) + producer.closeHandler = nil + } + + producer.stopAndWaitPendingSequencesQueue() + + producer.closeConfirmationStatus() + + if producer.options == nil { + return nil + } + _ = producer.options.client.coordinator.RemoveProducerById(producer.id, reason) + if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") } - err := producer.options.client.deletePublisher(producer.id) - if err != nil { - logs.LogError("error delete Publisher on closing: %s", err) + // remove from the server only if the producer exists + if reason.Reason == DeletePublisher { + _ = producer.options.client.deletePublisher(producer.id) } + if producer.options.client.coordinator.ProducersCount() == 0 { err := producer.options.client.Close() if err != nil { @@ -626,6 +652,7 @@ func (producer *Producer) stopAndWaitPendingSequencesQueue() { producer.waitForInflightMessages() // Close the pendingSequencesQueue. It closes the channel producer.pendingSequencesQueue.Close() + } func (producer *Producer) waitForInflightMessages() { @@ -703,14 +730,5 @@ func (c *Client) deletePublisher(publisherId byte) error { writeByte(b, publisherId) errWrite := c.handleWrite(b.Bytes(), resp) - err := c.coordinator.RemoveProducerById(publisherId, Event{ - Command: CommandDeletePublisher, - Reason: "deletePublisher", - Err: nil, - }) - if err != nil { - logs.LogWarn("producer id: %d already removed", publisherId) - } - return errWrite.Err } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 39c0a283..c58f2107 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -6,7 +6,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "sync/atomic" @@ -218,10 +217,8 @@ var _ = Describe("Streaming Producers", func() { go func(ch ChannelPublishConfirm, p *Producer) { defer GinkgoRecover() for ids := range ch { - for i, msg := range ids { + for _, msg := range ids { atomic.AddInt64(&nRecv, 1) - logs.LogInfo(fmt.Sprintf("Message i: %d confirmed - publishingId %d, iteration %d", - i, msg.GetPublishingId(), atomic.LoadInt64(&nRecv))) Expect(msg.GetError()).NotTo(HaveOccurred()) Expect(msg.GetProducerID()).To(Equal(p.id)) Expect(msg.GetPublishingId()).To(Equal(atomic.LoadInt64(&nRecv))) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 5bd16767..04780cad 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -140,7 +140,6 @@ func (c *Client) handleResponse() { } } } - } func (c *Client) handleSaslHandshakeResponse(streamingRes *ReaderProtocol, r *bufio.Reader) interface{} { @@ -267,13 +266,7 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in publishingIdCount-- } - //producer.removeFromConfirmationStatus(unConfirmedRecv) - - //producer.mutex.Lock() - if producer.publishConfirm != nil { - producer.publishConfirm <- unConfirmedRecv - } - //producer.mutex.Unlock() + producer.sendConfirmationStatus(unConfirmedRecv) return 0 } @@ -474,8 +467,8 @@ func (c *Client) handlePublishError(buffer *bufio.Reader) { } else { unConfirmedMessage := producer.unConfirmed.extractWithError(publishingId, code) - if producer.publishConfirm != nil && unConfirmedMessage != nil { - producer.publishConfirm <- []*ConfirmationStatus{unConfirmedMessage} + if unConfirmedMessage != nil { + producer.sendConfirmationStatus([]*ConfirmationStatus{unConfirmedMessage}) } } publishingErrorCount-- diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index bc54103a..7a4f9252 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -267,7 +267,8 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() for chq := range ch { if chq.Event.Reason == SocketClosed { time.Sleep(2 * time.Second) - Expect(chq.Context.ConnectPartition(chq.Partition)).NotTo(HaveOccurred()) + err := chq.Context.ConnectPartition(chq.Partition) + Expect(err).NotTo(HaveOccurred()) time.Sleep(1 * time.Second) mutex.Lock() reconnectedMap[chq.Partition] = true @@ -289,7 +290,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() time.Sleep(1 * time.Second) Eventually(func() bool { mutex.Lock(); defer mutex.Unlock(); return len(reconnectedMap) == 1 }, - 300*time.Millisecond).WithTimeout(5 * time.Second).Should(BeTrue()) + 300*time.Millisecond).WithTimeout(50 * time.Second).Should(BeTrue()) Eventually(func() bool { return len(superProducer.getProducers()) == 3