Skip to content

Commit

Permalink
[ADDED] CleanupPublisher method for removing internal JetStream subsc…
Browse files Browse the repository at this point in the history
…ription (#1690)

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Aug 12, 2024
1 parent af10233 commit c76a9a7
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 12 deletions.
3 changes: 3 additions & 0 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ var (
// of an ordered consumer which was not yet created.
ErrOrderedConsumerNotCreated JetStreamError = &jsError{message: "consumer instance not yet created"}

// ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called.
ErrJetStreamPublisherClosed JetStreamError = &jsError{message: "jetstream context closed"}

// KeyValue Errors

// ErrKeyExists is returned when attempting to create a key that already
Expand Down
46 changes: 46 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ type (
// outstanding asynchronously published messages are acknowledged by the
// server.
PublishAsyncComplete() <-chan struct{}

// CleanupPublisher will cleanup the publishing side of JetStreamContext.
//
// This will unsubscribe from the internal reply subject if needed.
// All pending async publishes will fail with ErrJetStreamContextClosed.
//
// If an error handler was provided, it will be called for each pending async
// publish and PublishAsyncComplete will be closed.
//
// After completing JetStreamContext is still usable - internal subscription
// will be recreated on next publish, but the acks from previous publishes will
// be lost.
CleanupPublisher()
}

// StreamManager provides CRUD API for managing streams. It is available as
Expand Down Expand Up @@ -1032,6 +1045,39 @@ func wrapContextWithoutDeadline(ctx context.Context) (context.Context, context.C
return context.WithTimeout(ctx, defaultAPITimeout)
}

// CleanupPublisher will cleanup the publishing side of JetStreamContext.
//
// This will unsubscribe from the internal reply subject if needed.
// All pending async publishes will fail with ErrJetStreamContextClosed.
//
// If an error handler was provided, it will be called for each pending async
// publish and PublishAsyncComplete will be closed.
//
// After completing JetStreamContext is still usable - internal subscription
// will be recreated on next publish, but the acks from previous publishes will
// be lost.
func (js *jetStream) CleanupPublisher() {
js.cleanupReplySub()
js.publisher.Lock()
errCb := js.publisher.aecb
for id, paf := range js.publisher.acks {
paf.err = ErrJetStreamPublisherClosed
if paf.errCh != nil {
paf.errCh <- paf.err
}
if errCb != nil {
// call error handler after releasing the mutex to avoid contention
defer errCb(js, paf.msg, ErrJetStreamPublisherClosed)
}
delete(js.publisher.acks, id)
}
if js.publisher.doneCh != nil {
close(js.publisher.doneCh)
js.publisher.doneCh = nil
}
js.publisher.Unlock()
}

func (js *jetStream) cleanupReplySub() {
if js.publisher == nil {
return
Expand Down
8 changes: 2 additions & 6 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 The NATS Authors
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -467,11 +467,7 @@ func (js *jetStream) resetPendingAcksOnReconnect() {
paf.errCh <- paf.err
}
if errCb != nil {
js.publisher.Unlock()
// clear reply subject so that new one is created on republish
paf.msg.Reply = ""
errCb(js, paf.msg, nats.ErrDisconnected)
js.publisher.Lock()
defer errCb(js, paf.msg, nats.ErrDisconnected)
}
delete(js.publisher.acks, id)
}
Expand Down
124 changes: 123 additions & 1 deletion jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 The NATS Authors
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -1963,3 +1963,125 @@ func TestConsumerConfigMatches(t *testing.T) {
t.Fatalf("ConsumerConfig doesn't match")
}
}

func TestJetStreamCleanupPublisher(t *testing.T) {

t.Run("cleanup js publisher", func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

// Create a stream
if _, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

numSubs := nc.NumSubscriptions()
if _, err := js.PublishAsync("FOO", []byte("hello")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

if numSubs+1 != nc.NumSubscriptions() {
t.Fatalf("Expected an additional subscription after publish, got %d", nc.NumSubscriptions())
}

js.CleanupPublisher()

if numSubs != nc.NumSubscriptions() {
t.Fatalf("Expected subscriptions to be back to original count")
}
})

t.Run("cleanup js publisher, cancel pending acks", func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

cbErr := make(chan error, 10)
js, err := jetstream.New(nc, jetstream.WithPublishAsyncErrHandler(func(js jetstream.JetStream, m *nats.Msg, err error) {
cbErr <- err
}))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create a stream with NoAck so that we can test that we cancel ack futures.
if _, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}, NoAck: true}); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

numSubs := nc.NumSubscriptions()

var acks []jetstream.PubAckFuture
for i := 0; i < 10; i++ {
ack, err := js.PublishAsync("FOO", []byte("hello"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
acks = append(acks, ack)
}

asyncComplete := js.PublishAsyncComplete()
select {
case <-asyncComplete:
t.Fatalf("Should not complete, NoAck is set")
case <-time.After(200 * time.Millisecond):
}

if numSubs+1 != nc.NumSubscriptions() {
t.Fatalf("Expected an additional subscription after publish, got %d", nc.NumSubscriptions())
}

js.CleanupPublisher()

if numSubs != nc.NumSubscriptions() {
t.Fatalf("Expected subscriptions to be back to original count")
}

// check that PublishAsyncComplete channel is closed
select {
case <-asyncComplete:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// check that all ack futures are cancelled
for _, ack := range acks {
select {
case err := <-ack.Err():
if !errors.Is(err, jetstream.ErrJetStreamPublisherClosed) {
t.Fatalf("Expected JetStreamContextClosed error, got %v", err)
}
case <-ack.Ok():
t.Fatalf("Expected error on the ack future")
case <-time.After(200 * time.Millisecond):
t.Fatalf("Expected an error on the ack future")
}
}

// check that async error handler is called for each pending ack
for i := 0; i < 10; i++ {
select {
case err := <-cbErr:
if !errors.Is(err, jetstream.ErrJetStreamPublisherClosed) {
t.Fatalf("Expected JetStreamContextClosed error, got %v", err)
}
case <-time.After(200 * time.Millisecond):
t.Fatalf("Expected errors to be passed from the async handler")
}
}
})

}
52 changes: 47 additions & 5 deletions js.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2023 The NATS Authors
// Copyright 2020-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -58,6 +58,19 @@ type JetStream interface {
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
PublishAsyncComplete() <-chan struct{}

// CleanupPublisher will cleanup the publishing side of JetStreamContext.
//
// This will unsubscribe from the internal reply subject if needed.
// All pending async publishes will fail with ErrJetStreamPublisherClosed.
//
// If an error handler was provided, it will be called for each pending async
// publish and PublishAsyncComplete will be closed.
//
// After completing JetStreamContext is still usable - internal subscription
// will be recreated on next publish, but the acks from previous publishes will
// be lost.
CleanupPublisher()

// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
Expand Down Expand Up @@ -719,10 +732,7 @@ func (js *js) resetPendingAcksOnReconnect() {
paf.errCh <- paf.err
}
if errCb != nil {
// clear reply subject so that new one is created on republish
js.mu.Unlock()
errCb(js, paf.msg, ErrDisconnected)
js.mu.Lock()
defer errCb(js, paf.msg, ErrDisconnected)
}
delete(js.pafs, id)
}
Expand All @@ -734,6 +744,38 @@ func (js *js) resetPendingAcksOnReconnect() {
}
}

// CleanupPublisher will cleanup the publishing side of JetStreamContext.
//
// This will unsubscribe from the internal reply subject if needed.
// All pending async publishes will fail with ErrJetStreamContextClosed.
//
// If an error handler was provided, it will be called for each pending async
// publish and PublishAsyncComplete will be closed.
//
// After completing JetStreamContext is still usable - internal subscription
// will be recreated on next publish, but the acks from previous publishes will
// be lost.
func (js *js) CleanupPublisher() {
js.cleanupReplySub()
js.mu.Lock()
errCb := js.opts.aecb
for id, paf := range js.pafs {
paf.err = ErrJetStreamPublisherClosed
if paf.errCh != nil {
paf.errCh <- paf.err
}
if errCb != nil {
defer errCb(js, paf.msg, ErrJetStreamPublisherClosed)
}
delete(js.pafs, id)
}
if js.dch != nil {
close(js.dch)
js.dch = nil
}
js.mu.Unlock()
}

func (js *js) cleanupReplySub() {
js.mu.Lock()
if js.rsub != nil {
Expand Down
3 changes: 3 additions & 0 deletions jserrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ var (
// ErrSubscriptionClosed is returned when attempting to send pull request to a closed subscription
ErrSubscriptionClosed JetStreamError = &jsError{message: "subscription closed"}

// ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called.
ErrJetStreamPublisherClosed JetStreamError = &jsError{message: "jetstream context closed"}

// Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
Expand Down
Loading

0 comments on commit c76a9a7

Please sign in to comment.