Skip to content

Commit

Permalink
Merge pull request edgexfoundry#4676 from weichou1229/issue-4675
Browse files Browse the repository at this point in the history
feat: implement notification retention feature
  • Loading branch information
cloudxxx8 authored Sep 13, 2023
2 parents 27f8dfd + b213c8a commit 507b856
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 39 deletions.
5 changes: 5 additions & 0 deletions cmd/support-notifications/res/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ MessageBus:
Database:
Name: notifications

Retention:
Enabled: false
Interval: 30m # Purging interval defines when the database should be rid of notifications above the high watermark.
MaxCap: 5000 # The maximum capacity defines where the high watermark of notifications should be detected for purging the amount of the notifications to the minimum capacity.
MinCap: 4000 # The minimum capacity defines where the total count of notifications should be returned to during purging.
25 changes: 25 additions & 0 deletions internal/pkg/infrastructure/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,31 @@ func (c *Client) NotificationCountByCategoriesAndLabels(categories []string, lab
return uint32(len(notifications)), nil
}

// NotificationTotalCount returns the total count of Notification from the database
func (c *Client) NotificationTotalCount() (uint32, errors.EdgeX) {
conn := c.Pool.Get()
defer conn.Close()

count, edgeXerr := getMemberNumber(conn, ZCARD, NotificationCollection)
if edgeXerr != nil {
return 0, errors.NewCommonEdgeXWrapper(edgeXerr)
}

return count, nil
}

// LatestNotificationByOffset returns a latest notification by offset
func (c *Client) LatestNotificationByOffset(offset uint32) (model.Notification, errors.EdgeX) {
conn := c.Pool.Get()
defer conn.Close()

notification, edgeXerr := latestNotificationByOffset(conn, int(offset))
if edgeXerr != nil {
return model.Notification{}, errors.NewCommonEdgeXWrapper(edgeXerr)
}
return notification, nil
}

// SubscriptionTotalCount returns the total count of Subscription from the database
func (c *Client) SubscriptionTotalCount() (uint32, errors.EdgeX) {
conn := c.Pool.Get()
Expand Down
18 changes: 18 additions & 0 deletions internal/pkg/infrastructure/redis/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,21 @@ func (c *Client) DeleteProcessedNotificationsByAge(age int64) (err errors.EdgeX)
go c.asyncDeleteTransmissionByStoreKeys(transStoreKeys)
return nil
}

func latestNotificationByOffset(conn redis.Conn, offset int) (notification models.Notification, edgeXerr errors.EdgeX) {
objects, err := getObjectsByRevRange(conn, NotificationCollectionCreated, offset, 1)
if err != nil {
return notification, errors.NewCommonEdgeXWrapper(err)
}
notifications, err := convertObjectsToNotifications(objects)
if err != nil {
return notification, errors.NewCommonEdgeXWrapper(err)
}
if len(notifications) > 1 {
return notification, errors.NewCommonEdgeX(errors.KindServerError, "the query result should not greater than one notification", nil)
}
if len(notifications) == 0 {
return notification, errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, fmt.Sprintf("notification not found from the offset %d", offset), nil)
}
return notifications[0], nil
}
55 changes: 54 additions & 1 deletion internal/support/notifications/application/notification.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
//
// Copyright (C) 2021 IOTech Ltd
// Copyright (C) 2021-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package application

import (
"context"
"fmt"
"sync"
"time"

"github.com/edgexfoundry/edgex-go/internal/pkg/correlation"
"github.com/edgexfoundry/edgex-go/internal/support/notifications/container"
Expand All @@ -20,6 +23,8 @@ import (
"github.com/google/uuid"
)

var asyncPurgeNotificationOnce sync.Once

// The AddNotification function accepts the new Notification model from the controller function
// and then invokes AddNotification function of infrastructure layer to add new Notification
func AddNotification(n models.Notification, ctx context.Context, dic *di.Container) (id string, edgeXerr errors.EdgeX) {
Expand Down Expand Up @@ -195,3 +200,51 @@ func DeleteProcessedNotificationsByAge(age int64, dic *di.Container) errors.Edge
}
return nil
}

// AsyncPurgeNotification purge notifications and related transmissions according to the retention capability.
func AsyncPurgeNotification(interval time.Duration, ctx context.Context, dic *di.Container) {
asyncPurgeNotificationOnce.Do(func() {
go func() {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
timer := time.NewTimer(interval)
for {
timer.Reset(interval)
select {
case <-ctx.Done():
lc.Info("Exiting notification retention")
return
case <-timer.C:
err := purgeNotification(dic)
if err != nil {
lc.Errorf("Failed to purge notifications and transmissions, %v", err)
break
}
}
}
}()
})
}

func purgeNotification(dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dbClient := container.DBClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
total, err := dbClient.NotificationTotalCount()
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), "failed to query notification total count, %v", err)
}
if total >= config.Retention.MaxCap {
lc.Debugf("Purging the notification amount %d to the minimum capacity %d", total, config.Retention.MinCap)
notification, err := dbClient.LatestNotificationByOffset(config.Retention.MinCap)
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query notification with offset '%d'", config.Retention.MinCap), err)
}
now := time.Now().UnixMilli()
age := now - notification.Created
err = dbClient.CleanupNotificationsByAge(age)
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to delete notifications and transmissions by age '%d'", age), err)
}
}
return nil
}
69 changes: 69 additions & 0 deletions internal/support/notifications/application/notification_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//
// Copyright (C) 2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package application

import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/edgex-go/internal/support/notifications/config"
"github.com/edgexfoundry/edgex-go/internal/support/notifications/container"
dbMock "github.com/edgexfoundry/edgex-go/internal/support/notifications/infrastructure/interfaces/mocks"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"
)

func TestPurgeNotification(t *testing.T) {
configuration := &config.ConfigurationStruct{
Retention: config.NotificationRetention{
Enabled: true,
Interval: "1s",
MaxCap: 5,
MinCap: 3,
},
}
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return configuration
},
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return logger.NewMockClient()
},
})

tests := []struct {
name string
notificationCount uint32
}{
{"invoke notification purging", configuration.Retention.MaxCap},
{"not invoke notification purging", configuration.Retention.MinCap},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
dbClientMock := &dbMock.DBClient{}
notification := models.Notification{}
dbClientMock.On("LatestNotificationByOffset", configuration.Retention.MinCap).Return(notification, nil)
dbClientMock.On("NotificationTotalCount").Return(testCase.notificationCount, nil)
dbClientMock.On("CleanupNotificationsByAge", mock.Anything).Return(nil)
dic.Update(di.ServiceConstructorMap{
container.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
},
})
err := purgeNotification(dic)
require.NoError(t, err)
if testCase.notificationCount >= configuration.Retention.MaxCap {
dbClientMock.AssertCalled(t, "CleanupNotificationsByAge", mock.Anything)
} else {
dbClientMock.AssertNotCalled(t, "CleanupNotificationsByAge", mock.Anything)
}
})
}
}
8 changes: 8 additions & 0 deletions internal/support/notifications/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ConfigurationStruct struct {
Service bootstrapConfig.ServiceInfo
MessageBus bootstrapConfig.MessageBusInfo
Smtp SmtpInfo
Retention NotificationRetention
}

type WritableInfo struct {
Expand All @@ -52,6 +53,13 @@ type SmtpInfo struct {
AuthMode string
}

type NotificationRetention struct {
Enabled bool
Interval string
MaxCap uint32
MinCap uint32
}

// UpdateFromRaw converts configuration received from the registry to a service-specific configuration struct which is
// then used to overwrite the service's existing configuration struct.
func (c *ConfigurationStruct) UpdateFromRaw(rawConfig interface{}) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type DBClient interface {
NotificationCountByStatus(status string) (uint32, errors.EdgeX)
NotificationCountByTimeRange(start int, end int) (uint32, errors.EdgeX)
NotificationCountByCategoriesAndLabels(categories []string, labels []string) (uint32, errors.EdgeX)
NotificationTotalCount() (uint32, errors.EdgeX)
LatestNotificationByOffset(offset uint32) (models.Notification, errors.EdgeX)

AddTransmission(trans models.Transmission) (models.Transmission, errors.EdgeX)
UpdateTransmission(trans models.Transmission) errors.EdgeX
Expand Down
Loading

0 comments on commit 507b856

Please sign in to comment.