From e60111ee67c8cb52d8e823e5108f3080451ef200 Mon Sep 17 00:00:00 2001 From: brianointel Date: Wed, 17 May 2023 09:32:44 -0700 Subject: [PATCH] feat: Camera Mgmt Autostart EVAM pipelines on startup, and on device added (#197) Signed-off-by: Brian Osburn Signed-off-by: preethi-satishcandra Co-authored-by: preethi-satishcandra --- .../custom/camera-management/Dockerfile | 6 +- .../custom/camera-management/README.md | 14 ++- .../custom/camera-management/appcamera/app.go | 27 +++++- .../camera-management/appcamera/config.go | 2 + .../camera-management/appcamera/evam.go | 96 ++++++++++++++++++- .../custom/camera-management/main.go | 8 +- .../camera-management/res/configuration.toml | 38 +++++++- 7 files changed, 175 insertions(+), 16 deletions(-) diff --git a/application-services/custom/camera-management/Dockerfile b/application-services/custom/camera-management/Dockerfile index 417118bf..12de6064 100644 --- a/application-services/custom/camera-management/Dockerfile +++ b/application-services/custom/camera-management/Dockerfile @@ -1,5 +1,5 @@ # -# Copyright (c) 2022 Intel Corporation +# Copyright (c) 2022-2023 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ # #build stage -ARG BASE=golang:1.17-alpine3.15 +ARG BASE=golang:1.18-alpine3.16 FROM ${BASE} AS builder ARG ALPINE_PKG_BASE="make git gcc libc-dev libsodium-dev zeromq-dev" @@ -37,7 +37,7 @@ ARG MAKE="make build-app" RUN $MAKE #final stage -FROM alpine:3.15 +FROM alpine:3.16 LABEL license='SPDX-License-Identifier: Apache-2.0' \ copyright='Copyright (c) 2022: Intel' LABEL Name=app-camera-management Version=${VERSION} diff --git a/application-services/custom/camera-management/README.md b/application-services/custom/camera-management/README.md index f89acf4b..9024581c 100644 --- a/application-services/custom/camera-management/README.md +++ b/application-services/custom/camera-management/README.md @@ -173,7 +173,19 @@ make run-edge-video-analytics export WRITABLE_INSECURESECRETS_CAMERACREDENTIALS_SECRETS_PASSWORD="" ``` -#### 3.2 Build and run +#### 3.2 Configure Default Pipeline +Initially, all new cameras added to the system will start the default analytics pipeline as defined in the configuration file below. The desired pipeline can be changed afterward or the feature can be disabled by setting the `DefaultPipelineName` and `DefaultPipelineVersion` to empty strings. + +Modify the [res/configuration.toml](res/configuration.toml) file with the name and version of the default pipeline to use when a new device is added to the system. + +Note: These values can be left empty to disable the feature. + ```toml +[AppCustom] +DefaultPipelineName = "object_detection" # Name of the default pipeline used when a new device is added to the system +DefaultPipelineVersion = "person" # Version of the default pipeline used when a new device is added to the system + ``` + +#### 3.3 Build and run ```shell # First make sure you are at the root of this example app cd edgex-examples/application-services/custom/camera-management diff --git a/application-services/custom/camera-management/appcamera/app.go b/application-services/custom/camera-management/appcamera/app.go index 7fbfbd57..a51967d2 100644 --- a/application-services/custom/camera-management/appcamera/app.go +++ b/application-services/custom/camera-management/appcamera/app.go @@ -6,11 +6,12 @@ package appcamera import ( + "net/http" + "sync" + "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" "github.com/pkg/errors" - "net/http" - "sync" ) type CameraManagementApp struct { @@ -43,12 +44,30 @@ func (app *CameraManagementApp) Run() error { return err } - if err := app.queryAllPipelineStatuses(); err != nil { + // Subscribe to events. + err := app.service.SetDefaultFunctionsPipeline( + app.processEdgeXDeviceSystemEvent) + if err != nil { + return errors.Wrap(err, "failed to set default pipeline to processEdgeXEvent") + } + + if err = app.queryAllPipelineStatuses(); err != nil { // do not exit, just log app.lc.Errorf("Unable to query EVAM pipeline statuses. Is EVAM running? %s", err.Error()) } - if err := app.service.MakeItRun(); err != nil { + devices, err := app.getAllDevices() + if err != nil { + app.lc.Errorf("no devices found: %s", err.Error()) + } else { + for _, device := range devices { + if err = app.startDefaultPipeline(device); err != nil { + app.lc.Errorf("Error starting default pipeline for %s, %v", device.Name, err) + } + } + } + + if err = app.service.MakeItRun(); err != nil { return errors.Wrap(err, "failed to run pipeline") } diff --git a/application-services/custom/camera-management/appcamera/config.go b/application-services/custom/camera-management/appcamera/config.go index 05574615..b55add59 100644 --- a/application-services/custom/camera-management/appcamera/config.go +++ b/application-services/custom/camera-management/appcamera/config.go @@ -12,6 +12,8 @@ type CustomConfig struct { EvamBaseUrl string MqttAddress string MqttTopic string + DefaultPipelineName string + DefaultPipelineVersion string } // ServiceConfig a struct that wraps CustomConfig which holds the values for driver configuration diff --git a/application-services/custom/camera-management/appcamera/evam.go b/application-services/custom/camera-management/appcamera/evam.go index bc56f0ff..da5c105f 100644 --- a/application-services/custom/camera-management/appcamera/evam.go +++ b/application-services/custom/camera-management/appcamera/evam.go @@ -9,10 +9,15 @@ import ( "context" "encoding/json" "fmt" - "github.com/IOTechSystems/onvif/media" - "github.com/pkg/errors" "net/url" "path" + + "github.com/edgexfoundry/go-mod-core-contracts/v2/common" + + "github.com/IOTechSystems/onvif/media" + "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" + "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos" + "github.com/pkg/errors" ) const ( @@ -218,6 +223,93 @@ func (app *CameraManagementApp) getPipelineStatus(deviceName string) (interface{ return nil, nil } +// processEdgeXDeviceSystemEvent is the function that is called when an EdgeX Device System Event is received +func (app *CameraManagementApp) processEdgeXDeviceSystemEvent(_ interfaces.AppFunctionContext, data interface{}) (bool, interface{}) { + if data == nil { + return false, fmt.Errorf("processEdgeXDeviceSystemEvent: was called without any data") + } + + systemEvent, ok := data.(dtos.SystemEvent) + if !ok { + return false, fmt.Errorf("type received %T is not a SystemEvent", data) + } + + if systemEvent.Type != common.DeviceSystemEventType { + return false, fmt.Errorf("system event type is not " + common.DeviceSystemEventType) + } + + device := dtos.Device{} + err := systemEvent.DecodeDetails(&device) + if err != nil { + return false, fmt.Errorf("failed to decode device details: %v", err) + } + + switch systemEvent.Action { + case common.DeviceSystemEventActionAdd: + if err = app.startDefaultPipeline(device); err != nil { + return false, err + } + case common.DeviceSystemEventActionDelete: + // stop any running pipelines for the deleted device + if info, found := app.getPipelineInfo(device.Name); found { + if err = app.stopPipeline(device.Name, info.Id); err != nil { + return false, fmt.Errorf("error stopping pipleline for device %s, %v", device.Name, err) + } + } + default: + app.lc.Debugf("System event action %s is not handled", systemEvent.Action) + } + + return false, nil +} + +func (app *CameraManagementApp) startDefaultPipeline(device dtos.Device) error { + pipelineRunning := app.isPipelineRunning(device.Name) + + if pipelineRunning { + app.lc.Debugf("pipeline is already running for device %s", device.Name) + return nil + } + + app.lc.Debugf("pipeline is not running for device %s", device.Name) + + if app.config.AppCustom.DefaultPipelineName == "" || app.config.AppCustom.DefaultPipelineVersion == "" { + app.lc.Warnf("no default pipeline name/version specified, skip starting pipeline for device %s", device.Name) + return nil + } + + startPipelineRequest := StartPipelineRequest{ + PipelineName: app.config.AppCustom.DefaultPipelineName, + PipelineVersion: app.config.AppCustom.DefaultPipelineVersion, + } + + protocol, ok := device.Protocols["Onvif"] + if ok { + app.lc.Debugf("Onvif protocol information found for device: %s message: %v", device.Name, protocol) + profileResponse, err := app.getProfiles(device.Name) + if err != nil { + return fmt.Errorf("failed to get profiles for device %s, message: %v", device.Name, err) + + } + + app.lc.Debugf("Onvif profile information found for device: %s message: %v", device.Name, profileResponse) + startPipelineRequest.Onvif = &OnvifPipelineConfig{ + ProfileToken: string(profileResponse.Profiles[0].Token), + } + } else if _, ok := device.Protocols["USB"]; ok { + app.lc.Debugf("Usb protocol found for device: %s", device.Name) + startPipelineRequest.USB = &USBStartStreamingRequest{} + } + + app.lc.Debugf("Starting default pipeline for device %s", device.Name) + if err := app.startPipeline(device.Name, startPipelineRequest); err != nil { + return fmt.Errorf("pipeline failed to start for device %s, message: %v", device.Name, err) + + } + + return nil +} + // queryAllPipelineStatuses queries EVAM for all pipeline statuses, attempts to link them to devices, and then // insert them into the pipeline map. func (app *CameraManagementApp) queryAllPipelineStatuses() error { diff --git a/application-services/custom/camera-management/main.go b/application-services/custom/camera-management/main.go index 48e0ebee..c8f5e8c1 100644 --- a/application-services/custom/camera-management/main.go +++ b/application-services/custom/camera-management/main.go @@ -1,5 +1,5 @@ // -// Copyright (c) 2022 Intel Corporation +// Copyright (c) 2022-2023 Intel Corporation // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,9 +18,11 @@ package main import ( "fmt" + "os" + appsdk "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg" "github.com/edgexfoundry/edgex-examples/application-services/custom/camera-management/appcamera" - "os" + "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos" ) const ( @@ -28,7 +30,7 @@ const ( ) func main() { - service, ok := appsdk.NewAppService(serviceKey) + service, ok := appsdk.NewAppServiceWithTargetType(serviceKey, &dtos.SystemEvent{}) if !ok { fmt.Printf("error: unable to create new app service %s!\n", serviceKey) os.Exit(-1) diff --git a/application-services/custom/camera-management/res/configuration.toml b/application-services/custom/camera-management/res/configuration.toml index c17875f1..48be18f8 100644 --- a/application-services/custom/camera-management/res/configuration.toml +++ b/application-services/custom/camera-management/res/configuration.toml @@ -102,12 +102,44 @@ TokenFile = "/tmp/edgex/secrets/app-camera-management/secrets-token.json" Host = "localhost" Port = 59882 -[Trigger] -Type="http" - [AppCustom] OnvifDeviceServiceName = "device-onvif-camera" USBDeviceServiceName = "device-usb-camera" EvamBaseUrl = "http://localhost:8080" MqttAddress = "edgex-mqtt-broker:1883" MqttTopic = "incoming/data/edge-video-analytics/inference-event" +DefaultPipelineName = "object_detection" # Name of the default pipeline used when a new device is added to the system; can be left blank to disable feature +DefaultPipelineVersion = "person" # Version of the default pipeline used when a new device is added to the system; can be left blank to disable feature + +[Trigger] +Type="edgex-messagebus" + [Trigger.EdgexMessageBus] + Type = "redis" + [Trigger.EdgexMessageBus.SubscribeHost] + Host = "localhost" + Port = 6379 + Protocol = "redis" + AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure). + SecretName = "redisdb" + SubscribeTopics="edgex/system-events/#/device/#" + + [Trigger.EdgexMessageBus.Optional] + authmode = "usernamepassword" # required for redis messagebus (secure or insecure). + secretname = "redisdb" + # Default MQTT Specific options that need to be here to enable environment variable overrides of them + ClientId ="app-camera-management" + Qos = "0" # Quality of Service values are 0 (At most once), 1 (At least once) or 2 (Exactly once) + KeepAlive = "10" # Seconds (must be 2 or greater) + Retained = "false" + AutoReconnect = "true" + ConnectTimeout = "5" # Seconds + SkipCertVerify = "false" + # Default NATS Specific options that need to be here to enable environment variable overrides of them + Format = "nats" + RetryOnFailedConnect = "true" + QueueGroup = "" + Durable = "" + AutoProvision = "true" + Deliver = "new" + DefaultPubRetryAttempts = "2" + Subject = "edgex/#" # Required for NATS JetStream only for stream autoprovisioning