Skip to content

Commit

Permalink
implemented reconnects for jetstream channels
Browse files Browse the repository at this point in the history
  • Loading branch information
astelmashenko committed Jul 7, 2023
1 parent 67f77c7 commit 5d8efcf
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 6 deletions.
4 changes: 4 additions & 0 deletions docs/jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ data:
caBundle: "" # caBundle is a base64 PEM encoded CA certificate chain
secret:
name: "" # a secret containing a `ca.crt` entry.
connOpts:
retryOnFailedConnect: true # should it reconnect on failed connection
maxReconnects: 50 # max reconnect attempts
reconnectWait: 2000 # delay between reconnect attempts
```
## JetStream integration
Expand Down
20 changes: 20 additions & 0 deletions examples/config-br-default-channel-jsm.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: config-br-default-channel
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
data:
channelTemplateSpec: |
apiVersion: messaging.knative.dev/v1alpha1
kind: NatsJetStreamChannel
spec:
stream:
config:
retention: Limits
maxBytes: 1000000000
replicas: 1
consumerConfigTemplate:
deliverPolicy: New
maxDeliver: 1
5 changes: 4 additions & 1 deletion examples/config-nats.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ metadata:
data:
eventing-nats: |
url: nats://nats.nats-io.svc.cluster.local
connOpts:
retryOnFailedConnect: true
maxReconnects: 5
reconnectWaitMilliseconds: 2000
20 changes: 16 additions & 4 deletions pkg/common/config/eventingnatsconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ limitations under the License.

package config

import v1 "k8s.io/api/core/v1"
import (
v1 "k8s.io/api/core/v1"
)

// EventingNatsConfig represents the YAML configuration which can be provided in the `config-nats` ConfigMap under the
// key, "eventing-nats".
type EventingNatsConfig struct {
URL string `json:"url,omitempty"`
Auth *ENConfigAuth `json:"auth,omitempty"`
RootCA *ENConfigRootCA `json:"tls,omitempty"`
URL string `json:"url,omitempty"`
ConnOpts *ConnOpts `json:"connOpts,omitempty"`
Auth *ENConfigAuth `json:"auth,omitempty"`
RootCA *ENConfigRootCA `json:"tls,omitempty"`
}

// ENConfigAuth provides configuration on how the client should authenticate itself to the server.
Expand Down Expand Up @@ -60,3 +63,12 @@ type ENConfigRootCA struct {
// Secret is a reference to an existing Secret where the controller will extract a certificate by the "ca.crt" key.
Secret *v1.LocalObjectReference `json:"secret,omitempty"`
}

type ConnOpts struct {
// MaxReconnects how many attempts to reconnect
MaxReconnects int `json:"maxReconnects,omitempty"`
// RetryOnFailedConnect should retry on failed reconnect
RetryOnFailedConnect bool `json:"retryOnFailedConnect,omitempty"`
// ReconnectWaitMilliseconds time between reconnects in milliseconds
ReconnectWaitMilliseconds int `json:"reconnectWaitMilliseconds,omitempty"`
}
33 changes: 32 additions & 1 deletion pkg/common/nats/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"encoding/base64"
"errors"
"fmt"
"time"

"knative.dev/pkg/logging"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -42,6 +45,8 @@ var (
)

func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*nats.Conn, error) {
logger := logging.FromContext(ctx)

url := config.URL
if url == "" {
url = constants.DefaultNatsURL
Expand All @@ -54,7 +59,7 @@ func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*

secrets := coreV1Client.Secrets(getNamespace(ctx))

var opts []nats.Option
opts := []nats.Option{nats.Name("kn jsm dispatcher")}

if config.Auth != nil {
o, err := buildAuthOption(ctx, *config.Auth, secrets)
Expand All @@ -74,6 +79,32 @@ func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*
opts = append(opts, o)
}

// reconnection options
if config.ConnOpts != nil && config.ConnOpts.RetryOnFailedConnect {
reconnectWait := time.Duration(config.ConnOpts.ReconnectWaitMilliseconds) * time.Millisecond
logger.Infof("Configuring retries: %#v", config.ConnOpts)
opts = append(opts, nats.RetryOnFailedConnect(config.ConnOpts.RetryOnFailedConnect))
opts = append(opts, nats.ReconnectWait(reconnectWait))
opts = append(opts, nats.MaxReconnects(config.ConnOpts.MaxReconnects))
opts = append(opts, nats.CustomReconnectDelay(func(attempts int) time.Duration {
if (config.ConnOpts.MaxReconnects - attempts) < 0 {
logger.Fatalf("Failed to recconect to Nats, no attempts left")
}
logger.Debugf("Reconnect attempts left: %d", config.ConnOpts.MaxReconnects-attempts)
return reconnectWait
}))
opts = append(opts, nats.ReconnectJitter(1000, time.Millisecond))
opts = append(opts, nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
logger.Warnf("Disconnected from JSM: err=%v", err)
logger.Warnf("Disconnected from JSM: will attempt reconnects for %d", config.ConnOpts.MaxReconnects)
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
logger.Infof("Reconnected to JSM [%s]", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
logger.Fatal("Exiting, no JSM servers available")
}))
}
return nats.Connect(url, opts...)
}

Expand Down

0 comments on commit 5d8efcf

Please sign in to comment.