Skip to content

Commit

Permalink
feat: Add device up/down detection
Browse files Browse the repository at this point in the history
Signed-off-by: FelixTing <[email protected]>
  • Loading branch information
FelixTing committed Jan 8, 2025
1 parent 0d02b50 commit 6a9267d
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 19 deletions.
10 changes: 8 additions & 2 deletions internal/application/callback.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -9,7 +9,6 @@ package application
import (
"context"
"fmt"

"github.com/edgexfoundry/device-sdk-go/v4/internal/cache"
"github.com/edgexfoundry/device-sdk-go/v4/internal/container"

Expand Down Expand Up @@ -96,6 +95,10 @@ func AddDevice(addDeviceRequest requests.AddDeviceRequest, dic *di.Container) er
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, err)
}

config := container.ConfigurationFrom(dic.Get)
reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)
reqFailsTracker.Set(device.Name, int(config.Device.AllowedFails))

lc.Debugf("starting AutoEvents for device %s", device.Name)
container.AutoEventManagerFrom(dic.Get).RestartForDevice(device.Name)
return nil
Expand Down Expand Up @@ -190,6 +193,9 @@ func DeleteDevice(name string, dic *di.Container) errors.EdgeX {
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, err)
}

reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)
reqFailsTracker.Remove(device.Name)

return nil
}

Expand Down
39 changes: 31 additions & 8 deletions internal/application/command.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -34,20 +34,27 @@ import (
"github.com/edgexfoundry/go-mod-core-contracts/v4/models"
)

func GetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, regexCmd bool, dic *di.Container) (*dtos.Event, errors.EdgeX) {
func GetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, regexCmd bool, dic *di.Container) (res *dtos.Event, err errors.EdgeX) {
if deviceName == "" {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil)
}
if commandName == "" {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "command is empty", nil)
}
var device models.Device
defer func() {
if err != nil {
DeviceRequestFailed(deviceName, dic)
} else {
DeviceRequestSucceeded(device, dic)
}
}()

device, err := validateServiceAndDeviceState(deviceName, dic)
device, err = validateServiceAndDeviceState(deviceName, dic)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}

var res *dtos.Event
_, cmdExist := cache.Profiles().DeviceCommand(device.ProfileName, commandName)
if cmdExist {
res, err = readDeviceCommand(device, commandName, queryParams, dic)
Expand All @@ -68,20 +75,27 @@ func GetCommand(ctx context.Context, deviceName string, commandName string, quer
return res, nil
}

func SetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, requests map[string]any, dic *di.Container) (*dtos.Event, errors.EdgeX) {
func SetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, requests map[string]any, dic *di.Container) (event *dtos.Event, err errors.EdgeX) {
if deviceName == "" {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil)
}
if commandName == "" {
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "command is empty", nil)
}
var device models.Device
defer func() {
if err != nil {
DeviceRequestFailed(deviceName, dic)
} else {
DeviceRequestSucceeded(device, dic)
}
}()

device, err := validateServiceAndDeviceState(deviceName, dic)
device, err = validateServiceAndDeviceState(deviceName, dic)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}

var event *dtos.Event
_, cmdExist := cache.Profiles().DeviceCommand(device.ProfileName, commandName)
if cmdExist {
event, err = writeDeviceCommand(device, commandName, queryParams, requests, dic)
Expand Down Expand Up @@ -447,8 +461,17 @@ func validateServiceAndDeviceState(deviceName string, dic *di.Container) (models
return models.Device{}, errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s locked", device.Name), nil)
}
// check device's OperatingState
// if it's a device return attempt, operating state is allowed to be DOWN
if device.OperatingState == models.Down {
return models.Device{}, errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s OperatingState is DOWN", device.Name), nil)
err := errors.NewCommonEdgeX(errors.KindServiceLocked, fmt.Sprintf("device %s OperatingState is DOWN", device.Name), nil)
config := container.ConfigurationFrom(dic.Get)
if config.Device.AllowedFails == 0 || config.Device.DeviceDownTimeout == 0 {
return models.Device{}, err
}
reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)
if reqFailsTracker.Value(deviceName) > 0 {
return models.Device{}, err
}
}

// check device's ProfileName
Expand Down
108 changes: 108 additions & 0 deletions internal/application/devicereturn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package application

import (
"context"
"time"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v4/di"

"github.com/edgexfoundry/go-mod-core-contracts/v4/common"
"github.com/edgexfoundry/go-mod-core-contracts/v4/models"

"github.com/edgexfoundry/device-sdk-go/v4/internal/cache"
sdkCommon "github.com/edgexfoundry/device-sdk-go/v4/internal/common"
"github.com/edgexfoundry/device-sdk-go/v4/internal/container"
)

func deviceReturn(deviceName string, dic *di.Container) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)

for {
LOOP:
time.Sleep(time.Duration(config.Device.DeviceDownTimeout) * time.Second)
lc.Infof("Checking operational state for device: %s", deviceName)

d, found := cache.Devices().ForName(deviceName)
if !found {
lc.Warnf("Device %s not found. Exiting retry loop.", deviceName)
return
}

if d.OperatingState == models.Up {
lc.Infof("Device %s is already operational. Exiting retry loop.", deviceName)
return
}

p, found := cache.Profiles().ForName(d.ProfileName)
if !found {
lc.Warnf("Device %s has no profile. Cannot set operational state automatically.", deviceName)
return
}

for _, dr := range p.DeviceResources {
if dr.Properties.ReadWrite == common.ReadWrite_R ||
dr.Properties.ReadWrite == common.ReadWrite_RW ||
dr.Properties.ReadWrite == common.ReadWrite_WR {
_, err := GetCommand(context.Background(), deviceName, dr.Name, "", true, dic)
if err == nil {
lc.Infof("Device %s responsive: setting operational state to up.", deviceName)
sdkCommon.UpdateOperatingState(deviceName, models.Up, lc, dc)
return
} else {
lc.Errorf("Device %s unresponsive: retrying in %v seconds.", deviceName, config.Device.DeviceDownTimeout)
goto LOOP
}
}
}
lc.Infof("Device %s has no readable resources. Setting operational state to up without checking.", deviceName)
sdkCommon.UpdateOperatingState(deviceName, models.Up, lc, dc)
return
}
}

func DeviceRequestFailed(deviceName string, dic *di.Container) {
config := container.ConfigurationFrom(dic.Get)
if config.Device.AllowedFails > 0 {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)

if reqFailsTracker.Decrease(deviceName) == 0 {
d, ok := cache.Devices().ForName(deviceName)
if !ok {
return
}
if d.OperatingState != models.Down {
lc.Infof("Marking device %s non-operational", deviceName)
sdkCommon.UpdateOperatingState(deviceName, models.Down, lc, dc)
}
if config.Device.DeviceDownTimeout > 0 {
lc.Warnf("Will retry device %s in %v seconds", deviceName, config.Device.DeviceDownTimeout)
go deviceReturn(deviceName, dic)
}
return
}
}
}

func DeviceRequestSucceeded(d models.Device, dic *di.Container) {
config := container.ConfigurationFrom(dic.Get)
reqFailsTracker := container.AllowedRequestFailuresTrackerFrom(dic.Get)
if config.Device.AllowedFails > 0 && reqFailsTracker.Value(d.Name) < int(config.Device.AllowedFails) {
reqFailsTracker.Set(d.Name, int(config.Device.AllowedFails))
if d.OperatingState == models.Down {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
sdkCommon.UpdateOperatingState(d.Name, models.Up, lc, dc)
}
}
}
2 changes: 1 addition & 1 deletion internal/autoevent/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2019-2023 IOTech Ltd
// Copyright (C) 2019-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down
7 changes: 2 additions & 5 deletions internal/cache/devices.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -50,13 +50,10 @@ type deviceCache struct {
func newDeviceCache(devices []models.Device, dic *di.Container) DeviceCache {
defaultSize := len(devices)
dMap := make(map[string]*models.Device, defaultSize)
for i, d := range devices {
dMap[d.Name] = &devices[i]
}

dc = &deviceCache{deviceMap: dMap, dic: dic}
lastConnectedMetrics := make(map[string]gometrics.Gauge)
for _, d := range devices {
dMap[d.Name] = &d
deviceMetric := gometrics.NewGauge()
registerMetric(d.Name, deviceMetric, dic)
lastConnectedMetrics[d.Name] = deviceMetric
Expand Down
6 changes: 5 additions & 1 deletion internal/config/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// -*- mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2017-2018 Canonical Ltd
// Copyright (C) 2018-2023 IOTech Ltd
// Copyright (C) 2018-2025 IOTech Ltd
// Copyright (c) 2021 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -53,6 +53,10 @@ type DeviceInfo struct {
EnableAsyncReadings bool
// Labels are properties applied to the device service to help with searching
Labels []string
// AllowedFails specifies the number of failed requests allowed before a device is marked as down.
AllowedFails uint
// DeviceDownTimeout specifies the duration in seconds that the Device Service will try to contact a device if it is marked as down.
DeviceDownTimeout uint
}

// DiscoveryInfo is a struct which contains configuration of device auto discovery.
Expand Down
61 changes: 61 additions & 0 deletions internal/container/allowedfailstracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package container

import "github.com/edgexfoundry/device-sdk-go/v4/internal/syncutils"

// AllowedFailuresTracker wraps a map of device names to atomic integers that track the number of allowed request
// failures for each device.
type AllowedFailuresTracker struct {
data map[string]*syncutils.AtomicInt
}

// NewAllowedFailuresTracker creates and initializes a new tracker.
func NewAllowedFailuresTracker() AllowedFailuresTracker {
return AllowedFailuresTracker{
data: make(map[string]*syncutils.AtomicInt),
}
}

// Get retrieves the AtomicInt for a given device name.
// Returns nil if the device does not exist.
func (aft *AllowedFailuresTracker) Get(deviceName string) *syncutils.AtomicInt {
return aft.data[deviceName]
}

// Set initializes or updates the AtomicInt for a given device.
func (aft *AllowedFailuresTracker) Set(deviceName string, value int) {
if _, exists := aft.data[deviceName]; !exists {
aft.data[deviceName] = &syncutils.AtomicInt{}
}
aft.data[deviceName].Set(value)
}

// Decrease decreases the AtomicInt value for a given device by 1.
// Returns the updated value or -1 if the device does not exist.
func (aft *AllowedFailuresTracker) Decrease(deviceName string) int {
if atomicInt, exists := aft.data[deviceName]; exists {
if atomicInt.Value() >= 0 {
return atomicInt.Decrease()
}
}
return -1
}

// Value retrieves the current value of the AtomicInt for a device.
// Returns -1 if the device does not exist.
func (aft *AllowedFailuresTracker) Value(deviceName string) int {
if atomicInt, exists := aft.data[deviceName]; exists {
return atomicInt.Value()
}
return -1
}

// Remove deletes the entry for a given device name from the tracker.
func (aft *AllowedFailuresTracker) Remove(deviceName string) {
delete(aft.data, deviceName)
}
10 changes: 9 additions & 1 deletion internal/container/deviceservice.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -60,3 +60,11 @@ func DiscoveryRequestIdFrom(get di.Get) string {
}
return id
}

// AllowedRequestFailuresTrackerName contains the name of allowed request failures tracker in the DIC.
var AllowedRequestFailuresTrackerName = di.TypeInstanceToName(AllowedFailuresTracker{})

// AllowedRequestFailuresTrackerFrom helper function queries the DIC and returns a device request failures tracker.
func AllowedRequestFailuresTrackerFrom(get di.Get) AllowedFailuresTracker {
return get(AllowedRequestFailuresTrackerName).(AllowedFailuresTracker)
}
5 changes: 4 additions & 1 deletion internal/controller/http/command_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2022-2023 IOTech Ltd
// Copyright (C) 2022-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -208,6 +208,9 @@ func mockDic() *di.Container {
bootstrapContainer.MetricsManagerInterfaceName: func(get di.Get) interface{} {
return mockMetricsManager
},
container.AllowedRequestFailuresTrackerName: func(get di.Get) any {
return container.NewAllowedFailuresTracker()
},
})

return dic
Expand Down
Loading

0 comments on commit 6a9267d

Please sign in to comment.