diff --git a/.github/infrastructure/docker-compose-natsstreaming.yml b/.github/infrastructure/docker-compose-natsstreaming.yml deleted file mode 100644 index 52250a567b..0000000000 --- a/.github/infrastructure/docker-compose-natsstreaming.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: '2' -services: - natsstreaming: - image: nats-streaming:latest - ports: - - "4222:4222" - - "8222:8222" \ No newline at end of file diff --git a/.github/scripts/test-info.mjs b/.github/scripts/test-info.mjs index e9b5bb129c..4b1268e1d8 100644 --- a/.github/scripts/test-info.mjs +++ b/.github/scripts/test-info.mjs @@ -414,10 +414,6 @@ const components = { conformanceSetup: 'docker-compose.sh vernemq', sourcePkg: ['pubsub/mqtt3'], }, - 'pubsub.natsstreaming': { - conformance: true, - conformanceSetup: 'docker-compose.sh natsstreaming', - }, 'pubsub.pulsar': { conformance: true, certification: true, diff --git a/go.mod b/go.mod index 8c08134464..3d06eecd96 100644 --- a/go.mod +++ b/go.mod @@ -86,7 +86,6 @@ require ( github.com/nats-io/nats-server/v2 v2.9.21 github.com/nats-io/nats.go v1.28.0 github.com/nats-io/nkeys v0.4.4 - github.com/nats-io/stan.go v0.10.4 github.com/open-policy-agent/opa v0.55.0 github.com/oracle/oci-go-sdk/v54 v54.0.0 github.com/pashagolub/pgxmock/v2 v2.11.0 @@ -307,7 +306,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible // indirect github.com/nats-io/jwt/v2 v2.4.1 // indirect - github.com/nats-io/nats-streaming-server v0.25.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/oleiade/lane v1.0.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect diff --git a/go.sum b/go.sum index ee8ee19b0c..1f003cc5c4 100644 --- a/go.sum +++ b/go.sum @@ -1229,7 +1229,6 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-kms-wrapping/entropy v0.1.0/go.mod h1:d1g9WGtAunDNpek8jUIEJnBlbgKS1N2Q61QkHiZyR1g= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= -github.com/hashicorp/go-msgpack/v2 v2.1.0 h1:J2g2hMyjSefUPTnkLRU2MnsLLsPRB1n4Z/wJRN07GuA= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= @@ -1275,7 +1274,6 @@ github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM= github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0= -github.com/hashicorp/raft v1.5.0 h1:uNs9EfJ4FwiArZRxxfd/dQ5d33nV31/CdCHArH89hT8= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY= github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4= @@ -1565,21 +1563,15 @@ github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+v github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk= github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= -github.com/nats-io/nats-streaming-server v0.25.5 h1:DX6xaPhKvVLhdpNsuEmmD+O9LfWSnw8cvxQU/H9LRy8= -github.com/nats-io/nats-streaming-server v0.25.5/go.mod h1:dSBVdHGsT/tV91lT4MWFfE6+yjRCNhRIYJpBaTHFdAo= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= -github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k= github.com/niean/gotools v0.0.0-20151221085310-ff3f51fc5c60/go.mod h1:gH2bvE9/eX49hWK7CwwL/+/y+dodduyxs5cTpBzF5v0= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= @@ -1973,7 +1965,6 @@ github.com/zouyx/agollo/v3 v3.4.5/go.mod h1:LJr3kDmm23QSW+F1Ol4TMHDa7HvJvscMdVxJ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= -go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= @@ -2103,7 +2094,6 @@ golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -2116,7 +2106,6 @@ golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= -golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= diff --git a/pubsub/natsstreaming/metadata.go b/pubsub/natsstreaming/metadata.go deleted file mode 100644 index af3825f736..0000000000 --- a/pubsub/natsstreaming/metadata.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package natsstreaming - -import ( - "time" - - "github.com/dapr/components-contrib/pubsub" -) - -type natsMetadata struct { - NatsURL string - NatsStreamingClusterID string - SubscriptionType string - NatsQueueGroupName string `mapstructure:"consumerId"` - DurableSubscriptionName string - StartAtSequence *uint64 - StartWithLastReceived string - DeliverNew string - DeliverAll string - StartAtTimeDelta time.Duration - StartAtTime string - StartAtTimeFormat string - AckWaitTime time.Duration - MaxInFlight *uint64 - ConcurrencyMode pubsub.ConcurrencyMode -} diff --git a/pubsub/natsstreaming/natsstreaming.go b/pubsub/natsstreaming/natsstreaming.go deleted file mode 100644 index 5628f1fb59..0000000000 --- a/pubsub/natsstreaming/natsstreaming.go +++ /dev/null @@ -1,372 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -Package natsstreaming implements NATS Streaming pubsub component -*/ -package natsstreaming - -import ( - "context" - "errors" - "fmt" - "math/rand" - "reflect" - "sync" - "sync/atomic" - "time" - - nats "github.com/nats-io/nats.go" - stan "github.com/nats-io/stan.go" - "github.com/nats-io/stan.go/pb" - - "github.com/dapr/components-contrib/metadata" - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" - "github.com/dapr/kit/retry" -) - -// compulsory options. -const ( - natsURL = "natsURL" - natsStreamingClusterID = "natsStreamingClusterID" -) - -// subscription options (optional). -const ( - durableSubscriptionName = "durableSubscriptionName" - startAtSequence = "startAtSequence" - startWithLastReceived = "startWithLastReceived" - deliverAll = "deliverAll" - deliverNew = "deliverNew" - startAtTimeDelta = "startAtTimeDelta" - startAtTime = "startAtTime" - startAtTimeFormat = "startAtTimeFormat" - ackWaitTime = "ackWaitTime" - maxInFlight = "maxInFlight" -) - -// valid values for subscription options. -const ( - subscriptionTypeQueueGroup = "queue" - subscriptionTypeTopic = "topic" - startWithLastReceivedTrue = "true" - deliverAllTrue = "true" - deliverNewTrue = "true" -) - -const ( - consumerID = "consumerID" // passed in by Dapr runtime - subscriptionType = "subscriptionType" -) - -type natsStreamingPubSub struct { - metadata natsMetadata - natStreamingConn stan.Conn - - logger logger.Logger - - backOffConfig retry.Config - - closed atomic.Bool - closeCh chan struct{} - wg sync.WaitGroup -} - -// NewNATSStreamingPubSub returns a new NATS Streaming pub-sub implementation. -func NewNATSStreamingPubSub(logger logger.Logger) pubsub.PubSub { - return &natsStreamingPubSub{logger: logger, closeCh: make(chan struct{})} -} - -func parseNATSStreamingMetadata(meta pubsub.Metadata) (natsMetadata, error) { - m := natsMetadata{} - - var err error - if err = metadata.DecodeMetadata(meta.Properties, &m); err != nil { - return m, err - } - - if m.NatsURL == "" { - return m, errors.New("nats-streaming error: missing nats URL") - } - if m.NatsStreamingClusterID == "" { - return m, errors.New("nats-streaming error: missing nats streaming cluster ID") - } - - switch m.SubscriptionType { - case subscriptionTypeTopic, subscriptionTypeQueueGroup, "": - // valid values - default: - return m, errors.New("nats-streaming error: valid value for subscriptionType is topic or queue") - } - - if m.NatsQueueGroupName == "" { - return m, errors.New("nats-streaming error: missing queue group name") - } - - if m.MaxInFlight != nil && *m.MaxInFlight < 1 { - return m, errors.New("nats-streaming error: maxInFlight should be equal to or more than 1") - } - - //nolint:nestif - // subscription options - only one can be used - - // helper function to reset mutually exclusive options - clearValues := func(m *natsMetadata, indexToKeep int) { - if indexToKeep != 0 { - m.StartAtSequence = nil - } - if indexToKeep != 1 { - m.StartWithLastReceived = "" - } - if indexToKeep != 2 { - m.DeliverAll = "" - } - if indexToKeep != 3 { - m.DeliverNew = "" - } - if indexToKeep != 4 { - m.StartAtTime = "" - } - if indexToKeep != 4 { - m.StartAtTimeFormat = "" - } - } - - switch { - case m.StartAtSequence != nil: - if *m.StartAtSequence < 1 { - return m, errors.New("nats-streaming error: startAtSequence should be equal to or more than 1") - } - clearValues(&m, 0) - case m.StartWithLastReceived != "": - if m.StartWithLastReceived != startWithLastReceivedTrue { - return m, errors.New("nats-streaming error: valid value for startWithLastReceived is true") - } - clearValues(&m, 1) - case m.DeliverAll != "": - if m.DeliverAll != deliverAllTrue { - return m, errors.New("nats-streaming error: valid value for deliverAll is true") - } - clearValues(&m, 2) - case m.DeliverNew != "": - if m.DeliverNew != deliverNewTrue { - return m, errors.New("nats-streaming error: valid value for deliverNew is true") - } - clearValues(&m, 3) - case m.StartAtTime != "": - if m.StartAtTimeFormat == "" { - return m, errors.New("nats-streaming error: missing value for startAtTimeFormat") - } - clearValues(&m, 4) - } - - m.ConcurrencyMode, err = pubsub.Concurrency(meta.Properties) - if err != nil { - return m, fmt.Errorf("nats-streaming error: can't parse %s: %s", pubsub.ConcurrencyKey, err) - } - return m, nil -} - -func (n *natsStreamingPubSub) Init(_ context.Context, metadata pubsub.Metadata) error { - n.logger.Warn("⚠️ The NATS Streaming PubSub component is deprecated due to the deprecation of NATS Server, and will be removed from Dapr 1.13") - - m, err := parseNATSStreamingMetadata(metadata) - if err != nil { - return err - } - n.metadata = m - clientID := genRandomString(20) - opts := []nats.Option{nats.Name(clientID)} - natsConn, err := nats.Connect(m.NatsURL, opts...) - if err != nil { - return fmt.Errorf("nats-streaming: error connecting to nats server at %s: %s", m.NatsURL, err) - } - natStreamingConn, err := stan.Connect(m.NatsStreamingClusterID, clientID, stan.NatsConn(natsConn)) - if err != nil { - return fmt.Errorf("nats-streaming: error connecting to nats streaming server %s: %s", m.NatsStreamingClusterID, err) - } - n.logger.Debugf("connected to natsstreaming at %s", m.NatsURL) - - // Default retry configuration is used if no - // backOff properties are set. - if err := retry.DecodeConfigWithPrefix( - &n.backOffConfig, - metadata.Properties, - "backOff"); err != nil { - return err - } - - n.natStreamingConn = natStreamingConn - - return nil -} - -func (n *natsStreamingPubSub) Publish(_ context.Context, req *pubsub.PublishRequest) error { - if n.closed.Load() { - return errors.New("component is closed") - } - - err := n.natStreamingConn.Publish(req.Topic, req.Data) - if err != nil { - return fmt.Errorf("nats-streaming: error from publish: %s", err) - } - - return nil -} - -func (n *natsStreamingPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { - if n.closed.Load() { - return errors.New("component is closed") - } - - natStreamingsubscriptionOptions, err := n.subscriptionOptions() - if err != nil { - return fmt.Errorf("nats-streaming: error getting subscription options %s", err) - } - - natsMsgHandler := func(natsMsg *stan.Msg) { - msg := pubsub.NewMessage{ - Topic: req.Topic, - Data: natsMsg.Data, - } - - n.logger.Debugf("Processing NATS Streaming message %s/%d", natsMsg.Subject, natsMsg.Sequence) - - f := func() { - herr := handler(ctx, &msg) - if herr == nil { - natsMsg.Ack() - } - } - - switch n.metadata.ConcurrencyMode { - case pubsub.Single: - f() - case pubsub.Parallel: - n.wg.Add(1) - go func() { - defer n.wg.Done() - f() - }() - } - } - - var subscription stan.Subscription - if n.metadata.SubscriptionType == subscriptionTypeTopic { - subscription, err = n.natStreamingConn.Subscribe(req.Topic, natsMsgHandler, natStreamingsubscriptionOptions...) - } else if n.metadata.SubscriptionType == subscriptionTypeQueueGroup { - subscription, err = n.natStreamingConn.QueueSubscribe(req.Topic, n.metadata.NatsQueueGroupName, natsMsgHandler, natStreamingsubscriptionOptions...) - } - - if err != nil { - return fmt.Errorf("nats-streaming: subscribe error %s", err) - } - - n.wg.Add(1) - go func() { - defer n.wg.Done() - select { - case <-ctx.Done(): - case <-n.closeCh: - } - err := subscription.Unsubscribe() - if err != nil { - n.logger.Warnf("nats-streaming: error while unsubscribing from topic %s: %v", req.Topic, err) - } - }() - - if n.metadata.SubscriptionType == subscriptionTypeTopic { - n.logger.Debugf("nats-streaming: subscribed to subject %s", req.Topic) - } else if n.metadata.SubscriptionType == subscriptionTypeQueueGroup { - n.logger.Debugf("nats-streaming: subscribed to subject %s with queue group %s", req.Topic, n.metadata.NatsQueueGroupName) - } - - return nil -} - -func (n *natsStreamingPubSub) subscriptionOptions() ([]stan.SubscriptionOption, error) { - var options []stan.SubscriptionOption - - if n.metadata.DurableSubscriptionName != "" { - options = append(options, stan.DurableName(n.metadata.DurableSubscriptionName)) - } - - switch { - case n.metadata.DeliverNew == deliverNewTrue: - options = append(options, stan.StartAt(pb.StartPosition_NewOnly)) //nolint:nosnakecase - case n.metadata.StartAtSequence != nil && *n.metadata.StartAtSequence >= 1: // messages index start from 1, this is a valid check - options = append(options, stan.StartAtSequence(*n.metadata.StartAtSequence)) - case n.metadata.StartWithLastReceived == startWithLastReceivedTrue: - options = append(options, stan.StartWithLastReceived()) - case n.metadata.DeliverAll == deliverAllTrue: - options = append(options, stan.DeliverAllAvailable()) - case n.metadata.StartAtTimeDelta > (1 * time.Nanosecond): // as long as its a valid time.Duration - options = append(options, stan.StartAtTimeDelta(n.metadata.StartAtTimeDelta)) - case n.metadata.StartAtTime != "": - if n.metadata.StartAtTimeFormat != "" { - startTime, err := time.Parse(n.metadata.StartAtTimeFormat, n.metadata.StartAtTime) - if err != nil { - return nil, err - } - options = append(options, stan.StartAtTime(startTime)) - } - } - - // default is auto ACK. switching to manual ACK since processing errors need to be handled - options = append(options, stan.SetManualAckMode()) - - // check if set the ack options. - if n.metadata.AckWaitTime > (1 * time.Nanosecond) { - options = append(options, stan.AckWait(n.metadata.AckWaitTime)) - } - if n.metadata.MaxInFlight != nil && *n.metadata.MaxInFlight >= 1 { - options = append(options, stan.MaxInflight(int(*n.metadata.MaxInFlight))) - } - - return options, nil -} - -const inputs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" - -// generates a random string of length 20. -func genRandomString(n int) string { - b := make([]byte, n) - s := rand.NewSource(int64(time.Now().Nanosecond())) - for i := range b { - b[i] = inputs[s.Int63()%int64(len(inputs))] - } - clientID := string(b) - - return clientID -} - -func (n *natsStreamingPubSub) Close() error { - defer n.wg.Wait() - if n.closed.CompareAndSwap(false, true) { - close(n.closeCh) - } - - return n.natStreamingConn.Close() -} - -func (n *natsStreamingPubSub) Features() []pubsub.Feature { - return nil -} - -// GetComponentMetadata returns the metadata of the component. -func (n *natsStreamingPubSub) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { - metadataStruct := natsMetadata{} - metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.PubSubType) - return -} diff --git a/pubsub/natsstreaming/natsstreaming_test.go b/pubsub/natsstreaming/natsstreaming_test.go deleted file mode 100644 index a80aac865e..0000000000 --- a/pubsub/natsstreaming/natsstreaming_test.go +++ /dev/null @@ -1,448 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package natsstreaming - -import ( - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - mdata "github.com/dapr/components-contrib/metadata" - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/ptr" -) - -func TestParseNATSStreamingForMetadataMandatoryOptionsMissing(t *testing.T) { - type test struct { - name string - properties map[string]string - } - tests := []test{ - {"nats URL missing", map[string]string{ - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - }}, - {"consumer ID missing", map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - subscriptionType: "topic", - }}, - {"cluster ID missing", map[string]string{ - natsURL: "nats://foo.bar:4222", - consumerID: "consumer1", - subscriptionType: "topic", - }}, - } - for _, _test := range tests { - t.Run(_test.name, func(t *testing.T) { - fakeMetaData := pubsub.Metadata{Base: mdata.Base{ - Properties: _test.properties, - }} - _, err := parseNATSStreamingMetadata(fakeMetaData) - assert.NotEmpty(t, err) - }) - } -} - -func TestParseNATSStreamingMetadataForInvalidSubscriptionOptions(t *testing.T) { - type test struct { - name string - properties map[string]string - } - - tests := []test{ - {"invalid value (less than 1) for startAtSequence", map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - startAtSequence: "0", - }}, - {"non integer value for startAtSequence", map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - startAtSequence: "foo", - }}, - {"startWithLastReceived is other than true", map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - startWithLastReceived: "foo", - }}, - {"deliverAll is other than true", map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - deliverAll: "foo", - }}, - {"deliverNew is other than true", map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - deliverNew: "foo", - }}, - {"invalid value for startAtTimeDelta", map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - startAtTimeDelta: "foo", - }}, - {"startAtTime provided without startAtTimeFormat", map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - startAtTime: "foo", - }}, - } - - for _, _test := range tests { - t.Run(_test.name, func(t *testing.T) { - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: _test.properties}, - } - _, err := parseNATSStreamingMetadata(fakeMetaData) - assert.NotEmpty(t, err) - }) - } -} - -func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) { - type test struct { - name string - properties map[string]string - expectedMetadataName string - expectedMetadataValue string - } - - tests := []test{ - { - "using startWithLastReceived", - map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - pubsub.ConcurrencyKey: "single", - startWithLastReceived: "true", - }, - "startWithLastReceived", "true", - }, - - { - "using deliverAll", - map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - pubsub.ConcurrencyKey: "single", - deliverAll: "true", - }, - "deliverAll", "true", - }, - - { - "using deliverNew", - map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - pubsub.ConcurrencyKey: "single", - deliverNew: "true", - }, - "deliverNew", "true", - }, - - { - "using startAtSequence", - map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - pubsub.ConcurrencyKey: "single", - startAtSequence: "42", - }, - "startAtSequence", "42", - }, - - { - "using startAtTimeDelta", - map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - pubsub.ConcurrencyKey: "single", - startAtTimeDelta: "1h", - }, - "startAtTimeDelta", "1h", - }, - - { - "using concurrencyMode single", - map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - startAtTimeDelta: "1h", - pubsub.ConcurrencyKey: "single", - }, - "concurrencyMode", "single", - }, - - { - "using concurrencyMode parallel", - map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - startAtTimeDelta: "1h", - pubsub.ConcurrencyKey: "parallel", - }, - "concurrencyMode", "parallel", - }, - } - - for _, _test := range tests { - t.Run(_test.name, func(t *testing.T) { - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: _test.properties}, - } - m, err := parseNATSStreamingMetadata(fakeMetaData) - - assert.NoError(t, err) - - assert.NotEmpty(t, m.NatsURL) - assert.NotEmpty(t, m.NatsStreamingClusterID) - assert.NotEmpty(t, m.SubscriptionType) - assert.NotEmpty(t, m.NatsQueueGroupName) - assert.NotEmpty(t, m.ConcurrencyMode) - assert.NotEmpty(t, _test.expectedMetadataValue) - - assert.Equal(t, _test.properties[natsURL], m.NatsURL) - assert.Equal(t, _test.properties[natsStreamingClusterID], m.NatsStreamingClusterID) - assert.Equal(t, _test.properties[subscriptionType], m.SubscriptionType) - assert.Equal(t, _test.properties[consumerID], m.NatsQueueGroupName) - assert.Equal(t, _test.properties[pubsub.ConcurrencyKey], string(m.ConcurrencyMode)) - assert.Equal(t, _test.properties[_test.expectedMetadataName], _test.expectedMetadataValue) - }) - } -} - -func TestParseNATSStreamingMetadata(t *testing.T) { - t.Run("mandatory metadata provided", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - } - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - m, err := parseNATSStreamingMetadata(fakeMetaData) - - assert.NoError(t, err) - assert.NotEmpty(t, m.NatsURL) - assert.NotEmpty(t, m.NatsStreamingClusterID) - assert.NotEmpty(t, m.NatsQueueGroupName) - assert.Equal(t, fakeProperties[natsURL], m.NatsURL) - assert.Equal(t, fakeProperties[natsStreamingClusterID], m.NatsStreamingClusterID) - assert.Equal(t, fakeProperties[consumerID], m.NatsQueueGroupName) - }) - - t.Run("subscription type missing", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - } - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - _, err := parseNATSStreamingMetadata(fakeMetaData) - assert.Empty(t, err) - }) - t.Run("invalid value for subscription type", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "baz", - } - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - _, err := parseNATSStreamingMetadata(fakeMetaData) - assert.NotEmpty(t, err) - }) - t.Run("more than one subscription option provided", func(t *testing.T) { - fakeProperties := map[string]string{ - natsURL: "nats://foo.bar:4222", - natsStreamingClusterID: "testcluster", - consumerID: "consumer1", - subscriptionType: "topic", - startAtSequence: "42", - startWithLastReceived: "true", - deliverAll: "true", - } - fakeMetaData := pubsub.Metadata{ - Base: mdata.Base{Properties: fakeProperties}, - } - m, err := parseNATSStreamingMetadata(fakeMetaData) - assert.NoError(t, err) - assert.NotEmpty(t, m.NatsURL) - assert.NotEmpty(t, m.NatsStreamingClusterID) - assert.NotEmpty(t, m.SubscriptionType) - assert.NotEmpty(t, m.NatsQueueGroupName) - assert.NotEmpty(t, m.StartAtSequence) - // startWithLastReceived ignored - assert.Empty(t, m.StartWithLastReceived) - // deliverAll will be ignored - assert.Empty(t, m.DeliverAll) - - assert.Equal(t, fakeProperties[natsURL], m.NatsURL) - assert.Equal(t, fakeProperties[natsStreamingClusterID], m.NatsStreamingClusterID) - assert.Equal(t, fakeProperties[subscriptionType], m.SubscriptionType) - assert.Equal(t, fakeProperties[consumerID], m.NatsQueueGroupName) - assert.Equal(t, fakeProperties[startAtSequence], strconv.FormatUint(*m.StartAtSequence, 10)) - }) -} - -func TestSubscriptionOptionsForValidOptions(t *testing.T) { - type test struct { - name string - m natsMetadata - expectedNumberOfOptions int - } - - tests := []test{ - {"using durableSubscriptionName", natsMetadata{DurableSubscriptionName: "foobar"}, 2}, - {"durableSubscriptionName is empty", natsMetadata{DurableSubscriptionName: ""}, 1}, - {"using startAtSequence", natsMetadata{StartAtSequence: ptr.Of(uint64(42))}, 2}, - {"using startWithLastReceived", natsMetadata{StartWithLastReceived: startWithLastReceivedTrue}, 2}, - {"using deliverAll", natsMetadata{DeliverAll: deliverAllTrue}, 2}, - {"using startAtTimeDelta", natsMetadata{StartAtTimeDelta: 1 * time.Hour}, 2}, - {"using startAtTime and startAtTimeFormat", natsMetadata{StartAtTime: "Feb 3, 2013 at 7:54pm (PST)", StartAtTimeFormat: "Jan 2, 2006 at 3:04pm (MST)"}, 2}, - {"using manual ack with ackWaitTime", natsMetadata{AckWaitTime: 30 * time.Second}, 2}, - {"using manual ack with maxInFlight", natsMetadata{MaxInFlight: ptr.Of(uint64(42))}, 2}, - } - - for _, _test := range tests { - t.Run(_test.name, func(t *testing.T) { - natsStreaming := natsStreamingPubSub{metadata: _test.m} - opts, err := natsStreaming.subscriptionOptions() - assert.Empty(t, err) - assert.NotEmpty(t, opts) - assert.Equal(t, _test.expectedNumberOfOptions, len(opts)) - }) - } -} - -func TestSubscriptionOptionsForInvalidOptions(t *testing.T) { - type test struct { - name string - m natsMetadata - } - - tests := []test{ - {"startAtSequence is less than 1", natsMetadata{StartAtSequence: ptr.Of(uint64(0))}}, - {"startWithLastReceived is other than true", natsMetadata{StartWithLastReceived: "foo"}}, - {"deliverAll is other than true", natsMetadata{DeliverAll: "foo"}}, - {"deliverNew is other than true", natsMetadata{DeliverNew: "foo"}}, - {"startAtTime is empty", natsMetadata{StartAtTime: "", StartAtTimeFormat: "Jan 2, 2006 at 3:04pm (MST)"}}, - {"startAtTimeFormat is empty", natsMetadata{StartAtTime: "Feb 3, 2013 at 7:54pm (PST)", StartAtTimeFormat: ""}}, - } - - for _, _test := range tests { - t.Run(_test.name, func(t *testing.T) { - natsStreaming := natsStreamingPubSub{metadata: _test.m} - opts, err := natsStreaming.subscriptionOptions() - assert.Empty(t, err) - assert.NotEmpty(t, opts) - assert.Equal(t, 1, len(opts)) - }) - } -} - -func TestSubscriptionOptions(t *testing.T) { - // general - t.Run("manual ACK option is present by default", func(t *testing.T) { - natsStreaming := natsStreamingPubSub{metadata: natsMetadata{}} - opts, err := natsStreaming.subscriptionOptions() - assert.Empty(t, err) - assert.NotEmpty(t, opts) - assert.Equal(t, 1, len(opts)) - }) - - t.Run("only one subscription option will be honored", func(t *testing.T) { - m := natsMetadata{DeliverNew: deliverNewTrue, DeliverAll: deliverAllTrue, StartAtTimeDelta: 1 * time.Hour} - natsStreaming := natsStreamingPubSub{metadata: m} - opts, err := natsStreaming.subscriptionOptions() - assert.Empty(t, err) - assert.NotEmpty(t, opts) - assert.Equal(t, 2, len(opts)) - }) - - // invalid subscription options - - t.Run("startAtTime is invalid", func(t *testing.T) { - m := natsMetadata{StartAtTime: "foobar", StartAtTimeFormat: "Jan 2, 2006 at 3:04pm (MST)"} - natsStreaming := natsStreamingPubSub{metadata: m} - opts, err := natsStreaming.subscriptionOptions() - assert.NotEmpty(t, err) - assert.Nil(t, opts) - }) - - t.Run("startAtTimeFormat is invalid", func(t *testing.T) { - m := natsMetadata{StartAtTime: "Feb 3, 2013 at 7:54pm (PST)", StartAtTimeFormat: "foo"} - - natsStreaming := natsStreamingPubSub{metadata: m} - opts, err := natsStreaming.subscriptionOptions() - assert.NotEmpty(t, err) - assert.Nil(t, opts) - }) -} - -func TestGenRandomString(t *testing.T) { - t.Run("random client ID is not empty", func(t *testing.T) { - clientID := genRandomString(20) - assert.NotEmpty(t, clientID) - }) - - t.Run("random client ID is not nil", func(t *testing.T) { - clientID := genRandomString(20) - assert.NotNil(t, clientID) - }) - - t.Run("random client ID length is 20", func(t *testing.T) { - clientID := genRandomString(20) - assert.NotEmpty(t, clientID) - assert.NotNil(t, clientID) - assert.Equal(t, 20, len(clientID)) - }) -} diff --git a/tests/config/pubsub/natsstreaming/pubsub.yml b/tests/config/pubsub/natsstreaming/pubsub.yml deleted file mode 100644 index ff7266c255..0000000000 --- a/tests/config/pubsub/natsstreaming/pubsub.yml +++ /dev/null @@ -1,20 +0,0 @@ -apiVersion: dapr.io/v1alpha1 -kind: Component -metadata: - name: pubsub -spec: - type: pubsub.natsstreaming - version: v1 - metadata: - - name: natsURL - value: "nats://localhost:4222" - - name: natsStreamingClusterID - value: "test-cluster" - - name: subscriptionType - value: topic - - name: consumerID - value: myConsumerID - - name: ackWaitTime - value: 10s - - name: maxInFlight - value: 1 \ No newline at end of file diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 1ddb12c515..c2097d31fd 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -50,8 +50,6 @@ components: operations: [] config: checkInOrderProcessing: false - - component: natsstreaming - operations: [] - component: jetstream operations: [] - component: kafka diff --git a/tests/conformance/common.go b/tests/conformance/common.go index f8e5f93dc6..0811be1bc8 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -75,7 +75,6 @@ import ( p_kafka "github.com/dapr/components-contrib/pubsub/kafka" p_kubemq "github.com/dapr/components-contrib/pubsub/kubemq" p_mqtt3 "github.com/dapr/components-contrib/pubsub/mqtt3" - p_natsstreaming "github.com/dapr/components-contrib/pubsub/natsstreaming" p_pulsar "github.com/dapr/components-contrib/pubsub/pulsar" p_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq" p_redis "github.com/dapr/components-contrib/pubsub/redis" @@ -468,8 +467,6 @@ func loadPubSub(tc TestComponent) pubsub.PubSub { pubsub = p_servicebustopics.NewAzureServiceBusTopics(testLogger) case "azure.servicebus.queues": pubsub = p_servicebusqueues.NewAzureServiceBusQueues(testLogger) - case "natsstreaming": - pubsub = p_natsstreaming.NewNATSStreamingPubSub(testLogger) case "jetstream": pubsub = p_jetstream.NewJetStream(testLogger) case kafka: