Skip to content

Commit

Permalink
[IMPROVED] Removed calling consumer info on reconnect.
Browse files Browse the repository at this point in the history
- `Consume()` and `Messages()` no longer call `ConsumerInfo()` on upon reconnect.
- Ordered consumers now reset on each reconnect event.

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Jun 13, 2024
1 parent 6642c1b commit 4b04df2
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 341 deletions.
158 changes: 0 additions & 158 deletions jetstream/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,164 +117,6 @@ func TestValidateSubject(t *testing.T) {
}
}

func TestRetryWithBackoff(t *testing.T) {
tests := []struct {
name string
givenOpts backoffOpts
withError bool
timeout time.Duration
cancelAfter time.Duration
successfulAttemptNum int
expectedAttemptsCount int
}{
{
name: "infinite attempts, 5 tries before success",
givenOpts: backoffOpts{
attempts: -1,
initialInterval: 10 * time.Millisecond,
maxInterval: 60 * time.Millisecond,
},
withError: false,
successfulAttemptNum: 5,
// 0ms + 10ms + 20ms + 40ms + 60ms = 130ms
timeout: 200 * time.Millisecond,
expectedAttemptsCount: 5,
},
{
name: "infinite attempts, 5 tries before success, without initial execution",
givenOpts: backoffOpts{
attempts: -1,
initialInterval: 10 * time.Millisecond,
disableInitialExecution: true,
factor: 2,
maxInterval: 60 * time.Millisecond,
},
withError: false,
successfulAttemptNum: 5,
// 10ms + 20ms + 40ms + 60ms + 60 = 190ms
timeout: 250 * time.Millisecond,
expectedAttemptsCount: 5,
},
{
name: "5 attempts, unsuccessful",
givenOpts: backoffOpts{
attempts: 5,
initialInterval: 10 * time.Millisecond,
factor: 2,
maxInterval: 60 * time.Millisecond,
},
withError: true,
// 0ms + 10ms + 20ms + 40ms + 60ms = 130ms
timeout: 200 * time.Millisecond,
expectedAttemptsCount: 5,
},
{
name: "custom backoff values, should override other settings",
givenOpts: backoffOpts{
initialInterval: 2 * time.Second,
factor: 2,
maxInterval: 100 * time.Millisecond,
customBackoff: []time.Duration{10 * time.Millisecond, 20 * time.Millisecond, 30 * time.Millisecond, 40 * time.Millisecond, 50 * time.Millisecond},
},
withError: false,
successfulAttemptNum: 4,
// 10ms + 20ms + 30ms + 40ms = 100ms
timeout: 150 * time.Millisecond,
expectedAttemptsCount: 4,
},
{
name: "no custom backoff, with cancel",
givenOpts: backoffOpts{
attempts: -1,
initialInterval: 100 * time.Millisecond,
factor: 1,
},
withError: false,
cancelAfter: 150 * time.Millisecond,
timeout: 200 * time.Millisecond,
expectedAttemptsCount: 2,
},
{
name: "custom backoff, with cancel",
givenOpts: backoffOpts{
customBackoff: []time.Duration{100 * time.Millisecond, 100 * time.Millisecond, 100 * time.Millisecond},
},
cancelAfter: 150 * time.Millisecond,
expectedAttemptsCount: 1,
timeout: 200 * time.Millisecond,
},
{
name: "attempts num not provided",
givenOpts: backoffOpts{
initialInterval: 100 * time.Millisecond,
factor: 1,
},
withError: true,
timeout: 1 * time.Second,
expectedAttemptsCount: 1,
},
{
name: "custom backoff, but attempts num provided",
givenOpts: backoffOpts{
customBackoff: []time.Duration{100 * time.Millisecond, 100 * time.Millisecond, 100 * time.Millisecond},
attempts: 5,
},
withError: true,
timeout: 1 * time.Second,
expectedAttemptsCount: 1,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ok := make(chan struct{})
errs := make(chan error, 1)

var cancelChan chan struct{}
if test.cancelAfter != 0 {
cancelChan = make(chan struct{})
test.givenOpts.cancel = cancelChan
}
var count int
go func() {
err := retryWithBackoff(func(attempt int) (bool, error) {
count = attempt
if test.successfulAttemptNum != 0 && attempt == test.successfulAttemptNum-1 {
return false, nil
}
return true, fmt.Errorf("error %d", attempt)
}, test.givenOpts)
if err != nil {
errs <- err
return
}
close(ok)
}()
if test.cancelAfter > 0 {
go func() {
time.Sleep(test.cancelAfter)
close(cancelChan)
}()
}
select {
case <-ok:
if test.withError {
t.Fatal("Expected error; got nil")
}
case err := <-errs:
if !test.withError {
t.Fatalf("Unexpected error: %v", err)
}
case <-time.After(test.timeout):
t.Fatalf("Timeout after %v", test.timeout)
}
if count != test.expectedAttemptsCount-1 {
t.Fatalf("Invalid count; want: %d; got: %d", test.expectedAttemptsCount, count)
}
})
}
}

func TestPullConsumer_checkPending(t *testing.T) {
tests := []struct {
name string
Expand Down
55 changes: 31 additions & 24 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
c.userErrHandler = consumeOpts.ErrHandler
opts = append(opts, ConsumeErrHandler(c.errHandler(c.serial)))
opts = append(opts, consumeReconnectNotify(),
ConsumeErrHandler(c.errHandler(c.serial)))
if consumeOpts.StopAfter > 0 {
c.withStopAfter = true
c.stopAfter = consumeOpts.StopAfter
Expand Down Expand Up @@ -196,17 +197,21 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err
return func(cc ConsumeContext, err error) {
c.Lock()
defer c.Unlock()
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) {
fmt.Println("errHandler", err, serial, c.serial)
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) && !errors.Is(err, errConnected) {
c.userErrHandler(cc, err)
}
if errors.Is(err, ErrNoHeartbeat) ||
errors.Is(err, errOrderedSequenceMismatch) ||
errors.Is(err, ErrConsumerDeleted) ||
errors.Is(err, ErrConsumerNotFound) {
errors.Is(err, errConnected) {
fmt.Println("reset", err)
// only reset if serial matches the current consumer serial and there is no reset in progress
if serial == c.serial && atomic.LoadUint32(&c.resetInProgress) == 0 {
atomic.StoreUint32(&c.resetInProgress, 1)
fmt.Println("reset start")
c.doReset <- struct{}{}
fmt.Println("reset done")
}
}
}
Expand Down Expand Up @@ -234,7 +239,9 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
opts = append(opts, WithMessagesErrOnMissingHeartbeat(true))
opts = append(opts,
WithMessagesErrOnMissingHeartbeat(true),
messagesReconnectNotify())
c.stopAfterMsgsLeft = make(chan int, 1)
if consumeOpts.StopAfter > 0 {
c.withStopAfter = true
Expand Down Expand Up @@ -467,25 +474,11 @@ func (c *orderedConsumer) reset() error {
}
consName := c.currentConsumer.CachedInfo().Name
c.currentConsumer.Unlock()
var err error
for i := 0; ; i++ {
if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts {
return fmt.Errorf("%w: maximum number of delete attempts reached: %s", ErrOrderedConsumerReset, err)
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
_ = c.jetStream.DeleteConsumer(ctx, c.stream, consName)
cancel()
if err != nil {
if errors.Is(err, ErrConsumerNotFound) {
break
}
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
continue
}
return err
}
break
}
}()
}

c.cursor.deliverSeq = 0
Expand Down Expand Up @@ -550,6 +543,9 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
} else {
cfg.FilterSubjects = c.cfg.FilterSubjects
}
if c.cfg.InactiveThreshold != 0 {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

if c.serial != 1 {
return cfg
Expand All @@ -575,9 +571,6 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig {
cfg.DeliverPolicy = DeliverByStartTimePolicy
cfg.OptStartTime = c.cfg.OptStartTime
}
if c.cfg.InactiveThreshold != 0 {
cfg.InactiveThreshold = c.cfg.InactiveThreshold
}

return cfg
}
Expand All @@ -598,6 +591,20 @@ func messagesStopAfterNotify(numMsgs int, msgsLeftAfterStop chan int) PullMessag
})
}

func consumeReconnectNotify() PullConsumeOpt {
return pullOptFunc(func(opts *consumeOpts) error {
opts.notifyOnReconnect = true
return nil
})
}

func messagesReconnectNotify() PullMessagesOpt {
return pullOptFunc(func(opts *consumeOpts) error {
opts.notifyOnReconnect = true
return nil
})
}

// Info returns information about the ordered consumer.
// Note that this method will fetch the latest instance of the
// consumer from the server, which can be deleted by the library at any time.
Expand Down
Loading

0 comments on commit 4b04df2

Please sign in to comment.