Skip to content
This repository has been archived by the owner on Nov 7, 2023. It is now read-only.

Commit

Permalink
Update deviceController to reconcile the deviceService between the op…
Browse files Browse the repository at this point in the history
…enyurt and edge
  • Loading branch information
qclc committed Sep 7, 2021
1 parent e764d4b commit 08ba4c2
Show file tree
Hide file tree
Showing 22 changed files with 1,132 additions and 244 deletions.
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
# Build the manager binary
FROM golang:1.15 as builder
FROM golang:1.16 as builder

WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download

#RUN go mod download
RUN GOPROXY=https://goproxy.cn go mod download
# Copy the go source
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/
COPY clients/ clients/

# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o manager main.go
Expand Down
2 changes: 1 addition & 1 deletion PROJECT
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
domain: openyurt.io
layout: go.kubebuilder.io/v3
projectName: device-controller
repo: github.com/charleszheng44/device-controller
repo: github.com/openyurtio/device-controller
resources:
- api:
crdVersion: v1
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha1/deviceprofile_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type DeviceProfileSpec struct {
// Model of the device
Model string `json:"model,omitempty"`
// EdgeXLabels used to search for groups of profiles on EdgeX Foundry
EdgeXLabels []string `json:"labels,omitempty"`
Labels []string `json:"labels,omitempty"`
DeviceResources []DeviceResource `json:"deviceResources,omitempty"`

// TODO support the following field
Expand Down
45 changes: 36 additions & 9 deletions api/v1alpha1/deviceservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
)

const (
DeviceServiceFinalizer = "v1alpha1.deviceService.finalizer"
// DeviceServiceSyncedCondition indicates that the deviceService exists in both OpenYurt and edge platform
DeviceServiceSyncedCondition clusterv1.ConditionType = "DeviceServiceSynced"
// DeviceServiceManagingCondition indicates that the deviceService is being managed by cloud and its field are being reconciled
DeviceServiceManagingCondition clusterv1.ConditionType = "DeviceServiceManaging"
)

type Addressable struct {
Expand Down Expand Up @@ -47,15 +56,8 @@ type Addressable struct {

// DeviceServiceSpec defines the desired state of DeviceService
type DeviceServiceSpec struct {
// Information describing the device
Description string `json:"description,omitempty"`
// the Id assigned by the EdgeX foundry
// TODO store this field in the status
Id string `json:"id,omitempty"`
// TODO store this field in the status
LastConnected int64 `json:"lastConnected,omitempty"`
// time in milliseconds that the device last reported data to the core
// TODO store this field in the status
LastReported int64 `json:"lastReported,omitempty"`
// operational state - either enabled or disabled
OperatingState OperatingState `json:"operatingState,omitempty"`
// tags or other labels applied to the device service for search or other
Expand All @@ -66,11 +68,28 @@ type DeviceServiceSpec struct {
Addressable Addressable `json:"addressable,omitempty"`
// Device Service Admin State
AdminState AdminState `json:"adminState,omitempty"`
// True means deviceService is managed by cloud, cloud can update the related fields
// False means cloud can't update the fields
Managed bool `json:"managed,omitempty"`
// NodePool indicates which nodePool the deviceService comes from
NodePool string `json:"nodePool,omitempty"`
}

// DeviceServiceStatus defines the observed state of DeviceService
type DeviceServiceStatus struct {
AddedToEdgeX bool `json:"addedToEdgeX,omitempty"`
// Synced indicates whether the device already exists on both OpenYurt and edge platform
Synced bool `json:"synced,omitempty"`
// the Id assigned by the edge platform
EdgeId string `json:"edgeId,omitempty"`
// time in milliseconds that the device last reported data to the core
LastConnected int64 `json:"lastConnected,omitempty"`
// time in milliseconds that the device last reported data to the core
LastReported int64 `json:"lastReported,omitempty"`
// Device Service Admin State
AdminState AdminState `json:"adminState,omitempty"`
// current deviceService state
// +optional
Conditions clusterv1.Conditions `json:"conditions,omitempty"`
}

//+kubebuilder:object:root=true
Expand All @@ -85,6 +104,14 @@ type DeviceService struct {
Status DeviceServiceStatus `json:"status,omitempty"`
}

func (ds *DeviceService) SetConditions(conditions clusterv1.Conditions) {
ds.Status.Conditions = conditions
}

func (ds *DeviceService) GetConditions() clusterv1.Conditions {
return ds.Status.Conditions
}

//+kubebuilder:object:root=true

// DeviceServiceList contains a list of DeviceService
Expand Down
10 changes: 9 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

248 changes: 248 additions & 0 deletions clients/edgex-foundry/deviceservice_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package edgex_foudry

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"

"github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/go-logr/logr"
"github.com/go-resty/resty/v2"
"github.com/openyurtio/device-controller/api/v1alpha1"
edgeCli "github.com/openyurtio/device-controller/clients"
)

type EdgexDeviceServiceClient struct {
*resty.Client
CoreMetaClient ClientURL
logr.Logger
}

func NewEdgexDeviceServiceClient(coreMetaClient ClientURL, log logr.Logger) *EdgexDeviceServiceClient {
return &EdgexDeviceServiceClient{
Client: resty.New(),
CoreMetaClient: coreMetaClient,
Logger: log,
}
}

// Create function sends a POST request to EdgeX to add a new deviceService
func (eds *EdgexDeviceServiceClient) Create(ctx context.Context, deviceservice *v1alpha1.DeviceService, options edgeCli.CreateOptions) (*v1alpha1.DeviceService, error) {
ds := toEdgexDeviceService(deviceservice)
eds.V(5).Info("will add the DeviceServices",
"DeviceService", ds.Name)
dpJson, err := json.Marshal(&ds)
if err != nil {
return nil, err
}
postPath := fmt.Sprintf("http://%s:%d%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath)
resp, err := eds.R().
SetBody(dpJson).Post(postPath)
if err != nil {
return nil, err
} else if resp.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("create deviceService on edgex foundry failed, the response is : %s", resp.Body())
}
createdDs := deviceservice.DeepCopy()
createdDs.Status.EdgeId = string(resp.Body())
return createdDs, err
}

// Delete function sends a request to EdgeX to delete a deviceService
func (eds *EdgexDeviceServiceClient) Delete(ctx context.Context, name string, option edgeCli.DeleteOptions) error {
eds.V(5).Info("will delete the DeviceService",
"DeviceService", name)
delURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, name)
resp, err := eds.R().Delete(delURL)
if err != nil {
return err
}
if string(resp.Body()) != "true" {
return errors.New(string(resp.Body()))
}
return nil
}

// Update is used to set the admin or operating state of the deviceService by unique name of the deviceService.
// TODO support to update other fields
func (eds *EdgexDeviceServiceClient) Update(ctx context.Context, ds *v1alpha1.DeviceService, options edgeCli.UpdateOptions) (*v1alpha1.DeviceService, error) {
actualDSName := getEdgeDeviceServiceName(ds)
putBaseURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, actualDSName)
if ds == nil {
return nil, nil
}
if ds.Spec.AdminState != "" {
amURL := fmt.Sprintf("%s/adminstate/%s", putBaseURL, ds.Spec.AdminState)
if rep, err := resty.New().R().SetHeader("Content-Type", "application/json").Put(amURL); err != nil {
return nil, err
} else if rep.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("failed to update deviceService: %s, get response: %s", actualDSName, string(rep.Body()))
}
}
if ds.Spec.OperatingState != "" {
opURL := fmt.Sprintf("%s/opstate/%s", putBaseURL, ds.Spec.OperatingState)
if rep, err := resty.New().R().
SetHeader("Content-Type", "application/json").Put(opURL); err != nil {
return nil, err
} else if rep.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("failed to update deviceService: %s, get response: %s", actualDSName, string(rep.Body()))
}
}

return ds, nil
}

// Get is used to query the deviceService information corresponding to the deviceService name
func (eds *EdgexDeviceServiceClient) Get(ctx context.Context, name string, options edgeCli.GetOptions) (*v1alpha1.DeviceService, error) {
eds.V(5).Info("will get DeviceServices",
"DeviceService", name)
var ds v1alpha1.DeviceService
getURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, name)
resp, err := eds.R().Get(getURL)
if err != nil {
return &ds, err
}
if string(resp.Body()) == "Item not found\n" ||
strings.HasPrefix(string(resp.Body()), "no item found") {
return &ds, errors.New("Item not found")
}
var dp models.DeviceService
err = json.Unmarshal(resp.Body(), &dp)
ds = toKubeDeviceService(dp)
return &ds, err
}

// List is used to get all deviceService objects on edge platform
// The Hanoi version currently supports only a single label and does not support other filters
func (eds *EdgexDeviceServiceClient) List(ctx context.Context, options edgeCli.ListOptions) ([]v1alpha1.DeviceService, error) {
eds.V(5).Info("will list DeviceServices")
lp := fmt.Sprintf("http://%s:%d%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath)
if options.LabelSelector != nil {
if _, ok := options.LabelSelector["label"]; ok {
lp = strings.Join([]string{lp, strings.Join([]string{"label", options.LabelSelector["label"]}, "/")}, "/")
}
}
resp, err := eds.R().
EnableTrace().
Get(lp)
if err != nil {
return nil, err
}
dss := []models.DeviceService{}
if err := json.Unmarshal(resp.Body(), &dss); err != nil {
return nil, err
}
var res []v1alpha1.DeviceService
for _, ds := range dss {
res = append(res, toKubeDeviceService(ds))
}
return res, nil
}

// CreateAddressable function sends a POST request to EdgeX to add a new addressable
func (eds *EdgexDeviceServiceClient) CreateAddressable(ctx context.Context, addressable *v1alpha1.Addressable, options edgeCli.CreateOptions) (*v1alpha1.Addressable, error) {
as := toEdgeXAddressable(addressable)
eds.V(5).Info("will add the Addressables",
"Addressable", as.Name)
dpJson, err := json.Marshal(&as)
if err != nil {
return nil, err
}
postPath := fmt.Sprintf("http://%s:%d%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath)
resp, err := eds.R().
SetBody(dpJson).Post(postPath)
if err != nil {
return nil, err
}
createdAddr := addressable.DeepCopy()
createdAddr.Id = string(resp.Body())
return createdAddr, err
}

// DeleteAddressable function sends a request to EdgeX to delete a addressable
func (eds *EdgexDeviceServiceClient) DeleteAddressable(ctx context.Context, name string, options edgeCli.DeleteOptions) error {
eds.V(5).Info("will delete the Addressable",
"Addressable", name)
delURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath, name)
resp, err := eds.R().Delete(delURL)
if err != nil {
return err
}
if string(resp.Body()) != "true" {
return errors.New(string(resp.Body()))
}
return nil
}

// UpdateAddressable is used to update the addressable on edgex foundry
func (eds *EdgexDeviceServiceClient) UpdateAddressable(ctx context.Context, device *v1alpha1.Addressable, options edgeCli.UpdateOptions) (*v1alpha1.Addressable, error) {
return nil, nil
}

// GetAddressable is used to query the addressable information corresponding to the addressable name
func (eds *EdgexDeviceServiceClient) GetAddressable(ctx context.Context, name string, options edgeCli.GetOptions) (*v1alpha1.Addressable, error) {
eds.V(5).Info("will get Addressables",
"Addressable", name)
var addressable v1alpha1.Addressable
getURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath, name)
resp, err := eds.R().Get(getURL)
if err != nil {
return &addressable, err
}
if string(resp.Body()) == "Item not found\n" {
return &addressable, errors.New("Item not found")
}
var maddr models.Addressable
err = json.Unmarshal(resp.Body(), &maddr)
addressable = toKubeAddressable(maddr)
return &addressable, err
}

// ListAddressables is used to get all addressable objects on edge platform
func (eds *EdgexDeviceServiceClient) ListAddressables(ctx context.Context, options edgeCli.ListOptions) ([]v1alpha1.Addressable, error) {
eds.V(5).Info("will list Addressables")
lp := fmt.Sprintf("http://%s:%d%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath)
resp, err := eds.R().
EnableTrace().
Get(lp)
if err != nil {
return nil, err
}
ass := []models.Addressable{}
if err := json.Unmarshal(resp.Body(), &ass); err != nil {
return nil, err
}
var res []v1alpha1.Addressable
for i := range ass {
res = append(res, toKubeAddressable(ass[i]))
}
return res, nil
}
Loading

0 comments on commit 08ba4c2

Please sign in to comment.