Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented reconnects for jetstream channels #409

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ data:
caBundle: "" # caBundle is a base64 PEM encoded CA certificate chain
secret:
name: "" # a secret containing a `ca.crt` entry.
connOpts:
retryOnFailedConnect: true # should it reconnect on failed connection
maxReconnects: 50 # max reconnect attempts
reconnectWait: 2000 # delay between reconnect attempts
```

## JetStream integration
Expand Down
20 changes: 20 additions & 0 deletions examples/config-br-default-channel-jsm.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: config-br-default-channel
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
data:
channelTemplateSpec: |
apiVersion: messaging.knative.dev/v1alpha1
kind: NatsJetStreamChannel
spec:
stream:
config:
retention: Limits
maxBytes: 1000000000
replicas: 1
consumerConfigTemplate:
deliverPolicy: New
maxDeliver: 1
5 changes: 4 additions & 1 deletion examples/config-nats.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ metadata:
data:
eventing-nats: |
url: nats://nats.nats-io.svc.cluster.local

connOpts:
retryOnFailedConnect: true
maxReconnects: 5
reconnectWaitMilliseconds: 2000

20 changes: 16 additions & 4 deletions pkg/common/config/eventingnatsconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ limitations under the License.

package config

import v1 "k8s.io/api/core/v1"
import (
v1 "k8s.io/api/core/v1"
)

// EventingNatsConfig represents the YAML configuration which can be provided in the `config-nats` ConfigMap under the
// key, "eventing-nats".
type EventingNatsConfig struct {
URL string `json:"url,omitempty"`
Auth *ENConfigAuth `json:"auth,omitempty"`
RootCA *ENConfigRootCA `json:"tls,omitempty"`
URL string `json:"url,omitempty"`
ConnOpts *ConnOpts `json:"connOpts,omitempty"`
Auth *ENConfigAuth `json:"auth,omitempty"`
RootCA *ENConfigRootCA `json:"tls,omitempty"`
}

// ENConfigAuth provides configuration on how the client should authenticate itself to the server.
Expand Down Expand Up @@ -60,3 +63,12 @@ type ENConfigRootCA struct {
// Secret is a reference to an existing Secret where the controller will extract a certificate by the "ca.crt" key.
Secret *v1.LocalObjectReference `json:"secret,omitempty"`
}

type ConnOpts struct {
// MaxReconnects how many attempts to reconnect
MaxReconnects int `json:"maxReconnects,omitempty"`
// RetryOnFailedConnect should retry on failed reconnect
RetryOnFailedConnect bool `json:"retryOnFailedConnect,omitempty"`
// ReconnectWaitMilliseconds time between reconnects in milliseconds
ReconnectWaitMilliseconds int `json:"reconnectWaitMilliseconds,omitempty"`
}
33 changes: 32 additions & 1 deletion pkg/common/nats/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"encoding/base64"
"errors"
"fmt"
"time"

"knative.dev/pkg/logging"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -42,6 +45,8 @@ var (
)

func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*nats.Conn, error) {
logger := logging.FromContext(ctx)

url := config.URL
if url == "" {
url = constants.DefaultNatsURL
Expand All @@ -54,7 +59,7 @@ func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*

secrets := coreV1Client.Secrets(getNamespace(ctx))

var opts []nats.Option
opts := []nats.Option{nats.Name("kn jsm dispatcher")}

if config.Auth != nil {
o, err := buildAuthOption(ctx, *config.Auth, secrets)
Expand All @@ -74,6 +79,32 @@ func NewNatsConn(ctx context.Context, config commonconfig.EventingNatsConfig) (*
opts = append(opts, o)
}

// reconnection options
if config.ConnOpts != nil && config.ConnOpts.RetryOnFailedConnect {
reconnectWait := time.Duration(config.ConnOpts.ReconnectWaitMilliseconds) * time.Millisecond
logger.Infof("Configuring retries: %#v", config.ConnOpts)
opts = append(opts, nats.RetryOnFailedConnect(config.ConnOpts.RetryOnFailedConnect))
opts = append(opts, nats.ReconnectWait(reconnectWait))
opts = append(opts, nats.MaxReconnects(config.ConnOpts.MaxReconnects))
opts = append(opts, nats.CustomReconnectDelay(func(attempts int) time.Duration {
if (config.ConnOpts.MaxReconnects - attempts) < 0 {
logger.Fatalf("Failed to recconect to Nats, no attempts left")
}
logger.Debugf("Reconnect attempts left: %d", config.ConnOpts.MaxReconnects-attempts)
return reconnectWait
}))
opts = append(opts, nats.ReconnectJitter(1000, time.Millisecond))
opts = append(opts, nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
logger.Warnf("Disconnected from JSM: err=%v", err)
logger.Warnf("Disconnected from JSM: will attempt reconnects for %d", config.ConnOpts.MaxReconnects)
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
logger.Infof("Reconnected to JSM [%s]", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
logger.Fatal("Exiting, no JSM servers available")
}))
}
return nats.Connect(url, opts...)
}

Expand Down