Skip to content

Commit

Permalink
[IMPROVED] Use errors.New instead of fmt.Errorf where possible (#1707)
Browse files Browse the repository at this point in the history
  • Loading branch information
canack authored Sep 19, 2024
1 parent a06b6a9 commit 8bd1736
Show file tree
Hide file tree
Showing 21 changed files with 63 additions and 59 deletions.
2 changes: 1 addition & 1 deletion jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (Je
}
}
if apiPrefix == "" {
return nil, fmt.Errorf("API prefix cannot be empty")
return nil, errors.New("API prefix cannot be empty")
}
if !strings.HasSuffix(apiPrefix, ".") {
jsOpts.apiPrefix = fmt.Sprintf("%s.", apiPrefix)
Expand Down
4 changes: 2 additions & 2 deletions jetstream/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,13 @@ func TestPullConsumer_checkPending(t *testing.T) {
}
ok <- struct{}{}
case <-time.After(1 * time.Second):
errs <- fmt.Errorf("Timeout")
errs <- errors.New("Timeout")
return
}
} else {
select {
case <-prChan:
errs <- fmt.Errorf("Unexpected pull request")
errs <- errors.New("Unexpected pull request")
case <-time.After(100 * time.Millisecond):
ok <- struct{}{}
return
Expand Down
5 changes: 3 additions & 2 deletions jetstream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jetstream
import (
"bytes"
"context"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -434,15 +435,15 @@ func parsePending(msg *nats.Msg) (int, int, error) {
if msgsLeftStr != "" {
msgsLeft, err = strconv.Atoi(msgsLeftStr)
if err != nil {
return 0, 0, fmt.Errorf("nats: invalid format of Nats-Pending-Messages")
return 0, 0, errors.New("nats: invalid format of Nats-Pending-Messages")
}
}
bytesLeftStr := msg.Header.Get("Nats-Pending-Bytes")
var bytesLeft int
if bytesLeftStr != "" {
bytesLeft, err = strconv.Atoi(bytesLeftStr)
if err != nil {
return 0, 0, fmt.Errorf("nats: invalid format of Nats-Pending-Bytes")
return 0, 0, errors.New("nats: invalid format of Nats-Pending-Bytes")
}
}
return msgsLeft, bytesLeft, nil
Expand Down
4 changes: 2 additions & 2 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error {
// if custom backoff is set, use it instead of other options
if len(opts.customBackoff) > 0 {
if opts.attempts != 0 {
return fmt.Errorf("cannot use custom backoff intervals when attempts are set")
return errors.New("cannot use custom backoff intervals when attempts are set")
}
for i, interval := range opts.customBackoff {
select {
Expand All @@ -774,7 +774,7 @@ func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error {
opts.maxInterval = 1 * time.Minute
}
if opts.attempts == 0 {
return fmt.Errorf("retry attempts have to be set when not using custom backoff intervals")
return errors.New("retry attempts have to be set when not using custom backoff intervals")
}
interval := opts.initialInterval
for i := 0; ; i++ {
Expand Down
4 changes: 2 additions & 2 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ func parseMessagesOpts(ordered bool, opts ...PullMessagesOpt) (*consumeOpts, err

func (consumeOpts *consumeOpts) setDefaults(ordered bool) error {
if consumeOpts.MaxBytes != unset && consumeOpts.MaxMessages != unset {
return fmt.Errorf("only one of MaxMessages and MaxBytes can be specified")
return errors.New("only one of MaxMessages and MaxBytes can be specified")
}
if consumeOpts.MaxBytes != unset {
// when max_bytes is used, set batch size to a very large number
Expand Down Expand Up @@ -1007,7 +1007,7 @@ func (consumeOpts *consumeOpts) setDefaults(ordered bool) error {
}
}
if consumeOpts.Heartbeat > consumeOpts.Expires/2 {
return fmt.Errorf("the value of Heartbeat must be less than 50%% of expiry")
return errors.New("the value of Heartbeat must be less than 50%% of expiry")
}
return nil
}
10 changes: 5 additions & 5 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,24 +504,24 @@ func convertDirectGetMsgResponseToMsg(name string, r *nats.Msg) (*RawStreamMsg,
// Check for headers that give us the required information to
// reconstruct the message.
if len(r.Header) == 0 {
return nil, fmt.Errorf("nats: response should have headers")
return nil, errors.New("nats: response should have headers")
}
stream := r.Header.Get(StreamHeader)
if stream == "" {
return nil, fmt.Errorf("nats: missing stream header")
return nil, errors.New("nats: missing stream header")
}

seqStr := r.Header.Get(SequenceHeader)
if seqStr == "" {
return nil, fmt.Errorf("nats: missing sequence header")
return nil, errors.New("nats: missing sequence header")
}
seq, err := strconv.ParseUint(seqStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("nats: invalid sequence header '%s': %v", seqStr, err)
}
timeStr := r.Header.Get(TimeStampHeaer)
if timeStr == "" {
return nil, fmt.Errorf("nats: missing timestamp header")
return nil, errors.New("nats: missing timestamp header")
}

tm, err := time.Parse(time.RFC3339Nano, timeStr)
Expand All @@ -530,7 +530,7 @@ func convertDirectGetMsgResponseToMsg(name string, r *nats.Msg) (*RawStreamMsg,
}
subj := r.Header.Get(SubjectHeader)
if subj == "" {
return nil, fmt.Errorf("nats: missing subject header")
return nil, errors.New("nats: missing subject header")
}
return &RawStreamMsg{
Subject: subj,
Expand Down
5 changes: 3 additions & 2 deletions jetstream/stream_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package jetstream

import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -584,7 +585,7 @@ func (alg StoreCompression) MarshalJSON() ([]byte, error) {
case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
return nil, errors.New("unknown compression algorithm")
}
return json.Marshal(str)
}
Expand All @@ -600,7 +601,7 @@ func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
return errors.New("unknown compression algorithm")
}
return nil
}
2 changes: 1 addition & 1 deletion jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ func TestKeyValueMirrorCrossDomains(t *testing.T) {
checkFor(t, 10*time.Second, 10*time.Millisecond, func() error {
_, err := kv.Get(context.Background(), key)
if err == nil {
return fmt.Errorf("Expected key to be gone")
return errors.New("Expected key to be gone")
}
if !errors.Is(err, jetstream.ErrKeyNotFound) {
return err
Expand Down
2 changes: 1 addition & 1 deletion jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
errs <- fmt.Errorf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
}
case <-time.After(5 * time.Second):
errs <- fmt.Errorf("Did not receive completion signal")
errs <- errors.New("Did not receive completion signal")
}
wg.Done()
}(ack)
Expand Down
32 changes: 16 additions & 16 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
o.ttl = js.opts.wait
}
if o.stallWait > 0 {
return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish")
return nil, errors.New("nats: stall wait cannot be set to sync publish")
}

if o.id != _EMPTY_ {
Expand Down Expand Up @@ -1143,7 +1143,7 @@ func RetryAttempts(num int) PubOpt {
func StallWait(ttl time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
if ttl <= 0 {
return fmt.Errorf("nats: stall wait should be more than 0")
return errors.New("nats: stall wait should be more than 0")
}
opts.stallWait = ttl
return nil
Expand Down Expand Up @@ -1501,11 +1501,11 @@ func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode boo
// Prevent an user from attempting to create a queue subscription on
// a JS consumer that was not created with a deliver group.
if queue != _EMPTY_ {
return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group")
return _EMPTY_, errors.New("cannot create a queue subscription for a consumer without a deliver group")
} else if info.PushBound {
// Need to reject a non queue subscription to a non queue consumer
// if the consumer is already bound.
return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription")
return _EMPTY_, errors.New("consumer is already bound to a subscription")
}
} else {
// If the JS consumer has a deliver group, we need to fail a non queue
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// If no stream name is specified, the subject cannot be empty.
if subj == _EMPTY_ && o.stream == _EMPTY_ {
return nil, fmt.Errorf("nats: subject required")
return nil, errors.New("nats: subject required")
}

// Note that these may change based on the consumer info response we may get.
Expand All @@ -1629,7 +1629,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// would subscribe to and server would send on.
if o.cfg.Heartbeat > 0 || o.cfg.FlowControl {
// Not making this a public ErrXXX in case we allow in the future.
return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control")
return nil, errors.New("nats: queue subscription doesn't support idle heartbeat nor flow control")
}

// If this is a queue subscription and no consumer nor durable name was specified,
Expand Down Expand Up @@ -1667,31 +1667,31 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if o.ordered {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
return nil, errors.New("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
return nil, errors.New("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
return nil, errors.New("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
return nil, errors.New("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
return nil, errors.New("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
return nil, errors.New("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
return nil, errors.New("nats: can not use pull mode for an ordered consumer")
}
// Setup how we need it to be here.
o.cfg.FlowControl = true
Expand Down Expand Up @@ -2425,7 +2425,7 @@ func Description(description string) SubOpt {
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if opts.cfg.Durable != _EMPTY_ {
return fmt.Errorf("nats: option Durable set more than once")
return errors.New("nats: option Durable set more than once")
}
if opts.consumer != _EMPTY_ && opts.consumer != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
Expand Down Expand Up @@ -3950,7 +3950,7 @@ func (alg StoreCompression) MarshalJSON() ([]byte, error) {
case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
return nil, errors.New("unknown compression algorithm")
}
return json.Marshal(str)
}
Expand All @@ -3966,7 +3966,7 @@ func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
return errors.New("unknown compression algorithm")
}
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,27 +1330,27 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
// Check for headers that give us the required information to
// reconstruct the message.
if len(r.Header) == 0 {
return nil, fmt.Errorf("nats: response should have headers")
return nil, errors.New("nats: response should have headers")
}
stream := r.Header.Get(JSStream)
if stream == _EMPTY_ {
return nil, fmt.Errorf("nats: missing stream header")
return nil, errors.New("nats: missing stream header")
}

// Mirrors can now answer direct gets, so removing check for name equality.
// TODO(dlc) - We could have server also have a header with origin and check that?

seqStr := r.Header.Get(JSSequence)
if seqStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing sequence header")
return nil, errors.New("nats: missing sequence header")
}
seq, err := strconv.ParseUint(seqStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("nats: invalid sequence header '%s': %v", seqStr, err)
}
timeStr := r.Header.Get(JSTimeStamp)
if timeStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing timestamp header")
return nil, errors.New("nats: missing timestamp header")
}
// Temporary code: the server in main branch is sending with format
// "2006-01-02 15:04:05.999999999 +0000 UTC", but will be changed
Expand All @@ -1365,7 +1365,7 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
}
subj := r.Header.Get(JSSubject)
if subj == _EMPTY_ {
return nil, fmt.Errorf("nats: missing subject header")
return nil, errors.New("nats: missing subject header")
}
return &RawStreamMsg{
Subject: subj,
Expand Down
6 changes: 3 additions & 3 deletions micro/test/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func TestAddService(t *testing.T) {
}

if test.givenConfig.ErrorHandler != nil {
go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, fmt.Errorf("oops"))
go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, errors.New("oops"))
select {
case <-errService:
case <-time.After(1 * time.Second):
Expand Down Expand Up @@ -536,7 +536,7 @@ func TestAddService(t *testing.T) {
}
}
if test.natsErrorHandler != nil {
go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, fmt.Errorf("oops"))
go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.asyncErrorSubject}, errors.New("oops"))
select {
case <-errService:
t.Fatalf("Expected to restore nats error handler")
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestErrHandlerSubjectMatch(t *testing.T) {
}
defer svc.Stop()

go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.errSubject}, fmt.Errorf("oops"))
go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.errSubject}, errors.New("oops"))
if test.expectServiceErr {
select {
case <-errChan:
Expand Down
6 changes: 3 additions & 3 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ func ProxyPath(path string) Option {
func CustomInboxPrefix(p string) Option {
return func(o *Options) error {
if p == "" || strings.Contains(p, ">") || strings.Contains(p, "*") || strings.HasSuffix(p, ".") {
return fmt.Errorf("nats: invalid custom prefix")
return errors.New("nats: invalid custom prefix")
}
o.InboxPrefix = p
return nil
Expand Down Expand Up @@ -1814,7 +1814,7 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
if len(nc.srvPool) == 0 {
nc.ws = isWS
} else if isWS && !nc.ws || !isWS && nc.ws {
return fmt.Errorf("mixing of websocket and non websocket URLs is not allowed")
return errors.New("mixing of websocket and non websocket URLs is not allowed")
}

var tlsName string
Expand Down Expand Up @@ -5792,7 +5792,7 @@ func NkeyOptionFromSeed(seedFile string) (Option, error) {
return nil, err
}
if !nkeys.IsValidPublicUserKey(pub) {
return nil, fmt.Errorf("nats: Not a valid nkey user seed")
return nil, errors.New("nats: Not a valid nkey user seed")
}
sigCB := func(nonce []byte) ([]byte, error) {
return sigHandler(nonce, seedFile)
Expand Down
4 changes: 2 additions & 2 deletions test/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ func (d *checkPoolUpdatedDialer) Dial(network, address string) (net.Conn, error)
doReal = true
} else if d.final {
d.ra++
return nil, fmt.Errorf("On purpose")
return nil, errors.New("On purpose")
} else {
d.ra++
if d.ra == 15 {
Expand All @@ -698,7 +698,7 @@ func (d *checkPoolUpdatedDialer) Dial(network, address string) (net.Conn, error)
d.conn = c
return c, nil
}
return nil, fmt.Errorf("On purpose")
return nil, errors.New("On purpose")
}

func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
Expand Down
Loading

0 comments on commit 8bd1736

Please sign in to comment.