Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed-up some tests and fixing some lint reports. #5258

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ jobs:
- name: Build NATS Server
run: go build

filestore:
name: FileStore tests
needs: [build-latest, build-supported, lint]
runs-on: ${{ vars.GHA_WORKER_MEDIUM }}
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v5
with:
go-version: stable

- name: Run unit tests
run: go test -race -v -run=TestFileStore ./server/... -count=1 -vet=off -timeout=30m -failfast

js-no-cluster:
name: JetStream tests
needs: [build-latest, build-supported, lint]
Expand Down Expand Up @@ -212,8 +228,24 @@ jobs:
- name: Run unit tests
run: go test -race -v -run=TestMQTT ./server/... -count=1 -vet=off -timeout=30m -failfast

msgtrace:
name: MsgTrace tests
needs: [build-latest, build-supported, lint]
runs-on: ${{ vars.GHA_WORKER_MEDIUM }}
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v5
with:
go-version: stable

- name: Run unit tests
run: go test -race -v -run=TestMsgTrace ./server/... -count=1 -vet=off -timeout=30m -failfast

server-pkg-non-js:
name: Non-JetStream/MQTT tests
name: Non-JetStream/MQTT/MsgTrace tests
needs: [build-latest, build-supported, lint]
runs-on: ${{ vars.GHA_WORKER_LARGE }}
steps:
Expand All @@ -226,7 +258,7 @@ jobs:
go-version: stable

- name: Run unit tests
run: go test -race -v -p=1 ./server/... -tags=skip_js_tests,skip_mqtt_tests -count=1 -vet=off -timeout=30m -failfast
run: go test -race -v -p=1 ./server/... -tags=skip_js_tests,skip_fs_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast
timeout-minutes: 15

non-server-pkg:
Expand Down
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
env: TEST_SUITE=compile
- name: "Run TestNoRace tests"
env: TEST_SUITE=no_race_tests
- name: "Run FileStore tests"
env: TEST_SUITE=fs_tests
- name: "Run JetStream tests"
env: TEST_SUITE=js_tests
- name: "Run JetStream cluster tests (1)"
Expand All @@ -36,7 +38,9 @@ jobs:
env: TEST_SUITE=js_super_cluster_tests
- name: "Run MQTT tests"
env: TEST_SUITE=mqtt_tests
- name: "Run non JetStream/MQTT tests from the server package"
- name: "Run Message Tracing tests"
env: TEST_SUITE=msgtrace_tests
- name: "Run all other tests from the server package"
env: TEST_SUITE=srv_pkg_non_js_tests
- name: "Run all tests from all other packages"
env: TEST_SUITE=non_srv_pkg_tests
Expand Down
19 changes: 16 additions & 3 deletions scripts/runTestsOnTravis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ elif [ "$1" = "no_race_tests" ]; then

go test -v -p=1 -run=TestNoRace ./... -count=1 -vet=off -timeout=30m -failfast

elif [ "$1" = "fs_tests" ]; then

# Run FileStore tests. By convention, all file store tests start with `TestFileStore`.

go test -race -v -run=TestFileStore ./server -count=1 -vet=off -timeout=30m -failfast
kozlovic marked this conversation as resolved.
Show resolved Hide resolved

elif [ "$1" = "js_tests" ]; then

# Run JetStream non-clustered tests. By convention, all JS tests start
Expand Down Expand Up @@ -76,13 +82,20 @@ elif [ "$1" = "mqtt_tests" ]; then

go test -race -v -run=TestMQTT ./server -count=1 -vet=off -timeout=30m -failfast

elif [ "$1" = "msgtrace_tests" ]; then

# Run Message Tracing tests. By convention, all message tracing tests start with `TestMsgTrace`.

go test -race -v -run=TestMsgTrace ./server -count=1 -vet=off -timeout=30m -failfast

elif [ "$1" = "srv_pkg_non_js_tests" ]; then

# Run all non JetStream tests in the server package. We exclude the
# JS tests by using the `skip_js_tests` build tag and MQTT tests by
# using the `skip_mqtt_tests`
# FileStgore tests by using the `skip_fs_tests` build tag, the JS tests
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
# by using `skip_js_tests`, MQTT tests by using `skip_mqtt_tests` and
# message tracing tests by using `skip_msgtrace_tests`.

go test -race -v -p=1 ./server/... -tags=skip_js_tests,skip_mqtt_tests -count=1 -vet=off -timeout=30m -failfast
go test -race -v -p=1 ./server/... -tags=skip_fs_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast

elif [ "$1" = "non_srv_pkg_tests" ]; then

Expand Down
2 changes: 1 addition & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4413,7 +4413,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// if the message is routed, it does not initialize tracing in the
// remote.
positions := disableTraceHeaders(c, msg)
defer enableTraceHeaders(c, msg, positions)
defer enableTraceHeaders(msg, positions)
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@ func (s *Server) initEventTracking() {
s.Errorf("Error setting up internal tracking: %v", err)
}
}
extractAccount := func(c *client, subject string, msg []byte) (string, error) {
extractAccount := func(subject string) (string, error) {
if tk := strings.Split(subject, tsep); len(tk) != accReqTokens {
return _EMPTY_, fmt.Errorf("subject %q is malformed", subject)
} else {
Expand All @@ -1143,7 +1143,7 @@ func (s *Server) initEventTracking() {
"SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &SubszEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
optz.SubszOptions.Subscriptions = true
Expand All @@ -1155,7 +1155,7 @@ func (s *Server) initEventTracking() {
"CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &ConnzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
optz.ConnzOptions.Account = acc
Expand All @@ -1166,7 +1166,7 @@ func (s *Server) initEventTracking() {
"LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
optz.LeafzOptions.Account = acc
Expand All @@ -1177,7 +1177,7 @@ func (s *Server) initEventTracking() {
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &JszEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
optz.Account = acc
Expand All @@ -1188,7 +1188,7 @@ func (s *Server) initEventTracking() {
"INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &AccInfoEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
return s.accountInfo(acc)
Expand All @@ -1201,7 +1201,7 @@ func (s *Server) initEventTracking() {
"STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, hdr, msg []byte) {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, hdr, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else if acc == "PING" { // Filter PING subject. Happens for server as well. But wildcards are not used
return nil, errSkipZreq
Expand Down Expand Up @@ -1908,7 +1908,7 @@ func getAcceptEncoding(hdr []byte) compressionType {
return unsupportedCompression
}

func (s *Server) zReq(c *client, reply string, hdr, msg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) {
func (s *Server) zReq(_ *client, reply string, hdr, msg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func createUserCredsEx(t *testing.T, nuc *jwt.UserClaims, akp nkeys.KeyPair) nat
return nats.UserJWT(userCB, sigCB)
}

func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option {
func createUserCreds(t *testing.T, _ *Server, akp nkeys.KeyPair) nats.Option {
return createUserCredsEx(t, jwt.NewUserClaims("test"), akp)
}

Expand Down
4 changes: 2 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7817,7 +7817,7 @@ func (fs *fileStore) stop(writeState bool) error {
const errFile = "errors.txt"

// Stream our snapshot through S2 compression and tar.
func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includeConsumers bool) {
func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
defer w.Close()

enc := s2.NewWriter(w)
Expand Down Expand Up @@ -8030,7 +8030,7 @@ func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumer
fs.FastState(&state)

// Stream in separate Go routine.
go fs.streamSnapshot(pw, &state, includeConsumers)
go fs.streamSnapshot(pw, includeConsumers)

return &SnapshotResult{pr, state}, nil
}
Expand Down
7 changes: 5 additions & 2 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !skip_fs_tests
// +build !skip_fs_tests

package server

import (
Expand Down Expand Up @@ -3011,7 +3014,7 @@ func TestFileStoreExpireMsgsOnStart(t *testing.T) {
fs.mu.RUnlock()
}

lastSeqForBlk := func(index int) uint64 {
lastSeqForBlk := func() uint64 {
t.Helper()
fs.mu.RLock()
defer fs.mu.RUnlock()
Expand Down Expand Up @@ -3066,7 +3069,7 @@ func TestFileStoreExpireMsgsOnStart(t *testing.T) {

// Now delete 10 messages from the end of the first block which we will expire on restart.
// We will expire up to seq 100, so delete 91-100.
lseq := lastSeqForBlk(0)
lseq := lastSeqForBlk()
for seq := lseq; seq > lseq-10; seq-- {
removed, err := fs.RemoveMsg(seq)
if err != nil || !removed {
Expand Down
2 changes: 1 addition & 1 deletion server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ func (s *Server) sendSubsToGateway(c *client, accountName string) {
// This function will then execute appropriate function based on the command
// contained in the protocol.
// <Invoked from a route connection's readLoop>
func (s *Server) processGatewayInfoFromRoute(info *Info, routeSrvID string, route *client) {
func (s *Server) processGatewayInfoFromRoute(info *Info, routeSrvID string) {
switch info.GatewayCmd {
case gatewayCmdGossip:
s.processImplicitGateway(info)
Expand Down
25 changes: 15 additions & 10 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3354,8 +3354,13 @@ func getInboundGatewayConnection(s *Server, name string) *client {
var gwsa [4]*client
var gws = gwsa[:0]
s.getInboundGatewayConnections(&gws)
if len(gws) > 0 {
return gws[0]
for _, gw := range gws {
gw.mu.Lock()
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
ok := gw.gw.name == name
gw.mu.Unlock()
if ok {
return gw
}
}
return nil
}
Expand Down Expand Up @@ -3555,7 +3560,7 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {
// For this test, make sure to use inbound from A so
// A will reconnect when we send bad proto that
// causes connection to be closed.
c := getInboundGatewayConnection(sa, "A")
c := getInboundGatewayConnection(sa, "B")
// Mock an invalid protocol (account name missing)
info := &Info{
Gateway: "B",
Expand All @@ -3568,7 +3573,7 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {

orgConn := c
checkFor(t, 3*time.Second, 100*time.Millisecond, func() error {
curConn := getInboundGatewayConnection(sa, "A")
curConn := getInboundGatewayConnection(sa, "B")
if orgConn == curConn {
return fmt.Errorf("Not reconnected")
}
Expand All @@ -3581,7 +3586,7 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {
// Refresh
c = nil
checkFor(t, 3*time.Second, 15*time.Millisecond, func() error {
c = getInboundGatewayConnection(sa, "A")
c = getInboundGatewayConnection(sa, "B")
if c == nil {
return fmt.Errorf("Did not reconnect")
}
Expand All @@ -3603,7 +3608,7 @@ func TestGatewaySendAllSubsBadProtocol(t *testing.T) {

orgConn = c
checkFor(t, 3*time.Second, 100*time.Millisecond, func() error {
curConn := getInboundGatewayConnection(sa, "A")
curConn := getInboundGatewayConnection(sa, "B")
if orgConn == curConn {
return fmt.Errorf("Not reconnected")
}
Expand Down Expand Up @@ -4352,15 +4357,15 @@ func TestGatewayServiceImportComplexSetup(t *testing.T) {
if c == nil || c.opts.Name != sb2.ID() {
t.Fatalf("A2 does not have outbound to B2")
}
c = getInboundGatewayConnection(sa2, "A")
c = getInboundGatewayConnection(sa2, "B")
if c != nil {
t.Fatalf("Bad setup")
}
c = sb2.getOutboundGatewayConnection("A")
if c == nil || c.opts.Name != sa1.ID() {
t.Fatalf("B2 does not have outbound to A1")
}
c = getInboundGatewayConnection(sb2, "B")
c = getInboundGatewayConnection(sb2, "A")
if c == nil || c.opts.Name != sa2.ID() {
t.Fatalf("Bad setup")
}
Expand Down Expand Up @@ -4694,15 +4699,15 @@ func TestGatewayServiceExportWithWildcards(t *testing.T) {
if c == nil || c.opts.Name != sb2.ID() {
t.Fatalf("A2 does not have outbound to B2")
}
c = getInboundGatewayConnection(sa2, "A")
c = getInboundGatewayConnection(sa2, "B")
if c != nil {
t.Fatalf("Bad setup")
}
c = sb2.getOutboundGatewayConnection("A")
if c == nil || c.opts.Name != sa1.ID() {
t.Fatalf("B2 does not have outbound to A1")
}
c = getInboundGatewayConnection(sb2, "B")
c = getInboundGatewayConnection(sb2, "A")
if c == nil || c.opts.Name != sa2.ID() {
t.Fatalf("Bad setup")
}
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3379,7 +3379,7 @@ func (s *Server) jsStreamRestoreRequest(sub *subscription, c *client, _ *Account
}

if s.JetStreamIsClustered() {
s.jsClusteredStreamRestoreRequest(ci, acc, &req, stream, subject, reply, rmsg)
s.jsClusteredStreamRestoreRequest(ci, acc, &req, subject, reply, rmsg)
return
}

Expand Down Expand Up @@ -3709,7 +3709,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Accoun
})

// Now do the real streaming.
s.streamSnapshot(ci, acc, mset, sr, &req)
s.streamSnapshot(acc, mset, sr, &req)

end := time.Now().UTC()

Expand Down Expand Up @@ -3739,7 +3739,7 @@ const defaultSnapshotChunkSize = 128 * 1024
const defaultSnapshotWindowSize = 8 * 1024 * 1024 // 8MB

// streamSnapshot will stream out our snapshot to the reply subject.
func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
func (s *Server) streamSnapshot(acc *Account, mset *stream, sr *SnapshotResult, req *JSApiStreamSnapshotRequest) {
chunkSize := req.ChunkSize
if chunkSize == 0 {
chunkSize = defaultSnapshotChunkSize
Expand Down
Loading