Skip to content

Commit

Permalink
Create consumer on OrderedConsumer, add missing tests (#1317)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Jun 23, 2023
1 parent 3003ca6 commit 7c2bded
Show file tree
Hide file tree
Showing 7 changed files with 456 additions and 12 deletions.
4 changes: 4 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord
if cfg.OptStartSeq != 0 {
oc.cursor.streamSeq = cfg.OptStartSeq - 1
}
err := oc.reset()
if err != nil {
return nil, err
}

return oc, nil
}
Expand Down
7 changes: 5 additions & 2 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ var errOrderedSequenceMismatch = errors.New("sequence mismatch")
// Consume can be used to continuously receive messages and handle them with the provided callback function
func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) {
if c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume && c.currentConsumer == nil {
c.consumerType = consumerTypeConsume
err := c.reset()
if err != nil {
return nil, err
Expand All @@ -78,6 +77,7 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
if c.consumerType == consumerTypeFetch {
return nil, ErrOrderConsumerUsedAsFetch
}
c.consumerType = consumerTypeConsume
consumeOpts, err := parseConsumeOpts(opts...)
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
Expand Down Expand Up @@ -156,7 +156,6 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err
// Messages returns [MessagesContext], allowing continuously iterating over messages on a stream.
func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error) {
if c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume && c.currentConsumer == nil {
c.consumerType = consumerTypeConsume
err := c.reset()
if err != nil {
return nil, err
Expand All @@ -167,6 +166,7 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
if c.consumerType == consumerTypeFetch {
return nil, ErrOrderConsumerUsedAsFetch
}
c.consumerType = consumerTypeConsume
consumeOpts, err := parseMessagesOpts(opts...)
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
Expand Down Expand Up @@ -236,12 +236,15 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro
if c.consumerType == consumerTypeConsume {
return nil, ErrOrderConsumerUsedAsConsume
}
c.currentConsumer.Lock()
if c.runningFetch != nil {
if !c.runningFetch.done {
c.currentConsumer.Unlock()
return nil, ErrOrderedConsumerConcurrentRequests
}
c.cursor.streamSeq = c.runningFetch.sseq
}
c.currentConsumer.Unlock()
c.consumerType = consumerTypeFetch
err := c.reset()
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,9 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
defer close(res.msgs)
for {
if receivedMsgs == req.Batch || (req.MaxBytes != 0 && receivedBytes == req.MaxBytes) {
p.Lock()
res.done = true
p.Unlock()
return
}
select {
Expand Down
4 changes: 4 additions & 0 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig)
if cfg.OptStartSeq != 0 {
oc.cursor.streamSeq = cfg.OptStartSeq - 1
}
err := oc.reset()
if err != nil {
return nil, err
}

return oc, nil
}
Expand Down
16 changes: 16 additions & 0 deletions jetstream/test/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,19 @@ func restartBasicJSServer(t *testing.T, s *server.Server) *server.Server {
s.WaitForShutdown()
return RunServerWithOptions(opts)
}

func checkFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
t.Helper()
timeout := time.Now().Add(totalWait)
var err error
for time.Now().Before(timeout) {
err = f()
if err == nil {
return
}
time.Sleep(sleepDur)
}
if err != nil {
t.Fatal(err.Error())
}
}
148 changes: 148 additions & 0 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -262,6 +263,153 @@ func TestCreateStream(t *testing.T) {
}
}

func TestCreateStreamMirrorCrossDomains(t *testing.T) {
test := []struct {
name string
streamConfig *jetstream.StreamConfig
}{
{
name: "create stream mirror cross domains",
streamConfig: &jetstream.StreamConfig{
Name: "MIRROR",
Mirror: &jetstream.StreamSource{
Name: "TEST",
Domain: "HUB",
},
},
},
{
name: "create stream with source cross domains",
streamConfig: &jetstream.StreamConfig{
Name: "MIRROR",
Sources: []*jetstream.StreamSource{
{
Name: "TEST",
Domain: "HUB",
},
},
},
},
}

for _, test := range test {
t.Run(test.name, func(t *testing.T) {
conf := createConfFile(t, []byte(`
server_name: HUB
listen: 127.0.0.1:-1
jetstream: { domain: HUB }
leafnodes { listen: 127.0.0.1:7422 }
}`))
defer os.Remove(conf)
srv, _ := RunServerWithConfig(conf)
defer shutdownJSServerAndRemoveStorage(t, srv)

lconf := createConfFile(t, []byte(`
server_name: LEAF
listen: 127.0.0.1:-1
jetstream: { domain:LEAF }
leafnodes {
remotes = [ { url: "leaf://127.0.0.1" } ]
}
}`))
defer os.Remove(lconf)
ln, _ := RunServerWithConfig(lconf)
defer shutdownJSServerAndRemoveStorage(t, ln)

nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish(ctx, "foo", []byte("msg1")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish(ctx, "foo", []byte("msg2")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}

lnc, err := nats.Connect(ln.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer lnc.Close()
ljs, err := jetstream.New(lnc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

ccfg := *test.streamConfig
_, err = ljs.CreateStream(ctx, ccfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(test.streamConfig, &ccfg) {
t.Fatalf("Did not expect config to be altered: %+v vs %+v", test.streamConfig, ccfg)
}

// Make sure we sync.
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
lStream, err := ljs.Stream(ctx, "MIRROR")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if lStream.CachedInfo().State.Msgs == 2 {
return nil
}
return fmt.Errorf("Did not get synced messages: %d", lStream.CachedInfo().State.Msgs)
})
if _, err := ljs.Publish(ctx, "foo", []byte("msg3")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
lStream, err := ljs.Stream(ctx, "MIRROR")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if lStream.CachedInfo().State.Msgs != 3 {
t.Fatalf("Expected 3 msgs in stream; got: %d", lStream.CachedInfo().State.Msgs)
}

rjs, err := jetstream.NewWithDomain(lnc, "HUB")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = rjs.Stream(ctx, "TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := rjs.Publish(ctx, "foo", []byte("msg4")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rStream, err := rjs.Stream(ctx, "TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if rStream.CachedInfo().State.Msgs != 4 {
t.Fatalf("Expected 3 msgs in stream; got: %d", rStream.CachedInfo().State.Msgs)
}
})
}
}

func TestUpdateStream(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading

0 comments on commit 7c2bded

Please sign in to comment.