diff --git a/internal/core/data/init.go b/internal/core/data/init.go index edd9ddfb10..2fe6f069ca 100644 --- a/internal/core/data/init.go +++ b/internal/core/data/init.go @@ -19,20 +19,17 @@ import ( "fmt" "sync" - "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local" - - "github.com/edgexfoundry/go-mod-bootstrap/bootstrap/container" - "github.com/edgexfoundry/go-mod-bootstrap/bootstrap/startup" - "github.com/edgexfoundry/go-mod-bootstrap/di" - dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" "github.com/edgexfoundry/edgex-go/internal/core/data/v2" v2DataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/v2/bootstrap/container" errorContainer "github.com/edgexfoundry/edgex-go/internal/pkg/container" "github.com/edgexfoundry/edgex-go/internal/pkg/errorconcept" - + "github.com/edgexfoundry/go-mod-bootstrap/bootstrap/container" + "github.com/edgexfoundry/go-mod-bootstrap/bootstrap/startup" + "github.com/edgexfoundry/go-mod-bootstrap/di" "github.com/edgexfoundry/go-mod-core-contracts/clients" "github.com/edgexfoundry/go-mod-core-contracts/clients/metadata" + "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local" "github.com/edgexfoundry/go-mod-messaging/messaging" msgTypes "github.com/edgexfoundry/go-mod-messaging/pkg/types" diff --git a/internal/core/data/v2/application/event.go b/internal/core/data/v2/application/event.go index 622aa431f6..fb6c1380ab 100644 --- a/internal/core/data/v2/application/event.go +++ b/internal/core/data/v2/application/event.go @@ -13,7 +13,6 @@ import ( dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" v2DataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/v2/bootstrap/container" "github.com/edgexfoundry/edgex-go/internal/pkg/correlation" - "github.com/edgexfoundry/go-mod-bootstrap/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/di" "github.com/edgexfoundry/go-mod-core-contracts/clients" @@ -158,6 +157,19 @@ func EventCountByDevice(deviceName string, dic *di.Container) (uint32, errors.Ed return count, nil } +// The DeletePushedEvents function will be invoked by controller functions +// and then invokes DeletePushedEvents function in the infrastructure layer to remove +// all events that have been pushed (pushed timestamp is greater than zero) +func DeletePushedEvents(dic *di.Container) errors.EdgeX { + dbClient := v2DataContainer.DBClientFrom(dic.Get) + + err := dbClient.DeletePushedEvents() + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + return nil +} + // UpdateEventPushedById updates event pushed timestamp per incoming event id func UpdateEventPushedById(id string, dic *di.Container) errors.EdgeX { dbClient := v2DataContainer.DBClientFrom(dic.Get) diff --git a/internal/core/data/v2/application/event_test.go b/internal/core/data/v2/application/event_test.go index cfa94342aa..977683564f 100644 --- a/internal/core/data/v2/application/event_test.go +++ b/internal/core/data/v2/application/event_test.go @@ -89,6 +89,7 @@ func newMockDB(persist bool) *dbMock.DBClient { myMock.On("DeleteEventById", testUUIDString).Return(nil) myMock.On("EventTotalCount").Return(testEventCount, nil) myMock.On("EventCountByDevice", testDeviceName).Return(testEventCount, nil) + myMock.On("DeletePushedEvents").Return(nil) } return myMock @@ -257,3 +258,17 @@ func TestEventCountByDevice(t *testing.T) { require.NoError(t, err) assert.Equal(t, testEventCount, count, "Event total count is not expected") } + +func TestDeletePushedEvents(t *testing.T) { + dbClientMock := newMockDB(true) + + dic := mocks.NewMockDIC() + dic.Update(di.ServiceConstructorMap{ + v2DataContainer.DBClientInterfaceName: func(get di.Get) interface{} { + return dbClientMock + }, + }) + + err := DeletePushedEvents(dic) + require.NoError(t, err) +} diff --git a/internal/core/data/v2/controller/http/event.go b/internal/core/data/v2/controller/http/event.go index 89ccdda2c3..628bc473af 100644 --- a/internal/core/data/v2/controller/http/event.go +++ b/internal/core/data/v2/controller/http/event.go @@ -215,6 +215,31 @@ func (ec *EventController) EventCountByDevice(w http.ResponseWriter, r *http.Req pkg.Encode(eventResponse, w, lc) // encode and send out the response } +func (ec *EventController) DeletePushedEvents(w http.ResponseWriter, r *http.Request) { + // retrieve all the service injections from bootstrap + lc := container.LoggingClientFrom(ec.dic.Get) + + ctx := r.Context() + correlationId := correlation.FromContext(ctx) + + var response interface{} + var statusCode int + + err := application.DeletePushedEvents(ec.dic) + if err != nil { + lc.Debug(err.DebugMessages(), clients.CorrelationHeader, correlationId) + response = commonDTO.NewBaseResponse("", err.Message(), err.Code()) + statusCode = err.Code() + } else { + response = commonDTO.NewBaseResponse("", "", http.StatusAccepted) + statusCode = http.StatusAccepted + } + + utils.WriteHttpHeader(w, ctx, statusCode) + // encode and send out the response + pkg.Encode(response, w, lc) +} + func (ec *EventController) UpdateEventPushedById(w http.ResponseWriter, r *http.Request) { // retrieve all the service injections from bootstrap lc := container.LoggingClientFrom(ec.dic.Get) diff --git a/internal/core/data/v2/controller/http/event_test.go b/internal/core/data/v2/controller/http/event_test.go index d47dd08541..a98a3e6069 100644 --- a/internal/core/data/v2/controller/http/event_test.go +++ b/internal/core/data/v2/controller/http/event_test.go @@ -410,6 +410,33 @@ func TestEventCountByDevice(t *testing.T) { assert.Equal(t, deviceName, actualResponse.DeviceName, "Device name in the response body is not expected") } +func TestDeletePushedEvents(t *testing.T) { + dbClientMock := &dbMock.DBClient{} + dbClientMock.On("DeletePushedEvents").Return(nil) + + dic := mocks.NewMockDIC() + dic.Update(di.ServiceConstructorMap{ + v2DataContainer.DBClientInterfaceName: func(get di.Get) interface{} { + return dbClientMock + }, + }) + ec := NewEventController(dic) + + req, err := http.NewRequest(http.MethodDelete, v2.ApiEventScrubRoute, http.NoBody) + require.NoError(t, err) + + recorder := httptest.NewRecorder() + handler := http.HandlerFunc(ec.DeletePushedEvents) + handler.ServeHTTP(recorder, req) + + var actualResponse common.BaseResponse + err = json.Unmarshal(recorder.Body.Bytes(), &actualResponse) + + assert.Equal(t, v2.ApiVersion, actualResponse.ApiVersion, "API Version not as expected") + assert.Equal(t, http.StatusAccepted, recorder.Result().StatusCode, "HTTP status code not as expected") + assert.Empty(t, actualResponse.Message, "Message should be empty when it is successful") +} + func TestUpdateEventPushedById(t *testing.T) { expectedResponseCode := http.StatusMultiStatus diff --git a/internal/core/data/v2/infrastructure/interfaces/db.go b/internal/core/data/v2/infrastructure/interfaces/db.go index 4821f90731..6b0456c1da 100644 --- a/internal/core/data/v2/infrastructure/interfaces/db.go +++ b/internal/core/data/v2/infrastructure/interfaces/db.go @@ -21,4 +21,5 @@ type DBClient interface { UpdateEventPushedById(id string) errors.EdgeX AllEvents(offset int, limit int) ([]model.Event, errors.EdgeX) EventsByDeviceName(offset int, limit int, name string) ([]model.Event, errors.EdgeX) + DeletePushedEvents() errors.EdgeX } diff --git a/internal/core/data/v2/infrastructure/interfaces/mocks/DBClient.go b/internal/core/data/v2/infrastructure/interfaces/mocks/DBClient.go index cc5def9565..fe6c984703 100644 --- a/internal/core/data/v2/infrastructure/interfaces/mocks/DBClient.go +++ b/internal/core/data/v2/infrastructure/interfaces/mocks/DBClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.2.1. DO NOT EDIT. +// Code generated by mockery v1.0.0. DO NOT EDIT. package mocks @@ -84,6 +84,22 @@ func (_m *DBClient) DeleteEventById(id string) errors.EdgeX { return r0 } +// DeletePushedEvents provides a mock function with given fields: +func (_m *DBClient) DeletePushedEvents() errors.EdgeX { + ret := _m.Called() + + var r0 errors.EdgeX + if rf, ok := ret.Get(0).(func() errors.EdgeX); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(errors.EdgeX) + } + } + + return r0 +} + // EventById provides a mock function with given fields: id func (_m *DBClient) EventById(id string) (models.Event, errors.EdgeX) { ret := _m.Called(id) diff --git a/internal/core/data/v2/router.go b/internal/core/data/v2/router.go index 1bb5e90314..991c5b2a08 100644 --- a/internal/core/data/v2/router.go +++ b/internal/core/data/v2/router.go @@ -32,6 +32,7 @@ func LoadRestRoutes(r *mux.Router, dic *di.Container) { r.HandleFunc(v2Constant.ApiEventPushRoute, ec.UpdateEventPushedById).Methods(http.MethodPut) r.HandleFunc(v2Constant.ApiAllEventRoute, ec.AllEvents).Methods(http.MethodGet) r.HandleFunc(v2Constant.ApiEventByDeviceNameRoute, ec.EventsByDeviceName).Methods(http.MethodGet) + r.HandleFunc(v2Constant.ApiEventScrubRoute, ec.DeletePushedEvents).Methods(http.MethodDelete) r.Use(correlation.ManageHeader) r.Use(correlation.OnResponseComplete) diff --git a/internal/pkg/v2/infrastructure/redis/dbcommands.go b/internal/pkg/v2/infrastructure/redis/dbcommands.go index d377fb5ef6..cde18dd0d0 100644 --- a/internal/pkg/v2/infrastructure/redis/dbcommands.go +++ b/internal/pkg/v2/infrastructure/redis/dbcommands.go @@ -3,29 +3,31 @@ package redis // Redis commmands used in this project // Reference: https://redis.io/commands const ( - MULTI = "MULTI" - SET = "SET" - GET = "GET" - EXISTS = "EXISTS" - DEL = "DEL" - HSET = "HSET" - HGET = "HGET" - HEXISTS = "HEXISTS" - HDEL = "HDEL" - SADD = "SADD" - SREM = "SREM" - ZADD = "ZADD" - ZREM = "ZREM" - EXEC = "EXEC" - ZRANGE = "ZRANGE" - ZREVRANGE = "ZREVRANGE" - MGET = "MGET" - ZCARD = "ZCARD" - ZCOUNT = "ZCOUNT" - UNLINK = "UNLINK" + MULTI = "MULTI" + SET = "SET" + GET = "GET" + EXISTS = "EXISTS" + DEL = "DEL" + HSET = "HSET" + HGET = "HGET" + HEXISTS = "HEXISTS" + HDEL = "HDEL" + SADD = "SADD" + SREM = "SREM" + ZADD = "ZADD" + ZREM = "ZREM" + EXEC = "EXEC" + ZRANGE = "ZRANGE" + ZREVRANGE = "ZREVRANGE" + MGET = "MGET" + ZCARD = "ZCARD" + ZCOUNT = "ZCOUNT" + UNLINK = "UNLINK" + ZRANGEBYSCORE = "ZRANGEBYSCORE" ) const ( - InfiniteMin = "-inf" - InfiniteMax = "+inf" + InfiniteMin = "-inf" + InfiniteMax = "+inf" + GreaterThanZero = "(0" ) diff --git a/internal/pkg/v2/infrastructure/redis/event.go b/internal/pkg/v2/infrastructure/redis/event.go index ae82dd37b4..350b3949c8 100644 --- a/internal/pkg/v2/infrastructure/redis/event.go +++ b/internal/pkg/v2/infrastructure/redis/event.go @@ -25,12 +25,86 @@ const ( EventsCollectionReadings = EventsCollection + ":readings" ) +// asyncDeleteEventsByIds deletes all events with given event Ids. This function is implemented to be run as a separate +// goroutine in the background to achieve better performance, so this function return nothing. When encountering any +// errors during deletion, this function will simply log the error. +func (c *Client) asyncDeleteEventsByIds(eventIds []string) { + conn := c.Pool.Get() + defer conn.Close() + + //start a transaction to get all events + events, edgeXerr := getObjectsByIds(conn, common.ConvertStringsToInterfaces(eventIds)) + if edgeXerr != nil { + c.loggingClient.Error(fmt.Sprintf("Deleted events failed while retrieving objects by Ids. Err: %s", edgeXerr.DebugMessages())) + return + } + + // iterate each events for deletion in batch + queriesInQueue := 0 + e := models.Event{} + _ = conn.Send(MULTI) + for i, event := range events { + err := json.Unmarshal(event, &e) + if err != nil { + c.loggingClient.Error(fmt.Sprintf("unable to marshal event. Err: %s", err.Error())) + continue + } + storedKey := eventStoredKey(e.Id) + _ = conn.Send(UNLINK, storedKey) + _ = conn.Send(UNLINK, fmt.Sprintf("%s:%s", EventsCollectionReadings, e.Id)) + _ = conn.Send(ZREM, EventsCollection, storedKey) + _ = conn.Send(ZREM, EventsCollectionCreated, storedKey) + _ = conn.Send(ZREM, EventsCollectionPushed, storedKey) + _ = conn.Send(ZREM, fmt.Sprintf("%s:%s", EventsCollectionDeviceName, e.DeviceName), storedKey) + queriesInQueue++ + + if queriesInQueue >= c.BatchSize { + _, err = conn.Do(EXEC) + if err != nil { + c.loggingClient.Error(fmt.Sprintf("unable to execute batch event deletion. Err: %s", err.Error())) + continue + } + // reset queriesInQueue to zero if EXEC is successfully executed without error + queriesInQueue = 0 + // rerun another transaction when event iteration is not finished + if i < len(events)-1 { + _ = conn.Send(MULTI) + } + } + } + + if queriesInQueue > 0 { + _, err := conn.Do(EXEC) + if err != nil { + c.loggingClient.Error(fmt.Sprintf("unable to execute batch event deletion. Err: %s", err.Error())) + } + } +} + +// DeletePushedEvents deletes all pushed events and corresponding readings. This function is implemented to starts up +// two goroutines to delete readings and events in the bckground to achieve better performance. +func (c *Client) DeletePushedEvents() (edgeXerr errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + eventIds, readingIds, err := getPushedEventReadingIds(conn) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + c.loggingClient.Debug(fmt.Sprintf("Prepare to delete %v readings", len(readingIds))) + go c.asyncDeleteReadingsByIds(readingIds) + c.loggingClient.Debug(fmt.Sprintf("Prepare to delete %v events", len(eventIds))) + go c.asyncDeleteEventsByIds(eventIds) + + return nil +} + +// ************************** DB HELPER FUNCTIONS *************************** // eventStoredKey return the event's stored key which combines the collection name and object id func eventStoredKey(id string) string { return fmt.Sprintf("%s:%s", EventsCollection, id) } -// ************************** DB HELPER FUNCTIONS *************************** func addEvent(conn redis.Conn, e models.Event) (addedEvent models.Event, edgeXerr errors.EdgeX) { // query Event by Id first to avoid the Id conflict _, edgeXerr = eventById(conn, e.Id) @@ -131,6 +205,30 @@ func deleteEventById(conn redis.Conn, id string) (edgeXerr errors.EdgeX) { return edgeXerr } +func getPushedEventReadingIds(conn redis.Conn) (eventIds []string, readingIds []string, edgeXerr errors.EdgeX) { + pushedEventIds, err := redis.Strings(conn.Do(ZRANGEBYSCORE, EventsCollectionPushed, GreaterThanZero, InfiniteMax)) + if err != nil { + return nil, nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "retrieve all pushed event ids failed", err) + } + pushedEvents, edgeXerr := getObjectsByIds(conn, common.ConvertStringsToInterfaces(pushedEventIds)) + if edgeXerr != nil { + return nil, nil, edgeXerr + } + e := models.Event{} + for _, pushedEvent := range pushedEvents { + err = json.Unmarshal(pushedEvent, &e) + if err != nil { + return nil, nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to marshal event", err) + } + rIds, err := redis.Strings(conn.Do(ZRANGE, fmt.Sprintf("%s:%s", EventsCollectionReadings, e.Id), 0, -1)) + if err != nil { + return nil, nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("retrieve all reading Ids of pushed event %s failed", e.Id), err) + } + readingIds = append(readingIds, rIds...) + } + return pushedEventIds, readingIds, nil +} + func eventById(conn redis.Conn, id string) (event models.Event, edgeXerr errors.EdgeX) { edgeXerr = getObjectById(conn, eventStoredKey(id), &event) if edgeXerr != nil { diff --git a/internal/pkg/v2/infrastructure/redis/reading.go b/internal/pkg/v2/infrastructure/redis/reading.go index 1798832c09..b2a17d8136 100644 --- a/internal/pkg/v2/infrastructure/redis/reading.go +++ b/internal/pkg/v2/infrastructure/redis/reading.go @@ -10,7 +10,6 @@ import ( "fmt" "github.com/edgexfoundry/edgex-go/internal/pkg/common" - "github.com/edgexfoundry/go-mod-core-contracts/errors" "github.com/edgexfoundry/go-mod-core-contracts/v2" "github.com/edgexfoundry/go-mod-core-contracts/v2/models" @@ -28,6 +27,62 @@ const ( var emptyBinaryValue = make([]byte, 0) +// asyncDeleteReadingsByIds deletes all readings with given reading Ids. This function is implemented to be run as a +// separate gorountine in the background to achieve better performance, so this function return nothing. When +// encountering any errors during deletion, this function will simply log the error. +func (c *Client) asyncDeleteReadingsByIds(readingIds []string) { + conn := c.Pool.Get() + defer conn.Close() + + var readings [][]byte + //start a transaction to get all readings + readings, edgeXerr := getObjectsByIds(conn, common.ConvertStringsToInterfaces(readingIds)) + if edgeXerr != nil { + c.loggingClient.Error(fmt.Sprintf("Deleted readings failed while retrieving objects by Ids. Err: %s", edgeXerr.DebugMessages())) + return + } + + // iterate each readings for deletion in batch + queriesInQueue := 0 + r := models.BaseReading{} + _ = conn.Send(MULTI) + for i, reading := range readings { + err := json.Unmarshal(reading, &r) + if err != nil { + c.loggingClient.Error(fmt.Sprintf("unable to marshal reading. Err: %s", err.Error())) + continue + } + storedKey := readingStoredKey(r.Id) + _ = conn.Send(UNLINK, storedKey) + _ = conn.Send(ZREM, ReadingsCollection, storedKey) + _ = conn.Send(ZREM, ReadingsCollectionCreated, storedKey) + _ = conn.Send(ZREM, fmt.Sprintf("%s:%s", ReadingsCollectionDeviceName, r.DeviceName), storedKey) + _ = conn.Send(ZREM, fmt.Sprintf("%s:%s", ReadingsCollectionName, r.Name), storedKey) + queriesInQueue++ + + if queriesInQueue >= c.BatchSize { + _, err = conn.Do(EXEC) + if err != nil { + c.loggingClient.Error(fmt.Sprintf("unable to execute batch reading deletion. Err: %s", err.Error())) + continue + } + // reset queriesInQueue to zero if EXEC is successfully executed without error + queriesInQueue = 0 + // rerun another transaction when reading iteration is not finished + if i < len(readings)-1 { + _ = conn.Send(MULTI) + } + } + } + + if queriesInQueue > 0 { + _, err := conn.Do(EXEC) + if err != nil { + c.loggingClient.Error(fmt.Sprintf("unable to execute batch reading deletion. Err: %s", err.Error())) + } + } +} + // readingStoredKey return the reading's stored key which combines the collection name and object id func readingStoredKey(id string) string { return fmt.Sprintf("%s:%s", ReadingsCollection, id) diff --git a/openapi/v2/core-data.yaml b/openapi/v2/core-data.yaml index 778c0eef51..1cb358e6da 100644 --- a/openapi/v2/core-data.yaml +++ b/openapi/v2/core-data.yaml @@ -1149,8 +1149,8 @@ paths: delete: summary: "Remove all pushed events and their associated readings." responses: - '200': - description: "Delete successful" + '202': + description: "Delete request accepted" headers: X-Correlation-ID: $ref: '#/components/headers/correlatedResponseHeader'