From 1433ee335b6d7647cc9a28f8a6c24fffa224873b Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 4 Apr 2024 12:15:05 +0100 Subject: [PATCH] Common store test for `LoadNextMsgMulti` It concerns me a little that by duplicating the same test for both file and memory stores, that differences could slip in later that mean we are no longer testing for consistency between the two stores. This approach allows us to write tests that only use the `StreamStore` API and have them tested against all store permutations. Signed-off-by: Neil Twigg --- server/filestore_test.go | 61 --------------------- server/memstore_test.go | 64 ---------------------- server/store_test.go | 111 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 125 deletions(-) create mode 100644 server/store_test.go diff --git a/server/filestore_test.go b/server/filestore_test.go index 936b3ef33fc..de25e7728f7 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -6755,67 +6755,6 @@ func TestFileStoreMsgBlockFirstAndLastSeqCorrupt(t *testing.T) { require_Equal(t, lseq, 10) } -func TestFileStoreMsgLoadNextMsgMulti(t *testing.T) { - fs, err := newFileStore( - FileStoreConfig{StoreDir: t.TempDir()}, - StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) - require_NoError(t, err) - defer fs.Stop() - - // Put 1k msgs in - for i := 0; i < 1000; i++ { - subj := fmt.Sprintf("foo.%d", i) - fs.StoreMsg(subj, nil, []byte("ZZZ")) - } - - var smv StoreMsg - // Do multi load next with 1 wc entry. - sl := NewSublistWithCache() - sl.Insert(&subscription{subject: []byte("foo.>")}) - for i, seq := 0, uint64(1); i < 1000; i++ { - sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) - require_NoError(t, err) - require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) - require_Equal(t, nseq, seq) - seq++ - } - - // Now do multi load next with 1000 literal subjects. - sl = NewSublistWithCache() - for i := 0; i < 1000; i++ { - subj := fmt.Sprintf("foo.%d", i) - sl.Insert(&subscription{subject: []byte(subj)}) - } - for i, seq := 0, uint64(1); i < 1000; i++ { - sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) - require_NoError(t, err) - require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) - require_Equal(t, nseq, seq) - seq++ - } - - // Check that we can pull out 3 individuals. - sl = NewSublistWithCache() - sl.Insert(&subscription{subject: []byte("foo.2")}) - sl.Insert(&subscription{subject: []byte("foo.222")}) - sl.Insert(&subscription{subject: []byte("foo.999")}) - sm, seq, err := fs.LoadNextMsgMulti(sl, 1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.2") - require_Equal(t, seq, 3) - sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.222") - require_Equal(t, seq, 223) - sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.999") - require_Equal(t, seq, 1000) - _, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) - require_Error(t, err) - require_Equal(t, seq, 1000) -} - /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/memstore_test.go b/server/memstore_test.go index 2d35262b855..3c049233447 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1051,67 +1051,3 @@ func TestMemStorePurgeExWithDeletedMsgs(t *testing.T) { require_Equal(t, state.LastSeq, 10) require_Equal(t, state.Msgs, 1) } - -func TestMemStoreMsgLoadNextMsgMulti(t *testing.T) { - cfg := &StreamConfig{ - Name: "zzz", - Subjects: []string{"foo.*"}, - Storage: MemoryStorage, - } - ms, err := newMemStore(cfg) - require_NoError(t, err) - defer ms.Stop() - - // Put 1k msgs in - for i := 0; i < 1000; i++ { - subj := fmt.Sprintf("foo.%d", i) - ms.StoreMsg(subj, nil, []byte("ZZZ")) - } - - var smv StoreMsg - // Do multi load next with 1 wc entry. - sl := NewSublistWithCache() - sl.Insert(&subscription{subject: []byte("foo.>")}) - for i, seq := 0, uint64(1); i < 1000; i++ { - sm, nseq, err := ms.LoadNextMsgMulti(sl, seq, &smv) - require_NoError(t, err) - require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) - require_Equal(t, nseq, seq) - seq++ - } - - // Now do multi load next with 1000 literal subjects. - sl = NewSublistWithCache() - for i := 0; i < 1000; i++ { - subj := fmt.Sprintf("foo.%d", i) - sl.Insert(&subscription{subject: []byte(subj)}) - } - for i, seq := 0, uint64(1); i < 1000; i++ { - sm, nseq, err := ms.LoadNextMsgMulti(sl, seq, &smv) - require_NoError(t, err) - require_True(t, sm.subj == fmt.Sprintf("foo.%d", i)) - require_Equal(t, nseq, seq) - seq++ - } - - // Check that we can pull out 3 individuals. - sl = NewSublistWithCache() - sl.Insert(&subscription{subject: []byte("foo.2")}) - sl.Insert(&subscription{subject: []byte("foo.222")}) - sl.Insert(&subscription{subject: []byte("foo.999")}) - sm, seq, err := ms.LoadNextMsgMulti(sl, 1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.2") - require_Equal(t, seq, 3) - sm, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.222") - require_Equal(t, seq, 223) - sm, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) - require_NoError(t, err) - require_True(t, sm.subj == "foo.999") - require_Equal(t, seq, 1000) - _, seq, err = ms.LoadNextMsgMulti(sl, seq+1, &smv) - require_Error(t, err) - require_Equal(t, seq, 1000) -} diff --git a/server/store_test.go b/server/store_test.go new file mode 100644 index 00000000000..fdf4690a993 --- /dev/null +++ b/server/store_test.go @@ -0,0 +1,111 @@ +// Copyright 2012-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !skip_store_tests +// +build !skip_store_tests + +package server + +import ( + "fmt" + "testing" +) + +func testAllStoreAllPermutations(t *testing.T, compressionAndEncryption bool, cfg StreamConfig, fn func(t *testing.T, fs StreamStore)) { + t.Run("Memory", func(t *testing.T) { + cfg.Storage = MemoryStorage + fs, err := newMemStore(&cfg) + require_NoError(t, err) + defer fs.Stop() + fn(t, fs) + }) + t.Run("File", func(t *testing.T) { + cfg.Storage = FileStorage + if compressionAndEncryption { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fs, err := newFileStore(fcfg, cfg) + require_NoError(t, err) + defer fs.Stop() + fn(t, fs) + }) + } else { + fs, err := newFileStore(FileStoreConfig{ + StoreDir: t.TempDir(), + }, cfg) + require_NoError(t, err) + defer fs.Stop() + fn(t, fs) + } + }) +} + +func TestStoreMsgLoadNextMsgMulti(t *testing.T) { + testAllStoreAllPermutations( + t, false, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}}, + func(t *testing.T, fs StreamStore) { + // Put 1k msgs in + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + fs.StoreMsg(subj, nil, []byte("ZZZ")) + } + + var smv StoreMsg + // Do multi load next with 1 wc entry. + sl := NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.>")}) + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + + // Now do multi load next with 1000 literal subjects. + sl = NewSublistWithCache() + for i := 0; i < 1000; i++ { + subj := fmt.Sprintf("foo.%d", i) + sl.Insert(&subscription{subject: []byte(subj)}) + } + for i, seq := 0, uint64(1); i < 1000; i++ { + sm, nseq, err := fs.LoadNextMsgMulti(sl, seq, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, fmt.Sprintf("foo.%d", i)) + require_Equal(t, nseq, seq) + seq++ + } + + // Check that we can pull out 3 individuals. + sl = NewSublistWithCache() + sl.Insert(&subscription{subject: []byte("foo.2")}) + sl.Insert(&subscription{subject: []byte("foo.222")}) + sl.Insert(&subscription{subject: []byte("foo.999")}) + sm, seq, err := fs.LoadNextMsgMulti(sl, 1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo.2") + require_Equal(t, seq, 3) + sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo.222") + require_Equal(t, seq, 223) + sm, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_NoError(t, err) + require_Equal(t, sm.subj, "foo.999") + require_Equal(t, seq, 1000) + _, seq, err = fs.LoadNextMsgMulti(sl, seq+1, &smv) + require_Error(t, err) + require_Equal(t, seq, 1000) + }, + ) +}