Skip to content

Commit

Permalink
change the batchSend
Browse files Browse the repository at this point in the history
automatically splits the frames.
Removes the messages too big for the frame.
Add the batch result to give more control to the user

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Dec 3, 2024
1 parent d744d95 commit 4dc144d
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (p *ReliableProducer) BatchSend(batchMessages []message.StreamMessage) erro
}

p.mutex.Lock()
errW := p.producer.BatchSend(batchMessages)
_, errW := p.producer.BatchSend(batchMessages)
p.mutex.Unlock()

return p.checkWriteError(errW)
Expand Down
5 changes: 4 additions & 1 deletion pkg/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ var _ = Describe("Streaming testEnvironment", func() {

producer, err := testEnvironment.NewProducer(testStreamName, nil)
Expect(err).NotTo(HaveOccurred())
Expect(producer.BatchSend(CreateArrayMessagesForTesting(1_000))).NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(1_000))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(1_000))
Expect(result.TotalFrames).To(Equal(1))
time.Sleep(time.Millisecond * 800)
Expect(producer.Close()).NotTo(HaveOccurred())

Expand Down
12 changes: 10 additions & 2 deletions pkg/stream/consumer_sac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import (
func SendMessages(testEnvironment *Environment, streamName string) {
producer, err := testEnvironment.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())
Expect(producer.BatchSend(CreateArrayMessagesForTesting(30))).NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(30))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(30))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())
}

Expand Down Expand Up @@ -210,7 +214,11 @@ var _ = Describe("Streaming Single Active Consumer", func() {
SetAutoCommit(nil))
Expect(err).NotTo(HaveOccurred())

Expect(producer.BatchSend(CreateArrayMessagesForTesting(10))).NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(10))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(10))
Expect(result.TotalFrames).To(Equal(1))

Eventually(func() int32 {
return atomic.LoadInt32(&messagesReceived)
}, 5*time.Second).Should(Equal(int32(10)),
Expand Down
45 changes: 35 additions & 10 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,11 @@ var _ = Describe("Streaming Consumers", func() {
producer, err := env.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())

err = producer.BatchSend(CreateArrayMessagesForTesting(30)) // batch Send
result, err := producer.BatchSend(CreateArrayMessagesForTesting(30)) // batch Send
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(30))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())
var messagesReceived int32 = 0
consumer, err := env.NewConsumer(streamName,
Expand Down Expand Up @@ -187,8 +190,11 @@ var _ = Describe("Streaming Consumers", func() {
}, NewConsumerOptions().
SetOffset(OffsetSpecification{}.First()))
Expect(err).NotTo(HaveOccurred())
err = producer.BatchSend(CreateArrayMessagesForTesting(3)) // batch Send
result, err := producer.BatchSend(CreateArrayMessagesForTesting(3)) // batch Send
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(3))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())
Eventually(func() int32 {
return atomic.LoadInt32(&messagesCount)
Expand All @@ -203,8 +209,11 @@ var _ = Describe("Streaming Consumers", func() {
Expect(err).NotTo(HaveOccurred())

// Given we have produced 105 messages ...
err = producer.BatchSend(CreateArrayMessagesForTesting(105)) // batch Send
result, err := producer.BatchSend(CreateArrayMessagesForTesting(105)) // batch Send
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(105))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())

})
Expand Down Expand Up @@ -342,7 +351,11 @@ var _ = Describe("Streaming Consumers", func() {
// same SetPublishingId
// even we publish the same array more times
for i := 0; i < 10; i++ {
Expect(producer.BatchSend(arr)).NotTo(HaveOccurred())
result, err := producer.BatchSend(arr)
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(len(arr)))
Expect(result.TotalFrames).To(Equal(1))

}

var messagesReceived int32 = 0
Expand Down Expand Up @@ -390,8 +403,11 @@ var _ = Describe("Streaming Consumers", func() {
_, err = env.QueryOffset("consumer_test", streamName)
Expect(err).To(HaveOccurred())

Expect(producer.BatchSend(CreateArrayMessagesForTesting(107))).
NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(107))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(107))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())
var messagesReceived int32 = 0
consumer, err := env.NewConsumer(streamName,
Expand Down Expand Up @@ -448,8 +464,11 @@ var _ = Describe("Streaming Consumers", func() {
It("Check already closed", func() {
producer, err := env.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())
Expect(producer.BatchSend(CreateArrayMessagesForTesting(500))).
NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(500))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(500))
Expect(result.TotalFrames).To(Equal(1))

defer func(producer *Producer) {
err := producer.Close()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -687,7 +706,10 @@ var _ = Describe("Streaming Consumers", func() {
}

for i := 0; i < 50; i++ {
Expect(producer6Batch.BatchSend(batchMessages)).NotTo(HaveOccurred())
result, err2 := producer6Batch.BatchSend(batchMessages)
Expect(err2).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(len(batchMessages)))
Expect(result.TotalFrames).To(Equal(1))
}

var messagesReceived int32
Expand Down Expand Up @@ -731,7 +753,10 @@ var _ = Describe("Streaming Consumers", func() {
// so, even we set the SetPublishingId
// it will be ignored
for i := 0; i < 10; i++ {
Expect(producer.BatchSend(arr)).NotTo(HaveOccurred())
result, err := producer.BatchSend(arr)
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(len(arr)))
Expect(result.TotalFrames).To(Equal(1))
}

var messagesReceived int32 = 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (coordinator *Coordinator) NewProducer(
unConfirmedMessages: map[int64]*ConfirmationStatus{},
status: open,
messageSequenceCh: make(chan messageSequence, size),
adaptiveChannel: make(chan message.StreamMessage, adativeSize),
dynamicChannel: make(chan message.StreamMessage, adativeSize),
pendingMessages: pendingMessagesSequence{
messages: make([]*messageSequence, 0),
size: initBufferPublishSize,
Expand Down
5 changes: 4 additions & 1 deletion pkg/stream/filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func send(producer *Producer, state string) {
msg.ApplicationProperties = map[string]interface{}{"state": state}
messages = append(messages, msg)
}
Expect(producer.BatchSend(messages)).NotTo(HaveOccurred())
result, err := producer.BatchSend(messages)
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(25))
Expect(result.TotalFrames).To(Equal(1))

}
Loading

0 comments on commit 4dc144d

Please sign in to comment.