Skip to content

Commit

Permalink
[FIXED] Remove ConsumerInfo() calls in Consume() and Messages()
Browse files Browse the repository at this point in the history
… after reconnect. (#1643)

- `Consume()` and `Messages()` no longer call `ConsumerInfo()` on upon reconnect.
- Ordered consumers now reset on each reconnect event.

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Jun 13, 2024
1 parent 5dbd825 commit 66b92a9
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 333 deletions.
12 changes: 6 additions & 6 deletions go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.6
github.com/klauspost/compress v1.17.8
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.10.11
github.com/nats-io/nats-server/v2 v2.10.16
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.14.0
golang.org/x/text v0.15.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/sys v0.17.0 // indirect
github.com/nats-io/jwt/v2 v2.5.7 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
24 changes: 12 additions & 12 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0=
github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8=
github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c=
github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0=
github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
Expand All @@ -31,17 +31,17 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
195 changes: 156 additions & 39 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
stopAfterMsgsLeft chan int
withStopAfter bool
runningFetch *fetchResult
subscription *orderedSubscription
sync.Mutex
}

Expand Down Expand Up @@ -92,7 +93,8 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
c.userErrHandler = consumeOpts.ErrHandler
opts = append(opts, ConsumeErrHandler(c.errHandler(c.serial)))
opts = append(opts, consumeReconnectNotify(),
ConsumeErrHandler(c.errHandler(c.serial)))
if consumeOpts.StopAfter > 0 {
c.withStopAfter = true
c.stopAfter = consumeOpts.StopAfter
Expand All @@ -105,6 +107,7 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
consumer: c,
done: make(chan struct{}, 1),
}
c.subscription = sub
internalHandler := func(serial int) func(msg Msg) {
return func(msg Msg) {
// handler is a noop if message was delivered for a consumer with different serial
Expand Down Expand Up @@ -197,13 +200,13 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err
return func(cc ConsumeContext, err error) {
c.Lock()
defer c.Unlock()
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) {
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) && !errors.Is(err, errConnected) {
c.userErrHandler(cc, err)
}
if errors.Is(err, ErrNoHeartbeat) ||
errors.Is(err, errOrderedSequenceMismatch) ||
errors.Is(err, ErrConsumerDeleted) ||
errors.Is(err, ErrConsumerNotFound) {
errors.Is(err, errConnected) {
// only reset if serial matches the current consumer serial and there is no reset in progress
if serial == c.serial && atomic.LoadUint32(&c.resetInProgress) == 0 {
atomic.StoreUint32(&c.resetInProgress, 1)
Expand Down Expand Up @@ -235,7 +238,9 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
opts = append(opts, WithMessagesErrOnMissingHeartbeat(true))
opts = append(opts,
WithMessagesErrOnMissingHeartbeat(true),
messagesReconnectNotify())
c.stopAfterMsgsLeft = make(chan int, 1)
if consumeOpts.StopAfter > 0 {
c.withStopAfter = true
Expand All @@ -255,6 +260,7 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
opts: opts,
done: make(chan struct{}, 1),
}
c.subscription = sub

return sub, nil
}
Expand Down Expand Up @@ -367,6 +373,11 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro
}
c.currentConsumer.Unlock()
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -397,6 +408,11 @@ func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBat
c.cursor.streamSeq = c.runningFetch.sseq
}
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -425,6 +441,11 @@ func (c *orderedConsumer) FetchNoWait(batch int) (MessageBatch, error) {
return nil, ErrOrderedConsumerConcurrentRequests
}
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
consumer: c,
done: make(chan struct{}),
}
c.subscription = &sub
err := c.reset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -481,52 +502,42 @@ func (c *orderedConsumer) reset() error {
}
consName := c.currentConsumer.CachedInfo().Name
c.currentConsumer.Unlock()
var err error
for i := 0; ; i++ {
if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts {
return fmt.Errorf("%w: maximum number of delete attempts reached: %s", ErrOrderedConsumerReset, err)
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
_ = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
cancel()
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
break
}
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
continue
}
return err
}
break
}
}()
}

c.cursor.deliverSeq = 0
consumerConfig := c.getConsumerConfig()

var err error
var cons Consumer
for i := 0; ; i++ {
if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts {
return fmt.Errorf("%w: maximum number of create consumer attempts reached: %s", ErrOrderedConsumerReset, err)

backoffOpts := backoffOpts{
attempts: c.cfg.MaxResetAttempts,
initialInterval: time.Second,
factor: 2,
maxInterval: 10 * time.Second,
cancel: c.subscription.done,
}
err = retryWithBackoff(func(attempt int) (bool, error) {
isClosed := atomic.LoadUint32(&c.subscription.closed) == 1
if isClosed {
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig)
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
cancel()
break
}
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
cancel()
continue
}
cancel()
return err
return true, err
}
cancel()
break
c.currentConsumer = cons.(*pullConsumer)
return false, nil
}, backoffOpts)
if err != nil {
return err
}
c.currentConsumer = cons.(*pullConsumer)
return nil
Expand All @@ -548,6 +559,10 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
// otherwise, start from the next sequence
nextSeq = c.cursor.streamSeq + 1
}

if c.cfg.MaxResetAttempts == 0 {
c.cfg.MaxResetAttempts = -1
}
name := fmt.Sprintf("%s_%d", c.namePrefix, c.serial)
cfg := &ConsumerConfig{
Name: name,
Expand All @@ -564,6 +579,9 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
} else {
cfg.FilterSubjects = c.cfg.FilterSubjects
}
if c.cfg.InactiveThreshold != 0 {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

if c.serial != 1 {
return cfg
Expand All @@ -589,9 +607,6 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
cfg.DeliverPolicy = DeliverByStartTimePolicy
cfg.OptStartTime = c.cfg.OptStartTime
}
if c.cfg.InactiveThreshold != 0 {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

return cfg
}
Expand All @@ -612,6 +627,20 @@ func messagesStopAfterNotify(numMsgs int, msgsLeftAfterStop chan int) PullMessag
})
}

func consumeReconnectNotify() PullConsumeOpt {
return pullOptFunc(func(opts *consumeOpts) error {
opts.notifyOnReconnect = true
return nil
})
}

func messagesReconnectNotify() PullMessagesOpt {
return pullOptFunc(func(opts *consumeOpts) error {
opts.notifyOnReconnect = true
return nil
})
}

// Info returns information about the ordered consumer.
// Note that this method will fetch the latest instance of the
// consumer from the server, which can be deleted by the library at any time.
Expand Down Expand Up @@ -652,3 +681,91 @@ func (c *orderedConsumer) CachedInfo() *ConsumerInfo {
}
return c.currentConsumer.info
}

type backoffOpts struct {
// total retry attempts
// -1 for unlimited
attempts int
// initial interval after which first retry will be performed
// defaults to 1s
initialInterval time.Duration
// determines whether first function execution should be performed immediately
disableInitialExecution bool
// multiplier on each attempt
// defaults to 2
factor float64
// max interval between retries
// after reaching this value, all subsequent
// retries will be performed with this interval
// defaults to 1 minute
maxInterval time.Duration
// custom backoff intervals
// if set, overrides all other options except attempts
// if attempts are set, then the last interval will be used
// for all subsequent retries after reaching the limit
customBackoff []time.Duration
// cancel channel
// if set, retry will be canceled when this channel is closed
cancel <-chan struct{}
}

func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error {
var err error
var shouldContinue bool
// if custom backoff is set, use it instead of other options
if len(opts.customBackoff) > 0 {
if opts.attempts != 0 {
return fmt.Errorf("cannot use custom backoff intervals when attempts are set")
}
for i, interval := range opts.customBackoff {
select {
case <-opts.cancel:
return nil
case <-time.After(interval):
}
shouldContinue, err = f(i)
if !shouldContinue {
return err
}
}
return err
}

// set default options
if opts.initialInterval == 0 {
opts.initialInterval = 1 * time.Second
}
if opts.factor == 0 {
opts.factor = 2
}
if opts.maxInterval == 0 {
opts.maxInterval = 1 * time.Minute
}
if opts.attempts == 0 {
return fmt.Errorf("retry attempts have to be set when not using custom backoff intervals")
}
interval := opts.initialInterval
for i := 0; ; i++ {
if i == 0 && opts.disableInitialExecution {
time.Sleep(interval)
continue
}
shouldContinue, err = f(i)
if !shouldContinue {
return err
}
if opts.attempts > 0 && i >= opts.attempts-1 {
break
}
select {
case <-opts.cancel:
return nil
case <-time.After(interval):
}
interval = time.Duration(float64(interval) * opts.factor)
if interval >= opts.maxInterval {
interval = opts.maxInterval
}
}
return err
}
Loading

0 comments on commit 66b92a9

Please sign in to comment.