Skip to content

Commit

Permalink
update-golangcilint-config
Browse files Browse the repository at this point in the history
  • Loading branch information
BoskyWSMFN committed Sep 30, 2024
1 parent d834cde commit 8b0a6b3
Show file tree
Hide file tree
Showing 8 changed files with 3,003 additions and 656 deletions.
3,089 changes: 2,720 additions & 369 deletions .golangci.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion logger/dev.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build enable_zap_logger
//go:build enable_zap_logger

package logger

Expand Down
3 changes: 2 additions & 1 deletion logger/prod.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build !enable_zap_logger
//go:build !enable_zap_logger

package logger

Expand All @@ -8,5 +8,6 @@ type Logger = *zap.Logger

var Log = func() Logger {
l, _ := zap.NewProduction()

return l
}()
151 changes: 74 additions & 77 deletions nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,39 @@ package nats

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/imperiuse/advanced-nats-client/v1/logger"
"github.com/imperiuse/advanced-nats-client/v1/serializable"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/imperiuse/advanced-nats-client/v1/logger"
"github.com/imperiuse/advanced-nats-client/v1/serializable"
)

const (
// MaxReconnectDefault - max reconnect try cnt.
MaxReconnectDefault = -1 // infinity
// MaxReconnectDefault - max number of reconnect attempts.
MaxReconnectDefault = -1 // infinite

// ReconnectWaitDefault - reconnect w8 timeout default value.
// ReconnectWaitDefault - default reconnect wait timeout value.
ReconnectWaitDefault = 5 * time.Second

// ReconnectJitterDefault - reconnect w8 jitter timeout default value.
ReconnectJitterDefault = time.Second * 1
// ReconnectJitterDefault - default reconnect jitter wait timeout value.
ReconnectJitterDefault = 1 * time.Second

// ReconnectJitterTLSDefault - reconnect w8 jitter TLS timeout default value.
ReconnectJitterTLSDefault = time.Second * 2
// ReconnectJitterTLSDefault - default reconnect jitter TLS wait timeout value.
ReconnectJitterTLSDefault = 2 * time.Second
)

// ErrEmptyMsg - empty msg. nats msg is nil.
var ErrEmptyMsg = errors.New("empty msg. nats msg is nil")
// ErrEmptyMsg - error returned when the NATS message is nil.
var ErrEmptyMsg = errors.New("empty message: NATS message is nil")

// DefaultDSN - default nats url and port.
// DefaultDSN - default NATS URL and port.
var DefaultDSN = []URL{nats.DefaultURL}

// SimpleNatsClientI _ .
// SimpleNatsClientI - interface defining basic operations of a NATS client.
type SimpleNatsClientI interface {
UseCustomLogger(logger.Logger)
Ping(context.Context, Subj) (bool, error)
Expand All @@ -47,7 +49,7 @@ type SimpleNatsClientI interface {

//go:generate mockery --name=PureNatsConnI
type (
// PureNatsConnI - pure nats conn interface.
// PureNatsConnI - interface for pure NATS connection.
PureNatsConnI interface {
RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error)
Subscribe(subj string, msgHandler MsgHandler) (*Subscription, error)
Expand All @@ -61,32 +63,32 @@ type (
// QueueGroup - queue group name.
QueueGroup string

// client - wrapper for pure nats.Conn, so it's own nats client library for reduce code.
// Client - wrapper around the pure nats.Conn, a custom NATS client to reduce repetitive code.
client struct {
log logger.Logger
dsn []URL
dsn []URL //nolint:unused
conn PureNatsConnI

pureNC *Conn // pure nats connection, for some special stuff, doesn't matter in all
pureNC *Conn // pure NATS connection, used for special operations, generally not required
}

// URL - url name.
URL = string // dsn url like this -> "nats://127.0.0.1:4222"
// URL - URL string.
URL = string // DSN URL, e.g., "nats://127.0.0.1:4222"

// Option - nats.Msg.
// Option - alias for nats.Option.
Option = nats.Option
// MsgHandler - nats.MsgHandler.
// MsgHandler - alias for nats.MsgHandler.
MsgHandler = nats.MsgHandler
// Msg - nats.Msg.
// Msg - alias for nats.Msg.
Msg = nats.Msg
// Subscription - nats.Subscription.
// Subscription - alias for nats.Subscription.
Subscription = nats.Subscription
// Conn - nats.Conn struct.
// Conn - alias for nats.Conn.
Conn = nats.Conn

// Handler - pure NATS Msg, request reply.
// Handler - handler function to process pure NATS messages and reply.
Handler = func(*Msg, Serializable) Serializable
// Serializable - serializable.
// Serializable - serializable interface.
Serializable = serializable.Serializable
)

Expand All @@ -95,19 +97,19 @@ const (
pongMsg = "pong"
)

// New - main "constructor" function.
// nolint golint
// New - main constructor function.
// nolint: golint
func New(dsn []URL, options ...Option) (*client, error) {
c := NewDefaultClient().addDSN(dsn)

// Default settings for internal NATS client
// Default settings for the internal NATS client
if len(options) == 0 {
options = c.defaultNatsOptions()
}

conn, err := createNatsConn(dsn, options...)
if err != nil {
return nil, errors.Wrap(err, "can't create nats conn. nats unavailbale?")
return nil, fmt.Errorf("unable to create NATS connection: is NATS unavailable?: %w", err)
}

c.conn = conn
Expand All @@ -116,22 +118,20 @@ func New(dsn []URL, options ...Option) (*client, error) {
return c, nil
}

func createNatsConn(dsn []URL, option ...Option) (*nats.Conn, error) {
// Normally, the library will return an error when trying to connect and
// there is no server running. The RetryOnFailedConnect option will set
// the connection in reconnecting state if it failed to connect right away.
conn, err := nats.Connect(strings.Join(dsn, ", "), option...)
func createNatsConn(dsn []URL, options ...Option) (*nats.Conn, error) {
// Normally, the library returns an error when attempting to connect if there is no server running.
// The RetryOnFailedConnect option sets the connection to a reconnecting state if the initial connection attempt fails.
conn, err := nats.Connect(strings.Join(dsn, ", "), options...)
if err != nil {
// Should not return an error even if it can't connect, but you still
// need to check in case there are some configuration errors.
return nil, errors.Wrap(err, "create nats.Connect")
// Should not return an error even if it can't connect immediately, but still check for configuration errors.
return nil, fmt.Errorf("error creating nats.Connect: %w", err)
}

return conn, nil
}

// NewDefaultClient empty default client.
// nolint golint
// NewDefaultClient creates an empty default client.
// nolint: golint
func NewDefaultClient() *client {
return &client{
dsn: DefaultDSN,
Expand All @@ -153,106 +153,107 @@ func (c *client) defaultNatsOptions() []Option {
nats.ReconnectJitter(ReconnectJitterDefault, ReconnectJitterTLSDefault),
nats.ReconnectWait(ReconnectWaitDefault),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
c.log.Warn("[DisconnectErrHandler] Disconnect", zap.Error(err))
c.log.Warn("[DisconnectErrHandler] Disconnected", zap.Error(err))
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
c.log.Warn("[ReconnectHandler] Reconnect", zap.String("ConnUrl", nc.ConnectedUrl()))
c.log.Warn("[ReconnectHandler] Reconnected", zap.String("ConnUrl", nc.ConnectedUrl()))
}),
nats.ClosedHandler(func(nc *nats.Conn) {
c.log.Warn("[ClosedHandler] Close handler", zap.Error(nc.LastError()))
c.log.Warn("[ClosedHandler] Connection closed", zap.Error(nc.LastError()))
}),
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
c.log.Warn("[ErrorHandler]", zap.Error(err))
c.log.Warn("[ErrorHandler] NATS error", zap.Error(err))
}),
}
}

// UseCustomLogger - register your own logger instance of zap.Logger.
// UseCustomLogger registers a custom logger instance.
func (c *client) UseCustomLogger(log logger.Logger) {
c.log = log
}

// Ping - send synchronous request with ctx deadline or timeout
// e.g. ctx, _ := context.WithTimeout(context.Background(), time.Second).
// Ping sends a synchronous request with a context deadline or timeout.
func (c *client) Ping(ctx context.Context, subj Subj) (bool, error) {
c.log.Debug("Ping", zap.String("subj", string(subj)))

msg, err := c.conn.RequestWithContext(ctx, string(subj), []byte(pingMsg))
if err != nil {
return false, errors.Wrap(err, "c.conn.RequestWithContext")
return false, fmt.Errorf("c.conn.RequestWithContext: %w", err)
}

if msg == nil {
return false, errors.Wrap(ErrEmptyMsg, "msg == nil")
return false, fmt.Errorf("msg is nil: %w", ErrEmptyMsg)
}

return string(msg.Data) == pongMsg, nil
}

// PongHandler - register simple pong handler (use to peer-to-peer topics).
// PongHandler registers a simple pong handler (used for peer-to-peer topics).
func (c *client) PongHandler(subj Subj) (*Subscription, error) {
c.log.Debug("[PongHandler]", zap.String("subj", string(subj)))

return c.conn.Subscribe(string(subj), func(msg *Msg) {
if msg == nil {
c.log.Debug("Nil msg", zap.String("subj", string(subj)))
c.log.Debug("Received nil msg", zap.String("subj", string(subj)))

return
}

if string(msg.Data) == pingMsg {
_ = msg.Respond([]byte(pongMsg))
}
})
}

// PongQueueHandler - register pong handler with QueueGroup (use to 1 to Many).
// PongQueueHandler registers a pong handler with QueueGroup (used for 1-to-many communication).
func (c *client) PongQueueHandler(subj Subj, queue QueueGroup) (*Subscription, error) {
c.log.Debug("[PongQueueHandler]", zap.String("subj", string(subj)), zap.String("queue", string(queue)))

return c.conn.QueueSubscribe(string(subj), string(queue), func(msg *Msg) {
if msg == nil {
return
}

if string(msg.Data) == pingMsg {
_ = msg.Respond([]byte(pongMsg))
}
})
}

// Request - send synchronous msg to topic subj, and wait reply from another topic (e.g. Request-Reply Nats pattern).
func (c *client) Request(ctx context.Context, subj Subj, request Serializable, reply Serializable) error {
// Request sends a synchronous message to a subject and waits for a reply (following the Request-Reply NATS pattern).
func (c *client) Request(ctx context.Context, subj Subj, request, reply Serializable) error {
c.log.Debug("[Request]", zap.String("subj", string(subj)), zap.Any("data", request))

byteData, err := request.Marshal()
if err != nil {
return errors.Wrap(err, "request.Marshal()")
return fmt.Errorf("request.Marshal(): %w", err)
}

msg, err := c.conn.RequestWithContext(ctx, string(subj), byteData)
if err != nil {
return errors.Wrap(err, "c.conn.RequestWithContext")
return fmt.Errorf("c.conn.RequestWithContext: %w", err)
}

if msg == nil {
return errors.Wrap(ErrEmptyMsg, "Request")
return fmt.Errorf("*client.Request(): %w", ErrEmptyMsg)
}

return reply.Unmarshal(msg.Data)
}

// ReplyHandler - register for asynchronous msgHandler func for process Nats Msg.
// ReplyHandler registers an asynchronous message handler for processing NATS messages.
func (c *client) ReplyHandler(subj Subj, awaitData Serializable, msgHandler Handler) (*Subscription, error) {
return c.conn.Subscribe(string(subj), func(msg *nats.Msg) {
if msg == nil {
c.log.Warn("[ReplyHandler] Nil msg", zap.String("subj", string(subj)))
c.log.Warn("[ReplyHandler] Received nil message", zap.String("subj", string(subj)))

return
}

awaitData.Reset() // Important! For use clean struct
awaitData.Reset() // Important! Clean struct before use.

if err := awaitData.Unmarshal(msg.Data); err != nil {
c.log.Error("[ReplyHandler] Unmarshal",
c.log.Error("[ReplyHandler] Unmarshal failed",
zap.String("subj", string(subj)),
zap.Any("msg", msg),
zap.Error(err),
Expand All @@ -265,7 +266,7 @@ func (c *client) ReplyHandler(subj Subj, awaitData Serializable, msgHandler Hand

data, err := replyData.Marshal()
if err != nil {
c.log.Error("[ReplyHandler] Marshall",
c.log.Error("[ReplyHandler] Marshal failed",
zap.String("subj", string(subj)),
zap.Any("data", replyData),
zap.Error(err),
Expand All @@ -275,29 +276,27 @@ func (c *client) ReplyHandler(subj Subj, awaitData Serializable, msgHandler Hand
}

if err = msg.Respond(data); err != nil {
c.log.Error("[ReplyHandler] Respond",
c.log.Error("[ReplyHandler] Response failed",
zap.String("subj", string(subj)),
zap.Error(err),
)

return
}
})
}

// ReplyQueueHandler - register queue for asynchronous msgHandler func for process Nats Msg.
// ReplyQueueHandler registers a queue for asynchronous message processing with a handler.
func (c *client) ReplyQueueHandler(subj Subj, qG QueueGroup, awaitData Serializable, h Handler) (*Subscription, error) {
return c.conn.QueueSubscribe(string(subj), string(qG), func(msg *nats.Msg) {
if msg == nil {
c.log.Warn("[ReplyQueueHandler] Nil msg", zap.String("subj", string(subj)))
c.log.Warn("[ReplyQueueHandler] Received nil message", zap.String("subj", string(subj)))

return
}

awaitData.Reset() // Important! For use clean struct
awaitData.Reset() // Important! Clean struct before use.

if err := awaitData.Unmarshal(msg.Data); err != nil {
c.log.Error("[ReplyQueueHandler] Unmarshal",
c.log.Error("[ReplyQueueHandler] Unmarshal failed",
zap.String("subj", string(subj)),
zap.String("qGroup", string(qG)),
zap.Any("msg", msg),
Expand All @@ -311,7 +310,7 @@ func (c *client) ReplyQueueHandler(subj Subj, qG QueueGroup, awaitData Serializa

data, err := replyData.Marshal()
if err != nil {
c.log.Error("[ReplyQueueHandler] Marshall",
c.log.Error("[ReplyQueueHandler] Marshal failed",
zap.String("subj", string(subj)),
zap.String("qGroup", string(qG)),
zap.Any("data", replyData),
Expand All @@ -322,28 +321,26 @@ func (c *client) ReplyQueueHandler(subj Subj, qG QueueGroup, awaitData Serializa
}

if err = msg.Respond(data); err != nil {
c.log.Error("[ReplyQueueHandler] Respond",
c.log.Error("[ReplyQueueHandler] Response failed",
zap.String("subj", string(subj)),
zap.String("qGroup", string(qG)),
zap.Error(err),
)

return
}
})
}

// NatsConn - return pure Nats Conn (pointer to struct).
// NatsConn returns the pure NATS connection (pointer to the connection struct).
func (c *client) NatsConn() *Conn {
return c.pureNC
}

// Close - Drain and Close workaround.
// Close drains and closes the NATS connection.
func (c *client) Close() error {
defer c.conn.Close()

if err := c.conn.Drain(); err != nil {
return errors.Wrap(err, "c.conn.Drain")
return fmt.Errorf("c.conn.Drain: %w", err)
}

return nil
Expand Down
Loading

0 comments on commit 8b0a6b3

Please sign in to comment.