From 66b92a9bb46a698c91fc3419cf8df0f31f858738 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Fri, 14 Jun 2024 00:19:04 +0200 Subject: [PATCH] [FIXED] Remove `ConsumerInfo()` calls in `Consume()` and `Messages()` after reconnect. (#1643) - `Consume()` and `Messages()` no longer call `ConsumerInfo()` on upon reconnect. - Ordered consumers now reset on each reconnect event. Signed-off-by: Piotr Piotrowski --- go_test.mod | 12 +- go_test.sum | 24 ++-- jetstream/ordered.go | 195 ++++++++++++++++++++++++++------- jetstream/pull.go | 165 ++-------------------------- jetstream/test/ordered_test.go | 109 +++++++++--------- jetstream/test/pull_test.go | 111 ++++++++++--------- test/kv_test.go | 5 +- 7 files changed, 288 insertions(+), 333 deletions(-) diff --git a/go_test.mod b/go_test.mod index 5dfd112f5..f5b731dd0 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,20 +4,20 @@ go 1.19 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.17.6 + github.com/klauspost/compress v1.17.8 github.com/nats-io/jwt v1.2.2 - github.com/nats-io/nats-server/v2 v2.10.11 + github.com/nats-io/nats-server/v2 v2.10.16 github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.3.0 - golang.org/x/text v0.14.0 + golang.org/x/text v0.15.0 google.golang.org/protobuf v1.23.0 ) require ( github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.3 // indirect - golang.org/x/crypto v0.19.0 // indirect - golang.org/x/sys v0.17.0 // indirect + github.com/nats-io/jwt/v2 v2.5.7 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/sys v0.20.0 // indirect golang.org/x/time v0.5.0 // indirect ) diff --git a/go_test.sum b/go_test.sum index d28f0f625..f89d755ba 100644 --- a/go_test.sum +++ b/go_test.sum @@ -10,16 +10,16 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI= -github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= -github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= -github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0= -github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8= +github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c= +github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0= +github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= @@ -31,17 +31,17 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/jetstream/ordered.go b/jetstream/ordered.go index d4e71fafc..998b83dc3 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -43,6 +43,7 @@ type ( stopAfterMsgsLeft chan int withStopAfter bool runningFetch *fetchResult + subscription *orderedSubscription sync.Mutex } @@ -92,7 +93,8 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) } c.userErrHandler = consumeOpts.ErrHandler - opts = append(opts, ConsumeErrHandler(c.errHandler(c.serial))) + opts = append(opts, consumeReconnectNotify(), + ConsumeErrHandler(c.errHandler(c.serial))) if consumeOpts.StopAfter > 0 { c.withStopAfter = true c.stopAfter = consumeOpts.StopAfter @@ -105,6 +107,7 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt consumer: c, done: make(chan struct{}, 1), } + c.subscription = sub internalHandler := func(serial int) func(msg Msg) { return func(msg Msg) { // handler is a noop if message was delivered for a consumer with different serial @@ -197,13 +200,13 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err return func(cc ConsumeContext, err error) { c.Lock() defer c.Unlock() - if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) { + if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) && !errors.Is(err, errConnected) { c.userErrHandler(cc, err) } if errors.Is(err, ErrNoHeartbeat) || errors.Is(err, errOrderedSequenceMismatch) || errors.Is(err, ErrConsumerDeleted) || - errors.Is(err, ErrConsumerNotFound) { + errors.Is(err, errConnected) { // only reset if serial matches the current consumer serial and there is no reset in progress if serial == c.serial && atomic.LoadUint32(&c.resetInProgress) == 0 { atomic.StoreUint32(&c.resetInProgress, 1) @@ -235,7 +238,9 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er if err != nil { return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) } - opts = append(opts, WithMessagesErrOnMissingHeartbeat(true)) + opts = append(opts, + WithMessagesErrOnMissingHeartbeat(true), + messagesReconnectNotify()) c.stopAfterMsgsLeft = make(chan int, 1) if consumeOpts.StopAfter > 0 { c.withStopAfter = true @@ -255,6 +260,7 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er opts: opts, done: make(chan struct{}, 1), } + c.subscription = sub return sub, nil } @@ -367,6 +373,11 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro } c.currentConsumer.Unlock() c.consumerType = consumerTypeFetch + sub := orderedSubscription{ + consumer: c, + done: make(chan struct{}), + } + c.subscription = &sub err := c.reset() if err != nil { return nil, err @@ -397,6 +408,11 @@ func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBat c.cursor.streamSeq = c.runningFetch.sseq } c.consumerType = consumerTypeFetch + sub := orderedSubscription{ + consumer: c, + done: make(chan struct{}), + } + c.subscription = &sub err := c.reset() if err != nil { return nil, err @@ -425,6 +441,11 @@ func (c *orderedConsumer) FetchNoWait(batch int) (MessageBatch, error) { return nil, ErrOrderedConsumerConcurrentRequests } c.consumerType = consumerTypeFetch + sub := orderedSubscription{ + consumer: c, + done: make(chan struct{}), + } + c.subscription = &sub err := c.reset() if err != nil { return nil, err @@ -481,25 +502,11 @@ func (c *orderedConsumer) reset() error { } consName := c.currentConsumer.CachedInfo().Name c.currentConsumer.Unlock() - var err error - for i := 0; ; i++ { - if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts { - return fmt.Errorf("%w: maximum number of delete attempts reached: %s", ErrOrderedConsumerReset, err) - } + go func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - err = c.jetStream.DeleteConsumer(ctx, c.stream, consName) + _ = c.jetStream.DeleteConsumer(ctx, c.stream, consName) cancel() - if err != nil { - if errors.Is(err, ErrConsumerNotFound) { - break - } - if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) { - continue - } - return err - } - break - } + }() } c.cursor.deliverSeq = 0 @@ -507,26 +514,30 @@ func (c *orderedConsumer) reset() error { var err error var cons Consumer - for i := 0; ; i++ { - if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts { - return fmt.Errorf("%w: maximum number of create consumer attempts reached: %s", ErrOrderedConsumerReset, err) + + backoffOpts := backoffOpts{ + attempts: c.cfg.MaxResetAttempts, + initialInterval: time.Second, + factor: 2, + maxInterval: 10 * time.Second, + cancel: c.subscription.done, + } + err = retryWithBackoff(func(attempt int) (bool, error) { + isClosed := atomic.LoadUint32(&c.subscription.closed) == 1 + if isClosed { + return false, nil } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig) if err != nil { - if errors.Is(err, ErrConsumerNotFound) { - cancel() - break - } - if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) { - cancel() - continue - } - cancel() - return err + return true, err } - cancel() - break + c.currentConsumer = cons.(*pullConsumer) + return false, nil + }, backoffOpts) + if err != nil { + return err } c.currentConsumer = cons.(*pullConsumer) return nil @@ -548,6 +559,10 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig { // otherwise, start from the next sequence nextSeq = c.cursor.streamSeq + 1 } + + if c.cfg.MaxResetAttempts == 0 { + c.cfg.MaxResetAttempts = -1 + } name := fmt.Sprintf("%s_%d", c.namePrefix, c.serial) cfg := &ConsumerConfig{ Name: name, @@ -564,6 +579,9 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig { } else { cfg.FilterSubjects = c.cfg.FilterSubjects } + if c.cfg.InactiveThreshold != 0 { + cfg.InactiveThreshold = c.cfg.InactiveThreshold + } if c.serial != 1 { return cfg @@ -589,9 +607,6 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig { cfg.DeliverPolicy = DeliverByStartTimePolicy cfg.OptStartTime = c.cfg.OptStartTime } - if c.cfg.InactiveThreshold != 0 { - cfg.InactiveThreshold = c.cfg.InactiveThreshold - } return cfg } @@ -612,6 +627,20 @@ func messagesStopAfterNotify(numMsgs int, msgsLeftAfterStop chan int) PullMessag }) } +func consumeReconnectNotify() PullConsumeOpt { + return pullOptFunc(func(opts *consumeOpts) error { + opts.notifyOnReconnect = true + return nil + }) +} + +func messagesReconnectNotify() PullMessagesOpt { + return pullOptFunc(func(opts *consumeOpts) error { + opts.notifyOnReconnect = true + return nil + }) +} + // Info returns information about the ordered consumer. // Note that this method will fetch the latest instance of the // consumer from the server, which can be deleted by the library at any time. @@ -652,3 +681,91 @@ func (c *orderedConsumer) CachedInfo() *ConsumerInfo { } return c.currentConsumer.info } + +type backoffOpts struct { + // total retry attempts + // -1 for unlimited + attempts int + // initial interval after which first retry will be performed + // defaults to 1s + initialInterval time.Duration + // determines whether first function execution should be performed immediately + disableInitialExecution bool + // multiplier on each attempt + // defaults to 2 + factor float64 + // max interval between retries + // after reaching this value, all subsequent + // retries will be performed with this interval + // defaults to 1 minute + maxInterval time.Duration + // custom backoff intervals + // if set, overrides all other options except attempts + // if attempts are set, then the last interval will be used + // for all subsequent retries after reaching the limit + customBackoff []time.Duration + // cancel channel + // if set, retry will be canceled when this channel is closed + cancel <-chan struct{} +} + +func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error { + var err error + var shouldContinue bool + // if custom backoff is set, use it instead of other options + if len(opts.customBackoff) > 0 { + if opts.attempts != 0 { + return fmt.Errorf("cannot use custom backoff intervals when attempts are set") + } + for i, interval := range opts.customBackoff { + select { + case <-opts.cancel: + return nil + case <-time.After(interval): + } + shouldContinue, err = f(i) + if !shouldContinue { + return err + } + } + return err + } + + // set default options + if opts.initialInterval == 0 { + opts.initialInterval = 1 * time.Second + } + if opts.factor == 0 { + opts.factor = 2 + } + if opts.maxInterval == 0 { + opts.maxInterval = 1 * time.Minute + } + if opts.attempts == 0 { + return fmt.Errorf("retry attempts have to be set when not using custom backoff intervals") + } + interval := opts.initialInterval + for i := 0; ; i++ { + if i == 0 && opts.disableInitialExecution { + time.Sleep(interval) + continue + } + shouldContinue, err = f(i) + if !shouldContinue { + return err + } + if opts.attempts > 0 && i >= opts.attempts-1 { + break + } + select { + case <-opts.cancel: + return nil + case <-time.After(interval): + } + interval = time.Duration(float64(interval) * opts.factor) + if interval >= opts.maxInterval { + interval = opts.maxInterval + } + } + return err +} diff --git a/jetstream/pull.go b/jetstream/pull.go index 3196ad519..001a0d183 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -14,7 +14,6 @@ package jetstream import ( - "context" "encoding/json" "errors" "fmt" @@ -103,6 +102,7 @@ type ( ThresholdBytes int StopAfter int stopAfterMsgsLeft chan int + notifyOnReconnect bool } ConsumeErrHandlerFunc func(consumeCtx ConsumeContext, err error) @@ -304,42 +304,8 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( sub.Lock() if !isConnected { isConnected = true - // try fetching consumer info several times to make sure consumer is available after reconnect - backoffOpts := backoffOpts{ - attempts: 10, - initialInterval: 1 * time.Second, - disableInitialExecution: true, - factor: 2, - maxInterval: 10 * time.Second, - cancel: sub.done, - } - err = retryWithBackoff(func(attempt int) (bool, error) { - isClosed := atomic.LoadUint32(&sub.closed) == 1 - if isClosed { - return false, nil - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err := p.Info(ctx) - if err != nil { - if sub.consumeOpts.ErrHandler != nil { - err = fmt.Errorf("[%d] attempting to fetch consumer info after reconnect: %w", attempt, err) - if attempt == backoffOpts.attempts-1 { - err = errors.Join(err, fmt.Errorf("maximum retry attempts reached")) - } - sub.consumeOpts.ErrHandler(sub, err) - } - return true, err - } - return false, nil - }, backoffOpts) - if err != nil { - if sub.consumeOpts.ErrHandler != nil { - sub.consumeOpts.ErrHandler(sub, err) - } - sub.Unlock() - sub.cleanup() - return + if sub.consumeOpts.notifyOnReconnect { + sub.errs <- errConnected } sub.fetchNext <- &pullRequest{ @@ -596,39 +562,10 @@ func (s *pullSubscription) Next() (Msg, error) { if errors.Is(err, errConnected) { if !isConnected { isConnected = true - // try fetching consumer info several times to make sure consumer is available after reconnect - backoffOpts := backoffOpts{ - attempts: 10, - initialInterval: 1 * time.Second, - disableInitialExecution: true, - factor: 2, - maxInterval: 10 * time.Second, - cancel: s.done, - } - err = retryWithBackoff(func(attempt int) (bool, error) { - isClosed := atomic.LoadUint32(&s.closed) == 1 - if isClosed { - return false, nil - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err := s.consumer.Info(ctx) - if err != nil { - if errors.Is(err, ErrConsumerNotFound) { - return false, err - } - if attempt == backoffOpts.attempts-1 { - return true, fmt.Errorf("could not get consumer info after server reconnect: %w", err) - } - return true, err - } - return false, nil - }, backoffOpts) - if err != nil { - s.Stop() - return nil, err - } + if s.consumeOpts.notifyOnReconnect { + return nil, errConnected + } s.pending.msgCount = 0 s.pending.byteCount = 0 if hbMonitor != nil { @@ -638,7 +575,7 @@ func (s *pullSubscription) Next() (Msg, error) { } if errors.Is(err, errDisconnected) { if hbMonitor != nil { - hbMonitor.Reset(2 * s.consumeOpts.Heartbeat) + hbMonitor.Stop() } isConnected = false } @@ -1058,94 +995,6 @@ func (consumeOpts *consumeOpts) setDefaults(ordered bool) error { return nil } -type backoffOpts struct { - // total retry attempts - // -1 for unlimited - attempts int - // initial interval after which first retry will be performed - // defaults to 1s - initialInterval time.Duration - // determines whether first function execution should be performed immediately - disableInitialExecution bool - // multiplier on each attempt - // defaults to 2 - factor float64 - // max interval between retries - // after reaching this value, all subsequent - // retries will be performed with this interval - // defaults to 1 minute - maxInterval time.Duration - // custom backoff intervals - // if set, overrides all other options except attempts - // if attempts are set, then the last interval will be used - // for all subsequent retries after reaching the limit - customBackoff []time.Duration - // cancel channel - // if set, retry will be canceled when this channel is closed - cancel <-chan struct{} -} - -func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error { - var err error - var shouldContinue bool - // if custom backoff is set, use it instead of other options - if len(opts.customBackoff) > 0 { - if opts.attempts != 0 { - return fmt.Errorf("cannot use custom backoff intervals when attempts are set") - } - for i, interval := range opts.customBackoff { - select { - case <-opts.cancel: - return nil - case <-time.After(interval): - } - shouldContinue, err = f(i) - if !shouldContinue { - return err - } - } - return err - } - - // set default options - if opts.initialInterval == 0 { - opts.initialInterval = 1 * time.Second - } - if opts.factor == 0 { - opts.factor = 2 - } - if opts.maxInterval == 0 { - opts.maxInterval = 1 * time.Minute - } - if opts.attempts == 0 { - return fmt.Errorf("retry attempts have to be set when not using custom backoff intervals") - } - interval := opts.initialInterval - for i := 0; ; i++ { - if i == 0 && opts.disableInitialExecution { - time.Sleep(interval) - continue - } - shouldContinue, err = f(i) - if !shouldContinue { - return err - } - if opts.attempts > 0 && i >= opts.attempts-1 { - break - } - select { - case <-opts.cancel: - return nil - case <-time.After(interval): - } - interval = time.Duration(float64(interval) * opts.factor) - if interval >= opts.maxInterval { - interval = opts.maxInterval - } - } - return err -} - func (c *pullConsumer) getSubscription(id string) (*pullSubscription, bool) { c.Lock() defer c.Unlock() diff --git a/jetstream/test/ordered_test.go b/jetstream/test/ordered_test.go index 3a8b00d97..01a67a8ba 100644 --- a/jetstream/test/ordered_test.go +++ b/jetstream/test/ordered_test.go @@ -29,9 +29,9 @@ import ( func TestOrderedConsumerConsume(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -72,7 +72,7 @@ func TestOrderedConsumerConsume(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() name := c.CachedInfo().Name @@ -80,7 +80,7 @@ func TestOrderedConsumerConsume(t *testing.T) { t.Fatal(err) } wg.Add(len(testMsgs)) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() l.Stop() @@ -125,13 +125,13 @@ func TestOrderedConsumerConsume(t *testing.T) { t.Fatal(err) } wg.Add(len(testMsgs)) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() l.Stop() }) - t.Run("reset consumer before receiving any messages with custom start seq", func(t *testing.T) { + t.Run("with custom start seq", func(t *testing.T) { srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv) nc, err := nats.Connect(srv.ClientURL()) @@ -151,38 +151,37 @@ func TestOrderedConsumerConsume(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + publishTestMsgs(t, js) c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{DeliverPolicy: jetstream.DeliverByStartSequencePolicy, OptStartSeq: 3}) if err != nil { t.Fatalf("Unexpected error: %v", err) } wg := &sync.WaitGroup{} + wg.Add(len(testMsgs) - 2) l, err := c.Consume(func(msg jetstream.Msg) { wg.Done() }) if err != nil { t.Fatalf("Unexpected error: %v", err) } - time.Sleep(500 * time.Millisecond) + defer l.Stop() - name := c.CachedInfo().Name - if err := s.DeleteConsumer(ctx, name); err != nil { - t.Fatal(err) - } - // should receive messages with sequences 3, 4 and 5 - wg.Add(len(testMsgs) - 2) - publishTestMsgs(t, nc) wg.Wait() + time.Sleep(500 * time.Millisecond) // now delete consumer again and publish some more messages, all should be received normally - name = c.CachedInfo().Name - if err := s.DeleteConsumer(ctx, name); err != nil { + info, err := c.Info(ctx) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if err := s.DeleteConsumer(ctx, info.Config.Name); err != nil { t.Fatal(err) } wg.Add(len(testMsgs)) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() - l.Stop() }) t.Run("base usage, server shutdown", func(t *testing.T) { @@ -226,21 +225,13 @@ func TestOrderedConsumerConsume(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() srv = restartBasicJSServer(t, srv) defer shutdownJSServerAndRemoveStorage(t, srv) - select { - case err := <-errs: - if !errors.Is(err, jetstream.ErrConsumerNotFound) { - t.Fatalf("Expected error: %v; got: %v", jetstream.ErrConsumerNotFound, err) - } - case <-time.After(5 * time.Second): - t.Fatal("timeout waiting for error") - } wg.Add(len(testMsgs)) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() l.Stop() @@ -290,7 +281,7 @@ func TestOrderedConsumerConsume(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) select { case err := <-errs: if !errors.Is(err, jetstream.ErrNoHeartbeat) { @@ -302,7 +293,7 @@ func TestOrderedConsumerConsume(t *testing.T) { wg.Wait() wg.Add(len(testMsgs)) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() l.Stop() }) @@ -332,7 +323,7 @@ func TestOrderedConsumerConsume(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msgs, err := c.Fetch(5) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -535,7 +526,7 @@ func TestOrderedConsumerConsume(t *testing.T) { } wg := &sync.WaitGroup{} wg.Add(5) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) cc, err := c.Consume(func(msg jetstream.Msg) { time.Sleep(50 * time.Millisecond) msg.Ack() @@ -553,9 +544,9 @@ func TestOrderedConsumerConsume(t *testing.T) { func TestOrderedConsumerMessages(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -592,7 +583,7 @@ func TestOrderedConsumerMessages(t *testing.T) { } defer it.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < 5; i++ { msg, err := it.Next() if err != nil { @@ -604,7 +595,7 @@ func TestOrderedConsumerMessages(t *testing.T) { if err := s.DeleteConsumer(ctx, name); err != nil { t.Fatal(err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < 5; i++ { msg, err := it.Next() if err != nil { @@ -649,7 +640,7 @@ func TestOrderedConsumerMessages(t *testing.T) { } defer it.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < 5; i++ { msg, err := it.Next() if err != nil { @@ -659,7 +650,7 @@ func TestOrderedConsumerMessages(t *testing.T) { } srv = restartBasicJSServer(t, srv) defer shutdownJSServerAndRemoveStorage(t, srv) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < 5; i++ { msg, err := it.Next() if err != nil { @@ -708,7 +699,7 @@ func TestOrderedConsumerMessages(t *testing.T) { } defer it.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < 5; i++ { msg, err := it.Next() if err != nil { @@ -716,7 +707,7 @@ func TestOrderedConsumerMessages(t *testing.T) { } msgs = append(msgs, msg) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < 5; i++ { msg, err := it.Next() if err != nil { @@ -916,7 +907,7 @@ func TestOrderedConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msgs, err := c.Fetch(5) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -994,7 +985,7 @@ func TestOrderedConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) go func() { time.Sleep(100 * time.Millisecond) it.Drain() @@ -1022,9 +1013,9 @@ func TestOrderedConsumerMessages(t *testing.T) { func TestOrderedConsumerFetch(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -1056,7 +1047,7 @@ func TestOrderedConsumerFetch(t *testing.T) { msgs := make([]jetstream.Msg, 0) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) res, err := c.Fetch(5) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1072,7 +1063,7 @@ func TestOrderedConsumerFetch(t *testing.T) { if err := s.DeleteConsumer(ctx, name); err != nil { t.Fatal(err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) res, err = c.Fetch(5) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1150,7 +1141,7 @@ func TestOrderedConsumerFetch(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) res, err := c.Fetch(1, jetstream.FetchMaxWait(100*time.Millisecond)) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1168,9 +1159,9 @@ func TestOrderedConsumerFetch(t *testing.T) { func TestOrderedConsumerFetchBytes(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -1202,7 +1193,7 @@ func TestOrderedConsumerFetchBytes(t *testing.T) { msgs := make([]jetstream.Msg, 0) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) res, err := c.FetchBytes(500, jetstream.FetchMaxWait(100*time.Millisecond)) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1218,7 +1209,7 @@ func TestOrderedConsumerFetchBytes(t *testing.T) { if err := s.DeleteConsumer(ctx, name); err != nil { t.Fatal(err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) res, err = c.Fetch(500, jetstream.FetchMaxWait(100*time.Millisecond)) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1296,7 +1287,7 @@ func TestOrderedConsumerFetchBytes(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) res, err := c.FetchBytes(500, jetstream.FetchMaxWait(100*time.Millisecond)) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1314,9 +1305,9 @@ func TestOrderedConsumerFetchBytes(t *testing.T) { func TestOrderedConsumerNext(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -1346,7 +1337,7 @@ func TestOrderedConsumerNext(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msg, err := c.Next() if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1404,9 +1395,9 @@ func TestOrderedConsumerNext(t *testing.T) { func TestOrderedConsumerFetchNoWait(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -1438,7 +1429,7 @@ func TestOrderedConsumerFetchNoWait(t *testing.T) { msgs := make([]jetstream.Msg, 0) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) res, err := c.FetchNoWait(5) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1454,7 +1445,7 @@ func TestOrderedConsumerFetchNoWait(t *testing.T) { if err := s.DeleteConsumer(ctx, name); err != nil { t.Fatal(err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) res, err = c.FetchNoWait(5) if err != nil { t.Fatalf("Unexpected error: %s", err) diff --git a/jetstream/test/pull_test.go b/jetstream/test/pull_test.go index b46697e7a..f35aae315 100644 --- a/jetstream/test/pull_test.go +++ b/jetstream/test/pull_test.go @@ -28,9 +28,9 @@ import ( func TestPullConsumerFetch(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -61,7 +61,7 @@ func TestPullConsumerFetch(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msgs, err := c.Fetch(5) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -107,7 +107,7 @@ func TestPullConsumerFetch(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msgs, err := c.Fetch(10) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -184,7 +184,7 @@ func TestPullConsumerFetch(t *testing.T) { }() time.Sleep(10 * time.Millisecond) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) select { case err := <-errs: t.Fatalf("Unexpected error: %v", err) @@ -230,7 +230,7 @@ func TestPullConsumerFetch(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } time.Sleep(100 * time.Millisecond) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msg := <-msgs.Messages() if msg != nil { @@ -263,14 +263,14 @@ func TestPullConsumerFetch(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) time.Sleep(50 * time.Millisecond) msgs, err := c.FetchNoWait(10) if err != nil { t.Fatalf("Unexpected error: %v", err) } time.Sleep(100 * time.Millisecond) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) var msgsNum int for range msgs.Messages() { @@ -376,7 +376,7 @@ func TestPullConsumerFetch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) // fetch 5 messages, should return normally msgs, err := c.Fetch(5, jetstream.FetchHeartbeat(50*time.Millisecond)) if err != nil { @@ -480,14 +480,13 @@ func TestPullConsumerFetch(t *testing.T) { func TestPullConsumerFetchBytes(t *testing.T) { testSubject := "FOO.123" msg := [10]byte{} - publishTestMsgs := func(t *testing.T, nc *nats.Conn, count int) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream, count int) { for i := 0; i < count; i++ { - if err := nc.Publish(testSubject, msg[:]); err != nil { + if _, err := js.Publish(context.Background(), testSubject, msg[:]); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } } - t.Run("no options, exact byte count received", func(t *testing.T) { srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv) @@ -513,7 +512,7 @@ func TestPullConsumerFetchBytes(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc, 5) + publishTestMsgs(t, js, 5) // actual received msg size will be 60 (payload=10 + Subject=7 + Reply=43) msgs, err := c.FetchBytes(300) if err != nil { @@ -558,7 +557,7 @@ func TestPullConsumerFetchBytes(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc, 5) + publishTestMsgs(t, js, 5) // actual received msg size will be 60 (payload=10 + Subject=7 + Reply=43) msgs, err := c.FetchBytes(250) if err != nil { @@ -602,7 +601,7 @@ func TestPullConsumerFetchBytes(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc, 5) + publishTestMsgs(t, js, 5) // actual received msg size will be 60 (payload=10 + Subject=7 + Reply=43) msgs, err := c.FetchBytes(30) if err != nil { @@ -647,7 +646,7 @@ func TestPullConsumerFetchBytes(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc, 5) + publishTestMsgs(t, js, 5) // actual received msg size will be 60 (payload=10 + Subject=7 + Reply=43) msgs, err := c.FetchBytes(1000, jetstream.FetchMaxWait(50*time.Millisecond)) if err != nil { @@ -779,9 +778,9 @@ func TestPullConsumerFetchBytes(t *testing.T) { func TestPullConsumerFetch_WithCluster(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -819,7 +818,7 @@ func TestPullConsumerFetch_WithCluster(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msgs, err := c.Fetch(5) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -867,7 +866,7 @@ func TestPullConsumerFetch_WithCluster(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } time.Sleep(100 * time.Millisecond) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msg := <-msgs.Messages() if msg != nil { @@ -880,9 +879,9 @@ func TestPullConsumerFetch_WithCluster(t *testing.T) { func TestPullConsumerMessages(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -919,7 +918,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < len(testMsgs); i++ { msg, err := it.Next() if err != nil { @@ -981,7 +980,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < len(testMsgs); i++ { msg, err := it.Next() if err != nil { @@ -1042,7 +1041,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < len(testMsgs); i++ { msg, err := it.Next() if err != nil { @@ -1110,7 +1109,7 @@ func TestPullConsumerMessages(t *testing.T) { } defer it.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < len(testMsgs); i++ { msg, err := it.Next() if err != nil { @@ -1133,7 +1132,7 @@ func TestPullConsumerMessages(t *testing.T) { if !errors.Is(err, jetstream.ErrConsumerDeleted) { t.Fatalf("Expected error: %v; got: %v", jetstream.ErrConsumerDeleted, err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) time.Sleep(50 * time.Millisecond) _, err = it.Next() if !errors.Is(err, jetstream.ErrMsgIteratorClosed) { @@ -1180,7 +1179,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < len(testMsgs); i++ { msg, err := it.Next() if err != nil { @@ -1249,7 +1248,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < len(testMsgs); i++ { msg, err := it.Next() if err != nil { @@ -1490,7 +1489,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) for i := 0; i < len(testMsgs); i++ { msg, err := it.Next() if err != nil { @@ -1506,7 +1505,7 @@ func TestPullConsumerMessages(t *testing.T) { it.Stop() time.Sleep(10 * time.Millisecond) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) it, err = c.Messages() if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -1599,7 +1598,7 @@ func TestPullConsumerMessages(t *testing.T) { done := make(chan struct{}) errs := make(chan error) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) go func() { for i := 0; i < 2*len(testMsgs); i++ { msg, err := it.Next() @@ -1617,7 +1616,7 @@ func TestPullConsumerMessages(t *testing.T) { srv = restartBasicJSServer(t, srv) defer shutdownJSServerAndRemoveStorage(t, srv) time.Sleep(10 * time.Millisecond) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) select { case <-done: @@ -1667,7 +1666,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) errs := make(chan error) msgs := make([]jetstream.Msg, 0) @@ -1737,7 +1736,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) go func() { time.Sleep(100 * time.Millisecond) it.Stop() @@ -1792,7 +1791,7 @@ func TestPullConsumerMessages(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) go func() { time.Sleep(100 * time.Millisecond) it.Drain() @@ -1820,9 +1819,9 @@ func TestPullConsumerMessages(t *testing.T) { func TestPullConsumerConsume(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -1865,7 +1864,7 @@ func TestPullConsumerConsume(t *testing.T) { } defer l.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() if len(msgs) != len(testMsgs) { t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs)) @@ -1924,7 +1923,7 @@ func TestPullConsumerConsume(t *testing.T) { defer l2.Stop() wg.Add(len(testMsgs)) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() if len(msgs1)+len(msgs2) != len(testMsgs) { @@ -1974,7 +1973,7 @@ func TestPullConsumerConsume(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() l.Stop() @@ -1991,7 +1990,7 @@ func TestPullConsumerConsume(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } defer l.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() if len(msgs) != 2*len(testMsgs) { t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs)) @@ -2041,7 +2040,7 @@ func TestPullConsumerConsume(t *testing.T) { } defer l.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() if len(msgs) != len(testMsgs) { @@ -2091,7 +2090,7 @@ func TestPullConsumerConsume(t *testing.T) { } defer l.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() if len(msgs) != len(testMsgs) { @@ -2144,7 +2143,7 @@ func TestPullConsumerConsume(t *testing.T) { } defer l.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() if len(msgs) != len(testMsgs) { t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs)) @@ -2160,7 +2159,7 @@ func TestPullConsumerConsume(t *testing.T) { case <-time.After(5 * time.Second): t.Fatalf("Timeout waiting for %v", jetstream.ErrConsumerDeleted) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) time.Sleep(50 * time.Millisecond) if len(msgs) != len(testMsgs) { t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs)) @@ -2197,7 +2196,7 @@ func TestPullConsumerConsume(t *testing.T) { t.Fatalf("Error on subscribe: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msgs := make([]jetstream.Msg, 0) wg := &sync.WaitGroup{} wg.Add(len(testMsgs)) @@ -2359,7 +2358,7 @@ func TestPullConsumerConsume(t *testing.T) { } defer l.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() if len(msgs) != len(testMsgs) { @@ -2441,7 +2440,7 @@ func TestPullConsumerConsume(t *testing.T) { } defer l.Stop() - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() if len(msgs) != len(testMsgs) { t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs)) @@ -2480,7 +2479,7 @@ func TestPullConsumerConsume(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(2 * len(testMsgs)) msgs := make([]jetstream.Msg, 0) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) l, err := c.Consume(func(msg jetstream.Msg) { msgs = append(msgs, msg) wg.Done() @@ -2494,7 +2493,7 @@ func TestPullConsumerConsume(t *testing.T) { srv = restartBasicJSServer(t, srv) defer shutdownJSServerAndRemoveStorage(t, srv) time.Sleep(10 * time.Millisecond) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) wg.Wait() }) @@ -2524,7 +2523,7 @@ func TestPullConsumerConsume(t *testing.T) { } wg := &sync.WaitGroup{} wg.Add(2) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msgs := make([]jetstream.Msg, 0) cc, err := c.Consume(func(msg jetstream.Msg) { time.Sleep(80 * time.Millisecond) @@ -2572,7 +2571,7 @@ func TestPullConsumerConsume(t *testing.T) { } wg := &sync.WaitGroup{} wg.Add(5) - publishTestMsgs(t, nc) + publishTestMsgs(t, js) cc, err := c.Consume(func(msg jetstream.Msg) { time.Sleep(50 * time.Millisecond) msg.Ack() @@ -2809,9 +2808,9 @@ func TestPullConsumerConsume_WithCluster(t *testing.T) { func TestPullConsumerNext(t *testing.T) { testSubject := "FOO.123" testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} - publishTestMsgs := func(t *testing.T, nc *nats.Conn) { + publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { for _, msg := range testMsgs { - if err := nc.Publish(testSubject, []byte(msg)); err != nil { + if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { t.Fatalf("Unexpected error during publish: %s", err) } } @@ -2842,7 +2841,7 @@ func TestPullConsumerNext(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - publishTestMsgs(t, nc) + publishTestMsgs(t, js) msgs := make([]jetstream.Msg, 0) var i int diff --git a/test/kv_test.go b/test/kv_test.go index 0c8918552..a1b5ca45d 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -1509,7 +1509,6 @@ func TestKeyValueSourcing(t *testing.T) { t.Fatalf("Error creating kv: %v", err) } - // Wait half a second to make sure it has time to populate the stream from it's sources i := 0 for { status, err := kvC.Status() @@ -1520,11 +1519,11 @@ func TestKeyValueSourcing(t *testing.T) { break } else { i++ - if i > 3 { + if i > 10 { t.Fatalf("Error sourcing bucket does not contain the expected number of values") } } - time.Sleep(20 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } if _, err := kvC.Get("keyA"); err != nil {