Skip to content

Commit

Permalink
[FIXED] Remove ConsumerInfo() calls in Consume() and Messages()
Browse files Browse the repository at this point in the history
… after reconnect. (#1643)

- `Consume()` and `Messages()` no longer call `ConsumerInfo()` on upon reconnect.
- Ordered consumers now reset on each reconnect event.

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Aug 15, 2024
1 parent 2009834 commit bfb6524
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 315 deletions.
195 changes: 156 additions & 39 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
stopAfterMsgsLeft chan int
withStopAfter bool
runningFetch *fetchResult
subscription *orderedSubscription
sync.Mutex
}

Expand Down Expand Up @@ -92,7 +93,8 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
c.userErrHandler = consumeOpts.ErrHandler
opts = append(opts, ConsumeErrHandler(c.errHandler(c.serial)))
opts = append(opts, consumeReconnectNotify(),
ConsumeErrHandler(c.errHandler(c.serial)))
if consumeOpts.StopAfter > 0 {
c.withStopAfter = true
c.stopAfter = consumeOpts.StopAfter
Expand All @@ -105,6 +107,7 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
consumer: c,
done: make(chan struct{}, 1),
}
c.subscription = sub
internalHandler := func(serial int) func(msg Msg) {
return func(msg Msg) {
// handler is a noop if message was delivered for a consumer with different serial
Expand Down Expand Up @@ -197,13 +200,13 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err
return func(cc ConsumeContext, err error) {
c.Lock()
defer c.Unlock()
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) {
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) && !errors.Is(err, errConnected) {
c.userErrHandler(cc, err)
}
if errors.Is(err, ErrNoHeartbeat) ||
errors.Is(err, errOrderedSequenceMismatch) ||
errors.Is(err, ErrConsumerDeleted) ||
errors.Is(err, ErrConsumerNotFound) {
errors.Is(err, errConnected) {
// only reset if serial matches the current consumer serial and there is no reset in progress
if serial == c.serial && atomic.LoadUint32(&c.resetInProgress) == 0 {
atomic.StoreUint32(&c.resetInProgress, 1)
Expand Down Expand Up @@ -235,7 +238,9 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
opts = append(opts, WithMessagesErrOnMissingHeartbeat(true))
opts = append(opts,
WithMessagesErrOnMissingHeartbeat(true),
messagesReconnectNotify())
c.stopAfterMsgsLeft = make(chan int, 1)
if consumeOpts.StopAfter > 0 {
c.withStopAfter = true
Expand All @@ -255,6 +260,7 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
opts: opts,
done: make(chan struct{}, 1),
}
c.subscription = sub

return sub, nil
}
Expand Down Expand Up @@ -367,6 +373,11 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro
}
c.currentConsumer.Unlock()
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -397,6 +408,11 @@ func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBat
c.cursor.streamSeq = c.runningFetch.sseq
}
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -425,6 +441,11 @@ func (c *orderedConsumer) FetchNoWait(batch int) (MessageBatch, error) {
return nil, ErrOrderedConsumerConcurrentRequests
}
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -481,52 +502,42 @@ func (c *orderedConsumer) reset() error {
}
consName := c.currentConsumer.CachedInfo().Name
c.currentConsumer.Unlock()
var err error
for i := 0; ; i++ {
if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts {
return fmt.Errorf("%w: maximum number of delete attempts reached: %s", ErrOrderedConsumerReset, err)
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
_ = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
cancel()
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
break
}
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
continue
}
return err
}
break
}
}()
}

c.cursor.deliverSeq = 0
consumerConfig := c.getConsumerConfig()

var err error
var cons Consumer
for i := 0; ; i++ {
if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts {
return fmt.Errorf("%w: maximum number of create consumer attempts reached: %s", ErrOrderedConsumerReset, err)

backoffOpts := backoffOpts{
attempts: c.cfg.MaxResetAttempts,
initialInterval: time.Second,
factor: 2,
maxInterval: 10 * time.Second,
cancel: c.subscription.done,
}
err = retryWithBackoff(func(attempt int) (bool, error) {
isClosed := atomic.LoadUint32(&c.subscription.closed) == 1
if isClosed {
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig)
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
cancel()
break
}
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
cancel()
continue
}
cancel()
return err
return true, err
}
cancel()
break
c.currentConsumer = cons.(*pullConsumer)
return false, nil
}, backoffOpts)
if err != nil {
return err
}
c.currentConsumer = cons.(*pullConsumer)
return nil
Expand All @@ -548,6 +559,10 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
// otherwise, start from the next sequence
nextSeq = c.cursor.streamSeq + 1
}

if c.cfg.MaxResetAttempts == 0 {
c.cfg.MaxResetAttempts = -1
}
name := fmt.Sprintf("%s_%d", c.namePrefix, c.serial)
cfg := &ConsumerConfig{
Name: name,
Expand All @@ -564,6 +579,9 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
} else {
cfg.FilterSubjects = c.cfg.FilterSubjects
}
if c.cfg.InactiveThreshold != 0 {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

if c.serial != 1 {
return cfg
Expand All @@ -589,9 +607,6 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
cfg.DeliverPolicy = DeliverByStartTimePolicy
cfg.OptStartTime = c.cfg.OptStartTime
}
if c.cfg.InactiveThreshold != 0 {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

return cfg
}
Expand All @@ -612,6 +627,20 @@ func messagesStopAfterNotify(numMsgs int, msgsLeftAfterStop chan int) PullMessag
})
}

func consumeReconnectNotify() PullConsumeOpt {
return pullOptFunc(func(opts *consumeOpts) error {
opts.notifyOnReconnect = true
return nil
})
}

func messagesReconnectNotify() PullMessagesOpt {
return pullOptFunc(func(opts *consumeOpts) error {
opts.notifyOnReconnect = true
return nil
})
}

// Info returns information about the ordered consumer.
// Note that this method will fetch the latest instance of the
// consumer from the server, which can be deleted by the library at any time.
Expand Down Expand Up @@ -652,3 +681,91 @@ func (c *orderedConsumer) CachedInfo() *ConsumerInfo {
}
return c.currentConsumer.info
}

type backoffOpts struct {
// total retry attempts
// -1 for unlimited
attempts int
// initial interval after which first retry will be performed
// defaults to 1s
initialInterval time.Duration
// determines whether first function execution should be performed immediately
disableInitialExecution bool
// multiplier on each attempt
// defaults to 2
factor float64
// max interval between retries
// after reaching this value, all subsequent
// retries will be performed with this interval
// defaults to 1 minute
maxInterval time.Duration
// custom backoff intervals
// if set, overrides all other options except attempts
// if attempts are set, then the last interval will be used
// for all subsequent retries after reaching the limit
customBackoff []time.Duration
// cancel channel
// if set, retry will be canceled when this channel is closed
cancel <-chan struct{}
}

func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error {
var err error
var shouldContinue bool
// if custom backoff is set, use it instead of other options
if len(opts.customBackoff) > 0 {
if opts.attempts != 0 {
return fmt.Errorf("cannot use custom backoff intervals when attempts are set")
}
for i, interval := range opts.customBackoff {
select {
case <-opts.cancel:
return nil
case <-time.After(interval):
}
shouldContinue, err = f(i)
if !shouldContinue {
return err
}
}
return err
}

// set default options
if opts.initialInterval == 0 {
opts.initialInterval = 1 * time.Second
}
if opts.factor == 0 {
opts.factor = 2
}
if opts.maxInterval == 0 {
opts.maxInterval = 1 * time.Minute
}
if opts.attempts == 0 {
return fmt.Errorf("retry attempts have to be set when not using custom backoff intervals")
}
interval := opts.initialInterval
for i := 0; ; i++ {
if i == 0 && opts.disableInitialExecution {
time.Sleep(interval)
continue
}
shouldContinue, err = f(i)
if !shouldContinue {
return err
}
if opts.attempts > 0 && i >= opts.attempts-1 {
break
}
select {
case <-opts.cancel:
return nil
case <-time.After(interval):
}
interval = time.Duration(float64(interval) * opts.factor)
if interval >= opts.maxInterval {
interval = opts.maxInterval
}
}
return err
}
Loading

0 comments on commit bfb6524

Please sign in to comment.