Skip to content

Commit

Permalink
feat(data): new API to search Readings by multiple resource names
Browse files Browse the repository at this point in the history
Add a new Get API that will accept multiple resource names in the payload as the query criteria.
/reading/device/name/{deviceName}/start/{start}/end/{end}

the query payload would be:

{
  "resourceNames": ["r1", "r2", "r3"]
}

If the resourceNames or the payload is empty, return all the Readings meet the deviceName and in the start/end timestamp

Signed-off-by: Jude Hung <[email protected]>
  • Loading branch information
judehung committed Oct 20, 2021
1 parent 8641170 commit d6a824c
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 16 deletions.
36 changes: 29 additions & 7 deletions internal/core/data/application/reading.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ func ReadingsByTimeRange(start int, end int, offset int, limit int, dic *di.Cont

if err != nil {
return readings, totalCount, errors.NewCommonEdgeXWrapper(err)
} else {
return readings, totalCount, nil
}
return readings, totalCount, nil
}

func convertReadingModelsToDTOs(readingModels []models.Reading) (readings []dtos.BaseReading, err errors.EdgeX) {
Expand Down Expand Up @@ -140,9 +139,8 @@ func ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int,

if err != nil {
return readings, totalCount, errors.NewCommonEdgeXWrapper(err)
} else {
return readings, totalCount, nil
}
return readings, totalCount, nil
}

// ReadingsByDeviceNameAndResourceName query readings with offset, limit, device name and its associated resource name
Expand All @@ -165,9 +163,8 @@ func ReadingsByDeviceNameAndResourceName(deviceName string, resourceName string,

if err != nil {
return readings, totalCount, errors.NewCommonEdgeXWrapper(err)
} else {
return readings, totalCount, nil
}
return readings, totalCount, nil
}

// ReadingsByDeviceNameAndResourceNameAndTimeRange query readings with offset, limit, device name, its associated resource name and specified time range
Expand All @@ -190,7 +187,32 @@ func ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName string, resource

if err != nil {
return readings, totalCount, errors.NewCommonEdgeXWrapper(err)
}
return readings, totalCount, nil
}

// ReadingsByDeviceNameAndResourceNamesAndTimeRange query readings with offset, limit, device name, its associated resource name and specified time range
func ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int, dic *di.Container) (readings []dtos.BaseReading, totalCount uint32, err errors.EdgeX) {
if deviceName == "" {
return readings, totalCount, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil)
}

dbClient := container.DBClientFrom(dic.Get)
var readingModels []models.Reading
if resourceNames != nil && len(resourceNames) > 0 {
readingModels, totalCount, err = dbClient.ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName, resourceNames, start, end, offset, limit)
} else {
return readings, totalCount, nil
readingModels, err = dbClient.ReadingsByDeviceNameAndTimeRange(deviceName, start, end, offset, limit)
if err == nil {
totalCount, err = dbClient.ReadingCountByDeviceNameAndTimeRange(deviceName, start, end)
}
}

if err == nil {
readings, err = convertReadingModelsToDTOs(readingModels)
}
if err != nil {
return readings, totalCount, errors.NewCommonEdgeXWrapper(err)
}
return readings, totalCount, nil
}
72 changes: 63 additions & 9 deletions internal/core/data/controller/http/reading.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,34 @@
package http

import (
"fmt"
"math"
"net/http"

"github.com/edgexfoundry/edgex-go/internal/core/data/application"
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
"github.com/edgexfoundry/edgex-go/internal/io"
"github.com/edgexfoundry/edgex-go/internal/pkg"
"github.com/edgexfoundry/edgex-go/internal/pkg/utils"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/gorilla/mux"

"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"
responseDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses"

"github.com/edgexfoundry/edgex-go/internal/core/data/application"
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
"github.com/edgexfoundry/edgex-go/internal/pkg"
"github.com/edgexfoundry/edgex-go/internal/pkg/utils"
"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/gorilla/mux"
)

type ReadingController struct {
dic *di.Container
reader io.DtoReader
dic *di.Container
}

// NewReadingController creates and initializes a ReadingController
func NewReadingController(dic *di.Container) *ReadingController {
return &ReadingController{
dic: dic,
reader: io.NewJsonDtoReader(),
dic: dic,
}
}

Expand Down Expand Up @@ -246,3 +249,54 @@ func (rc *ReadingController) ReadingsByDeviceNameAndResourceNameAndTimeRange(w h
utils.WriteHttpHeader(w, ctx, http.StatusOK)
pkg.Encode(response, w, lc)
}

func (rc *ReadingController) ReadingsByDeviceNameAndResourceNamesAndTimeRange(w http.ResponseWriter, r *http.Request) {
lc := container.LoggingClientFrom(rc.dic.Get)
ctx := r.Context()
config := dataContainer.ConfigurationFrom(rc.dic.Get)

vars := mux.Vars(r)
deviceName := vars[common.Name]

// parse time range (start, end), offset, and limit from incoming request
start, end, offset, limit, err := utils.ParseTimeRangeOffsetLimit(r, 0, math.MaxInt32, -1, config.Service.MaxResultCount)
if err != nil {
utils.WriteErrorResponse(w, ctx, lc, err, "")
return
}

var queryPayload map[string]interface{}
if r.Body != http.NoBody { //only parse request body when there are contents provided
err = rc.reader.Read(r.Body, &queryPayload)
if err != nil {
utils.WriteErrorResponse(w, ctx, lc, err, "")
return
}
}

var resourceNames []string
if val, ok := queryPayload[common.ResourceNames]; ok { //look for
switch t := val.(type) {
case []interface{}:
for _, v := range t {
if strVal, ok := v.(string); ok {
resourceNames = append(resourceNames, strVal)
}
}
default:
err = errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("query criteria [%v] not in expected format", common.ResourceNames), nil)
utils.WriteErrorResponse(w, ctx, lc, err, "")
return
}
}

readings, totalCount, err := application.ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName, resourceNames, start, end, offset, limit, rc.dic)
if err != nil {
utils.WriteErrorResponse(w, ctx, lc, err, "")
return
}

response := responseDTO.NewMultiReadingsResponse("", "", http.StatusOK, totalCount, readings)
utils.WriteHttpHeader(w, ctx, http.StatusOK)
pkg.Encode(response, w, lc)
}
3 changes: 3 additions & 0 deletions internal/core/data/infrastructure/interfaces/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ type DBClient interface {
ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int) (uint32, errors.EdgeX)
ReadingCountByTimeRange(start int, end int) (uint32, errors.EdgeX)
ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX)
ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX)
ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX)
ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX)
}
80 changes: 80 additions & 0 deletions internal/core/data/infrastructure/interfaces/mocks/DBClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/core/data/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func LoadRestRoutes(r *mux.Router, dic *di.Container) {
r.HandleFunc(common.ApiReadingByResourceNameAndTimeRangeRoute, rc.ReadingsByResourceNameAndTimeRange).Methods(http.MethodGet)
r.HandleFunc(common.ApiReadingByDeviceNameAndResourceNameRoute, rc.ReadingsByDeviceNameAndResourceName).Methods(http.MethodGet)
r.HandleFunc(common.ApiReadingByDeviceNameAndResourceNameAndTimeRangeRoute, rc.ReadingsByDeviceNameAndResourceNameAndTimeRange).Methods(http.MethodGet)
r.HandleFunc(common.ApiReadingByDeviceNameAndTimeRangeRoute, rc.ReadingsByDeviceNameAndResourceNamesAndTimeRange).Methods(http.MethodGet)

r.Use(correlation.ManageHeader)
r.Use(correlation.LoggingMiddleware(container.LoggingClientFrom(dic.Get)))
Expand Down
38 changes: 38 additions & 0 deletions internal/pkg/infrastructure/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,44 @@ func (c *Client) ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName stri
return readings, nil
}

func (c *Client) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) (readings []model.Reading, totalCount uint32, err errors.EdgeX) {
conn := c.Pool.Get()
defer conn.Close()

readings, totalCount, err = readingsByDeviceNameAndResourceNamesAndTimeRange(conn, deviceName, resourceNames, start, end, offset, limit)
if err != nil {
return readings, totalCount, errors.NewCommonEdgeX(errors.Kind(err),
fmt.Sprintf("fail to query readings by deviceName %s, resourceNames %v and time range %v ~ %v", deviceName, resourceNames, start, end), err)
}

return readings, totalCount, nil
}

func (c *Client) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) (readings []model.Reading, err errors.EdgeX) {
conn := c.Pool.Get()
defer conn.Close()

readings, err = readingsByDeviceNameAndTimeRange(conn, deviceName, start, end, offset, limit)
if err != nil {
return readings, errors.NewCommonEdgeX(errors.Kind(err),
fmt.Sprintf("fail to query readings by deviceName %s, and time range %v ~ %v", deviceName, start, end), err)
}

return readings, nil
}

func (c *Client) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) {
conn := c.Pool.Get()
defer conn.Close()

count, edgeXerr := getMemberCountByScoreRange(conn, CreateKey(ReadingsCollectionDeviceName, deviceName), start, end)
if edgeXerr != nil {
return 0, errors.NewCommonEdgeXWrapper(edgeXerr)
}

return count, nil
}

// AddProvisionWatcher adds a new provision watcher
func (c *Client) AddProvisionWatcher(pw model.ProvisionWatcher) (model.ProvisionWatcher, errors.EdgeX) {
conn := c.Pool.Get()
Expand Down
41 changes: 41 additions & 0 deletions internal/pkg/infrastructure/redis/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,47 @@ func objectsByKeys(conn redis.Conn, setMethod string, offset int, limit int, red
return objects, nil
}

// unionObjectsByKeysAndScoreRange returns objects resulting from the union of all the given sets with specified score range, offset, and limit
func unionObjectsByKeysAndScoreRange(conn redis.Conn, start, end, offset, limit int, redisKeys ...string) ([][]byte, uint32, errors.EdgeX) {
return objectsByKeysAndScoreRange(conn, ZUNIONSTORE, start, end, offset, limit, redisKeys...)
}

// objectsByKeysAndScoreRange returns objects resulting from the set method of all the given sets with specified score range, offset, and limit. The data set method could be either ZINTERSTORE or ZUNIONSTORE
func objectsByKeysAndScoreRange(conn redis.Conn, setMethod string, start, end, offset, limit int, redisKeys ...string) (objects [][]byte, totalCount uint32, edgeXerr errors.EdgeX) {
// build up the redis command arguments
args := redis.Args{}
cacheSet := uuid.New().String()
args = append(args, cacheSet)
args = append(args, strconv.Itoa(len(redisKeys)))
for _, key := range redisKeys {
args = args.Add(key)
}

// create a temporary sorted set, cacheSet, resulting from the specified setMethod
_, err := conn.Do(setMethod, args...)
if err != nil {
return nil, totalCount, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to execute %s command with args %v", setMethod, args), err)
}

// get the total count of the temporary sorted set
if totalCount, edgeXerr = getMemberCountByScoreRange(conn, cacheSet, start, end); edgeXerr != nil {
return nil, totalCount, edgeXerr
}

// get objects from the temporary sorted set
if objects, edgeXerr = getObjectsByScoreRange(conn, cacheSet, start, end, offset, limit); edgeXerr != nil {
return nil, totalCount, edgeXerr
}

// clean up unused temporary sorted set
_, err = redis.Int(conn.Do(DEL, cacheSet))
if err != nil {
return nil, totalCount, errors.NewCommonEdgeX(errors.KindDatabaseError, "cache set deletion failed", err)
}

return objects, totalCount, nil
}

// idFromStoredKey extracts Id from the store key
func idFromStoredKey(storeKey string) string {
substrings := strings.Split(storeKey, DBKeySeparator)
Expand Down
Loading

0 comments on commit d6a824c

Please sign in to comment.