From dff1392c4be1e731651b8f30ca47908e53ddd988 Mon Sep 17 00:00:00 2001 From: Jude Hung Date: Tue, 19 Oct 2021 09:28:33 +0800 Subject: [PATCH] feat(data): new API to search Readings by multiple resource names Add a new Get API that will accept multiple resource names in the payload as the query criteria. /reading/device/name/{deviceName}/start/{start}/end/{end} the query payload would be: { "resourceNames": ["r1", "r2", "r3"] } If the resourceNames or the payload is empty, return all the Readings meet the deviceName and in the start/end timestamp Unit tests are added and openapi doc is updated Signed-off-by: Jude Hung --- go.mod | 2 +- go.sum | 4 +- internal/core/data/application/reading.go | 36 +++++-- internal/core/data/controller/http/reading.go | 72 ++++++++++++-- .../core/data/controller/http/reading_test.go | 93 +++++++++++++++++++ .../core/data/infrastructure/interfaces/db.go | 3 + .../interfaces/mocks/DBClient.go | 80 ++++++++++++++++ internal/core/data/router.go | 1 + internal/pkg/infrastructure/redis/client.go | 38 ++++++++ internal/pkg/infrastructure/redis/queries.go | 41 ++++++++ internal/pkg/infrastructure/redis/reading.go | 22 +++++ openapi/v2/core-data.yaml | 88 ++++++++++++++++++ 12 files changed, 461 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 576750e96e..064e58685c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/edgexfoundry/edgex-go require ( bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690 github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.0 - github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.23 + github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.25 github.com/edgexfoundry/go-mod-messaging/v2 v2.0.1 github.com/edgexfoundry/go-mod-registry/v2 v2.0.0 github.com/edgexfoundry/go-mod-secrets/v2 v2.0.0 diff --git a/go.sum b/go.sum index 25117ec21d..fa03fbd57f 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.0/go.mod h1:E5KLeFEwTuIbjrKMCIn github.com/edgexfoundry/go-mod-configuration/v2 v2.0.0 h1:S1rUyJPJWSvlkNHLbHLFWBvqzH+ShO1xPp4HH7Pvn9I= github.com/edgexfoundry/go-mod-configuration/v2 v2.0.0/go.mod h1:NZnjQtCdQtPH/eoOuTKA+8+FsoUU3WSlZ5JpXWuiYQQ= github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0/go.mod h1:pfXURRetgIto0GR0sCjDrfa71hqJ1wxmQWi/mOzWfWU= -github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.23 h1:kVg8RVx2MaOOctd3I6XLy0N3exOZ3muyK4IHRyD9Dzs= -github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.23/go.mod h1:I6UhBPCREubcU0ouIGBdZlNG5Xx4NijUVN5rvEtD03k= +github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.25 h1:AFQD5sbxpAfwESF/SXApyq7piSDgoioWLL5D3GY8qvw= +github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.1-dev.25/go.mod h1:I6UhBPCREubcU0ouIGBdZlNG5Xx4NijUVN5rvEtD03k= github.com/edgexfoundry/go-mod-messaging/v2 v2.0.1 h1:8nT3CiPLIft5RmR+vbmXBW9Kbz7TqPZ6C8QuQ6TTn6w= github.com/edgexfoundry/go-mod-messaging/v2 v2.0.1/go.mod h1:bLKWB9yeOHLZoQtHLZlGwz8MjsMJIvHDFce7CcUb4fE= github.com/edgexfoundry/go-mod-registry/v2 v2.0.0 h1:FCodcfCo3EqgINbGa9Rn6LqbiwkdT2FPKgCnk81GbFs= diff --git a/internal/core/data/application/reading.go b/internal/core/data/application/reading.go index 26ae06e262..eebc1b2c80 100644 --- a/internal/core/data/application/reading.go +++ b/internal/core/data/application/reading.go @@ -97,9 +97,8 @@ func ReadingsByTimeRange(start int, end int, offset int, limit int, dic *di.Cont if err != nil { return readings, totalCount, errors.NewCommonEdgeXWrapper(err) - } else { - return readings, totalCount, nil } + return readings, totalCount, nil } func convertReadingModelsToDTOs(readingModels []models.Reading) (readings []dtos.BaseReading, err errors.EdgeX) { @@ -140,9 +139,8 @@ func ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, if err != nil { return readings, totalCount, errors.NewCommonEdgeXWrapper(err) - } else { - return readings, totalCount, nil } + return readings, totalCount, nil } // ReadingsByDeviceNameAndResourceName query readings with offset, limit, device name and its associated resource name @@ -165,9 +163,8 @@ func ReadingsByDeviceNameAndResourceName(deviceName string, resourceName string, if err != nil { return readings, totalCount, errors.NewCommonEdgeXWrapper(err) - } else { - return readings, totalCount, nil } + return readings, totalCount, nil } // ReadingsByDeviceNameAndResourceNameAndTimeRange query readings with offset, limit, device name, its associated resource name and specified time range @@ -190,7 +187,32 @@ func ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName string, resource if err != nil { return readings, totalCount, errors.NewCommonEdgeXWrapper(err) + } + return readings, totalCount, nil +} + +// ReadingsByDeviceNameAndResourceNamesAndTimeRange query readings with offset, limit, device name, its associated resource name and specified time range +func ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int, dic *di.Container) (readings []dtos.BaseReading, totalCount uint32, err errors.EdgeX) { + if deviceName == "" { + return readings, totalCount, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil) + } + + dbClient := container.DBClientFrom(dic.Get) + var readingModels []models.Reading + if len(resourceNames) > 0 { + readingModels, totalCount, err = dbClient.ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName, resourceNames, start, end, offset, limit) } else { - return readings, totalCount, nil + readingModels, err = dbClient.ReadingsByDeviceNameAndTimeRange(deviceName, start, end, offset, limit) + if err == nil { + totalCount, err = dbClient.ReadingCountByDeviceNameAndTimeRange(deviceName, start, end) + } + } + + if err == nil { + readings, err = convertReadingModelsToDTOs(readingModels) + } + if err != nil { + return readings, totalCount, errors.NewCommonEdgeXWrapper(err) } + return readings, totalCount, nil } diff --git a/internal/core/data/controller/http/reading.go b/internal/core/data/controller/http/reading.go index 6fb34cff94..d3d650ca8c 100644 --- a/internal/core/data/controller/http/reading.go +++ b/internal/core/data/controller/http/reading.go @@ -6,31 +6,34 @@ package http import ( + "fmt" "math" "net/http" + "github.com/edgexfoundry/edgex-go/internal/core/data/application" + dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" + "github.com/edgexfoundry/edgex-go/internal/io" + "github.com/edgexfoundry/edgex-go/internal/pkg" + "github.com/edgexfoundry/edgex-go/internal/pkg/utils" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" - "github.com/gorilla/mux" - "github.com/edgexfoundry/go-mod-core-contracts/v2/common" commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common" responseDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses" - - "github.com/edgexfoundry/edgex-go/internal/core/data/application" - dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" - "github.com/edgexfoundry/edgex-go/internal/pkg" - "github.com/edgexfoundry/edgex-go/internal/pkg/utils" + "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" + "github.com/gorilla/mux" ) type ReadingController struct { - dic *di.Container + reader io.DtoReader + dic *di.Container } // NewReadingController creates and initializes a ReadingController func NewReadingController(dic *di.Container) *ReadingController { return &ReadingController{ - dic: dic, + reader: io.NewJsonDtoReader(), + dic: dic, } } @@ -246,3 +249,54 @@ func (rc *ReadingController) ReadingsByDeviceNameAndResourceNameAndTimeRange(w h utils.WriteHttpHeader(w, ctx, http.StatusOK) pkg.Encode(response, w, lc) } + +func (rc *ReadingController) ReadingsByDeviceNameAndResourceNamesAndTimeRange(w http.ResponseWriter, r *http.Request) { + lc := container.LoggingClientFrom(rc.dic.Get) + ctx := r.Context() + config := dataContainer.ConfigurationFrom(rc.dic.Get) + + vars := mux.Vars(r) + deviceName := vars[common.Name] + + // parse time range (start, end), offset, and limit from incoming request + start, end, offset, limit, err := utils.ParseTimeRangeOffsetLimit(r, 0, math.MaxInt32, -1, config.Service.MaxResultCount) + if err != nil { + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + + var queryPayload map[string]interface{} + if r.Body != http.NoBody { //only parse request body when there are contents provided + err = rc.reader.Read(r.Body, &queryPayload) + if err != nil { + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + } + + var resourceNames []string + if val, ok := queryPayload[common.ResourceNames]; ok { //look for + switch t := val.(type) { + case []interface{}: + for _, v := range t { + if strVal, ok := v.(string); ok { + resourceNames = append(resourceNames, strVal) + } + } + default: + err = errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("query criteria [%v] not in expected format", common.ResourceNames), nil) + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + } + + readings, totalCount, err := application.ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName, resourceNames, start, end, offset, limit, rc.dic) + if err != nil { + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + + response := responseDTO.NewMultiReadingsResponse("", "", http.StatusOK, totalCount, readings) + utils.WriteHttpHeader(w, ctx, http.StatusOK) + pkg.Encode(response, w, lc) +} diff --git a/internal/core/data/controller/http/reading_test.go b/internal/core/data/controller/http/reading_test.go index bc62142ecb..c63976ba88 100644 --- a/internal/core/data/controller/http/reading_test.go +++ b/internal/core/data/controller/http/reading_test.go @@ -2,8 +2,10 @@ package http import ( "encoding/json" + "io" "net/http" "net/http/httptest" + "strings" "testing" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" @@ -589,3 +591,94 @@ func TestReadingsByDeviceNameAndResourceNameAndTimeRange(t *testing.T) { }) } } + +func TestReadingsByDeviceNameAndResourceNamesAndTimeRange(t *testing.T) { + totalCount := uint32(0) + testResourceNames := []string{"resource01", "resource02"} + emptyPayload := make(map[string]interface{}) + testResourceNamesPayload := emptyPayload + testResourceNamesPayload[common.ResourceNames] = testResourceNames + dic := mocks.NewMockDIC() + dbClientMock := &dbMock.DBClient{} + dbClientMock.On("ReadingCountByDeviceNameAndTimeRange", TestDeviceName, 0, 100).Return(totalCount, nil) + dbClientMock.On("ReadingsByDeviceNameAndTimeRange", TestDeviceName, 0, 100, 0, 10).Return([]models.Reading{}, nil) + dbClientMock.On("ReadingsByDeviceNameAndResourceNamesAndTimeRange", TestDeviceName, testResourceNames, 0, 100, 0, 10).Return([]models.Reading{}, totalCount, nil) + dic.Update(di.ServiceConstructorMap{ + container.DBClientInterfaceName: func(get di.Get) interface{} { + return dbClientMock + }, + }) + rc := NewReadingController(dic) + assert.NotNil(t, rc) + + tests := []struct { + name string + deviceName string + payload map[string]interface{} + start string + end string + offset string + limit string + errorExpected bool + expectedTotalCount uint32 + expectedStatusCode int + }{ + {"Valid - provide deviceName and nil resourceNames", TestDeviceName, nil, "0", "100", "0", "10", false, totalCount, http.StatusOK}, + {"Valid - provide deviceName and empty resourceNames", TestDeviceName, emptyPayload, "0", "100", "0", "10", false, totalCount, http.StatusOK}, + {"Valid - provide deviceName and resourceNames", TestDeviceName, testResourceNamesPayload, "0", "100", "0", "10", false, totalCount, http.StatusOK}, + {"Invalid - empty deviceName", "", testResourceNamesPayload, "0", "100", "0", "10", true, totalCount, http.StatusBadRequest}, + {"Invalid - invalid start format", TestDeviceName, testResourceNamesPayload, "aaa", "100", "0", "10", true, totalCount, http.StatusBadRequest}, + {"Invalid - invalid end format", TestDeviceName, testResourceNamesPayload, "0", "bbb", "0", "10", true, totalCount, http.StatusBadRequest}, + {"Invalid - empty start", TestDeviceName, testResourceNamesPayload, "", "100", "0", "10", true, totalCount, http.StatusBadRequest}, + {"Invalid - empty end", TestDeviceName, testResourceNamesPayload, "0", "", "0", "10", true, totalCount, http.StatusBadRequest}, + {"Invalid - end before start", TestDeviceName, testResourceNamesPayload, "10", "0", "0", "10", true, totalCount, http.StatusBadRequest}, + {"Invalid - invalid offset format", TestDeviceName, testResourceNamesPayload, "0", "100", "aaa", "10", true, totalCount, http.StatusBadRequest}, + {"Invalid - invalid limit format", TestDeviceName, testResourceNamesPayload, "0", "100", "0", "aaa", true, totalCount, http.StatusBadRequest}, + } + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + var reader io.Reader + if testCase.payload != nil { + byteData, err := toByteArray(common.ContentTypeJSON, testCase.payload) + require.NoError(t, err) + reader = strings.NewReader(string(byteData)) + } else { + reader = http.NoBody + } + req, err := http.NewRequest(http.MethodGet, common.ApiReadingByDeviceNameAndTimeRangeRoute, reader) + req.Header.Set(common.ContentType, common.ContentTypeJSON) + require.NoError(t, err) + query := req.URL.Query() + query.Add(common.Offset, testCase.offset) + query.Add(common.Limit, testCase.limit) + req.URL.RawQuery = query.Encode() + req = mux.SetURLVars(req, map[string]string{common.Name: testCase.deviceName, common.Start: testCase.start, common.End: testCase.end}) + require.NoError(t, err) + + // Act + recorder := httptest.NewRecorder() + handler := http.HandlerFunc(rc.ReadingsByDeviceNameAndResourceNamesAndTimeRange) + handler.ServeHTTP(recorder, req) + + // Assert + if testCase.errorExpected { + var res commonDTO.BaseResponse + err = json.Unmarshal(recorder.Body.Bytes(), &res) + require.NoError(t, err) + assert.Equal(t, common.ApiVersion, res.ApiVersion, "API Version not as expected") + assert.Equal(t, testCase.expectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") + assert.Equal(t, testCase.expectedStatusCode, res.StatusCode, "Response status code not as expected") + assert.NotEmpty(t, res.Message, "Response message doesn't contain the error message") + } else { + var res responseDTO.MultiReadingsResponse + err = json.Unmarshal(recorder.Body.Bytes(), &res) + require.NoError(t, err) + assert.Equal(t, common.ApiVersion, res.ApiVersion, "API Version not as expected") + assert.Equal(t, testCase.expectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") + assert.Equal(t, testCase.expectedStatusCode, res.StatusCode, "Response status code not as expected") + assert.Empty(t, res.Message, "Message should be empty when it is successful") + assert.Equal(t, testCase.expectedTotalCount, res.TotalCount, "Total count not as expected") + } + }) + } +} diff --git a/internal/core/data/infrastructure/interfaces/db.go b/internal/core/data/infrastructure/interfaces/db.go index 97f879c3fc..31f966705b 100644 --- a/internal/core/data/infrastructure/interfaces/db.go +++ b/internal/core/data/infrastructure/interfaces/db.go @@ -38,4 +38,7 @@ type DBClient interface { ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int) (uint32, errors.EdgeX) ReadingCountByTimeRange(start int, end int) (uint32, errors.EdgeX) ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) + ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX) + ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) + ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) } diff --git a/internal/core/data/infrastructure/interfaces/mocks/DBClient.go b/internal/core/data/infrastructure/interfaces/mocks/DBClient.go index 1f08778d9f..ef52a1f424 100644 --- a/internal/core/data/infrastructure/interfaces/mocks/DBClient.go +++ b/internal/core/data/infrastructure/interfaces/mocks/DBClient.go @@ -352,6 +352,29 @@ func (_m *DBClient) ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceNa return r0, r1 } +// ReadingCountByDeviceNameAndTimeRange provides a mock function with given fields: deviceName, start, end +func (_m *DBClient) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { + ret := _m.Called(deviceName, start, end) + + var r0 uint32 + if rf, ok := ret.Get(0).(func(string, int, int) uint32); ok { + r0 = rf(deviceName, start, end) + } else { + r0 = ret.Get(0).(uint32) + } + + var r1 errors.EdgeX + if rf, ok := ret.Get(1).(func(string, int, int) errors.EdgeX); ok { + r1 = rf(deviceName, start, end) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(errors.EdgeX) + } + } + + return r0, r1 +} + // ReadingCountByResourceName provides a mock function with given fields: resourceName func (_m *DBClient) ReadingCountByResourceName(resourceName string) (uint32, errors.EdgeX) { ret := _m.Called(resourceName) @@ -519,6 +542,63 @@ func (_m *DBClient) ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName s return r0, r1 } +// ReadingsByDeviceNameAndResourceNamesAndTimeRange provides a mock function with given fields: deviceName, resourceNames, start, end, offset, limit +func (_m *DBClient) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start int, end int, offset int, limit int) ([]models.Reading, uint32, errors.EdgeX) { + ret := _m.Called(deviceName, resourceNames, start, end, offset, limit) + + var r0 []models.Reading + if rf, ok := ret.Get(0).(func(string, []string, int, int, int, int) []models.Reading); ok { + r0 = rf(deviceName, resourceNames, start, end, offset, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]models.Reading) + } + } + + var r1 uint32 + if rf, ok := ret.Get(1).(func(string, []string, int, int, int, int) uint32); ok { + r1 = rf(deviceName, resourceNames, start, end, offset, limit) + } else { + r1 = ret.Get(1).(uint32) + } + + var r2 errors.EdgeX + if rf, ok := ret.Get(2).(func(string, []string, int, int, int, int) errors.EdgeX); ok { + r2 = rf(deviceName, resourceNames, start, end, offset, limit) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).(errors.EdgeX) + } + } + + return r0, r1, r2 +} + +// ReadingsByDeviceNameAndTimeRange provides a mock function with given fields: deviceName, start, end, offset, limit +func (_m *DBClient) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]models.Reading, errors.EdgeX) { + ret := _m.Called(deviceName, start, end, offset, limit) + + var r0 []models.Reading + if rf, ok := ret.Get(0).(func(string, int, int, int, int) []models.Reading); ok { + r0 = rf(deviceName, start, end, offset, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]models.Reading) + } + } + + var r1 errors.EdgeX + if rf, ok := ret.Get(1).(func(string, int, int, int, int) errors.EdgeX); ok { + r1 = rf(deviceName, start, end, offset, limit) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(errors.EdgeX) + } + } + + return r0, r1 +} + // ReadingsByResourceName provides a mock function with given fields: offset, limit, resourceName func (_m *DBClient) ReadingsByResourceName(offset int, limit int, resourceName string) ([]models.Reading, errors.EdgeX) { ret := _m.Called(offset, limit, resourceName) diff --git a/internal/core/data/router.go b/internal/core/data/router.go index 7c25d4632a..4a2ba850fb 100644 --- a/internal/core/data/router.go +++ b/internal/core/data/router.go @@ -51,6 +51,7 @@ func LoadRestRoutes(r *mux.Router, dic *di.Container) { r.HandleFunc(common.ApiReadingByResourceNameAndTimeRangeRoute, rc.ReadingsByResourceNameAndTimeRange).Methods(http.MethodGet) r.HandleFunc(common.ApiReadingByDeviceNameAndResourceNameRoute, rc.ReadingsByDeviceNameAndResourceName).Methods(http.MethodGet) r.HandleFunc(common.ApiReadingByDeviceNameAndResourceNameAndTimeRangeRoute, rc.ReadingsByDeviceNameAndResourceNameAndTimeRange).Methods(http.MethodGet) + r.HandleFunc(common.ApiReadingByDeviceNameAndTimeRangeRoute, rc.ReadingsByDeviceNameAndResourceNamesAndTimeRange).Methods(http.MethodGet) r.Use(correlation.ManageHeader) r.Use(correlation.LoggingMiddleware(container.LoggingClientFrom(dic.Get))) diff --git a/internal/pkg/infrastructure/redis/client.go b/internal/pkg/infrastructure/redis/client.go index 016cbb6ecd..e67fc7d7e6 100644 --- a/internal/pkg/infrastructure/redis/client.go +++ b/internal/pkg/infrastructure/redis/client.go @@ -678,6 +678,44 @@ func (c *Client) ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName stri return readings, nil } +func (c *Client) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) (readings []model.Reading, totalCount uint32, err errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + readings, totalCount, err = readingsByDeviceNameAndResourceNamesAndTimeRange(conn, deviceName, resourceNames, start, end, offset, limit) + if err != nil { + return readings, totalCount, errors.NewCommonEdgeX(errors.Kind(err), + fmt.Sprintf("fail to query readings by deviceName %s, resourceNames %v and time range %v ~ %v", deviceName, resourceNames, start, end), err) + } + + return readings, totalCount, nil +} + +func (c *Client) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) (readings []model.Reading, err errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + readings, err = readingsByDeviceNameAndTimeRange(conn, deviceName, start, end, offset, limit) + if err != nil { + return readings, errors.NewCommonEdgeX(errors.Kind(err), + fmt.Sprintf("fail to query readings by deviceName %s, and time range %v ~ %v", deviceName, start, end), err) + } + + return readings, nil +} + +func (c *Client) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + count, edgeXerr := getMemberCountByScoreRange(conn, CreateKey(ReadingsCollectionDeviceName, deviceName), start, end) + if edgeXerr != nil { + return 0, errors.NewCommonEdgeXWrapper(edgeXerr) + } + + return count, nil +} + // AddProvisionWatcher adds a new provision watcher func (c *Client) AddProvisionWatcher(pw model.ProvisionWatcher) (model.ProvisionWatcher, errors.EdgeX) { conn := c.Pool.Get() diff --git a/internal/pkg/infrastructure/redis/queries.go b/internal/pkg/infrastructure/redis/queries.go index 2a561cea67..b1e70b9292 100644 --- a/internal/pkg/infrastructure/redis/queries.go +++ b/internal/pkg/infrastructure/redis/queries.go @@ -258,6 +258,47 @@ func objectsByKeys(conn redis.Conn, setMethod string, offset int, limit int, red return objects, nil } +// unionObjectsByKeysAndScoreRange returns objects resulting from the union of all the given sets with specified score range, offset, and limit +func unionObjectsByKeysAndScoreRange(conn redis.Conn, start, end, offset, limit int, redisKeys ...string) ([][]byte, uint32, errors.EdgeX) { + return objectsByKeysAndScoreRange(conn, ZUNIONSTORE, start, end, offset, limit, redisKeys...) +} + +// objectsByKeysAndScoreRange returns objects resulting from the set method of all the given sets with specified score range, offset, and limit. The data set method could be either ZINTERSTORE or ZUNIONSTORE +func objectsByKeysAndScoreRange(conn redis.Conn, setMethod string, start, end, offset, limit int, redisKeys ...string) (objects [][]byte, totalCount uint32, edgeXerr errors.EdgeX) { + // build up the redis command arguments + args := redis.Args{} + cacheSet := uuid.New().String() + args = append(args, cacheSet) + args = append(args, strconv.Itoa(len(redisKeys))) + for _, key := range redisKeys { + args = args.Add(key) + } + + // create a temporary sorted set, cacheSet, resulting from the specified setMethod + _, err := conn.Do(setMethod, args...) + if err != nil { + return nil, totalCount, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to execute %s command with args %v", setMethod, args), err) + } + + // get the total count of the temporary sorted set + if totalCount, edgeXerr = getMemberCountByScoreRange(conn, cacheSet, start, end); edgeXerr != nil { + return nil, totalCount, edgeXerr + } + + // get objects from the temporary sorted set + if objects, edgeXerr = getObjectsByScoreRange(conn, cacheSet, start, end, offset, limit); edgeXerr != nil { + return nil, totalCount, edgeXerr + } + + // clean up unused temporary sorted set + _, err = redis.Int(conn.Do(DEL, cacheSet)) + if err != nil { + return nil, totalCount, errors.NewCommonEdgeX(errors.KindDatabaseError, "cache set deletion failed", err) + } + + return objects, totalCount, nil +} + // idFromStoredKey extracts Id from the store key func idFromStoredKey(storeKey string) string { substrings := strings.Split(storeKey, DBKeySeparator) diff --git a/internal/pkg/infrastructure/redis/reading.go b/internal/pkg/infrastructure/redis/reading.go index a8a7895625..a17c1b592e 100644 --- a/internal/pkg/infrastructure/redis/reading.go +++ b/internal/pkg/infrastructure/redis/reading.go @@ -234,6 +234,20 @@ func readingsByDeviceNameAndResourceNameAndTimeRange(conn redis.Conn, deviceName return convertObjectsToReadings(objects) } +func readingsByDeviceNameAndResourceNamesAndTimeRange(conn redis.Conn, deviceName string, resourceNames []string, startTime int, endTime int, offset int, limit int) (readings []models.Reading, totalCount uint32, err errors.EdgeX) { + var redisKeys []string + for _, resourceName := range resourceNames { + redisKeys = append(redisKeys, CreateKey(ReadingsCollectionDeviceNameResourceName, deviceName, resourceName)) + } + + objects, totalCount, err := unionObjectsByKeysAndScoreRange(conn, startTime, endTime, offset, limit, redisKeys...) + if err != nil { + return readings, totalCount, err + } + readings, err = convertObjectsToReadings(objects) + return readings, totalCount, err +} + // readingsByTimeRange query readings by time range, offset, and limit func readingsByTimeRange(conn redis.Conn, startTime int, endTime int, offset int, limit int) (readings []models.Reading, edgeXerr errors.EdgeX) { objects, edgeXerr := getObjectsByScoreRange(conn, ReadingsCollectionOrigin, startTime, endTime, offset, limit) @@ -251,6 +265,14 @@ func readingsByResourceNameAndTimeRange(conn redis.Conn, resourceName string, st return convertObjectsToReadings(objects) } +func readingsByDeviceNameAndTimeRange(conn redis.Conn, deviceName string, startTime int, endTime int, offset int, limit int) (readings []models.Reading, edgeXerr errors.EdgeX) { + objects, edgeXerr := getObjectsByScoreRange(conn, CreateKey(ReadingsCollectionDeviceName, deviceName), startTime, endTime, offset, limit) + if edgeXerr != nil { + return readings, edgeXerr + } + return convertObjectsToReadings(objects) +} + func convertObjectsToReadings(objects [][]byte) (readings []models.Reading, edgeXerr errors.EdgeX) { readings = make([]models.Reading, len(objects)) var alias struct { diff --git a/openapi/v2/core-data.yaml b/openapi/v2/core-data.yaml index 578418a64d..d93bb67bc1 100644 --- a/openapi/v2/core-data.yaml +++ b/openapi/v2/core-data.yaml @@ -1706,6 +1706,94 @@ paths: examples: 500Example: $ref: '#/components/examples/500Example' + /reading/device/name/{deviceName}/start/{start}/end/{end}: + parameters: + - $ref: '#/components/parameters/correlatedRequestHeader' + - name: deviceName + in: path + required: true + schema: + type: string + description: "The device name name of readings" + - name: start + in: path + required: true + schema: + type: integer + description: "Unix timestamp (nanoseconds) indicating the start of a date/time range" + - name: end + in: path + required: true + schema: + type: integer + description: "Unix timestamp (nanoseconds) indicating the end of a date/time range" + - $ref: '#/components/parameters/offsetParam' + - $ref: '#/components/parameters/limitParam' + get: + summary: "Return a paginated range of readings by deviceName and specified time range while also allowing multiple resource names specified in the request body as query criteria. If resource names or request body is empty, return all the readings that meet deviceName and specified time range." + requestBody: + required: false + content: + application/json: + schema: + $ref: '#/components/schemas/AddEventRequest' + example: + resourceNames: + - resource-001 + - resource-002 + responses: + '200': + description: "OK" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/MultiReadingsResponse' + examples: + MultiReadingsExample: + $ref: '#/components/examples/ReadingsByResourceNameAndTimeRangeExample' + '400': + description: "Request is in an invalid state." + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ErrorResponse' + examples: + 400Example: + $ref: '#/components/examples/400Example' + '416': + description: "Request range is not satisfiable" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 416Example: + $ref: '#/components/examples/416Example' + '500': + description: "An unexpected error occurred on the server" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ErrorResponse' + examples: + 500Example: + $ref: '#/components/examples/500Example' /config: get: summary: "Returns the current configuration of the service."