Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic JetStream backend. #143

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Current implementations and their status
| Nats streaming | beta |
| Proximo | alpha |
| Freezer | alpha |
| JetStreams | alpha |

Additional resources
----------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion freezer/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestAll(t *testing.T) {

defer k.Kill()

testshared.TestAll(t, k)
testshared.TestAll(t, k, false)
}

type testServer struct {
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ require (
github.com/gofrs/uuid v3.2.0+incompatible
github.com/google/uuid v1.1.1
github.com/hashicorp/go-multierror v1.0.0
github.com/nats-io/jwt v0.3.0 // indirect
github.com/nats-io/nats-streaming-server v0.16.2
github.com/nats-io/nats.go v1.9.0
github.com/nats-io/nats.go v1.11.0
github.com/nats-io/stan.go v0.5.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.2.1
Expand All @@ -20,6 +21,8 @@ require (
github.com/uw-labs/proximo v0.0.0-20190913093050-8229af78f5dd
github.com/uw-labs/straw v0.0.0-20200213162553-01e9a0f94f69
github.com/uw-labs/sync v0.0.0-20190307114256-1bb306bf6e71
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.27.1 // indirect
)
63 changes: 23 additions & 40 deletions go.sum

Large diffs are not rendered by default.

84 changes: 81 additions & 3 deletions internal/testshared/testshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ type TestServer interface {

// TestAll is the main entrypoint from the backend implmenentation tests to
// call, and will run each test as a sub-test.
func TestAll(t *testing.T, ts TestServer) {
func TestAll(t *testing.T, ts TestServer, testDurable bool) {
t.Helper()

for _, x := range []func(t *testing.T, ts TestServer){
tests := []func(t *testing.T, ts TestServer){
testOnePublisherOneMessageOneConsumer,
testOnePublisherOneMessageTwoConsumers,
testPublisherShouldNotBlock,
Expand All @@ -42,7 +42,12 @@ func TestAll(t *testing.T, ts TestServer) {
testConsumeStatusFail,
testPublishMultipleMessagesOneConsumer,
testOnePublisherOneConsumerConsumeWithoutAckingDiscardedPayload,
} {
}
if testDurable {
tests = append(tests, testDurableConsumer)
}

for _, x := range tests {
f := func(t *testing.T) {
x(t, ts)
}
Expand Down Expand Up @@ -539,6 +544,79 @@ func testOnePublisherOneConsumerConsumeWithoutAckingDiscardedPayload(t *testing.
}
}

func testDurableConsumer(t *testing.T, ts TestServer) {
assert := assert.New(t)

topic := generateID()
consumerID := generateID()

cons := ts.NewConsumer(topic, consumerID)
prod := ts.NewProducer(topic)

consCtx, consCancel := context.WithCancel(context.Background())
consMsgs := make(chan substrate.Message)
consAcks := make(chan substrate.Message)
consErrs := make(chan error, 1)
go func() {
consErrs <- cons.ConsumeMessages(consCtx, consMsgs, consAcks)
consCancel()
}()

prodCtx, prodCancel := context.WithCancel(context.Background())

prodMsgs := make(chan substrate.Message, 1024)
prodAcks := make(chan substrate.Message, 1024)
prodErrs := make(chan error, 1)
go func() {
prodErrs <- prod.PublishMessages(prodCtx, prodAcks, prodMsgs)
prodCancel()
}()

// Publish 30 messages.
for i := 0; i < 30; i++ {
messageText := fmt.Sprintf("messageText-%d", i)
m := testMessage(messageText)
produceAndCheckAck(prodCtx, t, prodMsgs, prodAcks, &m)
}

// Consume 15 message and close connection.
for i := 0; i < 15; i++ {
messageText := fmt.Sprintf("messageText-%d", i)
msgStr := consumeAndAck(consCtx, t, consMsgs, consAcks)
assert.Equal(messageText, msgStr)
}
consCancel()
if err := <-consErrs; err != context.Canceled {
t.Errorf("unexpected error from consume : %s", err)
}
assert.NoError(cons.Close())

// Read the remaining 15 messages to check that subscription is durable.
cons = ts.NewConsumer(topic, consumerID)
consCtx, consCancel = context.WithCancel(context.Background())
consMsgs = make(chan substrate.Message)
consAcks = make(chan substrate.Message)
consErrs = make(chan error, 1)
go func() {
consErrs <- cons.ConsumeMessages(consCtx, consMsgs, consAcks)
consCancel()
}()

for i := 15; i < 30; i++ {
messageText := fmt.Sprintf("messageText-%d", i)
msgStr := consumeAndAck(consCtx, t, consMsgs, consAcks)
assert.Equal(messageText, msgStr)
}
consCancel()
if err := <-consErrs; err != context.Canceled {
t.Errorf("unexpected error from consume : %s", err)
}
prodCancel()
if err := <-prodErrs; err != context.Canceled {
t.Errorf("unexpected error from produce : %s", err)
}
}

// Helper functions below here

func connectSendmessageAndClose(t *testing.T, ts TestServer, topic string, messageText string, msgID string) {
Expand Down
211 changes: 211 additions & 0 deletions jetstream/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package jetstream

import (
"bytes"
"context"
"fmt"
"os/exec"
"testing"
"time"

"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"

"github.com/uw-labs/substrate"
"github.com/uw-labs/substrate/internal/testshared"
)

func TestAll(t *testing.T) {
k, err := runServer()
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
k.Kill()
})
t.Run("partitioned", testPartitioned(k))

testshared.TestAll(t, &testServer{}, true)
}

func testPartitioned(k *testServer) func(*testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

k.ensureTopic("PARTITIONED", "PARTITIONED.*")
p, err := NewAsyncMessageSink(AsyncMessageSinkConfig{
URL: "http://0.0.0.0:4222",
Topic: "PARTITIONED",
Partitioned: true,
})
assert.NoError(t, err)
s := substrate.NewSynchronousMessageSink(p)
defer func() {
assert.NoError(t, s.Close())
}()
assert.NoError(t, s.PublishMessage(ctx, &testMsg{
key: []byte("v1"),
data: []byte("v1-started"),
}))
assert.NoError(t, s.PublishMessage(ctx, &testMsg{
key: []byte("v2"),
data: []byte("v2-started"),
}))

c, err := NewAsyncMessageSource(AsyncMessageSourceConfig{
URL: "http://0.0.0.0:4222",
Topic: "PARTITIONED",
ConsumerGroup: "consumer-1",
Offset: OffsetOldest,
Partition: "*",
})
assert.NoError(t, err)
sc := substrate.NewSynchronousMessageSource(c)
defer sc.Close()

payloads := make([]string, 0, 2)
expErr := fmt.Errorf("success")
assert.Equal(t, expErr, sc.ConsumeMessages(ctx, func(ctx context.Context, m substrate.Message) error {
payloads = append(payloads, string(m.Data()))
if len(payloads) == 2 {
return expErr
}
return nil
}))
assert.Equal(t, []string{"v1-started", "v2-started"}, payloads)
}
}

type testServer struct {
containerName string
}

func (ks *testServer) NewConsumer(topic string, groupID string) substrate.AsyncMessageSource {
ks.ensureTopic(topic)
source, err := NewAsyncMessageSource(AsyncMessageSourceConfig{
URL: "http://0.0.0.0:4222",
Topic: topic,
ConsumerGroup: groupID,
Offset: OffsetOldest,
AckWait: time.Second,
})
if err != nil {
panic(err)
}
return source
}

func (ks *testServer) NewProducer(topic string) substrate.AsyncMessageSink {
ks.ensureTopic(topic)
conf := AsyncMessageSinkConfig{
URL: "http://0.0.0.0:4222",
Topic: topic,
}

sink, err := NewAsyncMessageSink(conf)
if err != nil {
panic(err)
}
return sink
}

func (ks *testServer) ensureTopic(topic string, subjects ...string) {
conn, err := nats.Connect("http://0.0.0.0:4222")
if err != nil {
panic(err)
}
defer conn.Close()

js, err := conn.JetStream()
if err != nil {
panic(err)
}
if _, err := js.AddStream(&nats.StreamConfig{
Name: topic,
Subjects: subjects,
}); err != nil {
panic(err)
}
}

func (ks *testServer) TestEnd() {}

func (ks *testServer) Kill() error {
cmd := exec.Command("docker", "rm", "-f", ks.containerName)

out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("error removing container: %s", out)
}

return nil
}

func runServer() (*testServer, error) {
containerName := uuid.New().String()

cmd := exec.CommandContext(
context.Background(),
"docker",
"run",
"-d",
"--rm",
"--name", containerName,
"-p", "4222:4222",
"nats",
"-js",
)
if err := cmd.Run(); err != nil {
return nil, err
}

// wait for container to start up
loop:
for {
portCmd := exec.Command("docker", "port", containerName, "4222/tcp")

out, err := portCmd.CombinedOutput()
switch {
case err == nil:
break loop
case bytes.Contains(out, []byte("No such container:")):
// Still starting up. Wait a while.
time.Sleep(time.Millisecond * 100)
default:
return nil, err
}
}

ks := &testServer{containerName}
for {
if err := ks.canConsume(); err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return ks, nil
}

func (ks *testServer) canConsume() error {
conn, err := nats.Connect("http://0.0.0.0:4222")
if err != nil {
return err
}
conn.Close()
return nil
}

type testMsg struct {
key []byte
data []byte
}

func (msg *testMsg) Key() []byte {
return msg.key
}

func (msg *testMsg) Data() []byte {
return msg.data
}
29 changes: 29 additions & 0 deletions jetstream/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package jetstream

import (
"fmt"

"github.com/nats-io/nats.go"

"github.com/uw-labs/substrate"
)

func natsStatus(nc *nats.Conn) (*substrate.Status, error) {
if nc == nil {
return &substrate.Status{
Problems: []string{"no nats connection"},
Working: false,
}, nil
}

if nc.IsConnected() {
return &substrate.Status{
Working: true,
}, nil
}

return &substrate.Status{
Problems: []string{fmt.Sprintf("nats not connected - last error: %v", nc.LastError())},
Working: false,
}, nil
}
Loading