Skip to content

Commit

Permalink
feat: Camera Mgmt Autostart EVAM pipelines on startup, and on device …
Browse files Browse the repository at this point in the history
…added (#197)


Signed-off-by: Brian Osburn <[email protected]>
Signed-off-by: preethi-satishcandra <[email protected]>
Co-authored-by: preethi-satishcandra <[email protected]>
  • Loading branch information
brianointel and presatish authored May 17, 2023
1 parent 8bc93bd commit e60111e
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 16 deletions.
6 changes: 3 additions & 3 deletions application-services/custom/camera-management/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Intel Corporation
# Copyright (c) 2022-2023 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
#

#build stage
ARG BASE=golang:1.17-alpine3.15
ARG BASE=golang:1.18-alpine3.16
FROM ${BASE} AS builder

ARG ALPINE_PKG_BASE="make git gcc libc-dev libsodium-dev zeromq-dev"
Expand All @@ -37,7 +37,7 @@ ARG MAKE="make build-app"
RUN $MAKE

#final stage
FROM alpine:3.15
FROM alpine:3.16
LABEL license='SPDX-License-Identifier: Apache-2.0' \
copyright='Copyright (c) 2022: Intel'
LABEL Name=app-camera-management Version=${VERSION}
Expand Down
14 changes: 13 additions & 1 deletion application-services/custom/camera-management/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,19 @@ make run-edge-video-analytics
export WRITABLE_INSECURESECRETS_CAMERACREDENTIALS_SECRETS_PASSWORD="<password>"
```

#### 3.2 Build and run
#### 3.2 Configure Default Pipeline
Initially, all new cameras added to the system will start the default analytics pipeline as defined in the configuration file below. The desired pipeline can be changed afterward or the feature can be disabled by setting the `DefaultPipelineName` and `DefaultPipelineVersion` to empty strings.

Modify the [res/configuration.toml](res/configuration.toml) file with the name and version of the default pipeline to use when a new device is added to the system.

Note: These values can be left empty to disable the feature.
```toml
[AppCustom]
DefaultPipelineName = "object_detection" # Name of the default pipeline used when a new device is added to the system
DefaultPipelineVersion = "person" # Version of the default pipeline used when a new device is added to the system
```

#### 3.3 Build and run
```shell
# First make sure you are at the root of this example app
cd edgex-examples/application-services/custom/camera-management
Expand Down
27 changes: 23 additions & 4 deletions application-services/custom/camera-management/appcamera/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
package appcamera

import (
"net/http"
"sync"

"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/pkg/errors"
"net/http"
"sync"
)

type CameraManagementApp struct {
Expand Down Expand Up @@ -43,12 +44,30 @@ func (app *CameraManagementApp) Run() error {
return err
}

if err := app.queryAllPipelineStatuses(); err != nil {
// Subscribe to events.
err := app.service.SetDefaultFunctionsPipeline(
app.processEdgeXDeviceSystemEvent)
if err != nil {
return errors.Wrap(err, "failed to set default pipeline to processEdgeXEvent")
}

if err = app.queryAllPipelineStatuses(); err != nil {
// do not exit, just log
app.lc.Errorf("Unable to query EVAM pipeline statuses. Is EVAM running? %s", err.Error())
}

if err := app.service.MakeItRun(); err != nil {
devices, err := app.getAllDevices()
if err != nil {
app.lc.Errorf("no devices found: %s", err.Error())
} else {
for _, device := range devices {
if err = app.startDefaultPipeline(device); err != nil {
app.lc.Errorf("Error starting default pipeline for %s, %v", device.Name, err)
}
}
}

if err = app.service.MakeItRun(); err != nil {
return errors.Wrap(err, "failed to run pipeline")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type CustomConfig struct {
EvamBaseUrl string
MqttAddress string
MqttTopic string
DefaultPipelineName string
DefaultPipelineVersion string
}

// ServiceConfig a struct that wraps CustomConfig which holds the values for driver configuration
Expand Down
96 changes: 94 additions & 2 deletions application-services/custom/camera-management/appcamera/evam.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/IOTechSystems/onvif/media"
"github.com/pkg/errors"
"net/url"
"path"

"github.com/edgexfoundry/go-mod-core-contracts/v2/common"

"github.com/IOTechSystems/onvif/media"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -218,6 +223,93 @@ func (app *CameraManagementApp) getPipelineStatus(deviceName string) (interface{
return nil, nil
}

// processEdgeXDeviceSystemEvent is the function that is called when an EdgeX Device System Event is received
func (app *CameraManagementApp) processEdgeXDeviceSystemEvent(_ interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
if data == nil {
return false, fmt.Errorf("processEdgeXDeviceSystemEvent: was called without any data")
}

systemEvent, ok := data.(dtos.SystemEvent)
if !ok {
return false, fmt.Errorf("type received %T is not a SystemEvent", data)
}

if systemEvent.Type != common.DeviceSystemEventType {
return false, fmt.Errorf("system event type is not " + common.DeviceSystemEventType)
}

device := dtos.Device{}
err := systemEvent.DecodeDetails(&device)
if err != nil {
return false, fmt.Errorf("failed to decode device details: %v", err)
}

switch systemEvent.Action {
case common.DeviceSystemEventActionAdd:
if err = app.startDefaultPipeline(device); err != nil {
return false, err
}
case common.DeviceSystemEventActionDelete:
// stop any running pipelines for the deleted device
if info, found := app.getPipelineInfo(device.Name); found {
if err = app.stopPipeline(device.Name, info.Id); err != nil {
return false, fmt.Errorf("error stopping pipleline for device %s, %v", device.Name, err)
}
}
default:
app.lc.Debugf("System event action %s is not handled", systemEvent.Action)
}

return false, nil
}

func (app *CameraManagementApp) startDefaultPipeline(device dtos.Device) error {
pipelineRunning := app.isPipelineRunning(device.Name)

if pipelineRunning {
app.lc.Debugf("pipeline is already running for device %s", device.Name)
return nil
}

app.lc.Debugf("pipeline is not running for device %s", device.Name)

if app.config.AppCustom.DefaultPipelineName == "" || app.config.AppCustom.DefaultPipelineVersion == "" {
app.lc.Warnf("no default pipeline name/version specified, skip starting pipeline for device %s", device.Name)
return nil
}

startPipelineRequest := StartPipelineRequest{
PipelineName: app.config.AppCustom.DefaultPipelineName,
PipelineVersion: app.config.AppCustom.DefaultPipelineVersion,
}

protocol, ok := device.Protocols["Onvif"]
if ok {
app.lc.Debugf("Onvif protocol information found for device: %s message: %v", device.Name, protocol)
profileResponse, err := app.getProfiles(device.Name)
if err != nil {
return fmt.Errorf("failed to get profiles for device %s, message: %v", device.Name, err)

}

app.lc.Debugf("Onvif profile information found for device: %s message: %v", device.Name, profileResponse)
startPipelineRequest.Onvif = &OnvifPipelineConfig{
ProfileToken: string(profileResponse.Profiles[0].Token),
}
} else if _, ok := device.Protocols["USB"]; ok {
app.lc.Debugf("Usb protocol found for device: %s", device.Name)
startPipelineRequest.USB = &USBStartStreamingRequest{}
}

app.lc.Debugf("Starting default pipeline for device %s", device.Name)
if err := app.startPipeline(device.Name, startPipelineRequest); err != nil {
return fmt.Errorf("pipeline failed to start for device %s, message: %v", device.Name, err)

}

return nil
}

// queryAllPipelineStatuses queries EVAM for all pipeline statuses, attempts to link them to devices, and then
// insert them into the pipeline map.
func (app *CameraManagementApp) queryAllPipelineStatuses() error {
Expand Down
8 changes: 5 additions & 3 deletions application-services/custom/camera-management/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2022 Intel Corporation
// Copyright (c) 2022-2023 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,17 +18,19 @@ package main

import (
"fmt"
"os"

appsdk "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg"
"github.com/edgexfoundry/edgex-examples/application-services/custom/camera-management/appcamera"
"os"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
)

const (
serviceKey = "app-camera-management"
)

func main() {
service, ok := appsdk.NewAppService(serviceKey)
service, ok := appsdk.NewAppServiceWithTargetType(serviceKey, &dtos.SystemEvent{})
if !ok {
fmt.Printf("error: unable to create new app service %s!\n", serviceKey)
os.Exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,44 @@ TokenFile = "/tmp/edgex/secrets/app-camera-management/secrets-token.json"
Host = "localhost"
Port = 59882

[Trigger]
Type="http"

[AppCustom]
OnvifDeviceServiceName = "device-onvif-camera"
USBDeviceServiceName = "device-usb-camera"
EvamBaseUrl = "http://localhost:8080"
MqttAddress = "edgex-mqtt-broker:1883"
MqttTopic = "incoming/data/edge-video-analytics/inference-event"
DefaultPipelineName = "object_detection" # Name of the default pipeline used when a new device is added to the system; can be left blank to disable feature
DefaultPipelineVersion = "person" # Version of the default pipeline used when a new device is added to the system; can be left blank to disable feature

[Trigger]
Type="edgex-messagebus"
[Trigger.EdgexMessageBus]
Type = "redis"
[Trigger.EdgexMessageBus.SubscribeHost]
Host = "localhost"
Port = 6379
Protocol = "redis"
AuthMode = "usernamepassword" # required for redis messagebus (secure or insecure).
SecretName = "redisdb"
SubscribeTopics="edgex/system-events/#/device/#"

[Trigger.EdgexMessageBus.Optional]
authmode = "usernamepassword" # required for redis messagebus (secure or insecure).
secretname = "redisdb"
# Default MQTT Specific options that need to be here to enable environment variable overrides of them
ClientId ="app-camera-management"
Qos = "0" # Quality of Service values are 0 (At most once), 1 (At least once) or 2 (Exactly once)
KeepAlive = "10" # Seconds (must be 2 or greater)
Retained = "false"
AutoReconnect = "true"
ConnectTimeout = "5" # Seconds
SkipCertVerify = "false"
# Default NATS Specific options that need to be here to enable environment variable overrides of them
Format = "nats"
RetryOnFailedConnect = "true"
QueueGroup = ""
Durable = ""
AutoProvision = "true"
Deliver = "new"
DefaultPubRetryAttempts = "2"
Subject = "edgex/#" # Required for NATS JetStream only for stream autoprovisioning

0 comments on commit e60111e

Please sign in to comment.