Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: device events #197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions application-services/custom/camera-management/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Intel Corporation
# Copyright (c) 2022-2023 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
#

#build stage
ARG BASE=golang:1.17-alpine3.15
ARG BASE=golang:1.18-alpine3.16
FROM ${BASE} AS builder

ARG ALPINE_PKG_BASE="make git gcc libc-dev libsodium-dev zeromq-dev"
Expand All @@ -37,7 +37,7 @@ ARG MAKE="make build-app"
RUN $MAKE

#final stage
FROM alpine:3.15
FROM alpine:3.16
LABEL license='SPDX-License-Identifier: Apache-2.0' \
copyright='Copyright (c) 2022: Intel'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
copyright='Copyright (c) 2022: Intel'
copyright='Copyright (c) 2023: Intel'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@presatish , missed this one in your last commit. ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lenny-intel I purposefully didn't update it as I was not completely sure if that had to be changed :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@presatish , we typical do update it when we update the one for the file.

LABEL Name=app-camera-management Version=${VERSION}
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
14 changes: 13 additions & 1 deletion application-services/custom/camera-management/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,19 @@ make run-edge-video-analytics
export WRITABLE_INSECURESECRETS_CAMERACREDENTIALS_SECRETS_PASSWORD="<password>"
```

#### 3.2 Build and run
#### 3.2 Configure Default Pipeline
Initially, all new cameras added to the system will start the default analytics pipeline as defined in the configuration file below. The desired pipeline can be changed afterward or the feature can be disabled by setting the `DefaultPipelineName` and `DefaultPipelineVersion` to empty strings.

Modify the [res/configuration.toml](res/configuration.toml) file with the name and version of the default pipeline to use when a new device is added to the system.

Note: These values can be left empty to disable the feature.
```toml
[AppCustom]
DefaultPipelineName = "object_detection" # Name of the default pipeline used when a new device is added to the system
DefaultPipelineVersion = "person" # Version of the default pipeline used when a new device is added to the system
```

#### 3.3 Build and run
```shell
# First make sure you are at the root of this example app
cd edgex-examples/application-services/custom/camera-management
Expand Down
27 changes: 23 additions & 4 deletions application-services/custom/camera-management/appcamera/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
package appcamera

import (
"net/http"
"sync"

"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/pkg/errors"
"net/http"
"sync"
)

type CameraManagementApp struct {
Expand Down Expand Up @@ -43,12 +44,30 @@ func (app *CameraManagementApp) Run() error {
return err
}

if err := app.queryAllPipelineStatuses(); err != nil {
// Subscribe to events.
err := app.service.SetDefaultFunctionsPipeline(
app.processEdgeXDeviceSystemEvent)
if err != nil {
return errors.Wrap(err, "failed to set default pipeline to processEdgeXEvent")
}

if err = app.queryAllPipelineStatuses(); err != nil {
// do not exit, just log
app.lc.Errorf("Unable to query EVAM pipeline statuses. Is EVAM running? %s", err.Error())
}

if err := app.service.MakeItRun(); err != nil {
devices, err := app.getAllDevices()
if err != nil {
app.lc.Errorf("no devices found: %s", err.Error())
} else {
for _, device := range devices {
if err = app.startDefaultPipeline(device); err != nil {
app.lc.Errorf("Error starting default pipeline for %s, %v", device.Name, err)
}
}
}

if err = app.service.MakeItRun(); err != nil {
return errors.Wrap(err, "failed to run pipeline")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type CustomConfig struct {
EvamBaseUrl string
MqttAddress string
MqttTopic string
DefaultPipelineName string
DefaultPipelineVersion string
}

// ServiceConfig a struct that wraps CustomConfig which holds the values for driver configuration
Expand Down
96 changes: 94 additions & 2 deletions application-services/custom/camera-management/appcamera/evam.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/IOTechSystems/onvif/media"
"github.com/pkg/errors"
"net/url"
"path"

"github.com/edgexfoundry/go-mod-core-contracts/v2/common"

"github.com/IOTechSystems/onvif/media"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -218,6 +223,93 @@ func (app *CameraManagementApp) getPipelineStatus(deviceName string) (interface{
return nil, nil
}

// processEdgeXDeviceSystemEvent is the function that is called when an EdgeX Device System Event is received
func (app *CameraManagementApp) processEdgeXDeviceSystemEvent(_ interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
if data == nil {
return false, fmt.Errorf("processEdgeXDeviceSystemEvent: was called without any data")
}

systemEvent, ok := data.(dtos.SystemEvent)
if !ok {
return false, fmt.Errorf("type received %T is not a SystemEvent", data)
}

if systemEvent.Type != common.DeviceSystemEventType {
return false, fmt.Errorf("system event type is not " + common.DeviceSystemEventType)
}

device := dtos.Device{}
err := systemEvent.DecodeDetails(&device)
if err != nil {
return false, fmt.Errorf("failed to decode device details: %v", err)
}

switch systemEvent.Action {
case common.DeviceSystemEventActionAdd:
if err = app.startDefaultPipeline(device); err != nil {
return false, err
}
case common.DeviceSystemEventActionDelete:
// stop any running pipelines for the deleted device
if info, found := app.getPipelineInfo(device.Name); found {
if err = app.stopPipeline(device.Name, info.Id); err != nil {
return false, fmt.Errorf("error stopping pipleline for device %s, %v", device.Name, err)
}
}
default:
app.lc.Debugf("System event action %s is not handled", systemEvent.Action)
}

return false, nil
}

func (app *CameraManagementApp) startDefaultPipeline(device dtos.Device) error {
pipelineRunning := app.isPipelineRunning(device.Name)

if pipelineRunning {
app.lc.Debugf("pipeline is already running for device %s", device.Name)
return nil
}

app.lc.Debugf("pipeline is not running for device %s", device.Name)

if app.config.AppCustom.DefaultPipelineName == "" || app.config.AppCustom.DefaultPipelineVersion == "" {
app.lc.Warnf("no default pipeline name/version specified, skip starting pipeline for device %s", device.Name)
return nil
}

startPipelineRequest := StartPipelineRequest{
PipelineName: app.config.AppCustom.DefaultPipelineName,
PipelineVersion: app.config.AppCustom.DefaultPipelineVersion,
}

protocol, ok := device.Protocols["Onvif"]
if ok {
app.lc.Debugf("Onvif protocol information found for device: %s message: %v", device.Name, protocol)
profileResponse, err := app.getProfiles(device.Name)
if err != nil {
return fmt.Errorf("failed to get profiles for device %s, message: %v", device.Name, err)

}

app.lc.Debugf("Onvif profile information found for device: %s message: %v", device.Name, profileResponse)
startPipelineRequest.Onvif = &OnvifPipelineConfig{
ProfileToken: string(profileResponse.Profiles[0].Token),
}
} else if _, ok := device.Protocols["USB"]; ok {
app.lc.Debugf("Usb protocol found for device: %s", device.Name)
startPipelineRequest.USB = &USBStartStreamingRequest{}
}

app.lc.Debugf("Starting default pipeline for device %s", device.Name)
if err := app.startPipeline(device.Name, startPipelineRequest); err != nil {
return fmt.Errorf("pipeline failed to start for device %s, message: %v", device.Name, err)

}

return nil
}

// queryAllPipelineStatuses queries EVAM for all pipeline statuses, attempts to link them to devices, and then
// insert them into the pipeline map.
func (app *CameraManagementApp) queryAllPipelineStatuses() error {
Expand Down
8 changes: 5 additions & 3 deletions application-services/custom/camera-management/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2022 Intel Corporation
// Copyright (c) 2022-2023 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,17 +18,19 @@ package main

lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
import (
"fmt"
"os"

appsdk "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg"
"github.com/edgexfoundry/edgex-examples/application-services/custom/camera-management/appcamera"
"os"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
)

const (
serviceKey = "app-camera-management"
)

func main() {
service, ok := appsdk.NewAppService(serviceKey)
service, ok := appsdk.NewAppServiceWithTargetType(serviceKey, &dtos.SystemEvent{})
if !ok {
fmt.Printf("error: unable to create new app service %s!\n", serviceKey)
os.Exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,44 @@ TokenFile = "/tmp/edgex/secrets/app-camera-management/secrets-token.json"
Host = "localhost"
Port = 59882

[Trigger]
Type="http"

[AppCustom]
OnvifDeviceServiceName = "device-onvif-camera"
USBDeviceServiceName = "device-usb-camera"
EvamBaseUrl = "http://localhost:8080"
MqttAddress = "edgex-mqtt-broker:1883"
MqttTopic = "incoming/data/edge-video-analytics/inference-event"
DefaultPipelineName = "object_detection" # Name of the default pipeline used when a new device is added to the system; can be left blank to disable feature
DefaultPipelineVersion = "person" # Version of the default pipeline used when a new device is added to the system; can be left blank to disable feature

[Trigger]
Type="edgex-messagebus"
[Trigger.EdgexMessageBus]
Type = "redis"
[Trigger.EdgexMessageBus.SubscribeHost]
Host = "localhost"
Port = 6379
Protocol = "redis"
AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure).
SecretName = "redisdb"
SubscribeTopics="edgex/system-events/#/device/#"

[Trigger.EdgexMessageBus.Optional]
authmode = "usernamepassword" # required for redis messagebus (secure or insecure).
secretname = "redisdb"
# Default MQTT Specific options that need to be here to enable environment variable overrides of them
ClientId ="app-camera-management"
Qos = "0" # Quality of Service values are 0 (At most once), 1 (At least once) or 2 (Exactly once)
KeepAlive = "10" # Seconds (must be 2 or greater)
Retained = "false"
AutoReconnect = "true"
ConnectTimeout = "5" # Seconds
SkipCertVerify = "false"
# Default NATS Specific options that need to be here to enable environment variable overrides of them
Format = "nats"
RetryOnFailedConnect = "true"
QueueGroup = ""
Durable = ""
AutoProvision = "true"
Deliver = "new"
DefaultPubRetryAttempts = "2"
Subject = "edgex/#" # Required for NATS JetStream only for stream autoprovisioning