Skip to content

Commit

Permalink
add reconnect for lwm2m gateway (#1075)
Browse files Browse the repository at this point in the history
* add reconnect for lwm2m gateway

* remove unused var

* update comment

* fix notify will crash when reconnect

* disable ping interval by default

* revert comment

* fix bad ping check
  • Loading branch information
rhoninl authored Dec 11, 2024
1 parent 7af19e4 commit 1771453
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 28 deletions.
114 changes: 109 additions & 5 deletions pkg/gateway/lwm2m/client/lwm2m.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const (
registerPath = "/rd"

observeTaskSuffix = "-ob"

reconnectInterval = 5 * time.Second
maxReconnectBackoff = 60 * time.Second
reconnectBackoffExp = 1.5
)

type Client struct {
Expand All @@ -44,6 +48,9 @@ type Client struct {

udpConnection *udpClient.Conn
taskManager *TaskManager

reconnectCh chan struct{}
stopCh chan struct{}
}

type Config struct {
Expand All @@ -55,23 +62,100 @@ type Config struct {

func NewClient(ctx context.Context, config Config) (*Client, error) {
var client = &Client{
ctx: context.TODO(),
ctx: ctx,
Config: config,
object: *NewObject(rootObjectId, nil),
taskManager: NewTaskManager(ctx),
dataCache: make(map[string]interface{}),
reconnectCh: make(chan struct{}),
stopCh: make(chan struct{}),
}

return client, nil
}

func (c *Client) Start() error {
// Initial connection
if err := c.connect(); err != nil {
return err
}

// Start connection monitor
go c.connectionMonitor()

return c.Register()
}

func (c *Client) connectionMonitor() {
backoff := reconnectInterval

for {
select {
case <-c.stopCh:
return

case <-c.reconnectCh:
logger.Info("Connection lost, attempting to reconnect...")

for {
// Try to reconnect
err := c.reconnect()
if err == nil {
logger.Info("Successfully reconnected")
backoff = reconnectInterval // Reset backoff on successful connection
break
}

logger.Errorf("Failed to reconnect: %v", err)

// Exponential backoff with max limit
backoff = time.Duration(float64(backoff) * reconnectBackoffExp)
if backoff > maxReconnectBackoff {
backoff = maxReconnectBackoff
}

select {
case <-c.stopCh:
return
case <-time.After(backoff):
continue
}
}
}
}
}

func (c *Client) reconnect() error {
// Close existing connection if any
if c.udpConnection != nil {
c.udpConnection.Close()
c.udpConnection = nil
}

// Establish new connection
if err := c.connect(); err != nil {
return err
}

// Update with the server
if err := c.Update(); err != nil {
return err
}

return nil
}

func (c *Client) connect() error {
udpClientOpts := []udp.Option{}

udpClientOpts = append(
udpClientOpts,
options.WithInactivityMonitor(time.Minute, func(cc *udpClient.Conn) {
_ = cc.Close()
logger.Warn("Connection inactive, triggering reconnect")
select {
case c.reconnectCh <- struct{}{}:
default:
}
}),
options.WithMux(c.handleRouter()),
)
Expand Down Expand Up @@ -366,13 +450,20 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str
c.taskManager.AddTask(objectId, time.Second*time.Duration(c.Settings.ObserveIntervalSec), func() {
data, err := c.object.ReadAll(objectId)
if err != nil {
logger.Errorf("failed to read data from object %s, error: %v", objectId, err)
return
}

jsonData := data.ReadAsJSON()

// check if udp connection is nil
if c.udpConnection == nil {
logger.Errorf("udp connection is nil, ignore observe")
return
}

c.dataCache[objectId] = jsonData
err = sendResponse(w.Conn(), token, obs, jsonData)
err = sendResponse(c.udpConnection, token, obs, jsonData)
if err != nil {
logger.Errorf("failed to send response: %v", err)
return
Expand All @@ -392,6 +483,12 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str

jsonData := data.ReadAsJSON()

// check if udp connection is nil
if c.udpConnection == nil {
logger.Errorf("udp connection is nil, ignore observe")
return
}

// check data is changed
if data, exists := c.dataCache[objectId]; exists {
if string(jsonData) == data {
Expand All @@ -401,10 +498,12 @@ func (c *Client) observe(w mux.ResponseWriter, token message.Token, objectId str
}

c.dataCache[objectId] = jsonData
err = sendResponse(w.Conn(), token, obs, jsonData)
err = sendResponse(c.udpConnection, token, obs, jsonData)
if err != nil {
logger.Errorf("failed to send response: %v", err)
return
}

obs++
c.taskManager.ResetTask(objectId)
})
Expand Down Expand Up @@ -449,8 +548,13 @@ func sendResponse(cc mux.Conn, token []byte, obs uint32, body string) error {
func (c *Client) CleanUp() {
c.taskManager.CancelAllTasks()
_ = c.Delete()

close(c.stopCh)
if c.udpConnection != nil {
_ = c.udpConnection.Close()
}
}

func (c *Client) isActivity() bool {
return time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec) * time.Second))
return c.udpConnection != nil && time.Now().Before(c.lastUpdatedTime.Add(time.Duration(c.Settings.LifeTimeSec)*time.Second))
}
24 changes: 10 additions & 14 deletions pkg/gateway/lwm2m/gatewaylwm2m.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,18 @@ func (g *Gateway) Start() error {
return err
}

// Register the client to the server
err := g.client.Register()
if err != nil {
logger.Errorf("Error registering client: %v", err)
return err
}

// Ping the client every pingIntervalSec seconds, by default 30 seconds
t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec))
for range t.C {
if err := g.client.Ping(); err != nil {
logger.Errorf("Error pinging client: %v", err)
g.ShutDown()
return err
if g.pingIntervalSec > 0 {
// Ping the client every pingIntervalSec seconds,by default disable
t := time.NewTicker(time.Second * time.Duration(g.pingIntervalSec))
for range t.C {
if err := g.client.Ping(); err != nil {
logger.Errorf("Error pinging client: %v", err)
g.ShutDown()
return err
}
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/k8s/api/v1alpha1/edgedevice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type LwM2MSetting struct {
PSKIdentity *string `json:"pskIdentity,omitempty"`
PSKKey *string `json:"pskKey,omitempty"`

// +kubebuilder:default=30
// +kubebuilder:default=-1
PingIntervalSec int64 `json:"pingIntervalSec,omitempty"`
// reference https://datatracker.ietf.org/doc/html/rfc7252#section-4.8.2
// +kubebuilder:default=247
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down Expand Up @@ -122,7 +122,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/crd/install/config_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down Expand Up @@ -121,7 +121,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/crd/install/config_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down Expand Up @@ -128,7 +128,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/crd/install/shifu_install.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down Expand Up @@ -128,7 +128,7 @@ spec:
format: int64
type: integer
pingIntervalSec:
default: 30
default: -1
format: int64
type: integer
pskIdentity:
Expand Down

0 comments on commit 1771453

Please sign in to comment.