From 94fa0cb99411c202a4c9c2ae84f444a10f2e5ad8 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 23 Jul 2024 16:46:25 +0200 Subject: [PATCH] Deprecate encoded connections (#1674) Signed-off-by: Piotr Piotrowski --- README.md | 95 ------- context.go | 2 + enc.go | 34 +++ encoders/builtin/default_enc.go | 6 + encoders/builtin/gob_enc.go | 6 + encoders/builtin/json_enc.go | 6 + encoders/protobuf/protobuf_enc.go | 8 + example_test.go | 100 -------- jetstream/kv.go | 4 +- jetstream/stream_config.go | 4 +- jetstream/test/helper_test.go | 9 - jserrors.go | 2 +- jsm.go | 10 +- kv.go | 2 +- nats.go | 15 +- netchan.go | 6 + test/basic_test.go | 10 +- test/conn_test.go | 4 +- test/context_test.go | 401 +++--------------------------- test/enc_test.go | 351 ++++++++++++++++++++++++++ test/gob_test.go | 2 + test/helper_test.go | 9 - test/json_test.go | 2 + test/netchan_test.go | 11 + test/protobuf_test.go | 2 + test/reconnect_test.go | 65 +++-- test/sub_test.go | 4 +- 27 files changed, 524 insertions(+), 646 deletions(-) diff --git a/README.md b/README.md index 7cf082f13..64307b202 100644 --- a/README.md +++ b/README.md @@ -136,60 +136,6 @@ To find more information on `nats.go` JetStream API, visit The service API (`micro`) allows you to [easily build NATS services](micro/README.md) The services API is currently in beta release. -## Encoded Connections - -```go - -nc, _ := nats.Connect(nats.DefaultURL) -c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER) -defer c.Close() - -// Simple Publisher -c.Publish("foo", "Hello World") - -// Simple Async Subscriber -c.Subscribe("foo", func(s string) { - fmt.Printf("Received a message: %s\n", s) -}) - -// EncodedConn can Publish any raw Go type using the registered Encoder -type person struct { - Name string - Address string - Age int -} - -// Go type Subscriber -c.Subscribe("hello", func(p *person) { - fmt.Printf("Received a person: %+v\n", p) -}) - -me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"} - -// Go type Publisher -c.Publish("hello", me) - -// Unsubscribe -sub, err := c.Subscribe("foo", nil) -// ... -sub.Unsubscribe() - -// Requests -var response string -err = c.Request("help", "help me", &response, 10*time.Millisecond) -if err != nil { - fmt.Printf("Request failed: %v\n", err) -} - -// Replying -c.Subscribe("help", func(subj, reply string, msg string) { - c.Publish(reply, "I can help!") -}) - -// Close connection -c.Close(); -``` - ## New Authentication (Nkeys and User Credentials) This requires server with version >= 2.0.0 @@ -269,34 +215,6 @@ if err != nil { ``` -## Using Go Channels (netchan) - -```go -nc, _ := nats.Connect(nats.DefaultURL) -ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER) -defer ec.Close() - -type person struct { - Name string - Address string - Age int -} - -recvCh := make(chan *person) -ec.BindRecvChan("hello", recvCh) - -sendCh := make(chan *person) -ec.BindSendChan("hello", sendCh) - -me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"} - -// Send via Go channels -sendCh <- me - -// Receive via Go channels -who := <- recvCh -``` - ## Wildcard Subscriptions ```go @@ -463,19 +381,6 @@ msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar")) sub, err := nc.SubscribeSync("foo") msg, err := sub.NextMsgWithContext(ctx) -// Encoded Request with context -c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) -type request struct { - Message string `json:"message"` -} -type response struct { - Code int `json:"code"` -} -req := &request{Message: "Hello"} -resp := &response{} -err := c.RequestWithContext(ctx, "foo", req, resp) -``` - ## Backwards compatibility In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines. diff --git a/context.go b/context.go index 20f1782ac..c19673c18 100644 --- a/context.go +++ b/context.go @@ -217,6 +217,8 @@ func (nc *Conn) FlushWithContext(ctx context.Context) error { // RequestWithContext will create an Inbox and perform a Request // using the provided cancellation context with the Inbox reply // for the data v. A response will be decoded into the vPtr last parameter. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) error { if ctx == nil { return ErrInvalidContext diff --git a/enc.go b/enc.go index 4550f618d..78bcc219f 100644 --- a/enc.go +++ b/enc.go @@ -24,7 +24,11 @@ import ( "github.com/nats-io/nats.go/encoders/builtin" ) +//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn + // Encoder interface is for all register encoders +// +// Deprecated: Encoded connections are no longer supported. type Encoder interface { Encode(subject string, v any) ([]byte, error) Decode(subject string, data []byte, vPtr any) error @@ -51,6 +55,8 @@ func init() { // EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to // a nats server and have an extendable encoder system that will encode and decode messages // from raw Go types. +// +// Deprecated: Encoded connections are no longer supported. type EncodedConn struct { Conn *Conn Enc Encoder @@ -58,6 +64,8 @@ type EncodedConn struct { // NewEncodedConn will wrap an existing Connection and utilize the appropriate registered // encoder. +// +// Deprecated: Encoded connections are no longer supported. func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) { if c == nil { return nil, errors.New("nats: Nil Connection") @@ -73,6 +81,8 @@ func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) { } // RegisterEncoder will register the encType with the given Encoder. Useful for customization. +// +// Deprecated: Encoded connections are no longer supported. func RegisterEncoder(encType string, enc Encoder) { encLock.Lock() defer encLock.Unlock() @@ -80,6 +90,8 @@ func RegisterEncoder(encType string, enc Encoder) { } // EncoderForType will return the registered Encoder for the encType. +// +// Deprecated: Encoded connections are no longer supported. func EncoderForType(encType string) Encoder { encLock.Lock() defer encLock.Unlock() @@ -88,6 +100,8 @@ func EncoderForType(encType string) Encoder { // Publish publishes the data argument to the given subject. The data argument // will be encoded using the associated encoder. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) Publish(subject string, v any) error { b, err := c.Enc.Encode(subject, v) if err != nil { @@ -99,6 +113,8 @@ func (c *EncodedConn) Publish(subject string, v any) error { // PublishRequest will perform a Publish() expecting a response on the // reply subject. Use Request() for automatically waiting for a response // inline. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) PublishRequest(subject, reply string, v any) error { b, err := c.Enc.Encode(subject, v) if err != nil { @@ -110,6 +126,8 @@ func (c *EncodedConn) PublishRequest(subject, reply string, v any) error { // Request will create an Inbox and perform a Request() call // with the Inbox reply for the data v. A response will be // decoded into the vPtr Response. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Duration) error { b, err := c.Enc.Encode(subject, v) if err != nil { @@ -150,6 +168,8 @@ func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Dura // and demarshal it into the given struct, e.g. person. // There are also variants where the callback wants either the subject, or the // subject and the reply subject. +// +// Deprecated: Encoded connections are no longer supported. type Handler any // Dissect the cb Handler's signature @@ -170,6 +190,8 @@ var emptyMsgType = reflect.TypeOf(&Msg{}) // Subscribe will create a subscription on the given subject and process incoming // messages using the specified Handler. The Handler should be a func that matches // a signature from the description of Handler from above. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) { return c.subscribe(subject, _EMPTY_, cb) } @@ -177,6 +199,8 @@ func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, erro // QueueSubscribe will create a queue subscription on the given subject and process // incoming messages using the specified Handler. The Handler should be a func that // matches a signature from the description of Handler from above. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) { return c.subscribe(subject, queue, cb) } @@ -238,18 +262,24 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio } // FlushTimeout allows a Flush operation to have an associated timeout. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) { return c.Conn.FlushTimeout(timeout) } // Flush will perform a round trip to the server and return when it // receives the internal reply. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) Flush() error { return c.Conn.Flush() } // Close will close the connection to the server. This call will release // all blocking calls, such as Flush(), etc. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) Close() { c.Conn.Close() } @@ -259,11 +289,15 @@ func (c *EncodedConn) Close() { // will be drained and can not publish any additional messages. Upon draining // of the publishers, the connection will be closed. Use the ClosedCB() // option to know when the connection has moved from draining to closed. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) Drain() error { return c.Conn.Drain() } // LastError reports the last error encountered via the Connection. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) LastError() error { return c.Conn.LastError() } diff --git a/encoders/builtin/default_enc.go b/encoders/builtin/default_enc.go index c1d0f6f0b..e73113da8 100644 --- a/encoders/builtin/default_enc.go +++ b/encoders/builtin/default_enc.go @@ -26,6 +26,8 @@ import ( // turn numbers into appropriate strings that can be decoded. It will also // properly encoded and decode bools. If will encode a struct, but if you want // to properly handle structures you should use JsonEncoder. +// +// Deprecated: Encoded connections are no longer supported. type DefaultEncoder struct { // Empty } @@ -35,6 +37,8 @@ var falseB = []byte("false") var nilB = []byte("") // Encode +// +// Deprecated: Encoded connections are no longer supported. func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) { switch arg := v.(type) { case string: @@ -58,6 +62,8 @@ func (je *DefaultEncoder) Encode(subject string, v any) ([]byte, error) { } // Decode +// +// Deprecated: Encoded connections are no longer supported. func (je *DefaultEncoder) Decode(subject string, data []byte, vPtr any) error { // Figure out what it's pointing to... sData := *(*string)(unsafe.Pointer(&data)) diff --git a/encoders/builtin/gob_enc.go b/encoders/builtin/gob_enc.go index 7ecf85e4d..e2e8c3202 100644 --- a/encoders/builtin/gob_enc.go +++ b/encoders/builtin/gob_enc.go @@ -21,6 +21,8 @@ import ( // GobEncoder is a Go specific GOB Encoder implementation for EncodedConn. // This encoder will use the builtin encoding/gob to Marshal // and Unmarshal most types, including structs. +// +// Deprecated: Encoded connections are no longer supported. type GobEncoder struct { // Empty } @@ -28,6 +30,8 @@ type GobEncoder struct { // FIXME(dlc) - This could probably be more efficient. // Encode +// +// Deprecated: Encoded connections are no longer supported. func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) { b := new(bytes.Buffer) enc := gob.NewEncoder(b) @@ -38,6 +42,8 @@ func (ge *GobEncoder) Encode(subject string, v any) ([]byte, error) { } // Decode +// +// Deprecated: Encoded connections are no longer supported. func (ge *GobEncoder) Decode(subject string, data []byte, vPtr any) (err error) { dec := gob.NewDecoder(bytes.NewBuffer(data)) err = dec.Decode(vPtr) diff --git a/encoders/builtin/json_enc.go b/encoders/builtin/json_enc.go index 0540d9850..8e4c852a4 100644 --- a/encoders/builtin/json_enc.go +++ b/encoders/builtin/json_enc.go @@ -21,11 +21,15 @@ import ( // JsonEncoder is a JSON Encoder implementation for EncodedConn. // This encoder will use the builtin encoding/json to Marshal // and Unmarshal most types, including structs. +// +// Deprecated: Encoded connections are no longer supported. type JsonEncoder struct { // Empty } // Encode +// +// Deprecated: Encoded connections are no longer supported. func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) { b, err := json.Marshal(v) if err != nil { @@ -35,6 +39,8 @@ func (je *JsonEncoder) Encode(subject string, v any) ([]byte, error) { } // Decode +// +// Deprecated: Encoded connections are no longer supported. func (je *JsonEncoder) Decode(subject string, data []byte, vPtr any) (err error) { switch arg := vPtr.(type) { case *string: diff --git a/encoders/protobuf/protobuf_enc.go b/encoders/protobuf/protobuf_enc.go index 017ffc035..805657767 100644 --- a/encoders/protobuf/protobuf_enc.go +++ b/encoders/protobuf/protobuf_enc.go @@ -20,6 +20,8 @@ import ( "google.golang.org/protobuf/proto" ) +//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn + // Additional index for registered Encoders. const ( PROTOBUF_ENCODER = "protobuf" @@ -33,6 +35,8 @@ func init() { // ProtobufEncoder is a protobuf implementation for EncodedConn // This encoder will use the builtin protobuf lib to Marshal // and Unmarshal structs. +// +// Deprecated: Encoded connections are no longer supported. type ProtobufEncoder struct { // Empty } @@ -43,6 +47,8 @@ var ( ) // Encode +// +// Deprecated: Encoded connections are no longer supported. func (pb *ProtobufEncoder) Encode(subject string, v any) ([]byte, error) { if v == nil { return nil, nil @@ -60,6 +66,8 @@ func (pb *ProtobufEncoder) Encode(subject string, v any) ([]byte, error) { } // Decode +// +// Deprecated: Encoded connections are no longer supported. func (pb *ProtobufEncoder) Decode(subject string, data []byte, vPtr any) error { if _, ok := vPtr.(*any); ok { return nil diff --git a/example_test.go b/example_test.go index 3ad367320..782adc414 100644 --- a/example_test.go +++ b/example_test.go @@ -227,106 +227,6 @@ func ExampleConn_Close() { nc.Close() } -// Shows how to wrap a Conn into an EncodedConn -func ExampleNewEncodedConn() { - nc, _ := nats.Connect(nats.DefaultURL) - c, _ := nats.NewEncodedConn(nc, "json") - c.Close() -} - -// EncodedConn can publish virtually anything just -// by passing it in. The encoder will be used to properly -// encode the raw Go type -func ExampleEncodedConn_Publish() { - nc, _ := nats.Connect(nats.DefaultURL) - c, _ := nats.NewEncodedConn(nc, "json") - defer c.Close() - - type person struct { - Name string - Address string - Age int - } - - me := &person{Name: "derek", Age: 22, Address: "85 Second St"} - c.Publish("hello", me) -} - -// EncodedConn's subscribers will automatically decode the -// wire data into the requested Go type using the Decode() -// method of the registered Encoder. The callback signature -// can also vary to include additional data, such as subject -// and reply subjects. -func ExampleEncodedConn_Subscribe() { - nc, _ := nats.Connect(nats.DefaultURL) - c, _ := nats.NewEncodedConn(nc, "json") - defer c.Close() - - type person struct { - Name string - Address string - Age int - } - - c.Subscribe("hello", func(p *person) { - fmt.Printf("Received a person! %+v\n", p) - }) - - c.Subscribe("hello", func(subj, reply string, p *person) { - fmt.Printf("Received a person on subject %s! %+v\n", subj, p) - }) - - me := &person{Name: "derek", Age: 22, Address: "85 Second St"} - c.Publish("hello", me) -} - -// BindSendChan() allows binding of a Go channel to a nats -// subject for publish operations. The Encoder attached to the -// EncodedConn will be used for marshaling. -func ExampleEncodedConn_BindSendChan() { - nc, _ := nats.Connect(nats.DefaultURL) - c, _ := nats.NewEncodedConn(nc, "json") - defer c.Close() - - type person struct { - Name string - Address string - Age int - } - - ch := make(chan *person) - c.BindSendChan("hello", ch) - - me := &person{Name: "derek", Age: 22, Address: "85 Second St"} - ch <- me -} - -// BindRecvChan() allows binding of a Go channel to a nats -// subject for subscribe operations. The Encoder attached to the -// EncodedConn will be used for un-marshaling. -func ExampleEncodedConn_BindRecvChan() { - nc, _ := nats.Connect(nats.DefaultURL) - c, _ := nats.NewEncodedConn(nc, "json") - defer c.Close() - - type person struct { - Name string - Address string - Age int - } - - ch := make(chan *person) - c.BindRecvChan("hello", ch) - - me := &person{Name: "derek", Age: 22, Address: "85 Second St"} - c.Publish("hello", me) - - // Receive the publish directly on a channel - who := <-ch - - fmt.Printf("%v says hello!\n", who) -} - func ExampleJetStream() { nc, err := nats.Connect("localhost") if err != nil { diff --git a/jetstream/kv.go b/jetstream/kv.go index 7a026a281..ae892b4d7 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -165,8 +165,8 @@ type ( // with the same options as Watch. WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) - // Keys will return all keys. DEPRECATED: Use ListKeys instead to avoid - // memory issues. + // Keys will return all keys. + // Deprecated: Use ListKeys instead to avoid memory issues. Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) // ListKeys will return KeyLister, allowing to retrieve all keys from diff --git a/jetstream/stream_config.go b/jetstream/stream_config.go index dd1f9d941..6eb843278 100644 --- a/jetstream/stream_config.go +++ b/jetstream/stream_config.go @@ -192,8 +192,8 @@ type ( // v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` - // Template identifies the template that manages the Stream. DEPRECATED: - // This feature is no longer supported. + // Template identifies the template that manages the Stream. + // Deprecated: This feature is no longer supported. Template string `json:"template_owner,omitempty"` } diff --git a/jetstream/test/helper_test.go b/jetstream/test/helper_test.go index a9dbae222..9b7c1b76a 100644 --- a/jetstream/test/helper_test.go +++ b/jetstream/test/helper_test.go @@ -82,15 +82,6 @@ func NewConnection(t *testing.T, port int) *nats.Conn { return nc } -// NewEConn -func NewEConn(t *testing.T) *nats.EncodedConn { - ec, err := nats.NewEncodedConn(NewDefaultConnection(t), nats.DEFAULT_ENCODER) - if err != nil { - t.Fatalf("Failed to create an encoded connection: %v\n", err) - } - return ec -} - //////////////////////////////////////////////////////////////////////////////// // Running nats server in separate Go routines //////////////////////////////////////////////////////////////////////////////// diff --git a/jserrors.go b/jserrors.go index f0285943b..2d942e771 100644 --- a/jserrors.go +++ b/jserrors.go @@ -151,7 +151,7 @@ var ( // ErrSubscriptionClosed is returned when attempting to send pull request to a closed subscription ErrSubscriptionClosed JetStreamError = &jsError{message: "subscription closed"} - // DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases. + // Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases. // Use ErrInvalidConsumerName instead. ErrInvalidDurableName = errors.New("nats: invalid durable name") ) diff --git a/jsm.go b/jsm.go index 9eb5d4b4b..682664730 100644 --- a/jsm.go +++ b/jsm.go @@ -41,7 +41,7 @@ type JetStreamManager interface { PurgeStream(name string, opts ...JSOpt) error // StreamsInfo can be used to retrieve a list of StreamInfo objects. - // DEPRECATED: Use Streams() instead. + // Deprecated: Use Streams() instead. StreamsInfo(opts ...JSOpt) <-chan *StreamInfo // Streams can be used to retrieve a list of StreamInfo objects. @@ -86,7 +86,7 @@ type JetStreamManager interface { ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) // ConsumersInfo is used to retrieve a list of ConsumerInfo objects. - // DEPRECATED: Use Consumers() instead. + // Deprecated: Use Consumers() instead. ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo // Consumers is used to retrieve a list of ConsumerInfo objects. @@ -240,7 +240,7 @@ type StreamConfig struct { // v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` - // Template identifies the template that manages the Stream. DEPRECATED: + // Template identifies the template that manages the Stream. Deprecated: // This feature is no longer supported. Template string `json:"template_owner,omitempty"` } @@ -747,7 +747,7 @@ func (jsc *js) Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo { } // ConsumersInfo is used to retrieve a list of ConsumerInfo objects. -// DEPRECATED: Use Consumers() instead. +// Deprecated: Use Consumers() instead. func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo { return jsc.Consumers(stream, opts...) } @@ -1617,7 +1617,7 @@ func (jsc *js) Streams(opts ...JSOpt) <-chan *StreamInfo { } // StreamsInfo can be used to retrieve a list of StreamInfo objects. -// DEPRECATED: Use Streams() instead. +// Deprecated: Use Streams() instead. func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo { return jsc.Streams(opts...) } diff --git a/kv.go b/kv.go index d9f40fdee..4e7a3fdec 100644 --- a/kv.go +++ b/kv.go @@ -65,7 +65,7 @@ type KeyValue interface { // WatchAll will invoke the callback for all updates. WatchAll(opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. - // DEPRECATED: Use ListKeys instead to avoid memory issues. + // Deprecated: Use ListKeys instead to avoid memory issues. Keys(opts ...WatchOpt) ([]string, error) // ListKeys will return all keys in a channel. ListKeys(opts ...WatchOpt) (KeyLister, error) diff --git a/nats.go b/nats.go index e4faa7495..c35d16d28 100644 --- a/nats.go +++ b/nats.go @@ -160,7 +160,7 @@ func GetDefaultOptions() Options { } } -// DEPRECATED: Use GetDefaultOptions() instead. +// Deprecated: Use GetDefaultOptions() instead. // DefaultOptions is not safe for use by multiple clients. // For details see #308. var DefaultOptions = GetDefaultOptions() @@ -386,7 +386,7 @@ type Options struct { // DisconnectedCB sets the disconnected handler that is called // whenever the connection is disconnected. // Will not be called if DisconnectedErrCB is set - // DEPRECATED. Use DisconnectedErrCB which passes error that caused + // Deprecated. Use DisconnectedErrCB which passes error that caused // the disconnect event. DisconnectedCB ConnHandler @@ -450,7 +450,7 @@ type Options struct { TokenHandler AuthTokenHandler // Dialer allows a custom net.Dialer when forming connections. - // DEPRECATED: should use CustomDialer instead. + // Deprecated: should use CustomDialer instead. Dialer *net.Dialer // CustomDialer allows to specify a custom dialer (not necessarily @@ -1108,7 +1108,7 @@ func DisconnectErrHandler(cb ConnErrHandler) Option { } // DisconnectHandler is an Option to set the disconnected handler. -// DEPRECATED: Use DisconnectErrHandler. +// Deprecated: Use DisconnectErrHandler. func DisconnectHandler(cb ConnHandler) Option { return func(o *Options) error { o.DisconnectedCB = cb @@ -1280,7 +1280,7 @@ func SyncQueueLen(max int) Option { // Dialer is an Option to set the dialer which will be used when // attempting to establish a connection. -// DEPRECATED: Should use CustomDialer instead. +// Deprecated: Should use CustomDialer instead. func Dialer(dialer *net.Dialer) Option { return func(o *Options) error { o.Dialer = dialer @@ -1397,7 +1397,7 @@ func TLSHandshakeFirst() Option { // Handler processing // SetDisconnectHandler will set the disconnect event handler. -// DEPRECATED: Use SetDisconnectErrHandler +// Deprecated: Use SetDisconnectErrHandler func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) { if nc == nil { return @@ -4902,7 +4902,8 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { } // Queued returns the number of queued messages in the client for this subscription. -// DEPRECATED: Use Pending() +// +// Deprecated: Use Pending() func (s *Subscription) QueuedMsgs() (int, error) { m, _, err := s.Pending() return int(m), err diff --git a/netchan.go b/netchan.go index 6b13690b4..3722d9f1b 100644 --- a/netchan.go +++ b/netchan.go @@ -23,6 +23,8 @@ import ( // Data will be encoded and decoded via the EncodedConn and its associated encoders. // BindSendChan binds a channel for send operations to NATS. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) BindSendChan(subject string, channel any) error { chVal := reflect.ValueOf(channel) if chVal.Kind() != reflect.Chan { @@ -61,11 +63,15 @@ func chPublish(c *EncodedConn, chVal reflect.Value, subject string) { } // BindRecvChan binds a channel for receive operations from NATS. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) BindRecvChan(subject string, channel any) (*Subscription, error) { return c.bindRecvChan(subject, _EMPTY_, channel) } // BindRecvQueueChan binds a channel for queue-based receive operations from NATS. +// +// Deprecated: Encoded connections are no longer supported. func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel any) (*Subscription, error) { return c.bindRecvChan(subject, queue, channel) } diff --git a/test/basic_test.go b/test/basic_test.go index 75a187d05..3161930a6 100644 --- a/test/basic_test.go +++ b/test/basic_test.go @@ -464,8 +464,8 @@ func TestQueueSubscriber(t *testing.T) { omsg := []byte("Hello World") nc.Publish("foo", omsg) nc.Flush() - r1, _ := s1.QueuedMsgs() - r2, _ := s2.QueuedMsgs() + r1, _, _ := s1.Pending() + r2, _, _ := s2.Pending() if (r1 + r2) != 1 { t.Fatal("Received too many messages for multiple queue subscribers") } @@ -479,8 +479,8 @@ func TestQueueSubscriber(t *testing.T) { } nc.Flush() v := uint(float32(total) * 0.15) - r1, _ = s1.QueuedMsgs() - r2, _ = s2.QueuedMsgs() + r1, _, _ = s1.Pending() + r2, _, _ = s2.Pending() if r1+r2 != total { t.Fatalf("Incorrect number of messages: %d vs %d", (r1 + r2), total) } @@ -1032,7 +1032,7 @@ func TestNilConnection(t *testing.T) { if _, err := sub.NextMsg(time.Millisecond); err == nil || err != nats.ErrBadSubscription { t.Fatalf("Expected ErrBadSubscription error, got %v\n", err) } - if _, err := sub.QueuedMsgs(); err == nil || err != nats.ErrBadSubscription { + if _, _, err := sub.Pending(); err == nil || err != nats.ErrBadSubscription { t.Fatalf("Expected ErrBadSubscription error, got %v\n", err) } if _, _, err := sub.Pending(); err == nil || err != nats.ErrBadSubscription { diff --git a/test/conn_test.go b/test/conn_test.go index afc5025b3..c7713559c 100644 --- a/test/conn_test.go +++ b/test/conn_test.go @@ -1847,8 +1847,8 @@ func TestDefaultOptionsDialer(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() - opts1 := nats.DefaultOptions - opts2 := nats.DefaultOptions + opts1 := nats.GetDefaultOptions() + opts2 := nats.GetDefaultOptions() nc1, err := opts1.Connect() if err != nil { diff --git a/test/context_test.go b/test/context_test.go index b9c2f24f6..f2df307a6 100644 --- a/test/context_test.go +++ b/test/context_test.go @@ -654,324 +654,6 @@ func TestContextSubNextMsgWithDeadline(t *testing.T) { } } -func TestContextEncodedRequestWithTimeout(t *testing.T) { - s := RunDefaultServer() - defer s.Shutdown() - - nc := NewDefaultConnection(t) - c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - t.Fatalf("Unable to create encoded connection: %v", err) - } - defer c.Close() - - deadline := time.Now().Add(100 * time.Millisecond) - ctx, cancelCB := context.WithDeadline(context.Background(), deadline) - defer cancelCB() // should always be called, not discarded, to prevent context leak - - type request struct { - Message string `json:"message"` - } - type response struct { - Code int `json:"code"` - } - c.Subscribe("slow", func(_, reply string, req *request) { - got := req.Message - expected := "Hello" - if got != expected { - t.Errorf("Expected to receive request with %q, got %q", got, expected) - } - - // simulates latency into the client so that timeout is hit. - time.Sleep(40 * time.Millisecond) - c.Publish(reply, &response{Code: 200}) - }) - - for i := 0; i < 2; i++ { - req := &request{Message: "Hello"} - resp := &response{} - err := c.RequestWithContext(ctx, "slow", req, resp) - if err != nil { - t.Fatalf("Expected encoded request with context to not fail: %s", err) - } - got := resp.Code - expected := 200 - if got != expected { - t.Errorf("Expected to receive %v, got: %v", expected, got) - } - } - - // A third request with latency would make the context - // reach the deadline. - req := &request{Message: "Hello"} - resp := &response{} - err = c.RequestWithContext(ctx, "slow", req, resp) - if err == nil { - t.Fatal("Expected request with context to reach deadline") - } - - // Reported error is "context deadline exceeded" from Context package, - // which implements net.Error Timeout interface. - type timeoutError interface { - Timeout() bool - } - timeoutErr, ok := err.(timeoutError) - if !ok || !timeoutErr.Timeout() { - t.Errorf("Expected to have a timeout error") - } - expected := `context deadline exceeded` - if !strings.Contains(err.Error(), expected) { - t.Errorf("Expected %q error, got: %q", expected, err.Error()) - } -} - -func TestContextEncodedRequestWithTimeoutCanceled(t *testing.T) { - s := RunDefaultServer() - defer s.Shutdown() - - nc := NewDefaultConnection(t) - c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - t.Fatalf("Unable to create encoded connection: %v", err) - } - defer c.Close() - - ctx, cancelCB := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancelCB() // should always be called, not discarded, to prevent context leak - - type request struct { - Message string `json:"message"` - } - type response struct { - Code int `json:"code"` - } - - c.Subscribe("fast", func(_, reply string, req *request) { - got := req.Message - expected := "Hello" - if got != expected { - t.Errorf("Expected to receive request with %q, got %q", got, expected) - } - - // simulates latency into the client so that timeout is hit. - time.Sleep(40 * time.Millisecond) - - c.Publish(reply, &response{Code: 200}) - }) - - // Fast request should not fail - req := &request{Message: "Hello"} - resp := &response{} - c.RequestWithContext(ctx, "fast", req, resp) - expectedCode := 200 - if resp.Code != expectedCode { - t.Errorf("Expected to receive %d, got: %d", expectedCode, resp.Code) - } - - // Cancel the context already so that rest of requests fail. - cancelCB() - - err = c.RequestWithContext(ctx, "fast", req, resp) - if err == nil { - t.Fatal("Expected request with timeout context to fail") - } - - // Reported error is "context canceled" from Context package, - // which is not a timeout error. - type timeoutError interface { - Timeout() bool - } - if _, ok := err.(timeoutError); ok { - t.Errorf("Expected to not have a timeout error") - } - expected := `context canceled` - if !strings.Contains(err.Error(), expected) { - t.Errorf("Expected %q error, got: %q", expected, err.Error()) - } - - // 2nd request should fail again even if fast because context has already been canceled - err = c.RequestWithContext(ctx, "fast", req, resp) - if err == nil { - t.Fatal("Expected request with timeout context to fail") - } -} - -func TestContextEncodedRequestWithCancel(t *testing.T) { - s := RunDefaultServer() - defer s.Shutdown() - - nc := NewDefaultConnection(t) - c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - t.Fatalf("Unable to create encoded connection: %v", err) - } - defer c.Close() - - ctx, cancelCB := context.WithCancel(context.Background()) - defer cancelCB() // should always be called, not discarded, to prevent context leak - - // timer which cancels the context though can also be arbitrarily extended - expirationTimer := time.AfterFunc(100*time.Millisecond, func() { - cancelCB() - }) - - type request struct { - Message string `json:"message"` - } - type response struct { - Code int `json:"code"` - } - c.Subscribe("slow", func(_, reply string, req *request) { - got := req.Message - expected := "Hello" - if got != expected { - t.Errorf("Expected to receive request with %q, got %q", got, expected) - } - - // simulates latency into the client so that timeout is hit. - time.Sleep(40 * time.Millisecond) - c.Publish(reply, &response{Code: 200}) - }) - c.Subscribe("slower", func(_, reply string, req *request) { - got := req.Message - expected := "World" - if got != expected { - t.Errorf("Expected to receive request with %q, got %q", got, expected) - } - - // we know this request will take longer so extend the timeout - expirationTimer.Reset(100 * time.Millisecond) - - // slower reply which would have hit original timeout - time.Sleep(90 * time.Millisecond) - c.Publish(reply, &response{Code: 200}) - }) - - for i := 0; i < 2; i++ { - req := &request{Message: "Hello"} - resp := &response{} - err := c.RequestWithContext(ctx, "slow", req, resp) - if err != nil { - t.Fatalf("Expected encoded request with context to not fail: %s", err) - } - got := resp.Code - expected := 200 - if got != expected { - t.Errorf("Expected to receive %v, got: %v", expected, got) - } - } - - // A third request with latency would make the context - // get canceled, but these reset the timer so deadline - // gets extended: - for i := 0; i < 10; i++ { - req := &request{Message: "World"} - resp := &response{} - err := c.RequestWithContext(ctx, "slower", req, resp) - if err != nil { - t.Fatalf("Expected request with context to not fail: %s", err) - } - got := resp.Code - expected := 200 - if got != expected { - t.Errorf("Expected to receive %d, got: %d", expected, got) - } - } - - req := &request{Message: "Hello"} - resp := &response{} - - // One more slow request will expire the timer and cause an error... - err = c.RequestWithContext(ctx, "slow", req, resp) - if err == nil { - t.Fatal("Expected request with cancellation context to fail") - } - - // ...though reported error is "context canceled" from Context package, - // which is not a timeout error. - type timeoutError interface { - Timeout() bool - } - if _, ok := err.(timeoutError); ok { - t.Errorf("Expected to not have a timeout error") - } - expected := `context canceled` - if !strings.Contains(err.Error(), expected) { - t.Errorf("Expected %q error, got: %q", expected, err.Error()) - } -} - -func TestContextEncodedRequestWithDeadline(t *testing.T) { - s := RunDefaultServer() - defer s.Shutdown() - - nc := NewDefaultConnection(t) - c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - t.Fatalf("Unable to create encoded connection: %v", err) - } - defer c.Close() - - deadline := time.Now().Add(100 * time.Millisecond) - ctx, cancelCB := context.WithDeadline(context.Background(), deadline) - defer cancelCB() // should always be called, not discarded, to prevent context leak - - type request struct { - Message string `json:"message"` - } - type response struct { - Code int `json:"code"` - } - c.Subscribe("slow", func(_, reply string, req *request) { - got := req.Message - expected := "Hello" - if got != expected { - t.Errorf("Expected to receive request with %q, got %q", got, expected) - } - - // simulates latency into the client so that timeout is hit. - time.Sleep(40 * time.Millisecond) - c.Publish(reply, &response{Code: 200}) - }) - - for i := 0; i < 2; i++ { - req := &request{Message: "Hello"} - resp := &response{} - err := c.RequestWithContext(ctx, "slow", req, resp) - if err != nil { - t.Fatalf("Expected encoded request with context to not fail: %s", err) - } - got := resp.Code - expected := 200 - if got != expected { - t.Errorf("Expected to receive %v, got: %v", expected, got) - } - } - - // A third request with latency would make the context - // reach the deadline. - req := &request{Message: "Hello"} - resp := &response{} - err = c.RequestWithContext(ctx, "slow", req, resp) - if err == nil { - t.Fatal("Expected request with context to reach deadline") - } - - // Reported error is "context deadline exceeded" from Context package, - // which implements net.Error Timeout interface. - type timeoutError interface { - Timeout() bool - } - timeoutErr, ok := err.(timeoutError) - if !ok || !timeoutErr.Timeout() { - t.Errorf("Expected to have a timeout error") - } - expected := `context deadline exceeded` - if !strings.Contains(err.Error(), expected) { - t.Errorf("Expected %q error, got: %q", expected, err.Error()) - } -} - func TestContextRequestConnClosed(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() @@ -1026,58 +708,6 @@ func TestContextBadSubscription(t *testing.T) { } } -func TestContextInvalid(t *testing.T) { - s := RunDefaultServer() - defer s.Shutdown() - - nc := NewDefaultConnection(t) - c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - t.Fatalf("Unable to create encoded connection: %v", err) - } - defer c.Close() - - //lint:ignore SA1012 testing that passing nil fails - _, err = nc.RequestWithContext(nil, "foo", []byte("")) - if err == nil { - t.Fatal("Expected request to fail with error") - } - if err != nats.ErrInvalidContext { - t.Errorf("Expected request to fail with connection closed error: %s", err) - } - - sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {}) - if err != nil { - t.Fatalf("Expected to be able to subscribe: %s", err) - } - - //lint:ignore SA1012 testing that passing nil fails - _, err = sub.NextMsgWithContext(nil) - if err == nil { - t.Fatal("Expected request to fail with error") - } - if err != nats.ErrInvalidContext { - t.Errorf("Expected request to fail with connection closed error: %s", err) - } - - type request struct { - Message string `json:"message"` - } - type response struct { - Code int `json:"code"` - } - req := &request{Message: "Hello"} - resp := &response{} - //lint:ignore SA1012 testing that passing nil fails - err = c.RequestWithContext(nil, "slow", req, resp) - if err == nil { - t.Fatal("Expected request to fail with error") - } - if err != nats.ErrInvalidContext { - t.Errorf("Expected request to fail with invalid context: %s", err) - } -} - func TestFlushWithContext(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() @@ -1148,3 +778,34 @@ func TestUnsubscribeAndNextMsgWithContext(t *testing.T) { } wg.Wait() } + +func TestContextInvalid(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + defer nc.Close() + + //lint:ignore SA1012 testing that passing nil fails + _, err := nc.RequestWithContext(nil, "foo", []byte("")) + if err == nil { + t.Fatal("Expected request to fail with error") + } + if err != nats.ErrInvalidContext { + t.Errorf("Expected request to fail with connection closed error: %s", err) + } + + sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {}) + if err != nil { + t.Fatalf("Expected to be able to subscribe: %s", err) + } + + //lint:ignore SA1012 testing that passing nil fails + _, err = sub.NextMsgWithContext(nil) + if err == nil { + t.Fatal("Expected request to fail with error") + } + if err != nats.ErrInvalidContext { + t.Errorf("Expected request to fail with connection closed error: %s", err) + } +} diff --git a/test/enc_test.go b/test/enc_test.go index e40abbf06..c8109e7af 100644 --- a/test/enc_test.go +++ b/test/enc_test.go @@ -15,7 +15,9 @@ package test import ( "bytes" + "context" "fmt" + "strings" "testing" "time" @@ -25,6 +27,8 @@ import ( "github.com/nats-io/nats.go/encoders/protobuf/testdata" ) +//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn + func NewDefaultEConn(t *testing.T) *nats.EncodedConn { ec, err := nats.NewEncodedConn(NewConnection(t, TEST_PORT), nats.DEFAULT_ENCODER) if err != nil { @@ -753,3 +757,350 @@ func TestRequestGOB(t *testing.T) { t.Fatalf("Did not receive proper response, %+v", reply) } } + +func TestContextEncodedRequestWithTimeout(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + t.Fatalf("Unable to create encoded connection: %v", err) + } + defer c.Close() + + deadline := time.Now().Add(100 * time.Millisecond) + ctx, cancelCB := context.WithDeadline(context.Background(), deadline) + defer cancelCB() // should always be called, not discarded, to prevent context leak + + type request struct { + Message string `json:"message"` + } + type response struct { + Code int `json:"code"` + } + c.Subscribe("slow", func(_, reply string, req *request) { + got := req.Message + expected := "Hello" + if got != expected { + t.Errorf("Expected to receive request with %q, got %q", got, expected) + } + + // simulates latency into the client so that timeout is hit. + time.Sleep(40 * time.Millisecond) + c.Publish(reply, &response{Code: 200}) + }) + + for i := 0; i < 2; i++ { + req := &request{Message: "Hello"} + resp := &response{} + err := c.RequestWithContext(ctx, "slow", req, resp) + if err != nil { + t.Fatalf("Expected encoded request with context to not fail: %s", err) + } + got := resp.Code + expected := 200 + if got != expected { + t.Errorf("Expected to receive %v, got: %v", expected, got) + } + } + + // A third request with latency would make the context + // reach the deadline. + req := &request{Message: "Hello"} + resp := &response{} + err = c.RequestWithContext(ctx, "slow", req, resp) + if err == nil { + t.Fatal("Expected request with context to reach deadline") + } + + // Reported error is "context deadline exceeded" from Context package, + // which implements net.Error Timeout interface. + type timeoutError interface { + Timeout() bool + } + timeoutErr, ok := err.(timeoutError) + if !ok || !timeoutErr.Timeout() { + t.Errorf("Expected to have a timeout error") + } + expected := `context deadline exceeded` + if !strings.Contains(err.Error(), expected) { + t.Errorf("Expected %q error, got: %q", expected, err.Error()) + } +} + +func TestContextEncodedRequestWithTimeoutCanceled(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + t.Fatalf("Unable to create encoded connection: %v", err) + } + defer c.Close() + + ctx, cancelCB := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancelCB() // should always be called, not discarded, to prevent context leak + + type request struct { + Message string `json:"message"` + } + type response struct { + Code int `json:"code"` + } + + c.Subscribe("fast", func(_, reply string, req *request) { + got := req.Message + expected := "Hello" + if got != expected { + t.Errorf("Expected to receive request with %q, got %q", got, expected) + } + + // simulates latency into the client so that timeout is hit. + time.Sleep(40 * time.Millisecond) + + c.Publish(reply, &response{Code: 200}) + }) + + // Fast request should not fail + req := &request{Message: "Hello"} + resp := &response{} + c.RequestWithContext(ctx, "fast", req, resp) + expectedCode := 200 + if resp.Code != expectedCode { + t.Errorf("Expected to receive %d, got: %d", expectedCode, resp.Code) + } + + // Cancel the context already so that rest of requests fail. + cancelCB() + + err = c.RequestWithContext(ctx, "fast", req, resp) + if err == nil { + t.Fatal("Expected request with timeout context to fail") + } + + // Reported error is "context canceled" from Context package, + // which is not a timeout error. + type timeoutError interface { + Timeout() bool + } + if _, ok := err.(timeoutError); ok { + t.Errorf("Expected to not have a timeout error") + } + expected := `context canceled` + if !strings.Contains(err.Error(), expected) { + t.Errorf("Expected %q error, got: %q", expected, err.Error()) + } + + // 2nd request should fail again even if fast because context has already been canceled + err = c.RequestWithContext(ctx, "fast", req, resp) + if err == nil { + t.Fatal("Expected request with timeout context to fail") + } +} + +func TestContextEncodedRequestWithCancel(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + t.Fatalf("Unable to create encoded connection: %v", err) + } + defer c.Close() + + ctx, cancelCB := context.WithCancel(context.Background()) + defer cancelCB() // should always be called, not discarded, to prevent context leak + + // timer which cancels the context though can also be arbitrarily extended + expirationTimer := time.AfterFunc(100*time.Millisecond, func() { + cancelCB() + }) + + type request struct { + Message string `json:"message"` + } + type response struct { + Code int `json:"code"` + } + c.Subscribe("slow", func(_, reply string, req *request) { + got := req.Message + expected := "Hello" + if got != expected { + t.Errorf("Expected to receive request with %q, got %q", got, expected) + } + + // simulates latency into the client so that timeout is hit. + time.Sleep(40 * time.Millisecond) + c.Publish(reply, &response{Code: 200}) + }) + c.Subscribe("slower", func(_, reply string, req *request) { + got := req.Message + expected := "World" + if got != expected { + t.Errorf("Expected to receive request with %q, got %q", got, expected) + } + + // we know this request will take longer so extend the timeout + expirationTimer.Reset(100 * time.Millisecond) + + // slower reply which would have hit original timeout + time.Sleep(90 * time.Millisecond) + c.Publish(reply, &response{Code: 200}) + }) + + for i := 0; i < 2; i++ { + req := &request{Message: "Hello"} + resp := &response{} + err := c.RequestWithContext(ctx, "slow", req, resp) + if err != nil { + t.Fatalf("Expected encoded request with context to not fail: %s", err) + } + got := resp.Code + expected := 200 + if got != expected { + t.Errorf("Expected to receive %v, got: %v", expected, got) + } + } + + // A third request with latency would make the context + // get canceled, but these reset the timer so deadline + // gets extended: + for i := 0; i < 10; i++ { + req := &request{Message: "World"} + resp := &response{} + err := c.RequestWithContext(ctx, "slower", req, resp) + if err != nil { + t.Fatalf("Expected request with context to not fail: %s", err) + } + got := resp.Code + expected := 200 + if got != expected { + t.Errorf("Expected to receive %d, got: %d", expected, got) + } + } + + req := &request{Message: "Hello"} + resp := &response{} + + // One more slow request will expire the timer and cause an error... + err = c.RequestWithContext(ctx, "slow", req, resp) + if err == nil { + t.Fatal("Expected request with cancellation context to fail") + } + + // ...though reported error is "context canceled" from Context package, + // which is not a timeout error. + type timeoutError interface { + Timeout() bool + } + if _, ok := err.(timeoutError); ok { + t.Errorf("Expected to not have a timeout error") + } + expected := `context canceled` + if !strings.Contains(err.Error(), expected) { + t.Errorf("Expected %q error, got: %q", expected, err.Error()) + } +} + +func TestContextEncodedRequestWithDeadline(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + t.Fatalf("Unable to create encoded connection: %v", err) + } + defer c.Close() + + deadline := time.Now().Add(100 * time.Millisecond) + ctx, cancelCB := context.WithDeadline(context.Background(), deadline) + defer cancelCB() // should always be called, not discarded, to prevent context leak + + type request struct { + Message string `json:"message"` + } + type response struct { + Code int `json:"code"` + } + c.Subscribe("slow", func(_, reply string, req *request) { + got := req.Message + expected := "Hello" + if got != expected { + t.Errorf("Expected to receive request with %q, got %q", got, expected) + } + + // simulates latency into the client so that timeout is hit. + time.Sleep(40 * time.Millisecond) + c.Publish(reply, &response{Code: 200}) + }) + + for i := 0; i < 2; i++ { + req := &request{Message: "Hello"} + resp := &response{} + err := c.RequestWithContext(ctx, "slow", req, resp) + if err != nil { + t.Fatalf("Expected encoded request with context to not fail: %s", err) + } + got := resp.Code + expected := 200 + if got != expected { + t.Errorf("Expected to receive %v, got: %v", expected, got) + } + } + + // A third request with latency would make the context + // reach the deadline. + req := &request{Message: "Hello"} + resp := &response{} + err = c.RequestWithContext(ctx, "slow", req, resp) + if err == nil { + t.Fatal("Expected request with context to reach deadline") + } + + // Reported error is "context deadline exceeded" from Context package, + // which implements net.Error Timeout interface. + type timeoutError interface { + Timeout() bool + } + timeoutErr, ok := err.(timeoutError) + if !ok || !timeoutErr.Timeout() { + t.Errorf("Expected to have a timeout error") + } + expected := `context deadline exceeded` + if !strings.Contains(err.Error(), expected) { + t.Errorf("Expected %q error, got: %q", expected, err.Error()) + } +} + +func TestEncodedContextInvalid(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + t.Fatalf("Unable to create encoded connection: %v", err) + } + defer c.Close() + + type request struct { + Message string `json:"message"` + } + type response struct { + Code int `json:"code"` + } + req := &request{Message: "Hello"} + resp := &response{} + //lint:ignore SA1012 testing that passing nil fails + err = c.RequestWithContext(nil, "slow", req, resp) + if err == nil { + t.Fatal("Expected request to fail with error") + } + if err != nats.ErrInvalidContext { + t.Errorf("Expected request to fail with invalid context: %s", err) + } +} diff --git a/test/gob_test.go b/test/gob_test.go index c772e1074..f326f8c9f 100644 --- a/test/gob_test.go +++ b/test/gob_test.go @@ -20,6 +20,8 @@ import ( "github.com/nats-io/nats.go" ) +//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn + func NewGobEncodedConn(tl TestLogger) *nats.EncodedConn { ec, err := nats.NewEncodedConn(NewConnection(tl, TEST_PORT), nats.GOB_ENCODER) if err != nil { diff --git a/test/helper_test.go b/test/helper_test.go index 7f2aedf0c..36d47e000 100644 --- a/test/helper_test.go +++ b/test/helper_test.go @@ -103,15 +103,6 @@ func NewConnection(t tLogger, port int) *nats.Conn { return nc } -// NewEConn -func NewEConn(t tLogger) *nats.EncodedConn { - ec, err := nats.NewEncodedConn(NewDefaultConnection(t), nats.DEFAULT_ENCODER) - if err != nil { - t.Fatalf("Failed to create an encoded connection: %v\n", err) - } - return ec -} - //////////////////////////////////////////////////////////////////////////////// // Running nats server in separate Go routines //////////////////////////////////////////////////////////////////////////////// diff --git a/test/json_test.go b/test/json_test.go index 4ef6f42c5..cebbf31ec 100644 --- a/test/json_test.go +++ b/test/json_test.go @@ -22,6 +22,8 @@ import ( "github.com/nats-io/nats.go/encoders/builtin" ) +//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn + func NewJsonEncodedConn(tl TestLogger) *nats.EncodedConn { ec, err := nats.NewEncodedConn(NewConnection(tl, TEST_PORT), nats.JSON_ENCODER) if err != nil { diff --git a/test/netchan_test.go b/test/netchan_test.go index b7272a909..f21d32921 100644 --- a/test/netchan_test.go +++ b/test/netchan_test.go @@ -20,6 +20,17 @@ import ( "github.com/nats-io/nats.go" ) +//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn + +// NewEConn +func NewEConn(t tLogger) *nats.EncodedConn { + ec, err := nats.NewEncodedConn(NewDefaultConnection(t), nats.DEFAULT_ENCODER) + if err != nil { + t.Fatalf("Failed to create an encoded connection: %v\n", err) + } + return ec +} + func TestBadChan(t *testing.T) { s := RunDefaultServer() defer s.Shutdown() diff --git a/test/protobuf_test.go b/test/protobuf_test.go index 08fdf6771..d4bee8c85 100644 --- a/test/protobuf_test.go +++ b/test/protobuf_test.go @@ -25,6 +25,8 @@ import ( pb "github.com/nats-io/nats.go/encoders/protobuf/testdata" ) +//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn + func NewProtoEncodedConn(tl TestLogger) *nats.EncodedConn { ec, err := nats.NewEncodedConn(NewConnection(tl, TEST_PORT), protobuf.PROTOBUF_ENCODER) if err != nil { diff --git a/test/reconnect_test.go b/test/reconnect_test.go index e543db72e..9fc1b2311 100644 --- a/test/reconnect_test.go +++ b/test/reconnect_test.go @@ -18,6 +18,7 @@ import ( "fmt" "net" "net/url" + "strconv" "sync" "sync/atomic" "testing" @@ -178,19 +179,15 @@ func TestBasicReconnectFunctionality(t *testing.T) { t.Fatalf("Should have connected ok: %v\n", err) } defer nc.Close() - ec, err := nats.NewEncodedConn(nc, nats.DEFAULT_ENCODER) - if err != nil { - t.Fatalf("Failed to create an encoded connection: %v\n", err) - } testString := "bar" - ec.Subscribe("foo", func(s string) { - if s != testString { + nc.Subscribe("foo", func(m *nats.Msg) { + if string(m.Data) != testString { t.Fatal("String doesn't match") } ch <- true }) - ec.Flush() + nc.Flush() ts.Shutdown() // server is stopped here... @@ -199,14 +196,14 @@ func TestBasicReconnectFunctionality(t *testing.T) { t.Fatalf("Did not get the disconnected callback on time\n") } - if err := ec.Publish("foo", testString); err != nil { + if err := nc.Publish("foo", []byte("bar")); err != nil { t.Fatalf("Failed to publish message: %v\n", err) } ts = startReconnectServer(t) defer ts.Shutdown() - if err := ec.FlushTimeout(5 * time.Second); err != nil { + if err := nc.FlushTimeout(5 * time.Second); err != nil { t.Fatalf("Error on Flush: %v", err) } @@ -215,7 +212,7 @@ func TestBasicReconnectFunctionality(t *testing.T) { } expectedReconnectCount := uint64(1) - reconnectCount := ec.Conn.Stats().Reconnects + reconnectCount := nc.Stats().Reconnects if reconnectCount != expectedReconnectCount { t.Fatalf("Reconnect count incorrect: %d vs %d\n", @@ -241,23 +238,20 @@ func TestExtendedReconnectFunctionality(t *testing.T) { t.Fatalf("Should have connected ok: %v", err) } defer nc.Close() - ec, err := nats.NewEncodedConn(nc, nats.DEFAULT_ENCODER) - if err != nil { - t.Fatalf("Failed to create an encoded connection: %v\n", err) - } + testString := "bar" received := int32(0) - ec.Subscribe("foo", func(s string) { + nc.Subscribe("foo", func(*nats.Msg) { atomic.AddInt32(&received, 1) }) - sub, _ := ec.Subscribe("foobar", func(s string) { + sub, _ := nc.Subscribe("foobar", func(*nats.Msg) { atomic.AddInt32(&received, 1) }) - ec.Publish("foo", testString) - ec.Flush() + nc.Publish("foo", []byte(testString)) + nc.Flush() ts.Shutdown() // server is stopped here.. @@ -268,18 +262,18 @@ func TestExtendedReconnectFunctionality(t *testing.T) { } // Sub while disconnected - ec.Subscribe("bar", func(s string) { + nc.Subscribe("bar", func(*nats.Msg) { atomic.AddInt32(&received, 1) }) // Unsub foobar while disconnected sub.Unsubscribe() - if err = ec.Publish("foo", testString); err != nil { + if err = nc.Publish("foo", []byte(testString)); err != nil { t.Fatalf("Received an error after disconnect: %v\n", err) } - if err = ec.Publish("bar", testString); err != nil { + if err = nc.Publish("bar", []byte(testString)); err != nil { t.Fatalf("Received an error after disconnect: %v\n", err) } @@ -292,19 +286,19 @@ func TestExtendedReconnectFunctionality(t *testing.T) { t.Fatal("Did not receive a reconnect callback message") } - if err = ec.Publish("foobar", testString); err != nil { + if err = nc.Publish("foobar", []byte(testString)); err != nil { t.Fatalf("Received an error after server restarted: %v\n", err) } - if err = ec.Publish("foo", testString); err != nil { + if err = nc.Publish("foo", []byte(testString)); err != nil { t.Fatalf("Received an error after server restarted: %v\n", err) } ch := make(chan bool) - ec.Subscribe("done", func(b bool) { + nc.Subscribe("done", func(*nats.Msg) { ch <- true }) - ec.Publish("done", true) + nc.Publish("done", nil) if e := Wait(ch); e != nil { t.Fatal("Did not receive our message") @@ -337,11 +331,6 @@ func TestQueueSubsOnReconnect(t *testing.T) { } defer nc.Close() - ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) - if err != nil { - t.Fatalf("Failed to create an encoded connection: %v\n", err) - } - // To hold results. results := make(map[int]int) var mu sync.Mutex @@ -364,25 +353,29 @@ func TestQueueSubsOnReconnect(t *testing.T) { subj := "foo.bar" qgroup := "workers" - cb := func(seqno int) { + cb := func(m *nats.Msg) { mu.Lock() defer mu.Unlock() + seqno, err := strconv.Atoi(string(m.Data)) + if err != nil { + t.Fatalf("Received an invalid sequence number: %v\n", err) + } results[seqno] = results[seqno] + 1 } // Create Queue Subscribers - ec.QueueSubscribe(subj, qgroup, cb) - ec.QueueSubscribe(subj, qgroup, cb) + nc.QueueSubscribe(subj, qgroup, cb) + nc.QueueSubscribe(subj, qgroup, cb) - ec.Flush() + nc.Flush() // Helper function to send messages and check results. sendAndCheckMsgs := func(numToSend int) { for i := 0; i < numToSend; i++ { - ec.Publish(subj, i) + nc.Publish(subj, []byte(fmt.Sprint(i))) } // Wait for processing. - ec.Flush() + nc.Flush() time.Sleep(50 * time.Millisecond) // Check Results diff --git a/test/sub_test.go b/test/sub_test.go index f0f83a8d5..1961e23d8 100644 --- a/test/sub_test.go +++ b/test/sub_test.go @@ -1120,7 +1120,7 @@ func TestAsyncSubscriptionPending(t *testing.T) { } // Test old way - q, _ := sub.QueuedMsgs() + q, _, _ := sub.Pending() if q != total && q != total-1 { t.Fatalf("Expected %d or %d, got %d", total, total-1, q) } @@ -1271,7 +1271,7 @@ func TestSyncSubscriptionPending(t *testing.T) { nc.Flush() // Test old way - q, _ := sub.QueuedMsgs() + q, _, _ := sub.Pending() if q != total && q != total-1 { t.Fatalf("Expected %d or %d, got %d", total, total-1, q) }