Skip to content

Commit

Permalink
Cherry-picks for 2.10.17-RC.5 (#5560)
Browse files Browse the repository at this point in the history
Includes the following:

* #5552 
* #5557
* #5556
* #5558

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Jun 18, 2024
2 parents da64c7b + a526e6a commit 37f8233
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 25 deletions.
2 changes: 1 addition & 1 deletion server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (s *Server) sendStatsz(subj string) {
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
}

// Limit updates to the heartbeat interval, max one second.
// Limit updates to the heartbeat interval, max one second by default.
func (s *Server) limitStatsz(subj string) bool {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func init() {
lostQuorumCheck = 4 * hbInterval

// For statz and jetstream placement speedups as well.
statszRateLimit = time.Millisecond * 100
statszRateLimit = 0
}

// Used to setup clusters of clusters for tests.
Expand Down
84 changes: 78 additions & 6 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ func TestJetStreamAddStreamOverlappingSubjects(t *testing.T) {
// Test that any overlapping subjects will fail.
expectErr(acc.addStream(&StreamConfig{Name: "foo"}))
expectErr(acc.addStream(&StreamConfig{Name: "a", Subjects: []string{"baz", "bar"}}))
expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}}))
expectErr(acc.addStream(&StreamConfig{Name: "b", Subjects: []string{">"}, NoAck: true}))
expectErr(acc.addStream(&StreamConfig{Name: "c", Subjects: []string{"baz.33"}}))
expectErr(acc.addStream(&StreamConfig{Name: "d", Subjects: []string{"*.33"}}))
expectErr(acc.addStream(&StreamConfig{Name: "e", Subjects: []string{"*.>"}}))
Expand All @@ -984,7 +984,7 @@ func TestJetStreamAddStreamOverlapWithJSAPISubjects(t *testing.T) {

expectErr := func(_ *stream, err error) {
t.Helper()
if err == nil || !strings.Contains(err.Error(), "subjects overlap") {
if err == nil || !strings.Contains(err.Error(), "subjects that overlap with jetstream api") {
t.Fatalf("Expected error but got none")
}
}
Expand Down Expand Up @@ -22616,25 +22616,40 @@ func TestJetStreamAuditStreams(t *testing.T) {
nc, js := jsClientConnect(t, s)
defer nc.Close()

jsOverlap := errors.New("subjects that overlap with jetstream api require no-ack to be true")
sysOverlap := errors.New("subjects that overlap with system api require no-ack to be true")

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JS.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")))
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JS.API.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$JSC.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api")))
require_Error(t, err, NewJSStreamInvalidConfigError(jsOverlap))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"$SYS.>"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api")))
require_Error(t, err, NewJSStreamInvalidConfigError(sysOverlap))

// These should be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true")))

// These should all be ok if no pub ack.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST1",
Subjects: []string{"$JS.>"},
Expand All @@ -22655,4 +22670,61 @@ func TestJetStreamAuditStreams(t *testing.T) {
NoAck: true,
})
require_NoError(t, err)

// Since prior behavior did allow $JS.EVENT to be captured without no-ack, these might break
// on a server upgrade so make sure they still work ok without --no-ack.

// To avoid overlap error.
err = js.DeleteStream("TEST1")
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST4",
Subjects: []string{"$JS.EVENT.>"},
})
require_NoError(t, err)

// Also allow $SYS.ACCOUNT to be captured without no-ack, these also might break
// on a server upgrade so make sure they still work ok without --no-ack.

// To avoid overlap error.
err = js.DeleteStream("TEST3")
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST5",
Subjects: []string{"$SYS.ACCOUNT.>"},
})
require_NoError(t, err)

// We will test handling of ">" on a cluster here.
// Specific test for capturing everything which will require both no-ack and replicas of 1.
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires no-ack to be true")))

_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
Replicas: 3,
NoAck: true,
})
require_Error(t, err, NewJSStreamInvalidConfigError(errors.New("capturing all subjects requires replicas of 1")))

// Ths should work ok.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{">"},
Replicas: 1,
NoAck: true,
})
require_NoError(t, err)
}
18 changes: 4 additions & 14 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,9 +1029,6 @@ func (n *raft) InstallSnapshot(data []byte) error {
return errCatchupsRunning
}

var state StreamState
n.wal.FastState(&state)

if n.applied == 0 {
return errNoSnapAvailable
}
Expand Down Expand Up @@ -1077,23 +1074,16 @@ func (n *raft) installSnapshot(snap *snapshot) error {
return err
}

// Delete our previous snapshot file if it exists.
if n.snapfile != _EMPTY_ && n.snapfile != sfile {
os.Remove(n.snapfile)
}
// Remember our latest snapshot file.
n.snapfile = sfile
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
return err
}
// Remove any old snapshots.
// Do this in a go routine.
go func() {
psnaps, _ := os.ReadDir(snapDir)
for _, fi := range psnaps {
pn := fi.Name()
if pn != sn {
os.Remove(filepath.Join(snapDir, pn))
}
}
}()

return nil
}
Expand Down
21 changes: 18 additions & 3 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,7 +1486,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}

// Check for literal duplication of subject interest in config
// and no overlap with any JS API subject space
// and no overlap with any JS or SYS API subject space.
dset := make(map[string]struct{}, len(cfg.Subjects))
for _, subj := range cfg.Subjects {
// Make sure the subject is valid. Check this first.
Expand All @@ -1496,13 +1496,28 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if _, ok := dset[subj]; ok {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("duplicate subjects detected"))
}
// Check for trying to capture everything.
if subj == fwcs {
if !cfg.NoAck {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires no-ack to be true"))
}
// Capturing everything also will require R1.
if cfg.Replicas != 1 {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("capturing all subjects requires replicas of 1"))
}
}
// Also check to make sure we do not overlap with our $JS API subjects.
if !cfg.NoAck && (subjectIsSubsetMatch(subj, "$JS.>") || subjectIsSubsetMatch(subj, "$JSC.>")) {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with jetstream api"))
// We allow an exception for $JS.EVENT.> since these could have been created in the past.
if !subjectIsSubsetMatch(subj, "$JS.EVENT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with jetstream api require no-ack to be true"))
}
}
// And the $SYS subjects.
if !cfg.NoAck && subjectIsSubsetMatch(subj, "$SYS.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects overlap with system api"))
if !subjectIsSubsetMatch(subj, "$SYS.ACCOUNT.>") {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("subjects that overlap with system api require no-ack to be true"))
}
}
// Mark for duplicate check.
dset[subj] = struct{}{}
Expand Down

0 comments on commit 37f8233

Please sign in to comment.