Skip to content

Commit

Permalink
Enable Superstream for consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ablease committed Nov 9, 2023
1 parent 338083c commit 4e8dd78
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func (c *Consumer) subscribeProperties() raw.SubscribeProperties {
if c.opts.SingleActiveConsumer {
s["single-active-consumer"] = "true"
}
if c.opts.SuperStream {
s["super-stream"] = c.Stream
}
return s
}

Expand Down
29 changes: 28 additions & 1 deletion pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,34 @@ var _ = Describe("Smart Consumer", func() {
})

Describe("SuperStream Consumer", func() {
It("can enable the feature", func() {
subscribeProperties := raw.SubscribeProperties{"super-stream": "test-stream"}
chunkChan := make(chan *raw.Chunk)
gomock.InOrder(fakeRawClient.EXPECT().
NotifyChunk(gomock.AssignableToTypeOf(chunkChan)).Return(chunkChan),
fakeRawClient.EXPECT().
Subscribe(
gomock.AssignableToTypeOf(ctxType), //context
gomock.Eq("test-stream"), // stream
gomock.Eq(uint16(1)), // offsetType
gomock.Eq(uint8(1)), // subscriptionId
gomock.Eq(uint16(2)), // credit
gomock.Eq(subscribeProperties), // properties
gomock.Eq(uint64(1)), // offset
))

opts := &ConsumerOptions{
SuperStream: true,
OffsetType: uint16(1),
SubscriptionId: uint8(1),
Credit: uint16(2),
Offset: uint64(1),
}
handleMessages := func(consumerContext ConsumerContext, message *amqp.Message) {
fmt.Printf("messages: %s\n", message.Data)
}
consumer, _ := NewConsumer("test-stream", fakeRawClient, handleMessages, opts)
Expect(consumer.Subscribe(context.Background())).To(Succeed())
})
})

})

0 comments on commit 4e8dd78

Please sign in to comment.