diff --git a/internal/core/data/application/reading.go b/internal/core/data/application/reading.go index 26ae06e262..c69b5cea30 100644 --- a/internal/core/data/application/reading.go +++ b/internal/core/data/application/reading.go @@ -97,9 +97,8 @@ func ReadingsByTimeRange(start int, end int, offset int, limit int, dic *di.Cont if err != nil { return readings, totalCount, errors.NewCommonEdgeXWrapper(err) - } else { - return readings, totalCount, nil } + return readings, totalCount, nil } func convertReadingModelsToDTOs(readingModels []models.Reading) (readings []dtos.BaseReading, err errors.EdgeX) { @@ -140,9 +139,8 @@ func ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, if err != nil { return readings, totalCount, errors.NewCommonEdgeXWrapper(err) - } else { - return readings, totalCount, nil } + return readings, totalCount, nil } // ReadingsByDeviceNameAndResourceName query readings with offset, limit, device name and its associated resource name @@ -165,9 +163,8 @@ func ReadingsByDeviceNameAndResourceName(deviceName string, resourceName string, if err != nil { return readings, totalCount, errors.NewCommonEdgeXWrapper(err) - } else { - return readings, totalCount, nil } + return readings, totalCount, nil } // ReadingsByDeviceNameAndResourceNameAndTimeRange query readings with offset, limit, device name, its associated resource name and specified time range @@ -190,7 +187,32 @@ func ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName string, resource if err != nil { return readings, totalCount, errors.NewCommonEdgeXWrapper(err) + } + return readings, totalCount, nil +} + +// ReadingsByDeviceNameAndResourceNamesAndTimeRange query readings with offset, limit, device name, its associated resource name and specified time range +func ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int, dic *di.Container) (readings []dtos.BaseReading, totalCount uint32, err errors.EdgeX) { + if deviceName == "" { + return readings, totalCount, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil) + } + + dbClient := container.DBClientFrom(dic.Get) + var readingModels []models.Reading + if resourceNames != nil && len(resourceNames) > 0 { + readingModels, totalCount, err = dbClient.ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName, resourceNames, start, end, offset, limit) } else { - return readings, totalCount, nil + readingModels, err = dbClient.ReadingsByDeviceNameAndTimeRange(deviceName, start, end, offset, limit) + if err == nil { + totalCount, err = dbClient.ReadingCountByDeviceNameAndTimeRange(deviceName, start, end) + } + } + + if err == nil { + readings, err = convertReadingModelsToDTOs(readingModels) + } + if err != nil { + return readings, totalCount, errors.NewCommonEdgeXWrapper(err) } + return readings, totalCount, nil } diff --git a/internal/core/data/controller/http/reading.go b/internal/core/data/controller/http/reading.go index 6fb34cff94..d3d650ca8c 100644 --- a/internal/core/data/controller/http/reading.go +++ b/internal/core/data/controller/http/reading.go @@ -6,31 +6,34 @@ package http import ( + "fmt" "math" "net/http" + "github.com/edgexfoundry/edgex-go/internal/core/data/application" + dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" + "github.com/edgexfoundry/edgex-go/internal/io" + "github.com/edgexfoundry/edgex-go/internal/pkg" + "github.com/edgexfoundry/edgex-go/internal/pkg/utils" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" - "github.com/gorilla/mux" - "github.com/edgexfoundry/go-mod-core-contracts/v2/common" commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common" responseDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses" - - "github.com/edgexfoundry/edgex-go/internal/core/data/application" - dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" - "github.com/edgexfoundry/edgex-go/internal/pkg" - "github.com/edgexfoundry/edgex-go/internal/pkg/utils" + "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" + "github.com/gorilla/mux" ) type ReadingController struct { - dic *di.Container + reader io.DtoReader + dic *di.Container } // NewReadingController creates and initializes a ReadingController func NewReadingController(dic *di.Container) *ReadingController { return &ReadingController{ - dic: dic, + reader: io.NewJsonDtoReader(), + dic: dic, } } @@ -246,3 +249,54 @@ func (rc *ReadingController) ReadingsByDeviceNameAndResourceNameAndTimeRange(w h utils.WriteHttpHeader(w, ctx, http.StatusOK) pkg.Encode(response, w, lc) } + +func (rc *ReadingController) ReadingsByDeviceNameAndResourceNamesAndTimeRange(w http.ResponseWriter, r *http.Request) { + lc := container.LoggingClientFrom(rc.dic.Get) + ctx := r.Context() + config := dataContainer.ConfigurationFrom(rc.dic.Get) + + vars := mux.Vars(r) + deviceName := vars[common.Name] + + // parse time range (start, end), offset, and limit from incoming request + start, end, offset, limit, err := utils.ParseTimeRangeOffsetLimit(r, 0, math.MaxInt32, -1, config.Service.MaxResultCount) + if err != nil { + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + + var queryPayload map[string]interface{} + if r.Body != http.NoBody { //only parse request body when there are contents provided + err = rc.reader.Read(r.Body, &queryPayload) + if err != nil { + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + } + + var resourceNames []string + if val, ok := queryPayload[common.ResourceNames]; ok { //look for + switch t := val.(type) { + case []interface{}: + for _, v := range t { + if strVal, ok := v.(string); ok { + resourceNames = append(resourceNames, strVal) + } + } + default: + err = errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("query criteria [%v] not in expected format", common.ResourceNames), nil) + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + } + + readings, totalCount, err := application.ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName, resourceNames, start, end, offset, limit, rc.dic) + if err != nil { + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + + response := responseDTO.NewMultiReadingsResponse("", "", http.StatusOK, totalCount, readings) + utils.WriteHttpHeader(w, ctx, http.StatusOK) + pkg.Encode(response, w, lc) +} diff --git a/internal/core/data/infrastructure/interfaces/db.go b/internal/core/data/infrastructure/interfaces/db.go index 97f879c3fc..31f966705b 100644 --- a/internal/core/data/infrastructure/interfaces/db.go +++ b/internal/core/data/infrastructure/interfaces/db.go @@ -38,4 +38,7 @@ type DBClient interface { ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int) (uint32, errors.EdgeX) ReadingCountByTimeRange(start int, end int) (uint32, errors.EdgeX) ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) + ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX) + ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) + ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) } diff --git a/internal/core/data/infrastructure/interfaces/mocks/DBClient.go b/internal/core/data/infrastructure/interfaces/mocks/DBClient.go index 1f08778d9f..ef52a1f424 100644 --- a/internal/core/data/infrastructure/interfaces/mocks/DBClient.go +++ b/internal/core/data/infrastructure/interfaces/mocks/DBClient.go @@ -352,6 +352,29 @@ func (_m *DBClient) ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceNa return r0, r1 } +// ReadingCountByDeviceNameAndTimeRange provides a mock function with given fields: deviceName, start, end +func (_m *DBClient) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { + ret := _m.Called(deviceName, start, end) + + var r0 uint32 + if rf, ok := ret.Get(0).(func(string, int, int) uint32); ok { + r0 = rf(deviceName, start, end) + } else { + r0 = ret.Get(0).(uint32) + } + + var r1 errors.EdgeX + if rf, ok := ret.Get(1).(func(string, int, int) errors.EdgeX); ok { + r1 = rf(deviceName, start, end) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(errors.EdgeX) + } + } + + return r0, r1 +} + // ReadingCountByResourceName provides a mock function with given fields: resourceName func (_m *DBClient) ReadingCountByResourceName(resourceName string) (uint32, errors.EdgeX) { ret := _m.Called(resourceName) @@ -519,6 +542,63 @@ func (_m *DBClient) ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName s return r0, r1 } +// ReadingsByDeviceNameAndResourceNamesAndTimeRange provides a mock function with given fields: deviceName, resourceNames, start, end, offset, limit +func (_m *DBClient) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start int, end int, offset int, limit int) ([]models.Reading, uint32, errors.EdgeX) { + ret := _m.Called(deviceName, resourceNames, start, end, offset, limit) + + var r0 []models.Reading + if rf, ok := ret.Get(0).(func(string, []string, int, int, int, int) []models.Reading); ok { + r0 = rf(deviceName, resourceNames, start, end, offset, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]models.Reading) + } + } + + var r1 uint32 + if rf, ok := ret.Get(1).(func(string, []string, int, int, int, int) uint32); ok { + r1 = rf(deviceName, resourceNames, start, end, offset, limit) + } else { + r1 = ret.Get(1).(uint32) + } + + var r2 errors.EdgeX + if rf, ok := ret.Get(2).(func(string, []string, int, int, int, int) errors.EdgeX); ok { + r2 = rf(deviceName, resourceNames, start, end, offset, limit) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).(errors.EdgeX) + } + } + + return r0, r1, r2 +} + +// ReadingsByDeviceNameAndTimeRange provides a mock function with given fields: deviceName, start, end, offset, limit +func (_m *DBClient) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]models.Reading, errors.EdgeX) { + ret := _m.Called(deviceName, start, end, offset, limit) + + var r0 []models.Reading + if rf, ok := ret.Get(0).(func(string, int, int, int, int) []models.Reading); ok { + r0 = rf(deviceName, start, end, offset, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]models.Reading) + } + } + + var r1 errors.EdgeX + if rf, ok := ret.Get(1).(func(string, int, int, int, int) errors.EdgeX); ok { + r1 = rf(deviceName, start, end, offset, limit) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(errors.EdgeX) + } + } + + return r0, r1 +} + // ReadingsByResourceName provides a mock function with given fields: offset, limit, resourceName func (_m *DBClient) ReadingsByResourceName(offset int, limit int, resourceName string) ([]models.Reading, errors.EdgeX) { ret := _m.Called(offset, limit, resourceName) diff --git a/internal/core/data/router.go b/internal/core/data/router.go index 7c25d4632a..4a2ba850fb 100644 --- a/internal/core/data/router.go +++ b/internal/core/data/router.go @@ -51,6 +51,7 @@ func LoadRestRoutes(r *mux.Router, dic *di.Container) { r.HandleFunc(common.ApiReadingByResourceNameAndTimeRangeRoute, rc.ReadingsByResourceNameAndTimeRange).Methods(http.MethodGet) r.HandleFunc(common.ApiReadingByDeviceNameAndResourceNameRoute, rc.ReadingsByDeviceNameAndResourceName).Methods(http.MethodGet) r.HandleFunc(common.ApiReadingByDeviceNameAndResourceNameAndTimeRangeRoute, rc.ReadingsByDeviceNameAndResourceNameAndTimeRange).Methods(http.MethodGet) + r.HandleFunc(common.ApiReadingByDeviceNameAndTimeRangeRoute, rc.ReadingsByDeviceNameAndResourceNamesAndTimeRange).Methods(http.MethodGet) r.Use(correlation.ManageHeader) r.Use(correlation.LoggingMiddleware(container.LoggingClientFrom(dic.Get))) diff --git a/internal/pkg/infrastructure/redis/client.go b/internal/pkg/infrastructure/redis/client.go index c06c5d2dd0..0abb2d3b9a 100644 --- a/internal/pkg/infrastructure/redis/client.go +++ b/internal/pkg/infrastructure/redis/client.go @@ -678,6 +678,44 @@ func (c *Client) ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName stri return readings, nil } +func (c *Client) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) (readings []model.Reading, totalCount uint32, err errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + readings, totalCount, err = readingsByDeviceNameAndResourceNamesAndTimeRange(conn, deviceName, resourceNames, start, end, offset, limit) + if err != nil { + return readings, totalCount, errors.NewCommonEdgeX(errors.Kind(err), + fmt.Sprintf("fail to query readings by deviceName %s, resourceNames %v and time range %v ~ %v", deviceName, resourceNames, start, end), err) + } + + return readings, totalCount, nil +} + +func (c *Client) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) (readings []model.Reading, err errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + readings, err = readingsByDeviceNameAndTimeRange(conn, deviceName, start, end, offset, limit) + if err != nil { + return readings, errors.NewCommonEdgeX(errors.Kind(err), + fmt.Sprintf("fail to query readings by deviceName %s, and time range %v ~ %v", deviceName, start, end), err) + } + + return readings, nil +} + +func (c *Client) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + count, edgeXerr := getMemberCountByScoreRange(conn, CreateKey(ReadingsCollectionDeviceName, deviceName), start, end) + if edgeXerr != nil { + return 0, errors.NewCommonEdgeXWrapper(edgeXerr) + } + + return count, nil +} + // AddProvisionWatcher adds a new provision watcher func (c *Client) AddProvisionWatcher(pw model.ProvisionWatcher) (model.ProvisionWatcher, errors.EdgeX) { conn := c.Pool.Get() diff --git a/internal/pkg/infrastructure/redis/queries.go b/internal/pkg/infrastructure/redis/queries.go index 2a561cea67..b1e70b9292 100644 --- a/internal/pkg/infrastructure/redis/queries.go +++ b/internal/pkg/infrastructure/redis/queries.go @@ -258,6 +258,47 @@ func objectsByKeys(conn redis.Conn, setMethod string, offset int, limit int, red return objects, nil } +// unionObjectsByKeysAndScoreRange returns objects resulting from the union of all the given sets with specified score range, offset, and limit +func unionObjectsByKeysAndScoreRange(conn redis.Conn, start, end, offset, limit int, redisKeys ...string) ([][]byte, uint32, errors.EdgeX) { + return objectsByKeysAndScoreRange(conn, ZUNIONSTORE, start, end, offset, limit, redisKeys...) +} + +// objectsByKeysAndScoreRange returns objects resulting from the set method of all the given sets with specified score range, offset, and limit. The data set method could be either ZINTERSTORE or ZUNIONSTORE +func objectsByKeysAndScoreRange(conn redis.Conn, setMethod string, start, end, offset, limit int, redisKeys ...string) (objects [][]byte, totalCount uint32, edgeXerr errors.EdgeX) { + // build up the redis command arguments + args := redis.Args{} + cacheSet := uuid.New().String() + args = append(args, cacheSet) + args = append(args, strconv.Itoa(len(redisKeys))) + for _, key := range redisKeys { + args = args.Add(key) + } + + // create a temporary sorted set, cacheSet, resulting from the specified setMethod + _, err := conn.Do(setMethod, args...) + if err != nil { + return nil, totalCount, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to execute %s command with args %v", setMethod, args), err) + } + + // get the total count of the temporary sorted set + if totalCount, edgeXerr = getMemberCountByScoreRange(conn, cacheSet, start, end); edgeXerr != nil { + return nil, totalCount, edgeXerr + } + + // get objects from the temporary sorted set + if objects, edgeXerr = getObjectsByScoreRange(conn, cacheSet, start, end, offset, limit); edgeXerr != nil { + return nil, totalCount, edgeXerr + } + + // clean up unused temporary sorted set + _, err = redis.Int(conn.Do(DEL, cacheSet)) + if err != nil { + return nil, totalCount, errors.NewCommonEdgeX(errors.KindDatabaseError, "cache set deletion failed", err) + } + + return objects, totalCount, nil +} + // idFromStoredKey extracts Id from the store key func idFromStoredKey(storeKey string) string { substrings := strings.Split(storeKey, DBKeySeparator) diff --git a/internal/pkg/infrastructure/redis/reading.go b/internal/pkg/infrastructure/redis/reading.go index a8a7895625..a17c1b592e 100644 --- a/internal/pkg/infrastructure/redis/reading.go +++ b/internal/pkg/infrastructure/redis/reading.go @@ -234,6 +234,20 @@ func readingsByDeviceNameAndResourceNameAndTimeRange(conn redis.Conn, deviceName return convertObjectsToReadings(objects) } +func readingsByDeviceNameAndResourceNamesAndTimeRange(conn redis.Conn, deviceName string, resourceNames []string, startTime int, endTime int, offset int, limit int) (readings []models.Reading, totalCount uint32, err errors.EdgeX) { + var redisKeys []string + for _, resourceName := range resourceNames { + redisKeys = append(redisKeys, CreateKey(ReadingsCollectionDeviceNameResourceName, deviceName, resourceName)) + } + + objects, totalCount, err := unionObjectsByKeysAndScoreRange(conn, startTime, endTime, offset, limit, redisKeys...) + if err != nil { + return readings, totalCount, err + } + readings, err = convertObjectsToReadings(objects) + return readings, totalCount, err +} + // readingsByTimeRange query readings by time range, offset, and limit func readingsByTimeRange(conn redis.Conn, startTime int, endTime int, offset int, limit int) (readings []models.Reading, edgeXerr errors.EdgeX) { objects, edgeXerr := getObjectsByScoreRange(conn, ReadingsCollectionOrigin, startTime, endTime, offset, limit) @@ -251,6 +265,14 @@ func readingsByResourceNameAndTimeRange(conn redis.Conn, resourceName string, st return convertObjectsToReadings(objects) } +func readingsByDeviceNameAndTimeRange(conn redis.Conn, deviceName string, startTime int, endTime int, offset int, limit int) (readings []models.Reading, edgeXerr errors.EdgeX) { + objects, edgeXerr := getObjectsByScoreRange(conn, CreateKey(ReadingsCollectionDeviceName, deviceName), startTime, endTime, offset, limit) + if edgeXerr != nil { + return readings, edgeXerr + } + return convertObjectsToReadings(objects) +} + func convertObjectsToReadings(objects [][]byte) (readings []models.Reading, edgeXerr errors.EdgeX) { readings = make([]models.Reading, len(objects)) var alias struct {