diff --git a/internal/ldap/dn_test.go b/internal/ldap/dn_test.go index c9db86c098..0e791fbc38 100644 --- a/internal/ldap/dn_test.go +++ b/internal/ldap/dn_test.go @@ -1,5 +1,7 @@ // Copyright (c) 2011-2015 Michael Mitton (mmitton@gmail.com) // Portions copyright (c) 2015-2016 go-ldap Authors +// Static-Check Fixes Copyright 2024 The NATS Authors + package ldap import ( @@ -53,7 +55,7 @@ func TestSuccessfulDNParsing(t *testing.T) { for test, answer := range testcases { dn, err := ParseDN(test) if err != nil { - t.Errorf(err.Error()) + t.Error(err.Error()) continue } if !reflect.DeepEqual(dn, &answer) { diff --git a/server/certidp/certidp.go b/server/certidp/certidp.go index c1d9678231..a26618577b 100644 --- a/server/certidp/certidp.go +++ b/server/certidp/certidp.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -222,7 +222,7 @@ func CertOCSPEligible(link *ChainLink) bool { if link == nil || link.Leaf.Raw == nil || len(link.Leaf.Raw) == 0 { return false } - if link.Leaf.OCSPServer == nil || len(link.Leaf.OCSPServer) == 0 { + if len(link.Leaf.OCSPServer) == 0 { return false } urls := getWebEndpoints(link.Leaf.OCSPServer) diff --git a/server/certidp/ocsp_responder.go b/server/certidp/ocsp_responder.go index 6e210f2b5d..ad6c2651bc 100644 --- a/server/certidp/ocsp_responder.go +++ b/server/certidp/ocsp_responder.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,6 +15,7 @@ package certidp import ( "encoding/base64" + "errors" "fmt" "io" "net/http" @@ -26,7 +27,7 @@ import ( func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte, error) { if link == nil || link.Leaf == nil || link.Issuer == nil || opts == nil || log == nil { - return nil, fmt.Errorf(ErrInvalidChainlink) + return nil, errors.New(ErrInvalidChainlink) } timeout := time.Duration(opts.Timeout * float64(time.Second)) @@ -59,7 +60,7 @@ func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte, responders := *link.OCSPWebEndpoints if len(responders) == 0 { - return nil, fmt.Errorf(ErrNoAvailOCSPServers) + return nil, errors.New(ErrNoAvailOCSPServers) } var raw []byte diff --git a/server/client.go b/server/client.go index 19886e7090..fa0b445d27 100644 --- a/server/client.go +++ b/server/client.go @@ -2972,7 +2972,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri if err := im.acc.sl.Insert(&nsub); err != nil { errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name) c.Debugf(errs) - return nil, fmt.Errorf(errs) + return nil, errors.New(errs) } // Update our route map here. But only if we are not a leaf node or a hub leafnode. diff --git a/server/filestore.go b/server/filestore.go index 6b959a34fc..f858c1a436 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -239,6 +239,7 @@ type msgBlock struct { noTrack bool needSync bool syncAlways bool + noCompact bool closed bool // Used to mock write failures. @@ -3959,6 +3960,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( mb.bytes = 0 } + // Allow us to check compaction again. + mb.noCompact = false + // Mark as dirty for stream state. fs.dirty++ @@ -4075,7 +4079,7 @@ func (mb *msgBlock) shouldCompactInline() bool { // Ignores 2MB minimum. // Lock should be held. func (mb *msgBlock) shouldCompactSync() bool { - return mb.bytes*2 < mb.rbytes + return mb.bytes*2 < mb.rbytes && !mb.noCompact } // This will compact and rewrite this block. This should only be called when we know we want to rewrite this block. @@ -4184,7 +4188,12 @@ func (mb *msgBlock) compact() { mb.needSync = true // Capture the updated rbytes. - mb.rbytes = uint64(len(nbuf)) + if rbytes := uint64(len(nbuf)); rbytes == mb.rbytes { + // No change, so set our noCompact bool here to avoid attempting to continually compress in syncBlocks. + mb.noCompact = true + } else { + mb.rbytes = rbytes + } // Remove any seqs from the beginning of the blk. for seq, nfseq := fseq, atomic.LoadUint64(&mb.first.seq); seq < nfseq; seq++ { diff --git a/server/filestore_test.go b/server/filestore_test.go index 1ab23b09ec..d986062204 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7379,6 +7379,83 @@ func TestFileStoreCheckSkipFirstBlockNotLoadOldBlocks(t *testing.T) { require_Equal(t, loaded, 1) } +func TestFileStoreSyncCompressOnlyIfDirty(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 256, SyncInterval: 250 * time.Millisecond}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("hello") + + // 6 msgs per block. + // Fill 2 blocks. + for i := 0; i < 12; i++ { + fs.StoreMsg("foo.BB", nil, msg) + } + // Create third block with just one message in it. + fs.StoreMsg("foo.BB", nil, msg) + + // Should have created 3 blocks. + require_Equal(t, fs.numMsgBlocks(), 3) + + // Now delete a bunch that will will fill up 3 block with tombstones. + for _, seq := range []uint64{2, 3, 4, 5, 8, 9, 10, 11} { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + // Now make sure we add 4th block so syncBlocks will try to compress. + for i := 0; i < 6; i++ { + fs.StoreMsg("foo.BB", nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 4) + + // All should have compact set. + fs.mu.Lock() + // Only check first 3 blocks. + for i := 0; i < 3; i++ { + mb := fs.blks[i] + mb.mu.Lock() + shouldCompact := mb.shouldCompactSync() + mb.mu.Unlock() + if !shouldCompact { + fs.mu.Unlock() + t.Fatalf("Expected should compact to be true for %d, got false", mb.getIndex()) + } + } + fs.mu.Unlock() + + // Let sync run. + time.Sleep(300 * time.Millisecond) + + // We want to make sure the last block, which is filled with tombstones and is not compactable, returns false now. + fs.mu.Lock() + for _, mb := range fs.blks { + mb.mu.Lock() + shouldCompact := mb.shouldCompactSync() + mb.mu.Unlock() + if shouldCompact { + fs.mu.Unlock() + t.Fatalf("Expected should compact to be false for %d, got true", mb.getIndex()) + } + } + fs.mu.Unlock() + + // Now remove some from block 3 and verify that compact is not suppressed. + _, err = fs.RemoveMsg(13) + require_NoError(t, err) + + fs.mu.Lock() + mb := fs.blks[2] // block 3. + mb.mu.Lock() + noCompact := mb.noCompact + mb.mu.Unlock() + fs.mu.Unlock() + // Verify that since we deleted a message we should be considered for compaction again in syncBlocks(). + require_False(t, noCompact) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/gateway_test.go b/server/gateway_test.go index 9dbe0230d8..4c8eb774c6 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2020 The NATS Authors +// Copyright 2018-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -211,7 +211,7 @@ func waitCh(t *testing.T, ch chan bool, errTxt string) { case <-ch: return case <-time.After(5 * time.Second): - t.Fatalf(errTxt) + t.Fatal(errTxt) } } @@ -5055,7 +5055,7 @@ func TestGatewayMapReplyOnlyForRecentSub(t *testing.T) { select { case e := <-errCh: if e != nil { - t.Fatalf(e.Error()) + t.Fatal(e.Error()) } case <-time.After(time.Second): t.Fatalf("Did not get replies") diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 2a3eebef50..e3b2c4007f 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5857,15 +5857,15 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { select { case dl := <-loggers[0].dbgCh: if strings.Contains(dl, condition) { - errCh <- fmt.Errorf(condition) + errCh <- errors.New(condition) } case dl := <-loggers[1].dbgCh: if strings.Contains(dl, condition) { - errCh <- fmt.Errorf(condition) + errCh <- errors.New(condition) } case dl := <-loggers[2].dbgCh: if strings.Contains(dl, condition) { - errCh <- fmt.Errorf(condition) + errCh <- errors.New(condition) } case <-ctx.Done(): return diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 44a6a81559..3dbb5bccdb 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "strconv" "strings" "sync" @@ -2543,3 +2544,396 @@ func TestJetStreamClusterKeyValueDesyncAfterHardKill(t *testing.T) { err = checkState(t, c, "$G", "KV_inconsistency") require_NoError(t, err) } + +func TestJetStreamClusterKeyValueSync(t *testing.T) { + t.Skip("Too long for CI at the moment") + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.LameDuckDuration = 15 * time.Second + s.opts.LameDuckGracePeriod = -15 * time.Second + s.optsMu.Unlock() + } + s := c.randomNonLeader() + connect := func(t *testing.T) (*nats.Conn, nats.JetStreamContext) { + return jsClientConnect(t, s) + } + + const accountName = "$G" + const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + createData := func(n int) []byte { + b := make([]byte, n) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return b + } + getOrCreateKvStore := func(kvname string) (nats.KeyValue, error) { + _, js := connect(t) + kvExists := false + existingKvnames := js.KeyValueStoreNames() + for existingKvname := range existingKvnames { + if existingKvname == kvname { + kvExists = true + break + } + } + if !kvExists { + return js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: kvname, + Replicas: 3, + Storage: nats.FileStorage, + }) + } else { + return js.KeyValue(kvname) + } + } + abs := func(x int64) int64 { + if x < 0 { + return -x + } + return x + } + var counter int64 + var errorCounter int64 + + getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail { + t.Helper() + srv := c.streamLeader(accountName, streamName) + if srv == nil { + return nil + } + jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) + require_NoError(t, err) + for _, acc := range jsz.AccountDetails { + if acc.Name == accountName { + for _, stream := range acc.Streams { + if stream.Name == streamName { + return &stream + } + } + } + } + t.Error("Could not find account details") + return nil + } + checkState := func(t *testing.T, c *cluster, accountName, streamName string) error { + t.Helper() + + leaderSrv := c.streamLeader(accountName, streamName) + if leaderSrv == nil { + return fmt.Errorf("no leader server found for stream %q", streamName) + } + streamLeader := getStreamDetails(t, c, accountName, streamName) + if streamLeader == nil { + return fmt.Errorf("no leader found for stream %q", streamName) + } + var errs []error + for _, srv := range c.servers { + if srv == leaderSrv { + // Skip self + continue + } + acc, err := srv.LookupAccount(accountName) + require_NoError(t, err) + stream, err := acc.lookupStream(streamName) + require_NoError(t, err) + state := stream.state() + + if state.Msgs != streamLeader.State.Msgs { + err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages", + streamName, leaderSrv, streamLeader.State.Msgs, + srv, state.Msgs, + ) + errs = append(errs, err) + } + if state.FirstSeq != streamLeader.State.FirstSeq { + err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d", + streamName, leaderSrv, streamLeader.State.FirstSeq, + srv, state.FirstSeq, + ) + errs = append(errs, err) + } + if state.LastSeq != streamLeader.State.LastSeq { + err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d", + streamName, leaderSrv, streamLeader.State.LastSeq, + srv, state.LastSeq, + ) + errs = append(errs, err) + } + if state.NumDeleted != streamLeader.State.NumDeleted { + err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d\nSTATE_A: %+v\nSTATE_B: %+v\n", + streamName, leaderSrv, streamLeader.State.NumDeleted, + srv, state.NumDeleted, streamLeader.State, state, + ) + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil + } + + checkMsgsEqual := func(t *testing.T, accountName, streamName string) error { + // Gather all the streams replicas and compare contents. + msets := make(map[*Server]*stream) + for _, s := range c.servers { + acc, err := s.LookupAccount(accountName) + if err != nil { + return err + } + mset, err := acc.lookupStream(streamName) + if err != nil { + return err + } + msets[s] = mset + } + + str := getStreamDetails(t, c, accountName, streamName) + if str == nil { + return fmt.Errorf("could not get stream leader state") + } + state := str.State + for seq := state.FirstSeq; seq <= state.LastSeq; seq++ { + var msgId string + var smv StoreMsg + for replica, mset := range msets { + mset.mu.RLock() + sm, err := mset.store.LoadMsg(seq, &smv) + mset.mu.RUnlock() + if err != nil { + if err == ErrStoreMsgNotFound || err == errDeletedMsg { + // Skip these. + } else { + t.Logf("WRN: Error loading message (seq=%d) from stream %q on replica %q: %v", seq, streamName, replica, err) + } + continue + } + if msgId == _EMPTY_ { + msgId = string(sm.hdr) + } else if msgId != string(sm.hdr) { + t.Errorf("MsgIds do not match for seq %d on stream %q: %q vs %q", seq, streamName, msgId, sm.hdr) + } + } + } + return nil + } + + keyUpdater := func(ctx context.Context, cancel context.CancelFunc, kvname string, numKeys int) { + kv, err := getOrCreateKvStore(kvname) + if err != nil { + t.Fatalf("[%s]:%v", kvname, err) + } + for i := 0; i < numKeys; i++ { + key := fmt.Sprintf("key-%d", i) + kv.Create(key, createData(160)) + } + lastData := make(map[string][]byte) + revisions := make(map[string]uint64) + for { + select { + case <-ctx.Done(): + return + default: + } + r := rand.Intn(numKeys) + key := fmt.Sprintf("key-%d", r) + + for i := 0; i < 5; i++ { + _, err := kv.Get(key) + if err != nil { + atomic.AddInt64(&errorCounter, 1) + if err == nats.ErrKeyNotFound { + t.Logf("WRN: Key not found! [%s/%s] - [%s]", kvname, key, err) + cancel() + } + } + } + + k, err := kv.Get(key) + if err != nil { + atomic.AddInt64(&errorCounter, 1) + } else { + if revisions[key] != 0 && abs(int64(k.Revision())-int64(revisions[key])) < 2 { + lastDataVal, ok := lastData[key] + if ok && k.Revision() == revisions[key] && slices.Compare(lastDataVal, k.Value()) != 0 { + t.Logf("data loss [%s/%s][rev:%d] expected:[%v] is:[%v]", kvname, key, revisions[key], string(lastDataVal), string(k.Value())) + } + } + newData := createData(160) + revisions[key], err = kv.Update(key, newData, k.Revision()) + if err != nil && err != nats.ErrTimeout { + atomic.AddInt64(&errorCounter, 1) + } else { + lastData[key] = newData + } + atomic.AddInt64(&counter, 1) + } + } + } + + streamCount := 50 + keysCount := 100 + streamPrefix := "IKV" + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // The keyUpdaters will run for less time. + kctx, kcancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer kcancel() + + var wg sync.WaitGroup + var streams []string + for i := 0; i < streamCount; i++ { + streamName := fmt.Sprintf("%s-%d", streamPrefix, i) + streams = append(streams, "KV_"+streamName) + + wg.Add(1) + go func(i int) { + defer wg.Done() + keyUpdater(kctx, cancel, streamName, keysCount) + }(i) + } + + debug := false + nc2, _ := jsClientConnect(t, s) + if debug { + go func() { + for range time.NewTicker(5 * time.Second).C { + select { + case <-ctx.Done(): + return + default: + } + for _, str := range streams { + leaderSrv := c.streamLeader(accountName, str) + if leaderSrv == nil { + continue + } + streamLeader := getStreamDetails(t, c, accountName, str) + if streamLeader == nil { + continue + } + t.Logf("|------------------------------------------------------------------------------------------------------------------------|") + lstate := streamLeader.State + t.Logf("| %-10s | %-10s | msgs:%-10d | bytes:%-10d | deleted:%-10d | first:%-10d | last:%-10d |", + str, leaderSrv.String()+"*", lstate.Msgs, lstate.Bytes, lstate.NumDeleted, lstate.FirstSeq, lstate.LastSeq, + ) + for _, srv := range c.servers { + if srv == leaderSrv { + continue + } + acc, err := srv.LookupAccount(accountName) + if err != nil { + continue + } + stream, err := acc.lookupStream(str) + if err != nil { + t.Logf("Error looking up stream %s on %s replica", str, srv) + continue + } + state := stream.state() + + unsynced := lstate.Msgs != state.Msgs || lstate.Bytes != state.Bytes || + lstate.NumDeleted != state.NumDeleted || lstate.FirstSeq != state.FirstSeq || lstate.LastSeq != state.LastSeq + + var result string + if unsynced { + result = "UNSYNCED" + } + t.Logf("| %-10s | %-10s | msgs:%-10d | bytes:%-10d | deleted:%-10d | first:%-10d | last:%-10d | %s", + str, srv, state.Msgs, state.Bytes, state.NumDeleted, state.FirstSeq, state.LastSeq, result, + ) + } + } + t.Logf("|------------------------------------------------------------------------------------------------------------------------| %v", nc2.ConnectedUrl()) + } + }() + } + + checkStreams := func(t *testing.T) { + for _, str := range streams { + checkFor(t, time.Minute, 500*time.Millisecond, func() error { + return checkState(t, c, accountName, str) + }) + checkFor(t, time.Minute, 500*time.Millisecond, func() error { + return checkMsgsEqual(t, accountName, str) + }) + } + } + +Loop: + for range time.NewTicker(30 * time.Second).C { + select { + case <-ctx.Done(): + break Loop + default: + } + rollout := func(t *testing.T) { + for _, s := range c.servers { + // For graceful mode + s.lameDuckMode() + s.WaitForShutdown() + s = c.restartServer(s) + + hctx, hcancel := context.WithTimeout(context.Background(), 15*time.Second) + defer hcancel() + + Healthz: + for range time.NewTicker(2 * time.Second).C { + select { + case <-hctx.Done(): + default: + } + + status := s.healthz(nil) + if status.StatusCode == 200 { + break Healthz + } + } + c.waitOnClusterReady() + checkStreams(t) + } + } + rollout(t) + checkStreams(t) + } + wg.Wait() + checkStreams(t) +} + +func TestJetStreamClusterKeyValueLastSeqMismatch(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "mismatch", + Replicas: 3, + }) + require_NoError(t, err) + + revision, err := kv.Create("foo", []byte("1")) + require_NoError(t, err) + require_Equal(t, revision, 1) + + revision, err = kv.Create("bar", []byte("2")) + require_NoError(t, err) + require_Equal(t, revision, 2) + + // Now delete foo from sequence 1. + // This needs to be low level remove (or system level) to test the condition we want here. + err = js.DeleteMsg("KV_mismatch", 1) + require_Error(t, err) + + // Now say we want to update baz but iff last was revision 1. + _, err = kv.Update("baz", []byte("3"), uint64(1)) + require_Error(t, err) + require_Equal(t, err.Error(), `nats: wrong last sequence: 0`) +} diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 4c1fba80fd..1ad01d8f4d 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22412,7 +22412,7 @@ func TestJetStreamSubjectFilteredPurgeClearsPendingAcks(t *testing.T) { } // https://github.com/nats-io/nats-server/issues/4878 -func TestInterestConsumerFilterEdit(t *testing.T) { +func TestJetStreamInterestStreamConsumerFilterEdit(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/stream.go b/server/stream.go index 95b18e1514..cdcf6e6aa0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4397,8 +4397,17 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if sm != nil { fseq = sm.seq } - if err == ErrStoreMsgNotFound && seq == 0 { - fseq, err = 0, nil + if err == ErrStoreMsgNotFound { + if seq == 0 { + fseq, err = 0, nil + } else { + // Do not bump clfs in case message was not found and could have been deleted. + var ss StreamState + store.FastState(&ss) + if seq <= ss.LastSeq { + fseq, err = seq, nil + } + } } if err != nil || fseq != seq { mset.mu.Unlock() diff --git a/server/stree/node48.go b/server/stree/node48.go index fe7ef54352..7099edd58b 100644 --- a/server/stree/node48.go +++ b/server/stree/node48.go @@ -51,9 +51,9 @@ func (n *node48) isFull() bool { return n.size >= 48 } func (n *node48) grow() node { nn := newNode256(n.prefix) - for c := byte(0); c < 255; c++ { - if i := n.key[c]; i > 0 { - nn.addChild(c, n.child[i-1]) + for c := 0; c < len(n.key); c++ { + if i := n.key[byte(c)]; i > 0 { + nn.addChild(byte(c), n.child[i-1]) } } return nn @@ -69,9 +69,9 @@ func (n *node48) deleteChild(c byte) { last := byte(n.size - 1) if i < last { n.child[i] = n.child[last] - for c := byte(0); c <= 255; c++ { - if n.key[c] == last+1 { - n.key[c] = i + 1 + for ic := 0; ic < len(n.key); ic++ { + if n.key[byte(ic)] == last+1 { + n.key[byte(ic)] = i + 1 break } } @@ -87,9 +87,9 @@ func (n *node48) shrink() node { return nil } nn := newNode16(nil) - for c := byte(0); c < 255; c++ { - if i := n.key[c]; i > 0 { - nn.addChild(c, n.child[i-1]) + for c := 0; c < len(n.key); c++ { + if i := n.key[byte(c)]; i > 0 { + nn.addChild(byte(c), n.child[i-1]) } } return nn diff --git a/server/stree/stree_test.go b/server/stree/stree_test.go index bbe2619919..7207e7e691 100644 --- a/server/stree/stree_test.go +++ b/server/stree/stree_test.go @@ -760,6 +760,15 @@ func TestSubjectTreeNode48(t *testing.T) { require_Equal(t, iterations, 2) require_True(t, gotB) require_True(t, gotC) + + // Check for off-by-one on byte 255 as found by staticcheck, see + // https://github.com/nats-io/nats-server/pull/5826. + n.addChild(255, &c) + require_Equal(t, n.key[255], 3) + grown := n.grow().(*node256) + require_True(t, grown.findChild(255) != nil) + shrunk := n.shrink().(*node16) + require_True(t, shrunk.findChild(255) != nil) } func TestSubjectTreeMatchNoCallbackDupe(t *testing.T) { diff --git a/server/sublist_test.go b/server/sublist_test.go index 966401b5f1..beab433412 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2023 The NATS Authors +// Copyright 2016-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -984,7 +984,7 @@ func TestSublistRaceOnMatch(t *testing.T) { wg.Wait() select { case e := <-errCh: - t.Fatalf(e.Error()) + t.Fatal(e.Error()) default: } } diff --git a/server/websocket.go b/server/websocket.go index 1752942303..6fce09dd9f 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -667,7 +667,7 @@ func (c *client) wsHandleProtocolError(message string) error { buf := wsCreateCloseMessage(wsCloseStatusProtocolError, message) c.wsEnqueueControlMessage(wsCloseMessage, buf) nbPoolPut(buf) // wsEnqueueControlMessage has taken a copy. - return fmt.Errorf(message) + return errors.New(message) } // Create a close message with the given `status` and `body`. diff --git a/test/client_cluster_test.go b/test/client_cluster_test.go index 4047f3d33d..b7da3a73a9 100644 --- a/test/client_cluster_test.go +++ b/test/client_cluster_test.go @@ -1,4 +1,4 @@ -// Copyright 2013-2019 The NATS Authors +// Copyright 2013-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -321,7 +321,7 @@ func TestRequestsAcrossRoutes(t *testing.T) { nc1.Flush() if err := checkExpectedSubs(1, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } var resp string @@ -372,7 +372,7 @@ func TestRequestsAcrossRoutesToQueues(t *testing.T) { }) if err := checkExpectedSubs(2, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } var resp string diff --git a/test/leafnode_test.go b/test/leafnode_test.go index b1bfb132bb..a945173281 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2020 The NATS Authors +// Copyright 2019-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -4376,13 +4376,13 @@ func TestLeafnodeHeaders(t *testing.T) { snc, err := nats.Connect(srv.ClientURL()) if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } defer snc.Close() lnc, err := nats.Connect(leaf.ClientURL()) if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } defer lnc.Close() @@ -4407,7 +4407,7 @@ func TestLeafnodeHeaders(t *testing.T) { } err = snc.PublishMsg(msg) if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } smsg, err := ssub.NextMsg(time.Second) diff --git a/test/new_routes_test.go b/test/new_routes_test.go index 174c6d9a43..c8cc0b79a5 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -892,7 +892,7 @@ func TestNewRouteSinglePublishOnNewAccount(t *testing.T) { expectA(pongRe) if err := checkExpectedSubs(1, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } clientB := createClientConn(t, optsB.Host, optsB.Port) @@ -1097,7 +1097,7 @@ func testNewRouteStreamImport(t *testing.T, duplicateSub bool) { // a subscription on "foo" for account $foo due to import. // So total of 2 subs. if err := checkExpectedSubs(2, srvA); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } // Send on clientA @@ -1122,7 +1122,7 @@ func testNewRouteStreamImport(t *testing.T, duplicateSub bool) { expectB(pongRe) if err := checkExpectedSubs(0, srvA); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } sendA("PUB foo 2\r\nok\r\nPING\r\n") diff --git a/test/norace_test.go b/test/norace_test.go index c14a56a0d3..eb5252e2a6 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2020 The NATS Authors +// Copyright 2019-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -96,7 +96,7 @@ func TestNoRaceRouteSendSubs(t *testing.T) { clientBExpect(pongRe) if err := checkExpectedSubs(totalPerServer, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } routes := fmt.Sprintf(` @@ -113,7 +113,7 @@ func TestNoRaceRouteSendSubs(t *testing.T) { checkClusterFormed(t, srvA, srvB) if err := checkExpectedSubs(2*totalPerServer, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } checkSlowConsumers := func(t *testing.T) { @@ -166,7 +166,7 @@ func TestNoRaceRouteSendSubs(t *testing.T) { defer requestorOnB.Close() if err := checkExpectedSubs(2*totalPerServer+2, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } totalReplies := 120000