Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Acknowledged field to support-notification to enhance the API #5034

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/pkg/infrastructure/postgres/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
package postgres

import (
notificationsInterfaces "github.com/edgexfoundry/edgex-go/internal/support/notifications/infrastructure/interfaces"
schedulerInterfaces "github.com/edgexfoundry/edgex-go/internal/support/scheduler/infrastructure/interfaces"
)

// Check the implementation of Postgres satisfies the DB client
var _ schedulerInterfaces.DBClient = &Client{}
var _ notificationsInterfaces.DBClient = &Client{}
1 change: 1 addition & 0 deletions internal/pkg/infrastructure/postgres/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,5 @@ const (
serviceNameField = "ServiceName"
statusField = "Status"
subscriptionNameField = "SubscriptionName"
acknowledgedField = "Acknowledged"
)
171 changes: 141 additions & 30 deletions internal/pkg/infrastructure/postgres/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/spf13/cast"

"github.com/edgexfoundry/go-mod-core-contracts/v4/dtos/requests"
"github.com/edgexfoundry/go-mod-core-contracts/v4/errors"
"github.com/edgexfoundry/go-mod-core-contracts/v4/models"

Expand Down Expand Up @@ -63,9 +65,12 @@ func (c *Client) NotificationById(id string) (models.Notification, errors.EdgeX)
}

// NotificationsByCategory queries the notification by category
func (c *Client) NotificationsByCategory(offset, limit int, category string) ([]models.Notification, errors.EdgeX) {
func (c *Client) NotificationsByCategory(offset, limit int, ack, category string) ([]models.Notification, errors.EdgeX) {
offset, validLimit := getValidOffsetAndLimit(offset, limit)
queryObj := map[string]any{categoryField: category}
if len(ack) != 0 {
queryObj[acknowledgedField] = cast.ToBool(ack)
}

notifications, err := queryNotifications(context.Background(), c.ConnPool, sqlQueryContentByJSONFieldWithPagination(notificationTableName), queryObj, offset, validLimit)
if err != nil {
Expand All @@ -76,10 +81,12 @@ func (c *Client) NotificationsByCategory(offset, limit int, category string) ([]
}

// NotificationsByLabel queries the notification by label
func (c *Client) NotificationsByLabel(offset, limit int, label string) ([]models.Notification, errors.EdgeX) {
func (c *Client) NotificationsByLabel(offset, limit int, ack, label string) ([]models.Notification, errors.EdgeX) {
offset, validLimit := getValidOffsetAndLimit(offset, limit)
queryObj := map[string]any{labelsField: []string{label}}

if len(ack) != 0 {
queryObj[acknowledgedField] = cast.ToBool(ack)
}
notifications, err := queryNotifications(context.Background(), c.ConnPool, sqlQueryContentByJSONFieldWithPagination(notificationTableName), queryObj, offset, validLimit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query all notifications by label %s", label), err)
Expand All @@ -89,9 +96,12 @@ func (c *Client) NotificationsByLabel(offset, limit int, label string) ([]models
}

// NotificationsByStatus queries the notification by status
func (c *Client) NotificationsByStatus(offset, limit int, status string) ([]models.Notification, errors.EdgeX) {
func (c *Client) NotificationsByStatus(offset, limit int, ack, status string) ([]models.Notification, errors.EdgeX) {
offset, validLimit := getValidOffsetAndLimit(offset, limit)
queryObj := map[string]any{statusField: status}
if len(ack) != 0 {
queryObj[acknowledgedField] = cast.ToBool(ack)
}

notifications, err := queryNotifications(context.Background(), c.ConnPool, sqlQueryContentByJSONFieldWithPagination(notificationTableName), queryObj, offset, validLimit)
if err != nil {
Expand All @@ -102,18 +112,8 @@ func (c *Client) NotificationsByStatus(offset, limit int, status string) ([]mode
}

// NotificationsByTimeRange queries the notification by time range
func (c *Client) NotificationsByTimeRange(start int64, end int64, offset, limit int) ([]models.Notification, errors.EdgeX) {
validStart, validEnd, offset, validLimit, err := getValidRangeParameters(int64(start), int64(end), offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}

notifications, err := queryNotifications(context.Background(), c.ConnPool, sqlQueryContentWithTimeRangeAndPagination(notificationTableName), validStart, validEnd, offset, validLimit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all notifications by time range", err)
}

return notifications, nil
func (c *Client) NotificationsByTimeRange(start int64, end int64, offset, limit int, ack string) ([]models.Notification, errors.EdgeX) {
return notificationsByTimeRange(c.ConnPool, start, end, offset, limit, ack)
}

// DeleteNotificationById deletes the notification by id
Expand All @@ -127,8 +127,8 @@ func (c *Client) DeleteNotificationById(id string) errors.EdgeX {
}

// NotificationsByCategoriesAndLabels queries the notification by categories and labels
func (c *Client) NotificationsByCategoriesAndLabels(offset, limit int, categories []string, labels []string) ([]models.Notification, errors.EdgeX) {
return notificationsByCategoriesAndLabels(c.ConnPool, offset, limit, categories, labels)
func (c *Client) NotificationsByCategoriesAndLabels(offset, limit int, categories []string, labels []string, ack string) ([]models.Notification, errors.EdgeX) {
return notificationsByCategoriesAndLabels(c.ConnPool, offset, limit, categories, labels, ack)
}

// UpdateNotification updates the notification
Expand Down Expand Up @@ -170,36 +170,86 @@ func (c *Client) DeleteProcessedNotificationsByAge(age int64) errors.EdgeX {
return nil
}

// DeleteNotificationByIds deletes the notification by ids
func (c *Client) DeleteNotificationByIds(ids []string) errors.EdgeX {
for _, id := range ids {
err := c.DeleteNotificationById(id)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
}
return nil
}

// NotificationsByQueryConditions queries the notification by query conditions
func (c *Client) NotificationsByQueryConditions(offset, limit int, condition requests.NotificationQueryCondition, ack string) ([]models.Notification, errors.EdgeX) {
return notificationsByQueryConditions(c.ConnPool, offset, limit, condition, ack)
}

// UpdateNotificationAckStatusByIds updates the notification ack status by ids
func (c *Client) UpdateNotificationAckStatusByIds(ack bool, ids []string) errors.EdgeX {
for _, id := range ids {
n, err := c.NotificationById(id)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
n.Acknowledged = ack
err = c.UpdateNotification(n)
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), "failed to update notification ack", err)
}
}
return nil
}

// NotificationCountByCategory returns the count of notifications by category
func (c *Client) NotificationCountByCategory(category string) (uint32, errors.EdgeX) {
func (c *Client) NotificationCountByCategory(category string, ack string) (uint32, errors.EdgeX) {
queryObj := map[string]any{categoryField: category}
if len(ack) != 0 {
queryObj[acknowledgedField] = cast.ToBool(ack)
}
return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByJSONField(notificationTableName), queryObj)
}

// NotificationCountByLabel returns the count of notifications by label
func (c *Client) NotificationCountByLabel(label string) (uint32, errors.EdgeX) {
func (c *Client) NotificationCountByLabel(label string, ack string) (uint32, errors.EdgeX) {
queryObj := map[string]any{labelsField: []string{label}}
if len(ack) != 0 {
queryObj[acknowledgedField] = cast.ToBool(ack)
}
return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByJSONField(notificationTableName), queryObj)
}

// NotificationCountByStatus returns the count of notifications by status
func (c *Client) NotificationCountByStatus(status string) (uint32, errors.EdgeX) {
func (c *Client) NotificationCountByStatus(status string, ack string) (uint32, errors.EdgeX) {
queryObj := map[string]any{statusField: status}
if len(ack) != 0 {
queryObj[acknowledgedField] = cast.ToBool(ack)
}
return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByJSONField(notificationTableName), queryObj)
}

// NotificationCountByTimeRange returns the count of notifications by time range
func (c *Client) NotificationCountByTimeRange(start int64, end int64) (uint32, errors.EdgeX) {
validStart, validEnd, err := getValidStartAndEnd(start, end)
func (c *Client) NotificationCountByTimeRange(start int64, end int64, ack string) (uint32, errors.EdgeX) {
notifications, err := notificationsByTimeRange(c.ConnPool, start, end, 0, -1, ack)
if err != nil {
return 0, errors.NewCommonEdgeXWrapper(err)
}
return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRange(notificationTableName), validStart, validEnd)
return uint32(len(notifications)), nil
}

// NotificationCountByCategoriesAndLabels returns the count of notifications by categories and labels
func (c *Client) NotificationCountByCategoriesAndLabels(categories []string, labels []string) (uint32, errors.EdgeX) {
notifications, err := notificationsByCategoriesAndLabels(c.ConnPool, 0, -1, categories, labels)
func (c *Client) NotificationCountByCategoriesAndLabels(categories []string, labels []string, ack string) (uint32, errors.EdgeX) {
notifications, err := notificationsByCategoriesAndLabels(c.ConnPool, 0, -1, categories, labels, ack)
if err != nil {
return 0, errors.NewCommonEdgeXWrapper(err)
}
return uint32(len(notifications)), nil
}

// NotificationCountByQueryConditions returns the count of notifications by query conditions
func (c *Client) NotificationCountByQueryConditions(condition requests.NotificationQueryCondition, ack string) (uint32, errors.EdgeX) {
notifications, err := notificationsByQueryConditions(c.ConnPool, 0, -1, condition, ack)
if err != nil {
return 0, errors.NewCommonEdgeXWrapper(err)
}
Expand Down Expand Up @@ -258,7 +308,27 @@ func checkNotificationExists(ctx context.Context, connPool *pgxpool.Pool, id str
return exists, nil
}

func notificationsByCategoriesAndLabels(connPool *pgxpool.Pool, offset, limit int, categories []string, labels []string) ([]models.Notification, errors.EdgeX) {
func notificationsByTimeRange(connPool *pgxpool.Pool, start, end int64, offset, limit int, ack string) ([]models.Notification, errors.EdgeX) {
validStart, validEnd, offset, validLimit, err := getValidRangeParameters(start, end, offset, limit)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}
args := []any{validStart, validEnd}
if len(ack) != 0 {
args = append(args, map[string]any{acknowledgedField: cast.ToBool(ack)})
} else {
args = append(args, map[string]any{})
}
args = append(args, offset, validLimit)
notifications, err := queryNotifications(context.Background(), connPool, sqlQueryContentWithTimeRangeAndPagination(notificationTableName), args...)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all notifications by time range", err)
}

return notifications, nil
}

func notificationsByCategoriesAndLabels(connPool *pgxpool.Pool, offset, limit int, categories []string, labels []string, ack string) ([]models.Notification, errors.EdgeX) {
offset, validLimit := getValidOffsetAndLimit(offset, limit)
sql := fmt.Sprintf(`
SELECT content
Expand All @@ -270,13 +340,54 @@ func notificationsByCategoriesAndLabels(connPool *pgxpool.Pool, offset, limit in
SELECT content, COALESCE((content->>'%s')::bigint, 0) AS sort_key
FROM %s
WHERE (content -> '%s')::jsonb ?| $2::text[]
)
ORDER BY sort_key OFFSET $3 LIMIT $4;
)
WHERE content @> $3::jsonb
ORDER BY sort_key OFFSET $4 LIMIT $5;
`, createdField, notificationTableName, categoryField, createdField, notificationTableName, labelsField)
notifications, err := queryNotifications(context.Background(), connPool, sql, categories, labels, offset, validLimit)
args := []any{categories, labels}
if len(ack) != 0 {
args = append(args, map[string]any{acknowledgedField: cast.ToBool(ack)})
} else {
args = append(args, map[string]any{})
}
args = append(args, offset, validLimit)
notifications, err := queryNotifications(context.Background(), connPool, sql, args...)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all notifications by categories and labels", err)
}

return notifications, nil
}

func notificationsByQueryConditions(connPool *pgxpool.Pool, offset, limit int, condition requests.NotificationQueryCondition, ack string) ([]models.Notification, errors.EdgeX) {
offset, validLimit := getValidOffsetAndLimit(offset, limit)

args := []any{condition.Start, condition.End}
if len(ack) != 0 {
args = append(args, map[string]any{acknowledgedField: cast.ToBool(ack)})
} else {
args = append(args, map[string]any{})
}
whereCategoryStatement := ""
if len(condition.Category) != 0 {
whereCategoryStatement = fmt.Sprintf("AND (content ->> '%s') = ANY($4)", categoryField)
args = append(args, condition.Category)
}
args = append(args, offset, validLimit)

sql := fmt.Sprintf(
`SELECT content FROM %s
WHERE COALESCE((content->>'%s')::bigint, 0) BETWEEN $1 AND $2
AND content @> $3::jsonb
%s
ORDER BY COALESCE((content->>'%s')::bigint, 0)
OFFSET $%d LIMIT $%d`,
notificationTableName, createdField, whereCategoryStatement, createdField, len(args)-1, len(args))

notifications, err := queryNotifications(context.Background(), connPool, sql, args...)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all notifications by query conditions", err)
}

return notifications, nil
}
2 changes: 1 addition & 1 deletion internal/pkg/infrastructure/postgres/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func sqlQueryContentWithPagination(table string) string {

// sqlQueryContentWithTimeRangeAndPagination returns the SQL statement for selecting content column from the table by the given time range with pagination
func sqlQueryContentWithTimeRangeAndPagination(table string) string {
return fmt.Sprintf("SELECT content FROM %s WHERE COALESCE((content->>'%s')::bigint, 0) BETWEEN $1 AND $2 ORDER BY COALESCE((content->>'%s')::bigint, 0) OFFSET $3 LIMIT $4", table, createdField, createdField)
return fmt.Sprintf("SELECT content FROM %s WHERE COALESCE((content->>'%s')::bigint, 0) BETWEEN $1 AND $2 AND content @> $3::jsonb ORDER BY COALESCE((content->>'%s')::bigint, 0) OFFSET $4 LIMIT $5", table, createdField, createdField)
}

// sqlQueryContentByJSONField returns the SQL statement for selecting content column in the table by the given JSON query string
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/infrastructure/postgres/transmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *Client) TransmissionsByTimeRange(start int64, end int64, offset, limit
return nil, errors.NewCommonEdgeXWrapper(err)
}

transmission, err := queryTransmissions(context.Background(), c.ConnPool, sqlQueryContentWithTimeRangeAndPagination(transmissionTableName), validStart, validEnd, offset, validLimit)
transmission, err := queryTransmissions(context.Background(), c.ConnPool, sqlQueryContentWithTimeRangeAndPagination(transmissionTableName), validStart, validEnd, map[string]any{}, offset, validLimit)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all transmission by time range", err)
}
Expand Down
Loading
Loading