From c92d9c149acad4904603b5df5e608a3e59cd11d7 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 23 Aug 2024 22:00:52 +0200 Subject: [PATCH 1/4] Correct test name Signed-off-by: Maurice van Veen --- server/jetstream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 4c1fba80fd..1ad01d8f4d 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22412,7 +22412,7 @@ func TestJetStreamSubjectFilteredPurgeClearsPendingAcks(t *testing.T) { } // https://github.com/nats-io/nats-server/issues/4878 -func TestInterestConsumerFilterEdit(t *testing.T) { +func TestJetStreamInterestStreamConsumerFilterEdit(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() From 208fdcd7d03b5750eeb4003465bb6e88eec8cef3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 25 Aug 2024 14:11:47 -0700 Subject: [PATCH 2/4] [FIXED] New staticcheck fixes (#5826) @neilalexander I do believe this uncovered two off by one conditions in node48 under stree. Could you take a look, kept that change separate. --------- Signed-off-by: Derek Collison Signed-off-by: Neil Twigg Co-authored-by: Neil Twigg --- internal/ldap/dn_test.go | 4 +++- server/certidp/certidp.go | 4 ++-- server/certidp/ocsp_responder.go | 7 ++++--- server/client.go | 2 +- server/gateway_test.go | 6 +++--- server/jetstream_cluster_3_test.go | 6 +++--- server/stree/node48.go | 18 +++++++++--------- server/stree/stree_test.go | 9 +++++++++ server/sublist_test.go | 4 ++-- server/websocket.go | 2 +- test/client_cluster_test.go | 6 +++--- test/leafnode_test.go | 8 ++++---- test/new_routes_test.go | 6 +++--- test/norace_test.go | 8 ++++---- 14 files changed, 51 insertions(+), 39 deletions(-) diff --git a/internal/ldap/dn_test.go b/internal/ldap/dn_test.go index c9db86c098..0e791fbc38 100644 --- a/internal/ldap/dn_test.go +++ b/internal/ldap/dn_test.go @@ -1,5 +1,7 @@ // Copyright (c) 2011-2015 Michael Mitton (mmitton@gmail.com) // Portions copyright (c) 2015-2016 go-ldap Authors +// Static-Check Fixes Copyright 2024 The NATS Authors + package ldap import ( @@ -53,7 +55,7 @@ func TestSuccessfulDNParsing(t *testing.T) { for test, answer := range testcases { dn, err := ParseDN(test) if err != nil { - t.Errorf(err.Error()) + t.Error(err.Error()) continue } if !reflect.DeepEqual(dn, &answer) { diff --git a/server/certidp/certidp.go b/server/certidp/certidp.go index c1d9678231..a26618577b 100644 --- a/server/certidp/certidp.go +++ b/server/certidp/certidp.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -222,7 +222,7 @@ func CertOCSPEligible(link *ChainLink) bool { if link == nil || link.Leaf.Raw == nil || len(link.Leaf.Raw) == 0 { return false } - if link.Leaf.OCSPServer == nil || len(link.Leaf.OCSPServer) == 0 { + if len(link.Leaf.OCSPServer) == 0 { return false } urls := getWebEndpoints(link.Leaf.OCSPServer) diff --git a/server/certidp/ocsp_responder.go b/server/certidp/ocsp_responder.go index 6e210f2b5d..ad6c2651bc 100644 --- a/server/certidp/ocsp_responder.go +++ b/server/certidp/ocsp_responder.go @@ -1,4 +1,4 @@ -// Copyright 2023 The NATS Authors +// Copyright 2023-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -15,6 +15,7 @@ package certidp import ( "encoding/base64" + "errors" "fmt" "io" "net/http" @@ -26,7 +27,7 @@ import ( func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte, error) { if link == nil || link.Leaf == nil || link.Issuer == nil || opts == nil || log == nil { - return nil, fmt.Errorf(ErrInvalidChainlink) + return nil, errors.New(ErrInvalidChainlink) } timeout := time.Duration(opts.Timeout * float64(time.Second)) @@ -59,7 +60,7 @@ func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte, responders := *link.OCSPWebEndpoints if len(responders) == 0 { - return nil, fmt.Errorf(ErrNoAvailOCSPServers) + return nil, errors.New(ErrNoAvailOCSPServers) } var raw []byte diff --git a/server/client.go b/server/client.go index 19886e7090..fa0b445d27 100644 --- a/server/client.go +++ b/server/client.go @@ -2972,7 +2972,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri if err := im.acc.sl.Insert(&nsub); err != nil { errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name) c.Debugf(errs) - return nil, fmt.Errorf(errs) + return nil, errors.New(errs) } // Update our route map here. But only if we are not a leaf node or a hub leafnode. diff --git a/server/gateway_test.go b/server/gateway_test.go index 9dbe0230d8..4c8eb774c6 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2020 The NATS Authors +// Copyright 2018-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -211,7 +211,7 @@ func waitCh(t *testing.T, ch chan bool, errTxt string) { case <-ch: return case <-time.After(5 * time.Second): - t.Fatalf(errTxt) + t.Fatal(errTxt) } } @@ -5055,7 +5055,7 @@ func TestGatewayMapReplyOnlyForRecentSub(t *testing.T) { select { case e := <-errCh: if e != nil { - t.Fatalf(e.Error()) + t.Fatal(e.Error()) } case <-time.After(time.Second): t.Fatalf("Did not get replies") diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 2a3eebef50..e3b2c4007f 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5857,15 +5857,15 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { select { case dl := <-loggers[0].dbgCh: if strings.Contains(dl, condition) { - errCh <- fmt.Errorf(condition) + errCh <- errors.New(condition) } case dl := <-loggers[1].dbgCh: if strings.Contains(dl, condition) { - errCh <- fmt.Errorf(condition) + errCh <- errors.New(condition) } case dl := <-loggers[2].dbgCh: if strings.Contains(dl, condition) { - errCh <- fmt.Errorf(condition) + errCh <- errors.New(condition) } case <-ctx.Done(): return diff --git a/server/stree/node48.go b/server/stree/node48.go index fe7ef54352..7099edd58b 100644 --- a/server/stree/node48.go +++ b/server/stree/node48.go @@ -51,9 +51,9 @@ func (n *node48) isFull() bool { return n.size >= 48 } func (n *node48) grow() node { nn := newNode256(n.prefix) - for c := byte(0); c < 255; c++ { - if i := n.key[c]; i > 0 { - nn.addChild(c, n.child[i-1]) + for c := 0; c < len(n.key); c++ { + if i := n.key[byte(c)]; i > 0 { + nn.addChild(byte(c), n.child[i-1]) } } return nn @@ -69,9 +69,9 @@ func (n *node48) deleteChild(c byte) { last := byte(n.size - 1) if i < last { n.child[i] = n.child[last] - for c := byte(0); c <= 255; c++ { - if n.key[c] == last+1 { - n.key[c] = i + 1 + for ic := 0; ic < len(n.key); ic++ { + if n.key[byte(ic)] == last+1 { + n.key[byte(ic)] = i + 1 break } } @@ -87,9 +87,9 @@ func (n *node48) shrink() node { return nil } nn := newNode16(nil) - for c := byte(0); c < 255; c++ { - if i := n.key[c]; i > 0 { - nn.addChild(c, n.child[i-1]) + for c := 0; c < len(n.key); c++ { + if i := n.key[byte(c)]; i > 0 { + nn.addChild(byte(c), n.child[i-1]) } } return nn diff --git a/server/stree/stree_test.go b/server/stree/stree_test.go index bbe2619919..7207e7e691 100644 --- a/server/stree/stree_test.go +++ b/server/stree/stree_test.go @@ -760,6 +760,15 @@ func TestSubjectTreeNode48(t *testing.T) { require_Equal(t, iterations, 2) require_True(t, gotB) require_True(t, gotC) + + // Check for off-by-one on byte 255 as found by staticcheck, see + // https://github.com/nats-io/nats-server/pull/5826. + n.addChild(255, &c) + require_Equal(t, n.key[255], 3) + grown := n.grow().(*node256) + require_True(t, grown.findChild(255) != nil) + shrunk := n.shrink().(*node16) + require_True(t, shrunk.findChild(255) != nil) } func TestSubjectTreeMatchNoCallbackDupe(t *testing.T) { diff --git a/server/sublist_test.go b/server/sublist_test.go index 966401b5f1..beab433412 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2023 The NATS Authors +// Copyright 2016-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -984,7 +984,7 @@ func TestSublistRaceOnMatch(t *testing.T) { wg.Wait() select { case e := <-errCh: - t.Fatalf(e.Error()) + t.Fatal(e.Error()) default: } } diff --git a/server/websocket.go b/server/websocket.go index 1752942303..6fce09dd9f 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -667,7 +667,7 @@ func (c *client) wsHandleProtocolError(message string) error { buf := wsCreateCloseMessage(wsCloseStatusProtocolError, message) c.wsEnqueueControlMessage(wsCloseMessage, buf) nbPoolPut(buf) // wsEnqueueControlMessage has taken a copy. - return fmt.Errorf(message) + return errors.New(message) } // Create a close message with the given `status` and `body`. diff --git a/test/client_cluster_test.go b/test/client_cluster_test.go index 4047f3d33d..b7da3a73a9 100644 --- a/test/client_cluster_test.go +++ b/test/client_cluster_test.go @@ -1,4 +1,4 @@ -// Copyright 2013-2019 The NATS Authors +// Copyright 2013-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -321,7 +321,7 @@ func TestRequestsAcrossRoutes(t *testing.T) { nc1.Flush() if err := checkExpectedSubs(1, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } var resp string @@ -372,7 +372,7 @@ func TestRequestsAcrossRoutesToQueues(t *testing.T) { }) if err := checkExpectedSubs(2, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } var resp string diff --git a/test/leafnode_test.go b/test/leafnode_test.go index b1bfb132bb..a945173281 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2020 The NATS Authors +// Copyright 2019-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -4376,13 +4376,13 @@ func TestLeafnodeHeaders(t *testing.T) { snc, err := nats.Connect(srv.ClientURL()) if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } defer snc.Close() lnc, err := nats.Connect(leaf.ClientURL()) if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } defer lnc.Close() @@ -4407,7 +4407,7 @@ func TestLeafnodeHeaders(t *testing.T) { } err = snc.PublishMsg(msg) if err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } smsg, err := ssub.NextMsg(time.Second) diff --git a/test/new_routes_test.go b/test/new_routes_test.go index 174c6d9a43..c8cc0b79a5 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -892,7 +892,7 @@ func TestNewRouteSinglePublishOnNewAccount(t *testing.T) { expectA(pongRe) if err := checkExpectedSubs(1, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } clientB := createClientConn(t, optsB.Host, optsB.Port) @@ -1097,7 +1097,7 @@ func testNewRouteStreamImport(t *testing.T, duplicateSub bool) { // a subscription on "foo" for account $foo due to import. // So total of 2 subs. if err := checkExpectedSubs(2, srvA); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } // Send on clientA @@ -1122,7 +1122,7 @@ func testNewRouteStreamImport(t *testing.T, duplicateSub bool) { expectB(pongRe) if err := checkExpectedSubs(0, srvA); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } sendA("PUB foo 2\r\nok\r\nPING\r\n") diff --git a/test/norace_test.go b/test/norace_test.go index c14a56a0d3..eb5252e2a6 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -1,4 +1,4 @@ -// Copyright 2019-2020 The NATS Authors +// Copyright 2019-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -96,7 +96,7 @@ func TestNoRaceRouteSendSubs(t *testing.T) { clientBExpect(pongRe) if err := checkExpectedSubs(totalPerServer, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } routes := fmt.Sprintf(` @@ -113,7 +113,7 @@ func TestNoRaceRouteSendSubs(t *testing.T) { checkClusterFormed(t, srvA, srvB) if err := checkExpectedSubs(2*totalPerServer, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } checkSlowConsumers := func(t *testing.T) { @@ -166,7 +166,7 @@ func TestNoRaceRouteSendSubs(t *testing.T) { defer requestorOnB.Close() if err := checkExpectedSubs(2*totalPerServer+2, srvA, srvB); err != nil { - t.Fatalf(err.Error()) + t.Fatal(err.Error()) } totalReplies := 120000 From 5ee2bd401f44f161d2133b18e046349c999997ef Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Mon, 26 Aug 2024 07:39:19 -0700 Subject: [PATCH 3/4] Do not bump clfs on seq mismatch when before stream LastSeq (#5821) Sometimes during a stream catchup after a restart, when applying entries the clfs could have been bumped due to msg not found errors from likely deleted messages, causing a stream replica to remain out of sync. Fixes #5205 --------- Signed-off-by: Waldemar Quevedo Signed-off-by: Derek Collison Co-authored-by: Derek Collison --- server/jetstream_cluster_4_test.go | 394 +++++++++++++++++++++++++++++ server/stream.go | 13 +- 2 files changed, 405 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 44a6a81559..3dbb5bccdb 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "runtime" + "slices" "strconv" "strings" "sync" @@ -2543,3 +2544,396 @@ func TestJetStreamClusterKeyValueDesyncAfterHardKill(t *testing.T) { err = checkState(t, c, "$G", "KV_inconsistency") require_NoError(t, err) } + +func TestJetStreamClusterKeyValueSync(t *testing.T) { + t.Skip("Too long for CI at the moment") + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.LameDuckDuration = 15 * time.Second + s.opts.LameDuckGracePeriod = -15 * time.Second + s.optsMu.Unlock() + } + s := c.randomNonLeader() + connect := func(t *testing.T) (*nats.Conn, nats.JetStreamContext) { + return jsClientConnect(t, s) + } + + const accountName = "$G" + const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + createData := func(n int) []byte { + b := make([]byte, n) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return b + } + getOrCreateKvStore := func(kvname string) (nats.KeyValue, error) { + _, js := connect(t) + kvExists := false + existingKvnames := js.KeyValueStoreNames() + for existingKvname := range existingKvnames { + if existingKvname == kvname { + kvExists = true + break + } + } + if !kvExists { + return js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: kvname, + Replicas: 3, + Storage: nats.FileStorage, + }) + } else { + return js.KeyValue(kvname) + } + } + abs := func(x int64) int64 { + if x < 0 { + return -x + } + return x + } + var counter int64 + var errorCounter int64 + + getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail { + t.Helper() + srv := c.streamLeader(accountName, streamName) + if srv == nil { + return nil + } + jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) + require_NoError(t, err) + for _, acc := range jsz.AccountDetails { + if acc.Name == accountName { + for _, stream := range acc.Streams { + if stream.Name == streamName { + return &stream + } + } + } + } + t.Error("Could not find account details") + return nil + } + checkState := func(t *testing.T, c *cluster, accountName, streamName string) error { + t.Helper() + + leaderSrv := c.streamLeader(accountName, streamName) + if leaderSrv == nil { + return fmt.Errorf("no leader server found for stream %q", streamName) + } + streamLeader := getStreamDetails(t, c, accountName, streamName) + if streamLeader == nil { + return fmt.Errorf("no leader found for stream %q", streamName) + } + var errs []error + for _, srv := range c.servers { + if srv == leaderSrv { + // Skip self + continue + } + acc, err := srv.LookupAccount(accountName) + require_NoError(t, err) + stream, err := acc.lookupStream(streamName) + require_NoError(t, err) + state := stream.state() + + if state.Msgs != streamLeader.State.Msgs { + err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages", + streamName, leaderSrv, streamLeader.State.Msgs, + srv, state.Msgs, + ) + errs = append(errs, err) + } + if state.FirstSeq != streamLeader.State.FirstSeq { + err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d", + streamName, leaderSrv, streamLeader.State.FirstSeq, + srv, state.FirstSeq, + ) + errs = append(errs, err) + } + if state.LastSeq != streamLeader.State.LastSeq { + err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d", + streamName, leaderSrv, streamLeader.State.LastSeq, + srv, state.LastSeq, + ) + errs = append(errs, err) + } + if state.NumDeleted != streamLeader.State.NumDeleted { + err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d\nSTATE_A: %+v\nSTATE_B: %+v\n", + streamName, leaderSrv, streamLeader.State.NumDeleted, + srv, state.NumDeleted, streamLeader.State, state, + ) + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil + } + + checkMsgsEqual := func(t *testing.T, accountName, streamName string) error { + // Gather all the streams replicas and compare contents. + msets := make(map[*Server]*stream) + for _, s := range c.servers { + acc, err := s.LookupAccount(accountName) + if err != nil { + return err + } + mset, err := acc.lookupStream(streamName) + if err != nil { + return err + } + msets[s] = mset + } + + str := getStreamDetails(t, c, accountName, streamName) + if str == nil { + return fmt.Errorf("could not get stream leader state") + } + state := str.State + for seq := state.FirstSeq; seq <= state.LastSeq; seq++ { + var msgId string + var smv StoreMsg + for replica, mset := range msets { + mset.mu.RLock() + sm, err := mset.store.LoadMsg(seq, &smv) + mset.mu.RUnlock() + if err != nil { + if err == ErrStoreMsgNotFound || err == errDeletedMsg { + // Skip these. + } else { + t.Logf("WRN: Error loading message (seq=%d) from stream %q on replica %q: %v", seq, streamName, replica, err) + } + continue + } + if msgId == _EMPTY_ { + msgId = string(sm.hdr) + } else if msgId != string(sm.hdr) { + t.Errorf("MsgIds do not match for seq %d on stream %q: %q vs %q", seq, streamName, msgId, sm.hdr) + } + } + } + return nil + } + + keyUpdater := func(ctx context.Context, cancel context.CancelFunc, kvname string, numKeys int) { + kv, err := getOrCreateKvStore(kvname) + if err != nil { + t.Fatalf("[%s]:%v", kvname, err) + } + for i := 0; i < numKeys; i++ { + key := fmt.Sprintf("key-%d", i) + kv.Create(key, createData(160)) + } + lastData := make(map[string][]byte) + revisions := make(map[string]uint64) + for { + select { + case <-ctx.Done(): + return + default: + } + r := rand.Intn(numKeys) + key := fmt.Sprintf("key-%d", r) + + for i := 0; i < 5; i++ { + _, err := kv.Get(key) + if err != nil { + atomic.AddInt64(&errorCounter, 1) + if err == nats.ErrKeyNotFound { + t.Logf("WRN: Key not found! [%s/%s] - [%s]", kvname, key, err) + cancel() + } + } + } + + k, err := kv.Get(key) + if err != nil { + atomic.AddInt64(&errorCounter, 1) + } else { + if revisions[key] != 0 && abs(int64(k.Revision())-int64(revisions[key])) < 2 { + lastDataVal, ok := lastData[key] + if ok && k.Revision() == revisions[key] && slices.Compare(lastDataVal, k.Value()) != 0 { + t.Logf("data loss [%s/%s][rev:%d] expected:[%v] is:[%v]", kvname, key, revisions[key], string(lastDataVal), string(k.Value())) + } + } + newData := createData(160) + revisions[key], err = kv.Update(key, newData, k.Revision()) + if err != nil && err != nats.ErrTimeout { + atomic.AddInt64(&errorCounter, 1) + } else { + lastData[key] = newData + } + atomic.AddInt64(&counter, 1) + } + } + } + + streamCount := 50 + keysCount := 100 + streamPrefix := "IKV" + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // The keyUpdaters will run for less time. + kctx, kcancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer kcancel() + + var wg sync.WaitGroup + var streams []string + for i := 0; i < streamCount; i++ { + streamName := fmt.Sprintf("%s-%d", streamPrefix, i) + streams = append(streams, "KV_"+streamName) + + wg.Add(1) + go func(i int) { + defer wg.Done() + keyUpdater(kctx, cancel, streamName, keysCount) + }(i) + } + + debug := false + nc2, _ := jsClientConnect(t, s) + if debug { + go func() { + for range time.NewTicker(5 * time.Second).C { + select { + case <-ctx.Done(): + return + default: + } + for _, str := range streams { + leaderSrv := c.streamLeader(accountName, str) + if leaderSrv == nil { + continue + } + streamLeader := getStreamDetails(t, c, accountName, str) + if streamLeader == nil { + continue + } + t.Logf("|------------------------------------------------------------------------------------------------------------------------|") + lstate := streamLeader.State + t.Logf("| %-10s | %-10s | msgs:%-10d | bytes:%-10d | deleted:%-10d | first:%-10d | last:%-10d |", + str, leaderSrv.String()+"*", lstate.Msgs, lstate.Bytes, lstate.NumDeleted, lstate.FirstSeq, lstate.LastSeq, + ) + for _, srv := range c.servers { + if srv == leaderSrv { + continue + } + acc, err := srv.LookupAccount(accountName) + if err != nil { + continue + } + stream, err := acc.lookupStream(str) + if err != nil { + t.Logf("Error looking up stream %s on %s replica", str, srv) + continue + } + state := stream.state() + + unsynced := lstate.Msgs != state.Msgs || lstate.Bytes != state.Bytes || + lstate.NumDeleted != state.NumDeleted || lstate.FirstSeq != state.FirstSeq || lstate.LastSeq != state.LastSeq + + var result string + if unsynced { + result = "UNSYNCED" + } + t.Logf("| %-10s | %-10s | msgs:%-10d | bytes:%-10d | deleted:%-10d | first:%-10d | last:%-10d | %s", + str, srv, state.Msgs, state.Bytes, state.NumDeleted, state.FirstSeq, state.LastSeq, result, + ) + } + } + t.Logf("|------------------------------------------------------------------------------------------------------------------------| %v", nc2.ConnectedUrl()) + } + }() + } + + checkStreams := func(t *testing.T) { + for _, str := range streams { + checkFor(t, time.Minute, 500*time.Millisecond, func() error { + return checkState(t, c, accountName, str) + }) + checkFor(t, time.Minute, 500*time.Millisecond, func() error { + return checkMsgsEqual(t, accountName, str) + }) + } + } + +Loop: + for range time.NewTicker(30 * time.Second).C { + select { + case <-ctx.Done(): + break Loop + default: + } + rollout := func(t *testing.T) { + for _, s := range c.servers { + // For graceful mode + s.lameDuckMode() + s.WaitForShutdown() + s = c.restartServer(s) + + hctx, hcancel := context.WithTimeout(context.Background(), 15*time.Second) + defer hcancel() + + Healthz: + for range time.NewTicker(2 * time.Second).C { + select { + case <-hctx.Done(): + default: + } + + status := s.healthz(nil) + if status.StatusCode == 200 { + break Healthz + } + } + c.waitOnClusterReady() + checkStreams(t) + } + } + rollout(t) + checkStreams(t) + } + wg.Wait() + checkStreams(t) +} + +func TestJetStreamClusterKeyValueLastSeqMismatch(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: "mismatch", + Replicas: 3, + }) + require_NoError(t, err) + + revision, err := kv.Create("foo", []byte("1")) + require_NoError(t, err) + require_Equal(t, revision, 1) + + revision, err = kv.Create("bar", []byte("2")) + require_NoError(t, err) + require_Equal(t, revision, 2) + + // Now delete foo from sequence 1. + // This needs to be low level remove (or system level) to test the condition we want here. + err = js.DeleteMsg("KV_mismatch", 1) + require_Error(t, err) + + // Now say we want to update baz but iff last was revision 1. + _, err = kv.Update("baz", []byte("3"), uint64(1)) + require_Error(t, err) + require_Equal(t, err.Error(), `nats: wrong last sequence: 0`) +} diff --git a/server/stream.go b/server/stream.go index 95b18e1514..cdcf6e6aa0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4397,8 +4397,17 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, if sm != nil { fseq = sm.seq } - if err == ErrStoreMsgNotFound && seq == 0 { - fseq, err = 0, nil + if err == ErrStoreMsgNotFound { + if seq == 0 { + fseq, err = 0, nil + } else { + // Do not bump clfs in case message was not found and could have been deleted. + var ss StreamState + store.FastState(&ss) + if seq <= ss.LastSeq { + fseq, err = seq, nil + } + } } if err != nil || fseq != seq { mset.mu.Unlock() From 0b5e1ecb37bb867ca16d30bcd0a5952f5d624f61 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 26 Aug 2024 12:36:35 -0700 Subject: [PATCH 4/4] When a block can not be compressed, mark it as such to avoid syncBlocks constantly trying to compress on each iteration. We had reports of disk spikes and memory pops queued off of syncBlocks, and looks like it was coming from blocks that were no longer compressable, but would load each time and go through the process. Signed-off-by: Derek Collison --- server/filestore.go | 13 +++++-- server/filestore_test.go | 77 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 6b959a34fc..f858c1a436 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -239,6 +239,7 @@ type msgBlock struct { noTrack bool needSync bool syncAlways bool + noCompact bool closed bool // Used to mock write failures. @@ -3959,6 +3960,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( mb.bytes = 0 } + // Allow us to check compaction again. + mb.noCompact = false + // Mark as dirty for stream state. fs.dirty++ @@ -4075,7 +4079,7 @@ func (mb *msgBlock) shouldCompactInline() bool { // Ignores 2MB minimum. // Lock should be held. func (mb *msgBlock) shouldCompactSync() bool { - return mb.bytes*2 < mb.rbytes + return mb.bytes*2 < mb.rbytes && !mb.noCompact } // This will compact and rewrite this block. This should only be called when we know we want to rewrite this block. @@ -4184,7 +4188,12 @@ func (mb *msgBlock) compact() { mb.needSync = true // Capture the updated rbytes. - mb.rbytes = uint64(len(nbuf)) + if rbytes := uint64(len(nbuf)); rbytes == mb.rbytes { + // No change, so set our noCompact bool here to avoid attempting to continually compress in syncBlocks. + mb.noCompact = true + } else { + mb.rbytes = rbytes + } // Remove any seqs from the beginning of the blk. for seq, nfseq := fseq, atomic.LoadUint64(&mb.first.seq); seq < nfseq; seq++ { diff --git a/server/filestore_test.go b/server/filestore_test.go index 1ab23b09ec..d986062204 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7379,6 +7379,83 @@ func TestFileStoreCheckSkipFirstBlockNotLoadOldBlocks(t *testing.T) { require_Equal(t, loaded, 1) } +func TestFileStoreSyncCompressOnlyIfDirty(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 256, SyncInterval: 250 * time.Millisecond}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("hello") + + // 6 msgs per block. + // Fill 2 blocks. + for i := 0; i < 12; i++ { + fs.StoreMsg("foo.BB", nil, msg) + } + // Create third block with just one message in it. + fs.StoreMsg("foo.BB", nil, msg) + + // Should have created 3 blocks. + require_Equal(t, fs.numMsgBlocks(), 3) + + // Now delete a bunch that will will fill up 3 block with tombstones. + for _, seq := range []uint64{2, 3, 4, 5, 8, 9, 10, 11} { + _, err = fs.RemoveMsg(seq) + require_NoError(t, err) + } + // Now make sure we add 4th block so syncBlocks will try to compress. + for i := 0; i < 6; i++ { + fs.StoreMsg("foo.BB", nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 4) + + // All should have compact set. + fs.mu.Lock() + // Only check first 3 blocks. + for i := 0; i < 3; i++ { + mb := fs.blks[i] + mb.mu.Lock() + shouldCompact := mb.shouldCompactSync() + mb.mu.Unlock() + if !shouldCompact { + fs.mu.Unlock() + t.Fatalf("Expected should compact to be true for %d, got false", mb.getIndex()) + } + } + fs.mu.Unlock() + + // Let sync run. + time.Sleep(300 * time.Millisecond) + + // We want to make sure the last block, which is filled with tombstones and is not compactable, returns false now. + fs.mu.Lock() + for _, mb := range fs.blks { + mb.mu.Lock() + shouldCompact := mb.shouldCompactSync() + mb.mu.Unlock() + if shouldCompact { + fs.mu.Unlock() + t.Fatalf("Expected should compact to be false for %d, got true", mb.getIndex()) + } + } + fs.mu.Unlock() + + // Now remove some from block 3 and verify that compact is not suppressed. + _, err = fs.RemoveMsg(13) + require_NoError(t, err) + + fs.mu.Lock() + mb := fs.blks[2] // block 3. + mb.mu.Lock() + noCompact := mb.noCompact + mb.mu.Unlock() + fs.mu.Unlock() + // Verify that since we deleted a message we should be considered for compaction again in syncBlocks(). + require_False(t, noCompact) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks ///////////////////////////////////////////////////////////////////////////