Skip to content

Commit

Permalink
[ADDED] ReplaceDurable option
Browse files Browse the repository at this point in the history
When the subscription request for a durable subscription times out
or fail on the client side, but it was accepted in the server, then
if the application tries to restart the subscription request again
it will fail with a "duplicate durable subscription" error until
the connection is closed.

This new option allows the user to decide how the server should behave
when processing a duplicate durable subscription. If disabled, the
default, it behaves as described above, that is, it will reject
the second subscription request and return the "duplicate durable"
error.
If enabled, if the server detects that this is a duplicate, it will
close the active one and accept the new one. It is a suspend followed
by a resume.

From the client perspective, if this is done in the context of #1135,
then everything works well since the original subscription in the
client was actually not started due to subscription request failure.
However, if user try to create multiple duplicate durable subscriptions
for subscription requests (Subscribe() calls) that did not fail, then
their application will not be notified that the subscriptions that are
being replaced are replaced, but they will simply stop receiving messages
on those.

Resolves #1135

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Jan 5, 2021
1 parent 4ddbef0 commit be195d3
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 0 deletions.
50 changes: 50 additions & 0 deletions server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8031,3 +8031,53 @@ func TestClusteringAddRemoveClusterNodesWithBootstrap(t *testing.T) {
}
}
}

func TestClusteringDurableReplaced(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.ReplaceDurable = true
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Wait for it to bootstrap
getLeader(t, 10*time.Second, s1)

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2sOpts.ReplaceDurable = true
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

// Configure third server.
s3sOpts := getTestDefaultOptsForClustering("c", false)
s3sOpts.ReplaceDurable = true
s3 := runServerWithOpts(t, s3sOpts, nil)
defer s3.Shutdown()

testDurableReplaced(t, s1)

s1.Shutdown()
newLeader := getLeader(t, 2*time.Second, s2, s3)
c, err := newLeader.lookupOrCreateChannel("foo")
if err != nil {
t.Fatalf("Error looking up channel: %v", err)
}
if subs := c.ss.getAllSubs(); len(subs) != 0 {
t.Fatalf("Expected 0 sub, got %v", len(subs))
}
c.ss.RLock()
lenDur := len(c.ss.durables)
c.ss.RUnlock()
if lenDur != 1 {
t.Fatalf("Expected 1 durable, got %v", lenDur)
}
}
5 changes: 5 additions & 0 deletions server/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ func ProcessConfigFile(configFile string, opts *Options) error {
return err
}
opts.NKeySeedFile = v.(string)
case "replace_durable", "replace_durables", "replace_duplicate_durable", "replace_duplicate_durables":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
}
opts.ReplaceDurable = v.(bool)
}
}
return nil
Expand Down
1 change: 1 addition & 0 deletions server/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ func TestParseWrongTypes(t *testing.T) {
expectFailureFor(t, "ft_group: 123", wrongTypeErr)
expectFailureFor(t, "partitioning: 123", wrongTypeErr)
expectFailureFor(t, "syslog_name: 123", wrongTypeErr)
expectFailureFor(t, "replace_durable: 123", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_channels:false}", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_msgs:false}", wrongTypeErr)
expectFailureFor(t, "store_limits:{max_bytes:false}", wrongTypeErr)
Expand Down
33 changes: 33 additions & 0 deletions server/partitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,3 +1027,36 @@ func TestPartitionsCleanInvalidConns(t *testing.T) {
t.Fatalf("Should not be more than %v, got %v", maxKnownInvalidConns, mlen)
}
}

func TestPartitionsDurableReplaced(t *testing.T) {
setPartitionsVarsForTest()
defer resetDefaultPartitionsVars()

clientCheckTimeout = 150 * time.Millisecond
defer func() { clientCheckTimeout = defaultClientCheckTimeout }()

// For this test, create a single NATS server to which both servers connect to.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

fooSubj := "foo"
barSubj := "bar"

opts1 := GetDefaultOptions()
opts1.NATSServerURL = "nats://127.0.0.1:4222"
opts1.Partitioning = true
opts1.ReplaceDurable = true
opts1.StoreLimits.AddPerChannel(fooSubj, &stores.ChannelLimits{})
s1 := runServerWithOpts(t, opts1, nil)
defer s1.Shutdown()

opts2 := GetDefaultOptions()
opts2.NATSServerURL = "nats://127.0.0.1:4222"
opts2.Partitioning = true
opts2.ReplaceDurable = true
opts2.StoreLimits.AddPerChannel(barSubj, &stores.ChannelLimits{})
s2 := runServerWithOpts(t, opts2, nil)
defer s2.Shutdown()

testDurableReplaced(t, s1)
}
47 changes: 47 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ type Options struct {
EncryptionKey []byte // Encryption key. The environment NATS_STREAMING_ENCRYPTION_KEY takes precedence and is the preferred way to provide the key.
Clustering ClusteringOptions
NATSClientOpts []nats.Option
ReplaceDurable bool // If true, the subscription request for a durable subscription will replace the current durable instead of failing with duplicate durable error.
}

// Clone returns a deep copy of the Options object.
Expand Down Expand Up @@ -5109,6 +5110,11 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
// until we are done with this subscription. This will also stop
// the delete timer if one was set.
c, preventDelete, err := s.lookupOrCreateChannelPreventDelete(sr.Subject)
// For durable subscriptions, and if allowed, if this is going to be a
// duplicate, close the current durable and accept the new one.
if err == nil && sr.DurableName != "" && sr.QGroup == "" && s.opts.ReplaceDurable {
err = s.closeDurableIfDuplicate(c, sr)
}
if err == nil {
// If clustered, thread operations through Raft.
if s.isClustered {
Expand Down Expand Up @@ -5173,6 +5179,47 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
s.subStartCh <- &subStartInfo{c: c, sub: sub, qs: qs, isDurable: sub.IsDurable}
}

// This will close (and replicate the close operation if running in cluster mode)
// the current durable subscription matching this subscription request information.
// This should be invoked only if ReplaceDurable option is enabled.
// It is used in case users want to be able to "resend" a subscription request
// if the original request failed, due to timeout for instance. It could be that
// the server accepted the original, sent the response back but the client library
// gave up on it due to its own timeout. In that case, trying to issue the same
// subscription request would lead to a "duplicate durable" error and the only choice
// would be to close the connection.
func (s *StanServer) closeDurableIfDuplicate(c *channel, sr *pb.SubscriptionRequest) error {
var duplicate bool
var ackInbox string
ss := c.ss
ss.RLock()
sub := ss.durables[durableKey(sr)]
if sub != nil {
sub.RLock()
duplicate = sub.ClientID != ""
ackInbox = sub.AckInbox
sub.RUnlock()
}
ss.RUnlock()
if !duplicate {
return nil
}
creq := &pb.UnsubscribeRequest{
ClientID: sr.ClientID,
Subject: sr.Subject,
Inbox: ackInbox,
}
var err error
if s.isClustered {
err = s.replicateCloseSubscription(creq)
} else {
s.closeMu.Lock()
err = s.unsubscribe(creq, true)
s.closeMu.Unlock()
}
return err
}

type subStateTraceCtx struct {
clientID string
isRemove bool
Expand Down
51 changes: 51 additions & 0 deletions server/server_durable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -918,3 +919,53 @@ func TestPersistentStoreDurableClosedStatusOnRestart(t *testing.T) {
// Restart one last time
dur = restartDurable()
}

func TestDurableReplaced(t *testing.T) {
opts := GetDefaultOptions()
opts.ReplaceDurable = true
s := runServerWithOpts(t, opts, nil)
defer s.Shutdown()

testDurableReplaced(t, s)
}

func testDurableReplaced(t *testing.T, s *StanServer) {
sc := NewDefaultConnection(t)
defer sc.Close()

count := int32(0)
cb := func(_ *stan.Msg) {
atomic.AddInt32(&count, 1)
}
dur1, err := sc.Subscribe("foo", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Unable to start durable: %v", err)
}

dur2, err := sc.Subscribe("foo", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Unable to start durable: %v", err)
}

sc.Publish("foo", []byte("msg"))
time.Sleep(150 * time.Millisecond)

if c := atomic.LoadInt32(&count); c != 1 {
t.Fatalf("Received %v messages!", c)
}

c, err := s.lookupOrCreateChannel("foo")
if err != nil {
t.Fatalf("Error on lookup: %v", err)
}
if subs := c.ss.getAllSubs(); len(subs) > 1 {
t.Fatalf("Should have only 1 sub, got %v", len(subs))
}

if err := dur1.Close(); err == nil {
t.Fatal("Expected error on close of dur1, but did not get one")
}
if err := dur2.Close(); err != nil {
t.Fatalf("Error on close: %v", err)
}
}

0 comments on commit be195d3

Please sign in to comment.