Skip to content

Commit

Permalink
[IMPROVED] Avoid stream locks for certain operations that could block…
Browse files Browse the repository at this point in the history
… routes/gateways. (#4933)

Avoid needing the lock for mset subscriptions to avoid possibly blocking
routes/gateways.
Also increase timeout for consumer creation for sources since might take
a bit longer.

 Signed-off-by: Derek Collison <[email protected]>

---------

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Jan 9, 2024
1 parent faed593 commit ddb262a
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 102 deletions.
14 changes: 7 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 The NATS Authors
// Copyright 2019-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -707,16 +707,16 @@ func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
}

func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname string, ca *consumerAssignment, isRecovering bool, action ConsumerAction) (*consumer, error) {
mset.mu.RLock()
s, jsa, tierName, cfg, acc, closed := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc, mset.closed
retention := cfg.Retention
mset.mu.RUnlock()

// Check if this stream has closed.
if closed {
if mset.closed.Load() {
return nil, NewJSStreamInvalidError()
}

mset.mu.RLock()
s, jsa, tierName, cfg, acc := mset.srv, mset.jsa, mset.tier, mset.cfg, mset.acc
retention := cfg.Retention
mset.mu.RUnlock()

// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
// This can happen on startup with restored state where on meta replay we still do not have
// the assignment. Running in single server mode this always returns true.
Expand Down
2 changes: 1 addition & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 The NATS Authors
// Copyright 2019-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 The NATS Authors
// Copyright 2019-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3717,7 +3717,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *

// We will place sequence number and size of chunk sent in the reply.
ackSubj := fmt.Sprintf(jsSnapshotAckT, mset.name(), nuid.Next())
ackSub, _ := mset.subscribeInternalUnlocked(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
ackSub, _ := mset.subscribeInternal(ackSubj+".>", func(_ *subscription, _ *client, _ *Account, subject, _ string, _ []byte) {
cs, _ := strconv.Atoi(tokenAt(subject, 6))
// This is very crude and simple, but ok for now.
// This only matters when sending multiple chunks.
Expand All @@ -3728,7 +3728,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
}
}
})
defer mset.unsubscribeUnlocked(ackSub)
defer mset.unsubscribe(ackSub)

// TODO(dlc) - Add in NATS-Chunked-Sequence header

Expand Down
14 changes: 4 additions & 10 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7748,21 +7748,15 @@ func (mset *stream) hasCatchupPeers() bool {
}

func (mset *stream) setCatchingUp() {
mset.mu.Lock()
mset.catchup = true
mset.mu.Unlock()
mset.catchup.Store(true)
}

func (mset *stream) clearCatchingUp() {
mset.mu.Lock()
mset.catchup = false
mset.mu.Unlock()
mset.catchup.Store(false)
}

func (mset *stream) isCatchingUp() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.catchup
return mset.catchup.Load()
}

// Determine if a non-leader is current.
Expand All @@ -7771,7 +7765,7 @@ func (mset *stream) isCurrent() bool {
if mset.node == nil {
return true
}
return mset.node.Current() && !mset.catchup
return mset.node.Current() && !mset.catchup.Load()
}

// Maximum requests for the whole server that can be in flight at the same time.
Expand Down
2 changes: 1 addition & 1 deletion server/memstore_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 The NATS Authors
// Copyright 2019-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion server/norace_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2023 The NATS Authors
// Copyright 2018-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion server/store.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2023 The NATS Authors
// Copyright 2019-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
Loading

0 comments on commit ddb262a

Please sign in to comment.