Skip to content

Commit

Permalink
fixed pull consumer start, added pull consumer params into channeltem…
Browse files Browse the repository at this point in the history
…plate
  • Loading branch information
astelmashenko committed Aug 2, 2024
1 parent ece918f commit 4c5aadc
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
13 changes: 13 additions & 0 deletions config/jetstream/302-jsm-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ spec:
sampleFrequency:
description: SampleFrequency sets the percentage of acknowledgements that should be sampled for observability. Valid values are in the range 0-100 and, for example, allows both formats of "30" and "30%".
type: string
consumerType:
description: Push or Pull based subscriptions
type: string
fetchMaxWait:
description: Pull consumer. Time to wait for a batch of messages if there is not enough of messages on a stream
type: integer
format: int64
minimum: 0
fetchBatchSize:
description: Pull consumer. Batch of messages to pull from stream at once for processing
type: integer
format: int32
minimum: 1
delivery:
description: DeliverySpec contains the default delivery spec for each subscription to this Channelable. Each subscription delivery spec, if any, overrides this global delivery spec.
type: object
Expand Down
5 changes: 5 additions & 0 deletions pkg/channel/jetstream/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ func (d *Dispatcher) subscribe(ctx context.Context, config ChannelConfig, sub Su
logger.Errorw("failed to create pull consumer", zap.Error(err))
return SubscriberStatusTypeError, err

Check warning on line 300 in pkg/channel/jetstream/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/dispatcher.go#L297-L300

Added lines #L297 - L300 were not covered by tests
}
err = consumer.(*PullConsumer).Start()
if err != nil {
logger.Errorw("failed to start pull consumer", zap.Error(err))
return SubscriberStatusTypeError, err

Check warning on line 305 in pkg/channel/jetstream/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/dispatcher.go#L302-L305

Added lines #L302 - L305 were not covered by tests
}
}

d.consumers[sub.UID] = consumer
Expand Down
9 changes: 4 additions & 5 deletions pkg/channel/jetstream/dispatcher/pull_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func (c *PullConsumer) Start() error {
if err := c.checkStart(); err != nil {
return err

Check warning on line 141 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L139-L141

Added lines #L139 - L141 were not covered by tests
}

defer close(c.closed)

Check warning on line 143 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L143

Added line #L143 was not covered by tests

ctx := logging.WithLogger(context.Background(), c.logger)

Check warning on line 145 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L145

Added line #L145 was not covered by tests
Expand All @@ -158,9 +157,9 @@ func (c *PullConsumer) Start() error {

for {

Check warning on line 158 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L158

Added line #L158 was not covered by tests
// TODO move 200 into subscription config
batch, err := c.natsConsumer.Fetch(FetchBatchSize, nats.MaxWait(200*time.Millisecond))
batch, err := c.natsConsumer.FetchBatch(FetchBatchSize, nats.MaxWait(200*time.Millisecond))
if err != nil {
return err
c.logger.Errorw("Failed to fetch messages", zap.Error(err), zap.String("consumer", c.sub.Name))

Check warning on line 162 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L160-L162

Added lines #L160 - L162 were not covered by tests
}

c.consumeMessages(ctx, batch, &wg)

Check warning on line 165 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L165

Added line #L165 was not covered by tests
Expand All @@ -172,8 +171,8 @@ func (c *PullConsumer) Start() error {
//
// This method returns once the MessageBatch has been consumed, or upon a call to Consumer.Close.
// Returning as a result of Consumer.Close results in an io.EOF error.
func (c *PullConsumer) consumeMessages(ctx context.Context, batch []*nats.Msg, wg *sync.WaitGroup) {
for _, natsMsg := range batch {
func (c *PullConsumer) consumeMessages(ctx context.Context, batch nats.MessageBatch, wg *sync.WaitGroup) {
for natsMsg := range batch.Messages() {
wg.Add(1)

Check warning on line 176 in pkg/channel/jetstream/dispatcher/pull_consumer.go

View check run for this annotation

Codecov / codecov/patch

pkg/channel/jetstream/dispatcher/pull_consumer.go#L174-L176

Added lines #L174 - L176 were not covered by tests

go func() {
Expand Down

0 comments on commit 4c5aadc

Please sign in to comment.