Skip to content

Commit

Permalink
Fix: delete publisher when producer is removed from manager
Browse files Browse the repository at this point in the history
Signed-off-by: Aitor Perez Cedres <[email protected]>
  • Loading branch information
Zerpet committed Dec 15, 2023
1 parent 6ef17b7 commit 3dcf74d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/stream/producer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func (p *producerManager) removeProducer(id int) error {
if id < 0 {
return errNegativeId
}
ctx, cancel := maybeApplyDefaultTimeout(context.Background())
defer cancel()
p.clientM.Lock()
_ = p.client.DeletePublisher(ctx, uint8(id))
// TODO log error if not nil
p.clientM.Unlock()
p.producers[id] = nil
p.producerCount -= 1

Expand Down
11 changes: 11 additions & 0 deletions pkg/stream/producer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ var _ = Describe("ProducerManager", func() {
prepareMockDeclarePublisher(fakeRawClient, 1)
prepareMockDeclarePublisher(fakeRawClient, 1)
prepareMockDeclarePublisher(fakeRawClient, 2)
prepareMockDeletePublisher(fakeRawClient, 1)
prepareMockDeletePublisher(fakeRawClient, 2)
pm := newProducerManager(0, EnvironmentConfiguration{
MaxProducersByConnection: 5,
})
Expand Down Expand Up @@ -144,6 +146,7 @@ var _ = Describe("ProducerManager", func() {
It("closes the connection", func() {
fakeRawClient.EXPECT().Close(gomock.Any())
prepareMockDeclarePublisher(fakeRawClient, 0)
prepareMockDeletePublisher(fakeRawClient, 0) // called during producer.Close() callback
pm := newProducerManager(0, EnvironmentConfiguration{MaxProducersByConnection: 5})
pm.client = fakeRawClient
pm.open = true
Expand All @@ -170,3 +173,11 @@ func prepareMockDeclarePublisher(m *MockRawClient, publisherId uint8) {
gomock.AssignableToTypeOf(""),
)
}

func prepareMockDeletePublisher(m *MockRawClient, publisherId uint8) {
ctxType := reflect.TypeOf((*context.Context)(nil)).Elem()
m.EXPECT().DeletePublisher(
gomock.AssignableToTypeOf(ctxType),
gomock.Eq(publisherId),
)
}

0 comments on commit 3dcf74d

Please sign in to comment.