Skip to content

Commit

Permalink
Merge pull request #2840 from judehung/issue-2839
Browse files Browse the repository at this point in the history
feat(data): Implement DELETE /event/scrub V2 API
  • Loading branch information
cloudxxx8 authored Nov 19, 2020
2 parents ef0f593 + 7f998f8 commit 1173b5c
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 35 deletions.
11 changes: 4 additions & 7 deletions internal/core/data/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,17 @@ import (
"fmt"
"sync"

"github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"

"github.com/edgexfoundry/go-mod-bootstrap/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/bootstrap/startup"
"github.com/edgexfoundry/go-mod-bootstrap/di"

dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
"github.com/edgexfoundry/edgex-go/internal/core/data/v2"
v2DataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/v2/bootstrap/container"
errorContainer "github.com/edgexfoundry/edgex-go/internal/pkg/container"
"github.com/edgexfoundry/edgex-go/internal/pkg/errorconcept"

"github.com/edgexfoundry/go-mod-bootstrap/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/bootstrap/startup"
"github.com/edgexfoundry/go-mod-bootstrap/di"
"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-core-contracts/clients/metadata"
"github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
"github.com/edgexfoundry/go-mod-messaging/messaging"
msgTypes "github.com/edgexfoundry/go-mod-messaging/pkg/types"

Expand Down
14 changes: 13 additions & 1 deletion internal/core/data/v2/application/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
v2DataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/v2/bootstrap/container"
"github.com/edgexfoundry/edgex-go/internal/pkg/correlation"

"github.com/edgexfoundry/go-mod-bootstrap/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/di"
"github.com/edgexfoundry/go-mod-core-contracts/clients"
Expand Down Expand Up @@ -158,6 +157,19 @@ func EventCountByDevice(deviceName string, dic *di.Container) (uint32, errors.Ed
return count, nil
}

// The DeletePushedEvents function will be invoked by controller functions
// and then invokes DeletePushedEvents function in the infrastructure layer to remove
// all events that have been pushed (pushed timestamp is greater than zero)
func DeletePushedEvents(dic *di.Container) errors.EdgeX {
dbClient := v2DataContainer.DBClientFrom(dic.Get)

err := dbClient.DeletePushedEvents()
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
return nil
}

// UpdateEventPushedById updates event pushed timestamp per incoming event id
func UpdateEventPushedById(id string, dic *di.Container) errors.EdgeX {
dbClient := v2DataContainer.DBClientFrom(dic.Get)
Expand Down
15 changes: 15 additions & 0 deletions internal/core/data/v2/application/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func newMockDB(persist bool) *dbMock.DBClient {
myMock.On("DeleteEventById", testUUIDString).Return(nil)
myMock.On("EventTotalCount").Return(testEventCount, nil)
myMock.On("EventCountByDevice", testDeviceName).Return(testEventCount, nil)
myMock.On("DeletePushedEvents").Return(nil)
}

return myMock
Expand Down Expand Up @@ -257,3 +258,17 @@ func TestEventCountByDevice(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, testEventCount, count, "Event total count is not expected")
}

func TestDeletePushedEvents(t *testing.T) {
dbClientMock := newMockDB(true)

dic := mocks.NewMockDIC()
dic.Update(di.ServiceConstructorMap{
v2DataContainer.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
},
})

err := DeletePushedEvents(dic)
require.NoError(t, err)
}
25 changes: 25 additions & 0 deletions internal/core/data/v2/controller/http/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,31 @@ func (ec *EventController) EventCountByDevice(w http.ResponseWriter, r *http.Req
pkg.Encode(eventResponse, w, lc) // encode and send out the response
}

func (ec *EventController) DeletePushedEvents(w http.ResponseWriter, r *http.Request) {
// retrieve all the service injections from bootstrap
lc := container.LoggingClientFrom(ec.dic.Get)

ctx := r.Context()
correlationId := correlation.FromContext(ctx)

var response interface{}
var statusCode int

err := application.DeletePushedEvents(ec.dic)
if err != nil {
lc.Debug(err.DebugMessages(), clients.CorrelationHeader, correlationId)
response = commonDTO.NewBaseResponse("", err.Message(), err.Code())
statusCode = err.Code()
} else {
response = commonDTO.NewBaseResponse("", "", http.StatusAccepted)
statusCode = http.StatusAccepted
}

utils.WriteHttpHeader(w, ctx, statusCode)
// encode and send out the response
pkg.Encode(response, w, lc)
}

func (ec *EventController) UpdateEventPushedById(w http.ResponseWriter, r *http.Request) {
// retrieve all the service injections from bootstrap
lc := container.LoggingClientFrom(ec.dic.Get)
Expand Down
27 changes: 27 additions & 0 deletions internal/core/data/v2/controller/http/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,33 @@ func TestEventCountByDevice(t *testing.T) {
assert.Equal(t, deviceName, actualResponse.DeviceName, "Device name in the response body is not expected")
}

func TestDeletePushedEvents(t *testing.T) {
dbClientMock := &dbMock.DBClient{}
dbClientMock.On("DeletePushedEvents").Return(nil)

dic := mocks.NewMockDIC()
dic.Update(di.ServiceConstructorMap{
v2DataContainer.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
},
})
ec := NewEventController(dic)

req, err := http.NewRequest(http.MethodDelete, v2.ApiEventScrubRoute, http.NoBody)
require.NoError(t, err)

recorder := httptest.NewRecorder()
handler := http.HandlerFunc(ec.DeletePushedEvents)
handler.ServeHTTP(recorder, req)

var actualResponse common.BaseResponse
err = json.Unmarshal(recorder.Body.Bytes(), &actualResponse)

assert.Equal(t, v2.ApiVersion, actualResponse.ApiVersion, "API Version not as expected")
assert.Equal(t, http.StatusAccepted, recorder.Result().StatusCode, "HTTP status code not as expected")
assert.Empty(t, actualResponse.Message, "Message should be empty when it is successful")
}

func TestUpdateEventPushedById(t *testing.T) {
expectedResponseCode := http.StatusMultiStatus

Expand Down
1 change: 1 addition & 0 deletions internal/core/data/v2/infrastructure/interfaces/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ type DBClient interface {
UpdateEventPushedById(id string) errors.EdgeX
AllEvents(offset int, limit int) ([]model.Event, errors.EdgeX)
EventsByDeviceName(offset int, limit int, name string) ([]model.Event, errors.EdgeX)
DeletePushedEvents() errors.EdgeX
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/core/data/v2/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func LoadRestRoutes(r *mux.Router, dic *di.Container) {
r.HandleFunc(v2Constant.ApiEventPushRoute, ec.UpdateEventPushedById).Methods(http.MethodPut)
r.HandleFunc(v2Constant.ApiAllEventRoute, ec.AllEvents).Methods(http.MethodGet)
r.HandleFunc(v2Constant.ApiEventByDeviceNameRoute, ec.EventsByDeviceName).Methods(http.MethodGet)
r.HandleFunc(v2Constant.ApiEventScrubRoute, ec.DeletePushedEvents).Methods(http.MethodDelete)

r.Use(correlation.ManageHeader)
r.Use(correlation.OnResponseComplete)
Expand Down
46 changes: 24 additions & 22 deletions internal/pkg/v2/infrastructure/redis/dbcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,31 @@ package redis
// Redis commmands used in this project
// Reference: https://redis.io/commands
const (
MULTI = "MULTI"
SET = "SET"
GET = "GET"
EXISTS = "EXISTS"
DEL = "DEL"
HSET = "HSET"
HGET = "HGET"
HEXISTS = "HEXISTS"
HDEL = "HDEL"
SADD = "SADD"
SREM = "SREM"
ZADD = "ZADD"
ZREM = "ZREM"
EXEC = "EXEC"
ZRANGE = "ZRANGE"
ZREVRANGE = "ZREVRANGE"
MGET = "MGET"
ZCARD = "ZCARD"
ZCOUNT = "ZCOUNT"
UNLINK = "UNLINK"
MULTI = "MULTI"
SET = "SET"
GET = "GET"
EXISTS = "EXISTS"
DEL = "DEL"
HSET = "HSET"
HGET = "HGET"
HEXISTS = "HEXISTS"
HDEL = "HDEL"
SADD = "SADD"
SREM = "SREM"
ZADD = "ZADD"
ZREM = "ZREM"
EXEC = "EXEC"
ZRANGE = "ZRANGE"
ZREVRANGE = "ZREVRANGE"
MGET = "MGET"
ZCARD = "ZCARD"
ZCOUNT = "ZCOUNT"
UNLINK = "UNLINK"
ZRANGEBYSCORE = "ZRANGEBYSCORE"
)

const (
InfiniteMin = "-inf"
InfiniteMax = "+inf"
InfiniteMin = "-inf"
InfiniteMax = "+inf"
GreaterThanZero = "(0"
)
100 changes: 99 additions & 1 deletion internal/pkg/v2/infrastructure/redis/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,86 @@ const (
EventsCollectionReadings = EventsCollection + ":readings"
)

// asyncDeleteEventsByIds deletes all events with given event Ids. This function is implemented to be run as a separate
// goroutine in the background to achieve better performance, so this function return nothing. When encountering any
// errors during deletion, this function will simply log the error.
func (c *Client) asyncDeleteEventsByIds(eventIds []string) {
conn := c.Pool.Get()
defer conn.Close()

//start a transaction to get all events
events, edgeXerr := getObjectsByIds(conn, common.ConvertStringsToInterfaces(eventIds))
if edgeXerr != nil {
c.loggingClient.Error(fmt.Sprintf("Deleted events failed while retrieving objects by Ids. Err: %s", edgeXerr.DebugMessages()))
return
}

// iterate each events for deletion in batch
queriesInQueue := 0
e := models.Event{}
_ = conn.Send(MULTI)
for i, event := range events {
err := json.Unmarshal(event, &e)
if err != nil {
c.loggingClient.Error(fmt.Sprintf("unable to marshal event. Err: %s", err.Error()))
continue
}
storedKey := eventStoredKey(e.Id)
_ = conn.Send(UNLINK, storedKey)
_ = conn.Send(UNLINK, fmt.Sprintf("%s:%s", EventsCollectionReadings, e.Id))
_ = conn.Send(ZREM, EventsCollection, storedKey)
_ = conn.Send(ZREM, EventsCollectionCreated, storedKey)
_ = conn.Send(ZREM, EventsCollectionPushed, storedKey)
_ = conn.Send(ZREM, fmt.Sprintf("%s:%s", EventsCollectionDeviceName, e.DeviceName), storedKey)
queriesInQueue++

if queriesInQueue >= c.BatchSize {
_, err = conn.Do(EXEC)
if err != nil {
c.loggingClient.Error(fmt.Sprintf("unable to execute batch event deletion. Err: %s", err.Error()))
continue
}
// reset queriesInQueue to zero if EXEC is successfully executed without error
queriesInQueue = 0
// rerun another transaction when event iteration is not finished
if i < len(events)-1 {
_ = conn.Send(MULTI)
}
}
}

if queriesInQueue > 0 {
_, err := conn.Do(EXEC)
if err != nil {
c.loggingClient.Error(fmt.Sprintf("unable to execute batch event deletion. Err: %s", err.Error()))
}
}
}

// DeletePushedEvents deletes all pushed events and corresponding readings. This function is implemented to starts up
// two goroutines to delete readings and events in the bckground to achieve better performance.
func (c *Client) DeletePushedEvents() (edgeXerr errors.EdgeX) {
conn := c.Pool.Get()
defer conn.Close()

eventIds, readingIds, err := getPushedEventReadingIds(conn)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
c.loggingClient.Debug(fmt.Sprintf("Prepare to delete %v readings", len(readingIds)))
go c.asyncDeleteReadingsByIds(readingIds)
c.loggingClient.Debug(fmt.Sprintf("Prepare to delete %v events", len(eventIds)))
go c.asyncDeleteEventsByIds(eventIds)

return nil
}

// ************************** DB HELPER FUNCTIONS ***************************
// eventStoredKey return the event's stored key which combines the collection name and object id
func eventStoredKey(id string) string {
return fmt.Sprintf("%s:%s", EventsCollection, id)
}

// ************************** DB HELPER FUNCTIONS ***************************
func addEvent(conn redis.Conn, e models.Event) (addedEvent models.Event, edgeXerr errors.EdgeX) {
// query Event by Id first to avoid the Id conflict
_, edgeXerr = eventById(conn, e.Id)
Expand Down Expand Up @@ -131,6 +205,30 @@ func deleteEventById(conn redis.Conn, id string) (edgeXerr errors.EdgeX) {
return edgeXerr
}

func getPushedEventReadingIds(conn redis.Conn) (eventIds []string, readingIds []string, edgeXerr errors.EdgeX) {
pushedEventIds, err := redis.Strings(conn.Do(ZRANGEBYSCORE, EventsCollectionPushed, GreaterThanZero, InfiniteMax))
if err != nil {
return nil, nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "retrieve all pushed event ids failed", err)
}
pushedEvents, edgeXerr := getObjectsByIds(conn, common.ConvertStringsToInterfaces(pushedEventIds))
if edgeXerr != nil {
return nil, nil, edgeXerr
}
e := models.Event{}
for _, pushedEvent := range pushedEvents {
err = json.Unmarshal(pushedEvent, &e)
if err != nil {
return nil, nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to marshal event", err)
}
rIds, err := redis.Strings(conn.Do(ZRANGE, fmt.Sprintf("%s:%s", EventsCollectionReadings, e.Id), 0, -1))
if err != nil {
return nil, nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("retrieve all reading Ids of pushed event %s failed", e.Id), err)
}
readingIds = append(readingIds, rIds...)
}
return pushedEventIds, readingIds, nil
}

func eventById(conn redis.Conn, id string) (event models.Event, edgeXerr errors.EdgeX) {
edgeXerr = getObjectById(conn, eventStoredKey(id), &event)
if edgeXerr != nil {
Expand Down
Loading

0 comments on commit 1173b5c

Please sign in to comment.