Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #4592 to 2.9 #4651

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16773,6 +16773,8 @@ func TestJetStreamWorkQueueSourceRestart(t *testing.T) {
sub, err := js.PullSubscribe("foo", "dur", nats.BindStream("TEST"))
require_NoError(t, err)

time.Sleep(100 * time.Millisecond)

ci, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_True(t, ci.NumPending == uint64(sent))
Expand Down
3 changes: 1 addition & 2 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,6 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
msg := fmt.Sprintf("R-MSG-%d", i+1)
for _, sname := range []string{"foo", "bar", "baz"} {
m := nats.NewMsg(sname)
m.Header.Set(nats.MsgIdHdr, sname+"-"+msg)
m.Data = []byte(msg)
if _, err := js.PublishMsg(m); err != nil {
t.Errorf("Unexpected publish error: %v", err)
Expand All @@ -1890,7 +1889,7 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
sc.clusterForName("C3").waitOnStreamLeader("$G", "MS2")
<-doneCh

checkFor(t, 15*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 15*time.Second, time.Second, func() error {
si, err := js2.StreamInfo("MS2")
if err != nil {
return err
Expand Down
62 changes: 44 additions & 18 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ type stream struct {
mirror *sourceInfo

// Sources
sources map[string]*sourceInfo
sources map[string]*sourceInfo
sourcesConsumerSetup *time.Timer

// Indicates we have direct consumers.
directs int
Expand Down Expand Up @@ -681,6 +682,11 @@ func (mset *stream) setLeader(isLeader bool) error {
return err
}
} else {
// cancel timer to create the source consumers if not fired yet
if mset.sourcesConsumerSetup != nil {
mset.sourcesConsumerSetup.Stop()
mset.sourcesConsumerSetup = nil
}
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
Expand Down Expand Up @@ -2211,7 +2217,7 @@ func (mset *stream) scheduleSetupMirrorConsumerRetryAsap() {
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
time.AfterFunc(next, func() {
mset.mu.Lock()
mset.setupMirrorConsumer()
Expand Down Expand Up @@ -2530,7 +2536,7 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
mset.scheduleSetSourceConsumerRetry(si.iname, seq, next, startTime)
}

Expand Down Expand Up @@ -3025,15 +3031,8 @@ func (mset *stream) setStartingSequenceForSource(sname string) {
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}
// Always reset here.
// Resets the SourceInfo for all the sources
func (mset *stream) resetSourceInfo() {
mset.sources = make(map[string]*sourceInfo)

for _, ssi := range mset.cfg.Sources {
Expand All @@ -3043,6 +3042,20 @@ func (mset *stream) startingSequenceForSources() {
si := &sourceInfo{name: ssi.Name, iname: ssi.iname}
mset.sources[ssi.iname] = si
}
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}

// Always reset here.
mset.resetSourceInfo()

var state StreamState
mset.store.FastState(&state)
Expand Down Expand Up @@ -3113,6 +3126,11 @@ func (mset *stream) setupSourceConsumers() error {
}
}

// If we are no longer the leader, give up
if !mset.isLeader() {
return nil
}

mset.startingSequenceForSources()

// Setup our consumers at the proper starting position.
Expand All @@ -3138,13 +3156,21 @@ func (mset *stream) subscribeToStream() error {
}
// Check if we need to setup mirroring.
if mset.cfg.Mirror != nil {
if err := mset.setupMirrorConsumer(); err != nil {
return err
}
// setup the initial mirror sourceInfo
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name}

// delay the actual mirror consumer creation for after a delay
mset.scheduleSetupMirrorConsumerRetryAsap()
} else if len(mset.cfg.Sources) > 0 {
if err := mset.setupSourceConsumers(); err != nil {
return err
}
// Setup the initial source infos for the sources
mset.resetSourceInfo()
// Delay the actual source consumer(s) creation(s) for after a delay

mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(10*time.Millisecond)))+10*time.Millisecond, func() {
mset.mu.Lock()
mset.setupSourceConsumers()
mset.mu.Unlock()
})
}
// Check for direct get access.
// We spin up followers for clustered streams in monitorStream().
Expand Down