Skip to content

Commit

Permalink
Improve producer edge cases
Browse files Browse the repository at this point in the history
* remove the code duplication when the producer is closed correctly or not

* make the unconfirmed operation more atomic to avoid rare cases when the unconfirmed operations were out of the mutex lock

* Increase the timeout for the Reliable producers and consumers. It was too low. Now, there is one more wait for the producer to wait for pending messages to be flushed when closed.

* refactor publish confirm channel with a lock to check if it is valid or not and avoid race conditions during the closing

* handle when the producer is not found during the confirmation. It can happen when the producer is closed before all the messages are confirmed

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Jan 7, 2025
1 parent 57b8473 commit 37ff466
Show file tree
Hide file tree
Showing 17 changed files with 330 additions and 190 deletions.
4 changes: 3 additions & 1 deletion examples/getting_started.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func main() {
channelClose := consumer.NotifyClose()
// channelClose receives all the closing events, here you can handle the
// client reconnection or just log
defer consumerClose(channelClose)
go func() {
consumerClose(channelClose)
}()

fmt.Println("Press any key to stop ")
_, _ = reader.ReadString('\n')
Expand Down
4 changes: 4 additions & 0 deletions examples/reliable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ Note:
- The `unConfirmedMessages` are not persisted, so if the application is restarted, the `unConfirmedMessages` will be lost.
- The `unConfirmedMessages` order is not guaranteed
- The `unConfirmedMessages` can grow indefinitely if the broker is unavailable for a long time.
- The `re-send` in an option that can be enabled by setting `enableResend` to `true`.

The example enables golang `pprof` you can check the url: localhost:6060/debug/pprof/. </br>
The scope is to check the resources used by the application in case of reconnection.


The `reliable_common.go/retry` function does different checks because during the restart broker can happen different events, please check:
Expand Down
63 changes: 42 additions & 21 deletions examples/reliable/reliable_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import (
"bufio"
"errors"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"log"
"net/http"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
Expand All @@ -13,6 +17,7 @@ import (
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
_ "net/http/pprof"
)

// The ha producer and consumer provide a way to auto-reconnect in case of connection problems
Expand All @@ -33,19 +38,24 @@ var reSent int32
const enableResend = false

func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Your application code here

// Tune the parameters to test the reliability
const messagesToSend = 10_000_000
const numberOfProducers = 1
const messagesToSend = 50_000_000
const numberOfProducers = 5
const concurrentProducers = 2
const numberOfConsumers = 1
const sendDelay = 1 * time.Millisecond
const delayEachMessages = 200
const maxProducersPerClient = 4
const delayEachMessages = 500
const maxProducersPerClient = 2
const maxConsumersPerClient = 2
//

reader := bufio.NewReader(os.Stdin)
//stream.SetLevelInfo(logs.DEBUG)
stream.SetLevelInfo(logs.DEBUG)
fmt.Println("Reliable Producer/Consumer example")
fmt.Println("Connecting to RabbitMQ streaming ...")

Expand All @@ -72,27 +82,52 @@ func main() {
}
err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
MaxLengthBytes: stream.ByteCapacity{}.GB(1),
},
)
CheckErr(err)

var producers []*ha.ReliableProducer
var consumers []*ha.ReliableConsumer
isRunning := true
go func() {
for isRunning {
totalConfirmed := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail)
expectedMessages := messagesToSend * numberOfProducers * concurrentProducers * 2
fmt.Printf("********************************************\n")
fmt.Printf("%s - ToSend: %d - nProducers: %d - concurrentProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC850),
expectedMessages, numberOfProducers, concurrentProducers, numberOfConsumers)
fmt.Printf("Sent:%d - ReSent %d - Confirmed:%d - Not confirmed:%d - Fail+Confirmed :%d \n",
sent, atomic.LoadInt32(&reSent), atomic.LoadInt32(&confirmed), atomic.LoadInt32(&fail), totalConfirmed)
fmt.Printf("Total Consumed: %d - Per consumer: %d \n", atomic.LoadInt32(&consumed),
atomic.LoadInt32(&consumed)/numberOfConsumers)

for _, producer := range producers {
fmt.Printf("%s, status: %s \n",
producer.GetInfo(), producer.GetStatusAsString())

}
for _, consumer := range consumers {
fmt.Printf("%s, status: %s \n",
consumer.GetInfo(), consumer.GetStatusAsString())
}
fmt.Printf("go-routine: %d\n", runtime.NumGoroutine())
fmt.Printf("********************************************\n")
time.Sleep(5 * time.Second)
}
}()
var producers []*ha.ReliableProducer

for i := 0; i < numberOfConsumers; i++ {
consumer, err := ha.NewReliableConsumer(env,
streamName,
stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()),
func(consumerContext stream.ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&consumed, 1)
})
CheckErr(err)
consumers = append(consumers, consumer)
}

for i := 0; i < numberOfProducers; i++ {
var mutex = sync.Mutex{}
// Here we store the messages that have not been confirmed
Expand Down Expand Up @@ -151,20 +186,6 @@ func main() {
}
}()
}
var consumers []*ha.ReliableConsumer

for i := 0; i < numberOfConsumers; i++ {
go func(name string) {
consumer, err := ha.NewReliableConsumer(env,
streamName,
stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()),
func(consumerContext stream.ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&consumed, 1)
})
CheckErr(err)
consumers = append(consumers, consumer)
}(streamName)
}

fmt.Println("Press enter to close the connections.")
_, _ = reader.ReadString('\n')
Expand Down
58 changes: 39 additions & 19 deletions pkg/ha/ha_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,41 @@ type ReliableConsumer struct {
bootstrap bool
}

func (c *ReliableConsumer) GetStatusAsString() string {
switch c.GetStatus() {
case StatusOpen:
return "Open"
case StatusClosed:
return "Closed"
case StatusStreamDoesNotExist:
return "StreamDoesNotExist"
case StatusReconnecting:
return "Reconnecting"
default:
return "Unknown"
}
}

func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
go func() {
for event := range channelClose {
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
c.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo())
c.bootstrap = false
err, reconnected := retry(0, c)
if err != nil {
logs.LogInfo(""+
"[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err)
}
if reconnected {
c.setStatus(StatusOpen)
} else {
c.setStatus(StatusClosed)
}

event := <-channelClose
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
c.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo())
c.bootstrap = false
err, reconnected := retry(1, c)
if err != nil {
logs.LogInfo(""+
"[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err)
}
if reconnected {
c.setStatus(StatusOpen)
} else {
logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", c.getInfo(), event.Reason)
c.setStatus(StatusClosed)
}
} else {
logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", c.getInfo(), event.Reason)
c.setStatus(StatusClosed)
}
}()
}
Expand All @@ -72,11 +85,13 @@ func NewReliableConsumer(env *stream.Environment, streamName string,
if consumerOptions == nil {
return nil, fmt.Errorf("the consumer options is mandatory")
}

logs.LogDebug("[Reliable] - creating %s", res.getInfo())
err := res.newConsumer()
if err == nil {

res.setStatus(StatusOpen)
}
logs.LogDebug("[Reliable] - created %s", res.getInfo())
return res, err
}

Expand Down Expand Up @@ -124,14 +139,15 @@ func (c *ReliableConsumer) newConsumer() error {
c.bootstrap, offset)
consumer, err := c.env.NewConsumer(c.streamName, func(consumerContext stream.ConsumerContext, message *amqp.Message) {
c.mutexConnection.Lock()

c.currentPosition = consumerContext.Consumer.GetOffset()
c.mutexConnection.Unlock()

c.messagesHandler(consumerContext, message)
}, c.consumerOptions.SetOffset(offset))
if err != nil {
return err
}

channelNotifyClose := consumer.NotifyClose()
c.handleNotifyClose(channelNotifyClose)
c.consumer = consumer
Expand All @@ -146,3 +162,7 @@ func (c *ReliableConsumer) Close() error {
}
return nil
}

func (c *ReliableConsumer) GetInfo() string {
return c.getInfo()
}
9 changes: 6 additions & 3 deletions pkg/ha/ha_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ var _ = Describe("Reliable Consumer", func() {
// kill the connection
errDrop := test_helper.DropConnection(connectionToDrop, "15672")
Expect(errDrop).NotTo(HaveOccurred())

time.Sleep(2 * time.Second) // we give some time to the client to reconnect
Expect(consumer.GetStatus()).To(Equal(StatusOpen))
/// just give some time to raise the event
time.Sleep(1200 * time.Millisecond)
Eventually(func() int { return consumer.GetStatus() }, "15s").WithPolling(300 * time.Millisecond).Should(Equal(StatusOpen))
Expect(consumer.GetStatusAsString()).To(Equal("Open"))
Expect(consumer.Close()).NotTo(HaveOccurred())
Expect(consumer.GetStatus()).To(Equal(StatusClosed))
Expect(consumer.GetStatusAsString()).To(Equal("Closed"))
})

It("Delete the stream should close the consumer", func() {
Expand All @@ -123,6 +125,7 @@ var _ = Describe("Reliable Consumer", func() {
Expect(err).NotTo(HaveOccurred())
Expect(consumer).NotTo(BeNil())
Expect(consumer.GetStatus()).To(Equal(StatusOpen))
Expect(consumer.GetStatusAsString()).To(Equal("Open"))
Expect(envForRConsumer.DeleteStream(streamForRConsumer)).NotTo(HaveOccurred())
Eventually(func() int {
return consumer.GetStatus()
Expand Down
67 changes: 46 additions & 21 deletions pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,31 @@ func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishCo

func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
go func() {
for event := range channelClose {
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
p.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", p.getInfo())
err, reconnected := retry(0, p)
if err != nil {
logs.LogInfo(
"[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err)
}
if reconnected {
p.setStatus(StatusOpen)
} else {
p.setStatus(StatusClosed)
}
event := <-channelClose
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
p.setStatus(StatusReconnecting)
waitTime := randomWaitWithBackoff(1)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting in %d milliseconds waiting pending messages", p.getInfo(), waitTime)
time.Sleep(time.Duration(waitTime) * time.Millisecond)
err, reconnected := retry(1, p)
if err != nil {
logs.LogInfo(
"[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err)
}
if reconnected {
p.setStatus(StatusOpen)
} else {
logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", p.getInfo(), event.Reason)
p.setStatus(StatusClosed)
}

p.reconnectionSignal.L.Lock()
p.reconnectionSignal.Broadcast()
p.reconnectionSignal.L.Unlock()
} else {
logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", p.getInfo(), event.Reason)
p.setStatus(StatusClosed)
}

p.reconnectionSignal.L.Lock()
p.reconnectionSignal.Broadcast()
p.reconnectionSignal.L.Unlock()
logs.LogInfo("[Reliable] - %s reconnection signal sent", p.getInfo())
}()
}

Expand All @@ -61,6 +63,7 @@ type ReliableProducer struct {
producerOptions *stream.ProducerOptions
count int32
confirmMessageHandler ConfirmMessageHandler
channelPublishConfirm stream.ChannelPublishConfirm
mutex *sync.Mutex
mutexStatus *sync.Mutex
status int
Expand Down Expand Up @@ -98,14 +101,17 @@ func NewReliableProducer(env *stream.Environment, streamName string,
}

func (p *ReliableProducer) newProducer() error {
p.mutex.Lock()
defer p.mutex.Unlock()
producer, err := p.env.NewProducer(p.streamName, p.producerOptions)
if err != nil {

return err
}
channelPublishConfirm := producer.NotifyPublishConfirmation()
p.channelPublishConfirm = producer.NotifyPublishConfirmation()
p.handlePublishConfirm(p.channelPublishConfirm)
channelNotifyClose := producer.NotifyClose()
p.handleNotifyClose(channelNotifyClose)
p.handlePublishConfirm(channelPublishConfirm)
p.producer = producer
return err
}
Expand Down Expand Up @@ -180,6 +186,21 @@ func (p *ReliableProducer) GetStatus() int {
return p.status
}

func (p *ReliableProducer) GetStatusAsString() string {
switch p.GetStatus() {
case StatusOpen:
return "Open"
case StatusClosed:
return "Closed"
case StatusStreamDoesNotExist:
return "StreamDoesNotExist"
case StatusReconnecting:
return "Reconnecting"
default:
return "Unknown"
}
}

// IReliable interface
func (p *ReliableProducer) setStatus(value int) {
p.mutexStatus.Lock()
Expand Down Expand Up @@ -224,3 +245,7 @@ func (p *ReliableProducer) Close() error {
}
return nil
}

func (p *ReliableProducer) GetInfo() string {
return p.getInfo()
}
Loading

0 comments on commit 37ff466

Please sign in to comment.