diff --git a/internal/pkg/infrastructure/postgres/client_test.go b/internal/pkg/infrastructure/postgres/client_test.go index f001ac2e87..587a668e2f 100644 --- a/internal/pkg/infrastructure/postgres/client_test.go +++ b/internal/pkg/infrastructure/postgres/client_test.go @@ -6,8 +6,10 @@ package postgres import ( + notificationsInterfaces "github.com/edgexfoundry/edgex-go/internal/support/notifications/infrastructure/interfaces" schedulerInterfaces "github.com/edgexfoundry/edgex-go/internal/support/scheduler/infrastructure/interfaces" ) // Check the implementation of Postgres satisfies the DB client var _ schedulerInterfaces.DBClient = &Client{} +var _ notificationsInterfaces.DBClient = &Client{} diff --git a/internal/pkg/infrastructure/postgres/consts.go b/internal/pkg/infrastructure/postgres/consts.go index 507793a53a..3701dcd5c5 100644 --- a/internal/pkg/infrastructure/postgres/consts.go +++ b/internal/pkg/infrastructure/postgres/consts.go @@ -91,4 +91,5 @@ const ( serviceNameField = "ServiceName" statusField = "Status" subscriptionNameField = "SubscriptionName" + acknowledgedField = "Acknowledged" ) diff --git a/internal/pkg/infrastructure/postgres/notification.go b/internal/pkg/infrastructure/postgres/notification.go index a643bfe4f7..a48b17c8cc 100644 --- a/internal/pkg/infrastructure/postgres/notification.go +++ b/internal/pkg/infrastructure/postgres/notification.go @@ -14,7 +14,9 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "github.com/spf13/cast" + "github.com/edgexfoundry/go-mod-core-contracts/v4/dtos/requests" "github.com/edgexfoundry/go-mod-core-contracts/v4/errors" "github.com/edgexfoundry/go-mod-core-contracts/v4/models" @@ -63,9 +65,12 @@ func (c *Client) NotificationById(id string) (models.Notification, errors.EdgeX) } // NotificationsByCategory queries the notification by category -func (c *Client) NotificationsByCategory(offset, limit int, category string) ([]models.Notification, errors.EdgeX) { +func (c *Client) NotificationsByCategory(offset, limit int, ack, category string) ([]models.Notification, errors.EdgeX) { offset, validLimit := getValidOffsetAndLimit(offset, limit) queryObj := map[string]any{categoryField: category} + if len(ack) != 0 { + queryObj[acknowledgedField] = cast.ToBool(ack) + } notifications, err := queryNotifications(context.Background(), c.ConnPool, sqlQueryContentByJSONFieldWithPagination(notificationTableName), queryObj, offset, validLimit) if err != nil { @@ -76,10 +81,12 @@ func (c *Client) NotificationsByCategory(offset, limit int, category string) ([] } // NotificationsByLabel queries the notification by label -func (c *Client) NotificationsByLabel(offset, limit int, label string) ([]models.Notification, errors.EdgeX) { +func (c *Client) NotificationsByLabel(offset, limit int, ack, label string) ([]models.Notification, errors.EdgeX) { offset, validLimit := getValidOffsetAndLimit(offset, limit) queryObj := map[string]any{labelsField: []string{label}} - + if len(ack) != 0 { + queryObj[acknowledgedField] = cast.ToBool(ack) + } notifications, err := queryNotifications(context.Background(), c.ConnPool, sqlQueryContentByJSONFieldWithPagination(notificationTableName), queryObj, offset, validLimit) if err != nil { return nil, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query all notifications by label %s", label), err) @@ -89,9 +96,12 @@ func (c *Client) NotificationsByLabel(offset, limit int, label string) ([]models } // NotificationsByStatus queries the notification by status -func (c *Client) NotificationsByStatus(offset, limit int, status string) ([]models.Notification, errors.EdgeX) { +func (c *Client) NotificationsByStatus(offset, limit int, ack, status string) ([]models.Notification, errors.EdgeX) { offset, validLimit := getValidOffsetAndLimit(offset, limit) queryObj := map[string]any{statusField: status} + if len(ack) != 0 { + queryObj[acknowledgedField] = cast.ToBool(ack) + } notifications, err := queryNotifications(context.Background(), c.ConnPool, sqlQueryContentByJSONFieldWithPagination(notificationTableName), queryObj, offset, validLimit) if err != nil { @@ -102,18 +112,8 @@ func (c *Client) NotificationsByStatus(offset, limit int, status string) ([]mode } // NotificationsByTimeRange queries the notification by time range -func (c *Client) NotificationsByTimeRange(start int64, end int64, offset, limit int) ([]models.Notification, errors.EdgeX) { - validStart, validEnd, offset, validLimit, err := getValidRangeParameters(int64(start), int64(end), offset, limit) - if err != nil { - return nil, errors.NewCommonEdgeXWrapper(err) - } - - notifications, err := queryNotifications(context.Background(), c.ConnPool, sqlQueryContentWithTimeRangeAndPagination(notificationTableName), validStart, validEnd, offset, validLimit) - if err != nil { - return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all notifications by time range", err) - } - - return notifications, nil +func (c *Client) NotificationsByTimeRange(start int64, end int64, offset, limit int, ack string) ([]models.Notification, errors.EdgeX) { + return notificationsByTimeRange(c.ConnPool, start, end, offset, limit, ack) } // DeleteNotificationById deletes the notification by id @@ -127,8 +127,8 @@ func (c *Client) DeleteNotificationById(id string) errors.EdgeX { } // NotificationsByCategoriesAndLabels queries the notification by categories and labels -func (c *Client) NotificationsByCategoriesAndLabels(offset, limit int, categories []string, labels []string) ([]models.Notification, errors.EdgeX) { - return notificationsByCategoriesAndLabels(c.ConnPool, offset, limit, categories, labels) +func (c *Client) NotificationsByCategoriesAndLabels(offset, limit int, categories []string, labels []string, ack string) ([]models.Notification, errors.EdgeX) { + return notificationsByCategoriesAndLabels(c.ConnPool, offset, limit, categories, labels, ack) } // UpdateNotification updates the notification @@ -170,36 +170,86 @@ func (c *Client) DeleteProcessedNotificationsByAge(age int64) errors.EdgeX { return nil } +// DeleteNotificationByIds deletes the notification by ids +func (c *Client) DeleteNotificationByIds(ids []string) errors.EdgeX { + for _, id := range ids { + err := c.DeleteNotificationById(id) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + } + return nil +} + +// NotificationsByQueryConditions queries the notification by query conditions +func (c *Client) NotificationsByQueryConditions(offset, limit int, condition requests.NotificationQueryCondition, ack string) ([]models.Notification, errors.EdgeX) { + return notificationsByQueryConditions(c.ConnPool, offset, limit, condition, ack) +} + +// UpdateNotificationAckStatusByIds updates the notification ack status by ids +func (c *Client) UpdateNotificationAckStatusByIds(ack bool, ids []string) errors.EdgeX { + for _, id := range ids { + n, err := c.NotificationById(id) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + n.Acknowledged = ack + err = c.UpdateNotification(n) + if err != nil { + return errors.NewCommonEdgeX(errors.Kind(err), "failed to update notification ack", err) + } + } + return nil +} + // NotificationCountByCategory returns the count of notifications by category -func (c *Client) NotificationCountByCategory(category string) (uint32, errors.EdgeX) { +func (c *Client) NotificationCountByCategory(category string, ack string) (uint32, errors.EdgeX) { queryObj := map[string]any{categoryField: category} + if len(ack) != 0 { + queryObj[acknowledgedField] = cast.ToBool(ack) + } return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByJSONField(notificationTableName), queryObj) } // NotificationCountByLabel returns the count of notifications by label -func (c *Client) NotificationCountByLabel(label string) (uint32, errors.EdgeX) { +func (c *Client) NotificationCountByLabel(label string, ack string) (uint32, errors.EdgeX) { queryObj := map[string]any{labelsField: []string{label}} + if len(ack) != 0 { + queryObj[acknowledgedField] = cast.ToBool(ack) + } return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByJSONField(notificationTableName), queryObj) } // NotificationCountByStatus returns the count of notifications by status -func (c *Client) NotificationCountByStatus(status string) (uint32, errors.EdgeX) { +func (c *Client) NotificationCountByStatus(status string, ack string) (uint32, errors.EdgeX) { queryObj := map[string]any{statusField: status} + if len(ack) != 0 { + queryObj[acknowledgedField] = cast.ToBool(ack) + } return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByJSONField(notificationTableName), queryObj) } // NotificationCountByTimeRange returns the count of notifications by time range -func (c *Client) NotificationCountByTimeRange(start int64, end int64) (uint32, errors.EdgeX) { - validStart, validEnd, err := getValidStartAndEnd(start, end) +func (c *Client) NotificationCountByTimeRange(start int64, end int64, ack string) (uint32, errors.EdgeX) { + notifications, err := notificationsByTimeRange(c.ConnPool, start, end, 0, -1, ack) if err != nil { return 0, errors.NewCommonEdgeXWrapper(err) } - return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRange(notificationTableName), validStart, validEnd) + return uint32(len(notifications)), nil } // NotificationCountByCategoriesAndLabels returns the count of notifications by categories and labels -func (c *Client) NotificationCountByCategoriesAndLabels(categories []string, labels []string) (uint32, errors.EdgeX) { - notifications, err := notificationsByCategoriesAndLabels(c.ConnPool, 0, -1, categories, labels) +func (c *Client) NotificationCountByCategoriesAndLabels(categories []string, labels []string, ack string) (uint32, errors.EdgeX) { + notifications, err := notificationsByCategoriesAndLabels(c.ConnPool, 0, -1, categories, labels, ack) + if err != nil { + return 0, errors.NewCommonEdgeXWrapper(err) + } + return uint32(len(notifications)), nil +} + +// NotificationCountByQueryConditions returns the count of notifications by query conditions +func (c *Client) NotificationCountByQueryConditions(condition requests.NotificationQueryCondition, ack string) (uint32, errors.EdgeX) { + notifications, err := notificationsByQueryConditions(c.ConnPool, 0, -1, condition, ack) if err != nil { return 0, errors.NewCommonEdgeXWrapper(err) } @@ -258,7 +308,27 @@ func checkNotificationExists(ctx context.Context, connPool *pgxpool.Pool, id str return exists, nil } -func notificationsByCategoriesAndLabels(connPool *pgxpool.Pool, offset, limit int, categories []string, labels []string) ([]models.Notification, errors.EdgeX) { +func notificationsByTimeRange(connPool *pgxpool.Pool, start, end int64, offset, limit int, ack string) ([]models.Notification, errors.EdgeX) { + validStart, validEnd, offset, validLimit, err := getValidRangeParameters(start, end, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + args := []any{validStart, validEnd} + if len(ack) != 0 { + args = append(args, map[string]any{acknowledgedField: cast.ToBool(ack)}) + } else { + args = append(args, map[string]any{}) + } + args = append(args, offset, validLimit) + notifications, err := queryNotifications(context.Background(), connPool, sqlQueryContentWithTimeRangeAndPagination(notificationTableName), args...) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all notifications by time range", err) + } + + return notifications, nil +} + +func notificationsByCategoriesAndLabels(connPool *pgxpool.Pool, offset, limit int, categories []string, labels []string, ack string) ([]models.Notification, errors.EdgeX) { offset, validLimit := getValidOffsetAndLimit(offset, limit) sql := fmt.Sprintf(` SELECT content @@ -270,13 +340,54 @@ func notificationsByCategoriesAndLabels(connPool *pgxpool.Pool, offset, limit in SELECT content, COALESCE((content->>'%s')::bigint, 0) AS sort_key FROM %s WHERE (content -> '%s')::jsonb ?| $2::text[] - ) - ORDER BY sort_key OFFSET $3 LIMIT $4; + ) + WHERE content @> $3::jsonb + ORDER BY sort_key OFFSET $4 LIMIT $5; `, createdField, notificationTableName, categoryField, createdField, notificationTableName, labelsField) - notifications, err := queryNotifications(context.Background(), connPool, sql, categories, labels, offset, validLimit) + args := []any{categories, labels} + if len(ack) != 0 { + args = append(args, map[string]any{acknowledgedField: cast.ToBool(ack)}) + } else { + args = append(args, map[string]any{}) + } + args = append(args, offset, validLimit) + notifications, err := queryNotifications(context.Background(), connPool, sql, args...) if err != nil { return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all notifications by categories and labels", err) } return notifications, nil } + +func notificationsByQueryConditions(connPool *pgxpool.Pool, offset, limit int, condition requests.NotificationQueryCondition, ack string) ([]models.Notification, errors.EdgeX) { + offset, validLimit := getValidOffsetAndLimit(offset, limit) + + args := []any{condition.Start, condition.End} + if len(ack) != 0 { + args = append(args, map[string]any{acknowledgedField: cast.ToBool(ack)}) + } else { + args = append(args, map[string]any{}) + } + whereCategoryStatement := "" + if len(condition.Category) != 0 { + whereCategoryStatement = fmt.Sprintf("AND (content ->> '%s') = ANY($4)", categoryField) + args = append(args, condition.Category) + } + args = append(args, offset, validLimit) + + sql := fmt.Sprintf( + `SELECT content FROM %s + WHERE COALESCE((content->>'%s')::bigint, 0) BETWEEN $1 AND $2 + AND content @> $3::jsonb + %s + ORDER BY COALESCE((content->>'%s')::bigint, 0) + OFFSET $%d LIMIT $%d`, + notificationTableName, createdField, whereCategoryStatement, createdField, len(args)-1, len(args)) + + notifications, err := queryNotifications(context.Background(), connPool, sql, args...) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all notifications by query conditions", err) + } + + return notifications, nil +} diff --git a/internal/pkg/infrastructure/postgres/sql.go b/internal/pkg/infrastructure/postgres/sql.go index 3215fe8b0b..aade386f8d 100644 --- a/internal/pkg/infrastructure/postgres/sql.go +++ b/internal/pkg/infrastructure/postgres/sql.go @@ -144,7 +144,7 @@ func sqlQueryContentWithPagination(table string) string { // sqlQueryContentWithTimeRangeAndPagination returns the SQL statement for selecting content column from the table by the given time range with pagination func sqlQueryContentWithTimeRangeAndPagination(table string) string { - return fmt.Sprintf("SELECT content FROM %s WHERE COALESCE((content->>'%s')::bigint, 0) BETWEEN $1 AND $2 ORDER BY COALESCE((content->>'%s')::bigint, 0) OFFSET $3 LIMIT $4", table, createdField, createdField) + return fmt.Sprintf("SELECT content FROM %s WHERE COALESCE((content->>'%s')::bigint, 0) BETWEEN $1 AND $2 AND content @> $3::jsonb ORDER BY COALESCE((content->>'%s')::bigint, 0) OFFSET $4 LIMIT $5", table, createdField, createdField) } // sqlQueryContentByJSONField returns the SQL statement for selecting content column in the table by the given JSON query string diff --git a/internal/pkg/infrastructure/postgres/transmission.go b/internal/pkg/infrastructure/postgres/transmission.go index d492c2ba8c..55db4eaabf 100644 --- a/internal/pkg/infrastructure/postgres/transmission.go +++ b/internal/pkg/infrastructure/postgres/transmission.go @@ -83,7 +83,7 @@ func (c *Client) TransmissionsByTimeRange(start int64, end int64, offset, limit return nil, errors.NewCommonEdgeXWrapper(err) } - transmission, err := queryTransmissions(context.Background(), c.ConnPool, sqlQueryContentWithTimeRangeAndPagination(transmissionTableName), validStart, validEnd, offset, validLimit) + transmission, err := queryTransmissions(context.Background(), c.ConnPool, sqlQueryContentWithTimeRangeAndPagination(transmissionTableName), validStart, validEnd, map[string]any{}, offset, validLimit) if err != nil { return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all transmission by time range", err) } diff --git a/internal/pkg/infrastructure/redis/client.go b/internal/pkg/infrastructure/redis/client.go index d0ef28a7b9..60139a903e 100644 --- a/internal/pkg/infrastructure/redis/client.go +++ b/internal/pkg/infrastructure/redis/client.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/edgexfoundry/go-mod-core-contracts/v4/clients/logger" + "github.com/edgexfoundry/go-mod-core-contracts/v4/dtos/requests" "github.com/edgexfoundry/go-mod-core-contracts/v4/errors" model "github.com/edgexfoundry/go-mod-core-contracts/v4/models" @@ -1125,27 +1126,27 @@ func (c *Client) AddNotification(notification model.Notification) (model.Notific } // NotificationsByCategory queries notifications by offset, limit and category -func (c *Client) NotificationsByCategory(offset int, limit int, category string) (notifications []model.Notification, edgeXerr errors.EdgeX) { +func (c *Client) NotificationsByCategory(offset int, limit int, ack, category string) (notifications []model.Notification, edgeXerr errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - notifications, edgeXerr = notificationsByCategory(conn, offset, limit, category) + notifications, edgeXerr = notificationsByCategory(conn, offset, limit, ack, category) if edgeXerr != nil { return notifications, errors.NewCommonEdgeX(errors.Kind(edgeXerr), - fmt.Sprintf("fail to query notifications by offset %d, limit %d and category %s", offset, limit, category), edgeXerr) + fmt.Sprintf("fail to query notifications by offset %d, limit %d, ack %s, and category %s", offset, limit, ack, category), edgeXerr) } return notifications, nil } // NotificationsByLabel queries notifications by offset, limit and label -func (c *Client) NotificationsByLabel(offset int, limit int, label string) (notifications []model.Notification, edgeXerr errors.EdgeX) { +func (c *Client) NotificationsByLabel(offset int, limit int, ack, label string) (notifications []model.Notification, edgeXerr errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - notifications, edgeXerr = notificationsByLabel(conn, offset, limit, label) + notifications, edgeXerr = notificationsByLabel(conn, offset, limit, ack, label) if edgeXerr != nil { return notifications, errors.NewCommonEdgeX(errors.Kind(edgeXerr), - fmt.Sprintf("fail to query notifications by offset %d, limit %d and label %s", offset, limit, label), edgeXerr) + fmt.Sprintf("fail to query notifications by offset %d, limit %d, ack %s, and label %s", offset, limit, ack, label), edgeXerr) } return notifications, nil } @@ -1163,108 +1164,130 @@ func (c *Client) NotificationById(id string) (notification model.Notification, e } // NotificationsByStatus queries notifications by offset, limit and status -func (c *Client) NotificationsByStatus(offset int, limit int, status string) (notifications []model.Notification, edgeXerr errors.EdgeX) { +func (c *Client) NotificationsByStatus(offset int, limit int, ack, status string) (notifications []model.Notification, edgeXerr errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - notifications, edgeXerr = notificationsByStatus(conn, offset, limit, status) + notifications, edgeXerr = notificationsByStatus(conn, offset, limit, ack, status) if edgeXerr != nil { return notifications, errors.NewCommonEdgeX(errors.Kind(edgeXerr), - fmt.Sprintf("fail to query notifications by offset %d, limit %d and status %s", offset, limit, status), edgeXerr) + fmt.Sprintf("fail to query notifications by offset %d, limit %d, ack %s, and status %s", offset, limit, ack, status), edgeXerr) } return notifications, nil } -// NotificationsByTimeRange query notifications by time range, offset, and limit -func (c *Client) NotificationsByTimeRange(start int64, end int64, offset int, limit int) (notifications []model.Notification, edgeXerr errors.EdgeX) { +// NotificationsByTimeRange query notifications by time range, ack, offset, and limit +func (c *Client) NotificationsByTimeRange(start int64, end int64, offset int, limit int, ack string) (notifications []model.Notification, edgeXerr errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - notifications, edgeXerr = notificationsByTimeRange(conn, start, end, offset, limit) + notifications, edgeXerr = notificationsByTimeRange(conn, start, end, offset, limit, ack) if edgeXerr != nil { return notifications, errors.NewCommonEdgeX(errors.Kind(edgeXerr), - fmt.Sprintf("fail to query notifications by time range %v ~ %v, offset %d, and limit %d", start, end, offset, limit), edgeXerr) + fmt.Sprintf("fail to query notifications by time range %v ~ %v, offset %d, ack %s, and limit %d", start, end, offset, ack, limit), edgeXerr) } return notifications, nil } -// NotificationsByCategoriesAndLabels queries notifications by offset, limit, categories and labels -func (c *Client) NotificationsByCategoriesAndLabels(offset int, limit int, categories []string, labels []string) (notifications []model.Notification, edgeXerr errors.EdgeX) { +// NotificationsByCategoriesAndLabels queries notifications by ack, offset, limit, categories and labels +func (c *Client) NotificationsByCategoriesAndLabels(offset int, limit int, categories []string, labels []string, ack string) (notifications []model.Notification, edgeXerr errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - notifications, edgeXerr = notificationsByCategoriesAndLabels(conn, offset, limit, categories, labels) + notifications, edgeXerr = notificationsByCategoriesAndLabels(conn, offset, limit, categories, labels, ack) if edgeXerr != nil { return notifications, errors.NewCommonEdgeX(errors.Kind(edgeXerr), - fmt.Sprintf("fail to query notifications by offset %d, limit %d, categories %v and labels %v", offset, limit, categories, labels), edgeXerr) + fmt.Sprintf("fail to query notifications by offset %d, limit %d, categories %v, ack %s and labels %v", offset, limit, categories, ack, labels), edgeXerr) } return notifications, nil } // NotificationCountByCategory returns the count of Notification associated with specified category from the database -func (c *Client) NotificationCountByCategory(category string) (uint32, errors.EdgeX) { +func (c *Client) NotificationCountByCategory(category, ack string) (uint32, errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - count, edgeXerr := getMemberNumber(conn, ZCARD, CreateKey(NotificationCollectionCategory, category)) + notifications, edgeXerr := notificationsByCategory(conn, 0, -1, ack, category) if edgeXerr != nil { return 0, errors.NewCommonEdgeXWrapper(edgeXerr) } - - return count, nil + return uint32(len(notifications)), nil } // NotificationCountByLabel returns the count of Notification associated with specified label from the database -func (c *Client) NotificationCountByLabel(label string) (uint32, errors.EdgeX) { +func (c *Client) NotificationCountByLabel(label, ack string) (uint32, errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - count, edgeXerr := getMemberNumber(conn, ZCARD, CreateKey(NotificationCollectionLabel, label)) + notifications, edgeXerr := notificationsByLabel(conn, 0, -1, ack, label) if edgeXerr != nil { return 0, errors.NewCommonEdgeXWrapper(edgeXerr) } - - return count, nil + return uint32(len(notifications)), nil } // NotificationCountByStatus returns the count of Notification associated with specified status from the database -func (c *Client) NotificationCountByStatus(status string) (uint32, errors.EdgeX) { +func (c *Client) NotificationCountByStatus(status, ack string) (uint32, errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - count, edgeXerr := getMemberNumber(conn, ZCARD, CreateKey(NotificationCollectionStatus, status)) + notifications, edgeXerr := notificationsByStatus(conn, 0, -1, ack, status) if edgeXerr != nil { return 0, errors.NewCommonEdgeXWrapper(edgeXerr) } - - return count, nil + return uint32(len(notifications)), nil } // NotificationCountByTimeRange returns the count of Notification from the database within specified time range -func (c *Client) NotificationCountByTimeRange(start int64, end int64) (uint32, errors.EdgeX) { +func (c *Client) NotificationCountByTimeRange(start int64, end int64, ack string) (uint32, errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - count, edgeXerr := getMemberCountByScoreRange(conn, NotificationCollectionCreated, start, end) + notifications, edgeXerr := notificationsByTimeRange(conn, start, end, 0, -1, ack) if edgeXerr != nil { return 0, errors.NewCommonEdgeXWrapper(edgeXerr) } - return count, nil + return uint32(len(notifications)), nil } // NotificationCountByCategoriesAndLabels returns the count of Notification associated with specified categories and labels from the database -func (c *Client) NotificationCountByCategoriesAndLabels(categories []string, labels []string) (uint32, errors.EdgeX) { +func (c *Client) NotificationCountByCategoriesAndLabels(categories []string, labels []string, ack string) (uint32, errors.EdgeX) { conn := c.Pool.Get() defer conn.Close() - notifications, edgeXerr := notificationsByCategoriesAndLabels(conn, 0, -1, categories, labels) + notifications, edgeXerr := notificationsByCategoriesAndLabels(conn, 0, -1, categories, labels, ack) if edgeXerr != nil { - return uint32(0), errors.NewCommonEdgeX(errors.Kind(edgeXerr), fmt.Sprintf("fail to query notifications by categories %v and labels %v", categories, labels), edgeXerr) + return uint32(0), errors.NewCommonEdgeX(errors.Kind(edgeXerr), fmt.Sprintf("fail to query notifications by categories %v, labels %v, and ack %s", categories, labels, ack), edgeXerr) } return uint32(len(notifications)), nil } +// NotificationCountByQueryConditions returns the count of Notification associated with specified condition from the database +func (c *Client) NotificationCountByQueryConditions(condition requests.NotificationQueryCondition, ack string) (uint32, errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + notifications, edgeXerr := notificationByQueryConditions(conn, 0, -1, condition, ack) + if edgeXerr != nil { + return uint32(0), errors.NewCommonEdgeX(errors.Kind(edgeXerr), fmt.Sprintf("fail to query notifications by condition %v and ack %s", condition, ack), edgeXerr) + } + return uint32(len(notifications)), nil +} + +// NotificationsByQueryConditions queries notifications by offset, limit, categories and time range +func (c *Client) NotificationsByQueryConditions(offset int, limit int, condition requests.NotificationQueryCondition, + ack string) (notifications []model.Notification, edgeXerr errors.EdgeX) { + conn := c.Pool.Get() + defer conn.Close() + + notifications, edgeXerr = notificationByQueryConditions(conn, offset, limit, condition, ack) + if edgeXerr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) + } + return notifications, nil +} + // NotificationTotalCount returns the total count of Notification from the database func (c *Client) NotificationTotalCount() (uint32, errors.EdgeX) { conn := c.Pool.Get() @@ -1407,6 +1430,17 @@ func (c *Client) DeleteNotificationById(id string) errors.EdgeX { return nil } +// DeleteNotificationById deletes notifications by ids +func (c *Client) DeleteNotificationByIds(ids []string) errors.EdgeX { + conn := c.Pool.Get() + defer conn.Close() + edgeXerr := deleteNotificationByIds(conn, ids) + if edgeXerr != nil { + return errors.NewCommonEdgeXWrapper(edgeXerr) + } + return nil +} + // UpdateNotification updates a notification func (c *Client) UpdateNotification(n model.Notification) errors.EdgeX { conn := c.Pool.Get() @@ -1414,6 +1448,18 @@ func (c *Client) UpdateNotification(n model.Notification) errors.EdgeX { return updateNotification(conn, n) } +// UpdateNotificationAckStatusByIds bulk updates acknowledgement status +func (c *Client) UpdateNotificationAckStatusByIds(ack bool, ids []string) errors.EdgeX { + conn := c.Pool.Get() + defer conn.Close() + + notifications, edgexErr := notificationByIds(conn, ids) + if edgexErr != nil { + return errors.NewCommonEdgeXWrapper(edgexErr) + } + return updateNotificationAckStatus(conn, ack, notifications) +} + // AddTransmission adds a new transmission func (c *Client) AddTransmission(t model.Transmission) (model.Transmission, errors.EdgeX) { conn := c.Pool.Get() diff --git a/internal/pkg/infrastructure/redis/dbconsts.go b/internal/pkg/infrastructure/redis/dbconsts.go index ce1e9dcade..6f7759bdba 100644 --- a/internal/pkg/infrastructure/redis/dbconsts.go +++ b/internal/pkg/infrastructure/redis/dbconsts.go @@ -37,6 +37,9 @@ const ( ZUNIONSTORE = "ZUNIONSTORE" ZINTERSTORE = "ZINTERSTORE" TYPE = "TYPE" + INFO = "INFO" + MEMORY = "MEMORY" + WEIGHTS = "WEIGHTS" ) const ( diff --git a/internal/pkg/infrastructure/redis/notification.go b/internal/pkg/infrastructure/redis/notification.go index 7247a29eda..2f4e094cff 100644 --- a/internal/pkg/infrastructure/redis/notification.go +++ b/internal/pkg/infrastructure/redis/notification.go @@ -8,14 +8,17 @@ package redis import ( "encoding/json" "fmt" + "strconv" pkgCommon "github.com/edgexfoundry/edgex-go/internal/pkg/common" "github.com/edgexfoundry/go-mod-core-contracts/v4/common" + "github.com/edgexfoundry/go-mod-core-contracts/v4/dtos/requests" "github.com/edgexfoundry/go-mod-core-contracts/v4/errors" "github.com/edgexfoundry/go-mod-core-contracts/v4/models" "github.com/gomodule/redigo/redis" + "github.com/google/uuid" ) const ( @@ -26,6 +29,7 @@ const ( NotificationCollectionSeverity = NotificationCollection + DBKeySeparator + common.Severity NotificationCollectionStatus = NotificationCollection + DBKeySeparator + common.Status NotificationCollectionCreated = NotificationCollection + DBKeySeparator + common.Created + NotificationCollectionAck = NotificationCollection + DBKeySeparator + common.Ack ) // notificationStoredKey return the notification's stored key which combines the collection name and object id @@ -51,6 +55,7 @@ func sendAddNotificationCmd(conn redis.Conn, storedKey string, n models.Notifica _ = conn.Send(ZADD, CreateKey(NotificationCollectionSender, n.Sender), n.Modified, storedKey) _ = conn.Send(ZADD, CreateKey(NotificationCollectionSeverity, string(n.Severity)), n.Modified, storedKey) _ = conn.Send(ZADD, CreateKey(NotificationCollectionStatus, string(n.Status)), n.Modified, storedKey) + _ = conn.Send(ZADD, CreateKey(NotificationCollectionAck, strconv.FormatBool(n.Acknowledged)), n.Modified, storedKey) return nil } @@ -84,13 +89,9 @@ func addNotification(conn redis.Conn, notification models.Notification) (models. } // notificationsByCategory queries notifications by offset, limit, and category -func notificationsByCategory(conn redis.Conn, offset int, limit int, category string) (notifications []models.Notification, edgeXerr errors.EdgeX) { - objects, err := getObjectsByRevRange(conn, CreateKey(NotificationCollectionCategory, category), offset, limit) - if err != nil { - return notifications, errors.NewCommonEdgeXWrapper(err) - } - - return convertObjectsToNotifications(objects) +func notificationsByCategory(conn redis.Conn, offset int, limit int, ack, category string) (notifications []models.Notification, edgeXerr errors.EdgeX) { + redisKey := CreateKey(NotificationCollectionCategory, category) + return getNotificationsByRedisKeyAndAck(conn, offset, limit, ack, redisKey) } func convertObjectsToNotifications(objects [][]byte) (notifications []models.Notification, edgeXerr errors.EdgeX) { @@ -107,13 +108,9 @@ func convertObjectsToNotifications(objects [][]byte) (notifications []models.Not } // notificationsByLabel queries notifications by offset, limit, and label -func notificationsByLabel(conn redis.Conn, offset int, limit int, label string) (notifications []models.Notification, edgeXerr errors.EdgeX) { - objects, err := getObjectsByRevRange(conn, CreateKey(NotificationCollectionLabel, label), offset, limit) - if err != nil { - return notifications, errors.NewCommonEdgeXWrapper(err) - } - - return convertObjectsToNotifications(objects) +func notificationsByLabel(conn redis.Conn, offset int, limit int, ack, label string) (notifications []models.Notification, edgeXerr errors.EdgeX) { + redisKey := CreateKey(NotificationCollectionLabel, label) + return getNotificationsByRedisKeyAndAck(conn, offset, limit, ack, redisKey) } // notificationById query notification by id from DB @@ -125,23 +122,102 @@ func notificationById(conn redis.Conn, id string) (notification models.Notificat return } +// notificationByIds query notification by ids from DB +func notificationByIds(conn redis.Conn, ids []string) (notifications []models.Notification, edgexErr errors.EdgeX) { + var storeKeys []string + for _, id := range ids { + storeKeys = append(storeKeys, notificationStoredKey(id)) + } + objects, edgexErr := getObjectsByIds(conn, pkgCommon.ConvertStringsToInterfaces(storeKeys)) + if edgexErr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgexErr) + } + return convertObjectsToNotifications(objects) +} + // notificationsByStatus queries notifications by offset, limit, and status -func notificationsByStatus(conn redis.Conn, offset int, limit int, status string) (notifications []models.Notification, edgeXerr errors.EdgeX) { - objects, err := getObjectsByRevRange(conn, CreateKey(NotificationCollectionStatus, status), offset, limit) - if err != nil { - return notifications, errors.NewCommonEdgeXWrapper(err) +func notificationsByStatus(conn redis.Conn, offset int, limit int, ack, status string) (notifications []models.Notification, edgeXerr errors.EdgeX) { + redisKey := CreateKey(NotificationCollectionStatus, status) + return getNotificationsByRedisKeyAndAck(conn, offset, limit, ack, redisKey) +} + +// notificationsByTimeRange query notifications by time range, offset, limit, and ack +func notificationsByTimeRange(conn redis.Conn, startTime int64, endTime int64, offset int, limit int, ack string) (notifications []models.Notification, edgeXerr errors.EdgeX) { + redisKey := NotificationCollectionCreated + if len(ack) > 0 { + args := redis.Args{} + cacheSet := uuid.New().String() + defer func() { + // delete cache set + _, _ = conn.Do(DEL, cacheSet) + }() + command := ZINTERSTORE + args = args.Add(cacheSet, 2, redisKey, CreateKey(NotificationCollectionAck, ack), WEIGHTS, 1, 0) + _, err := conn.Do(command, args...) + if err != nil { + return notifications, errors.NewCommonEdgeX(errors.KindDatabaseError, + fmt.Sprintf("failed to execute %s command with args %v", command, args), err) + } + redisKey = cacheSet } - return convertObjectsToNotifications(objects) + objects, edgeXerr := getObjectsByScoreRange(conn, redisKey, startTime, endTime, offset, limit) + if edgeXerr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) + } + notifications, edgeXerr = convertObjectsToNotifications(objects) + if edgeXerr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) + } + return notifications, nil } -// notificationsByTimeRange query notifications by time range, offset, and limit -func notificationsByTimeRange(conn redis.Conn, startTime int64, endTime int64, offset int, limit int) (notifications []models.Notification, edgeXerr errors.EdgeX) { - objects, edgeXerr := getObjectsByScoreRange(conn, NotificationCollectionCreated, startTime, endTime, offset, limit) +// notificationByQueryConditions query notifications by offset, limit, categories and time range +func notificationByQueryConditions(conn redis.Conn, offset, limit int, condition requests.NotificationQueryCondition, + ack string) (notifications []models.Notification, edgeXerr errors.EdgeX) { + if len(condition.Category) == 0 { + return notificationsByTimeRange(conn, condition.Start, condition.End, offset, limit, ack) + } + + cacheSetUnionCategory := uuid.New().String() + cacheSetIntersectionCreatedAndCategory := uuid.New().String() + cacheSetIntersectionAck := uuid.New().String() + cacheSet := cacheSetIntersectionCreatedAndCategory + defer func() { + // delete cache set + _, _ = conn.Do(DEL, cacheSetUnionCategory, cacheSetIntersectionCreatedAndCategory, cacheSetIntersectionAck) + }() + + var redisKeys []string + for _, c := range condition.Category { + redisKeys = append(redisKeys, CreateKey(NotificationCollectionCategory, c)) + } + args := redis.Args{} + args = args.Add(cacheSetUnionCategory, len(redisKeys)) + for _, key := range redisKeys { + args = args.Add(key) + } + // find all notifications by category and store the result to cache + command := ZUNIONSTORE + _, err := conn.Do(command, args...) + if err != nil { + return notifications, errors.NewCommonEdgeX(errors.KindDatabaseError, + fmt.Sprintf("failed to execute %s command with args %v", command, args), err) + } + + if len(ack) > 0 { + cacheSet = cacheSetIntersectionAck + } + + objects, edgeXerr := getObjectsByScoreRange(conn, cacheSet, condition.Start, condition.End, offset, limit) if edgeXerr != nil { - return notifications, edgeXerr + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) } - return convertObjectsToNotifications(objects) + notifications, edgeXerr = convertObjectsToNotifications(objects) + if edgeXerr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) + } + return notifications, nil } // sendDeleteNotificationCmd sends redis command to delete a notification @@ -158,6 +234,7 @@ func sendDeleteNotificationCmd(conn redis.Conn, storedKey string, n models.Notif _ = conn.Send(ZREM, CreateKey(NotificationCollectionSender, n.Sender), storedKey) _ = conn.Send(ZREM, CreateKey(NotificationCollectionSeverity, string(n.Severity)), storedKey) _ = conn.Send(ZREM, CreateKey(NotificationCollectionStatus, string(n.Status)), storedKey) + _ = conn.Send(ZREM, CreateKey(NotificationCollectionAck, strconv.FormatBool(n.Acknowledged)), storedKey) } // deleteNotificationById deletes the notification by id and all of its associated transmissions @@ -188,6 +265,34 @@ func deleteNotificationById(conn redis.Conn, id string) errors.EdgeX { return nil } +// deleteNotificationByIds deletes the notification by id and all of its associated transmissions +func deleteNotificationByIds(conn redis.Conn, ids []string) errors.EdgeX { + notifications, edgexErr := notificationByIds(conn, ids) + if edgexErr != nil { + return errors.NewCommonEdgeXWrapper(edgexErr) + } + var transmissions []models.Transmission + for _, notification := range notifications { + trans, edgexErr := transmissionsByNotificationId(conn, 0, -1, notification.Id) + if edgexErr != nil { + return errors.NewCommonEdgeXWrapper(edgexErr) + } + transmissions = append(transmissions, trans...) + } + _ = conn.Send(MULTI) + for _, notification := range notifications { + sendDeleteNotificationCmd(conn, notificationStoredKey(notification.Id), notification) + } + for _, transmission := range transmissions { + sendDeleteTransmissionCmd(conn, transmissionStoredKey(transmission.Id), transmission) + } + _, err := conn.Do(EXEC) + if err != nil { + return errors.NewCommonEdgeX(errors.KindDatabaseError, "notification deletion failed", err) + } + return nil +} + // updateNotification updates a notification func updateNotification(conn redis.Conn, n models.Notification) errors.EdgeX { oldNotification, edgeXerr := notificationById(conn, n.Id) @@ -210,7 +315,35 @@ func updateNotification(conn redis.Conn, n models.Notification) errors.EdgeX { return nil } -func notificationsByCategoriesAndLabels(conn redis.Conn, offset int, limit int, categories []string, labels []string) (notifications []models.Notification, edgeXerr errors.EdgeX) { +// updateNotificationAckStatus bulk updates acknowledgement status +func updateNotificationAckStatus(conn redis.Conn, ack bool, notifications []models.Notification) errors.EdgeX { + _ = conn.Send(MULTI) + for _, n := range notifications { + storedKey := notificationStoredKey(n.Id) + sendDeleteNotificationCmd(conn, storedKey, n) + n.Modified = pkgCommon.MakeTimestamp() + n.Acknowledged = ack + edgexErr := sendAddNotificationCmd(conn, storedKey, n) + if edgexErr != nil { + return errors.NewCommonEdgeXWrapper(edgexErr) + } + } + _, err := conn.Do(EXEC) + if err != nil { + return errors.NewCommonEdgeX(errors.KindDatabaseError, "notification acknowledgement status update failed", err) + } + return nil +} + +func notificationsByCategoriesAndLabels(conn redis.Conn, offset int, limit int, categories []string, labels []string, ack string) (notifications []models.Notification, edgeXerr errors.EdgeX) { + cacheSetUnion := uuid.New().String() + cacheSetIntersection := uuid.New().String() + cacheSet := cacheSetUnion + defer func() { + // delete cache set + _, _ = conn.Do(DEL, cacheSetUnion, cacheSetIntersection) + }() + var redisKeys []string for _, c := range categories { redisKeys = append(redisKeys, CreateKey(NotificationCollectionCategory, c)) @@ -219,11 +352,32 @@ func notificationsByCategoriesAndLabels(conn redis.Conn, offset int, limit int, redisKeys = append(redisKeys, CreateKey(NotificationCollectionLabel, label)) } - objects, err := unionObjectsByKeys(conn, offset, limit, redisKeys...) - if err != nil { - return notifications, errors.NewCommonEdgeXWrapper(err) + args := redis.Args{} + args = args.Add(cacheSetUnion, len(redisKeys)) + for _, key := range redisKeys { + args = args.Add(key) } - return convertObjectsToNotifications(objects) + + if len(ack) > 0 { + var redisKeys []string + redisKeys = append(redisKeys, cacheSetUnion, CreateKey(NotificationCollectionAck, ack)) + args := redis.Args{} + args = args.Add(cacheSetIntersection, len(redisKeys)) + for _, key := range redisKeys { + args = args.Add(key) + } + cacheSet = cacheSetIntersection + } + + objects, edgeXerr := getObjectsByRevRange(conn, cacheSet, offset, limit) + if edgeXerr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) + } + notifications, edgeXerr = convertObjectsToNotifications(objects) + if edgeXerr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) + } + return notifications, nil } // notificationAndTransmissionStoreKeys return the store keys of the notification and transmission that are older than age. @@ -380,6 +534,22 @@ func (c *Client) DeleteProcessedNotificationsByAge(age int64) (err errors.EdgeX) return nil } +func getNotificationsByRedisKeyAndAck(conn redis.Conn, offset, limit int, ack, redisKey string) (notifications []models.Notification, edgeXerr errors.EdgeX) { + var objects [][]byte + if len(ack) > 0 { + objects, edgeXerr = intersectionObjectsByKeys(conn, offset, limit, redisKey, CreateKey(NotificationCollectionAck, ack)) + if edgeXerr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) + } + } else { + objects, edgeXerr = getObjectsByRevRange(conn, redisKey, offset, limit) + if edgeXerr != nil { + return notifications, errors.NewCommonEdgeXWrapper(edgeXerr) + } + } + return convertObjectsToNotifications(objects) +} + func latestNotificationByOffset(conn redis.Conn, offset int) (notification models.Notification, edgeXerr errors.EdgeX) { objects, err := getObjectsByRevRange(conn, NotificationCollection, offset, 1) if err != nil { diff --git a/internal/pkg/infrastructure/redis/queries.go b/internal/pkg/infrastructure/redis/queries.go index 668c532476..c6b52075f5 100644 --- a/internal/pkg/infrastructure/redis/queries.go +++ b/internal/pkg/infrastructure/redis/queries.go @@ -224,6 +224,7 @@ func getMemberNumber(conn redis.Conn, command string, key string) (uint32, error } // unionObjectsByValues returns the keys of the set resulting from the union of all the given sets. +// nolint // unused but could be useful func unionObjectsByKeys(conn redis.Conn, offset int, limit int, redisKeys ...string) ([][]byte, errors.EdgeX) { return objectsByKeys(conn, ZUNIONSTORE, offset, limit, redisKeys...) } diff --git a/internal/support/notifications/application/notification.go b/internal/support/notifications/application/notification.go index 26e15c8760..52ae2437f9 100644 --- a/internal/support/notifications/application/notification.go +++ b/internal/support/notifications/application/notification.go @@ -18,6 +18,7 @@ import ( bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v4/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v4/di" "github.com/edgexfoundry/go-mod-core-contracts/v4/dtos" + "github.com/edgexfoundry/go-mod-core-contracts/v4/dtos/requests" "github.com/edgexfoundry/go-mod-core-contracts/v4/errors" "github.com/edgexfoundry/go-mod-core-contracts/v4/models" @@ -46,14 +47,14 @@ func AddNotification(n models.Notification, ctx context.Context, dic *di.Contain return addedNotification.Id, nil } -// NotificationsByCategory queries notifications with offset, limit, and category -func NotificationsByCategory(offset, limit int, category string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { +// NotificationsByCategory queries notifications with offset, limit, ack, and category +func NotificationsByCategory(offset, limit int, ack string, category string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { if category == "" { return notifications, totalCount, errors.NewCommonEdgeX(errors.KindContractInvalid, "category is empty", nil) } dbClient := container.DBClientFrom(dic.Get) - totalCount, err = dbClient.NotificationCountByCategory(category) + totalCount, err = dbClient.NotificationCountByCategory(category, ack) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } @@ -62,25 +63,21 @@ func NotificationsByCategory(offset, limit int, category string, dic *di.Contain return []dtos.Notification{}, totalCount, err } - notificationModels, err := dbClient.NotificationsByCategory(offset, limit, category) + notificationModels, err := dbClient.NotificationsByCategory(offset, limit, ack, category) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } - notifications = make([]dtos.Notification, len(notificationModels)) - for i, n := range notificationModels { - notifications[i] = dtos.FromNotificationModelToDTO(n) - } - return notifications, totalCount, nil + return dtos.FromNotificationModelsToDTOs(notificationModels), totalCount, nil } -// NotificationsByLabel queries notifications with offset, limit, and label -func NotificationsByLabel(offset, limit int, label string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { +// NotificationsByLabel queries notifications with offset, limit, ack and label +func NotificationsByLabel(offset, limit int, ack string, label string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { if label == "" { return notifications, totalCount, errors.NewCommonEdgeX(errors.KindContractInvalid, "label is empty", nil) } dbClient := container.DBClientFrom(dic.Get) - totalCount, err = dbClient.NotificationCountByLabel(label) + totalCount, err = dbClient.NotificationCountByLabel(label, ack) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } @@ -89,15 +86,11 @@ func NotificationsByLabel(offset, limit int, label string, dic *di.Container) (n return []dtos.Notification{}, totalCount, err } - notificationModels, err := dbClient.NotificationsByLabel(offset, limit, label) + notificationModels, err := dbClient.NotificationsByLabel(offset, limit, ack, label) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } - notifications = make([]dtos.Notification, len(notificationModels)) - for i, n := range notificationModels { - notifications[i] = dtos.FromNotificationModelToDTO(n) - } - return notifications, totalCount, nil + return dtos.FromNotificationModelsToDTOs(notificationModels), totalCount, nil } // NotificationById queries notification by ID @@ -118,14 +111,14 @@ func NotificationById(id string, dic *di.Container) (notification dtos.Notificat return notification, nil } -// NotificationsByStatus queries notifications with offset, limit, and status -func NotificationsByStatus(offset, limit int, status string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { +// NotificationsByStatus queries notifications with offset, limit, ack and status +func NotificationsByStatus(offset, limit int, status, ack string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { if status == "" { return notifications, totalCount, errors.NewCommonEdgeX(errors.KindContractInvalid, "status is empty", nil) } dbClient := container.DBClientFrom(dic.Get) - totalCount, err = dbClient.NotificationCountByStatus(status) + totalCount, err = dbClient.NotificationCountByStatus(status, ack) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } @@ -134,22 +127,18 @@ func NotificationsByStatus(offset, limit int, status string, dic *di.Container) return []dtos.Notification{}, totalCount, err } - notificationModels, err := dbClient.NotificationsByStatus(offset, limit, status) + notificationModels, err := dbClient.NotificationsByStatus(offset, limit, ack, status) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } - notifications = make([]dtos.Notification, len(notificationModels)) - for i, n := range notificationModels { - notifications[i] = dtos.FromNotificationModelToDTO(n) - } - return notifications, totalCount, nil + return dtos.FromNotificationModelsToDTOs(notificationModels), totalCount, nil } // NotificationsByTimeRange query notifications with offset, limit and time range -func NotificationsByTimeRange(start int64, end int64, offset int, limit int, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { +func NotificationsByTimeRange(start int64, end int64, offset int, limit int, ack string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { dbClient := container.DBClientFrom(dic.Get) - totalCount, err = dbClient.NotificationCountByTimeRange(start, end) + totalCount, err = dbClient.NotificationCountByTimeRange(start, end, ack) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } @@ -158,15 +147,11 @@ func NotificationsByTimeRange(start int64, end int64, offset int, limit int, dic return []dtos.Notification{}, totalCount, err } - notificationModels, err := dbClient.NotificationsByTimeRange(start, end, offset, limit) + notificationModels, err := dbClient.NotificationsByTimeRange(start, end, offset, limit, ack) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } - notifications = make([]dtos.Notification, len(notificationModels)) - for i, n := range notificationModels { - notifications[i] = dtos.FromNotificationModelToDTO(n) - } - return notifications, totalCount, nil + return dtos.FromNotificationModelsToDTOs(notificationModels), totalCount, nil } // DeleteNotificationById deletes the notification by id and all of its associated transmissions @@ -186,8 +171,18 @@ func DeleteNotificationById(id string, dic *di.Container) errors.EdgeX { return nil } +// DeleteNotificationByIds deletes the notifications by ids and all of their associated transmissions +func DeleteNotificationByIds(ids []string, dic *di.Container) errors.EdgeX { + dbClient := container.DBClientFrom(dic.Get) + err := dbClient.DeleteNotificationByIds(ids) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + return nil +} + // NotificationsBySubscriptionName queries notifications by offset, limit and subscriptionName -func NotificationsBySubscriptionName(offset, limit int, subscriptionName string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { +func NotificationsBySubscriptionName(offset, limit int, subscriptionName, ack string, dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { if subscriptionName == "" { return notifications, totalCount, errors.NewCommonEdgeX(errors.KindContractInvalid, "subscriptionName is empty", nil) } @@ -197,7 +192,7 @@ func NotificationsBySubscriptionName(offset, limit int, subscriptionName string, if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } - totalCount, err = dbClient.NotificationCountByCategoriesAndLabels(subscription.Categories, subscription.Labels) + totalCount, err = dbClient.NotificationCountByCategoriesAndLabels(subscription.Categories, subscription.Labels, ack) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } @@ -206,15 +201,11 @@ func NotificationsBySubscriptionName(offset, limit int, subscriptionName string, return []dtos.Notification{}, totalCount, err } - notificationModels, err := dbClient.NotificationsByCategoriesAndLabels(offset, limit, subscription.Categories, subscription.Labels) + notificationModels, err := dbClient.NotificationsByCategoriesAndLabels(offset, limit, subscription.Categories, subscription.Labels, ack) if err != nil { return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) } - notifications = make([]dtos.Notification, len(notificationModels)) - for i, n := range notificationModels { - notifications[i] = dtos.FromNotificationModelToDTO(n) - } - return notifications, totalCount, nil + return dtos.FromNotificationModelsToDTOs(notificationModels), totalCount, nil } // CleanupNotificationsByAge invokes the infrastructure layer function to remove notifications that are older than age. And the corresponding transmissions will also be deleted @@ -241,6 +232,36 @@ func DeleteProcessedNotificationsByAge(age int64, dic *di.Container) errors.Edge return nil } +// NotificationByQueryConditions queries notifications with offset, limit, ack, categories, and time range +func NotificationByQueryConditions(offset, limit int, ack string, conditions requests.NotificationQueryCondition, + dic *di.Container) (notifications []dtos.Notification, totalCount uint32, err errors.EdgeX) { + dbClient := container.DBClientFrom(dic.Get) + + totalCount, err = dbClient.NotificationCountByQueryConditions(conditions, ack) + if err != nil { + return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) + } + cont, err := utils.CheckCountRange(totalCount, offset, limit) + if !cont { + return []dtos.Notification{}, totalCount, err + } + + notificationModels, err := dbClient.NotificationsByQueryConditions(offset, limit, conditions, ack) + if err != nil { + return notifications, totalCount, errors.NewCommonEdgeXWrapper(err) + } + return dtos.FromNotificationModelsToDTOs(notificationModels), totalCount, nil +} + +func UpdateNotificationAckStatus(ack bool, ids []string, dic *di.Container) errors.EdgeX { + dbClient := container.DBClientFrom(dic.Get) + err := dbClient.UpdateNotificationAckStatusByIds(ack, ids) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + return nil +} + // AsyncPurgeNotification purge notifications and related transmissions according to the retention capability. func AsyncPurgeNotification(interval time.Duration, ctx context.Context, dic *di.Container) { asyncPurgeNotificationOnce.Do(func() { diff --git a/internal/support/notifications/controller/http/notification.go b/internal/support/notifications/controller/http/notification.go index 9606798492..4366cab7ac 100644 --- a/internal/support/notifications/controller/http/notification.go +++ b/internal/support/notifications/controller/http/notification.go @@ -6,11 +6,14 @@ package http import ( + "fmt" + "io" "math" "net/http" "strconv" + "strings" - "github.com/edgexfoundry/edgex-go/internal/io" + edgexIO "github.com/edgexfoundry/edgex-go/internal/io" "github.com/edgexfoundry/edgex-go/internal/pkg" "github.com/edgexfoundry/edgex-go/internal/pkg/correlation" "github.com/edgexfoundry/edgex-go/internal/pkg/utils" @@ -29,15 +32,19 @@ import ( "github.com/labstack/echo/v4" ) +const ( + defaultEnd = int64(7289539200000) // December 31st 2200, 12:00:00 +) + type NotificationController struct { - reader io.DtoReader + reader edgexIO.DtoReader dic *di.Container } // NewNotificationController creates and initializes an NotificationController func NewNotificationController(dic *di.Container) *NotificationController { return &NotificationController{ - reader: io.NewJsonDtoReader(), + reader: edgexIO.NewJsonDtoReader(), dic: dic, } } @@ -120,7 +127,11 @@ func (nc *NotificationController) NotificationsByCategory(c echo.Context) error if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } - notifications, totalCount, err := application.NotificationsByCategory(offset, limit, category, nc.dic) + ack, err := parseAckStatusQueryString(r) + if err != nil { + return utils.WriteErrorResponse(w, ctx, lc, err, "") + } + notifications, totalCount, err := application.NotificationsByCategory(offset, limit, ack, category, nc.dic) if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } @@ -144,7 +155,11 @@ func (nc *NotificationController) NotificationsByLabel(c echo.Context) error { if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } - notifications, totalCount, err := application.NotificationsByLabel(offset, limit, label, nc.dic) + ack, err := parseAckStatusQueryString(r) + if err != nil { + return utils.WriteErrorResponse(w, ctx, lc, err, "") + } + notifications, totalCount, err := application.NotificationsByLabel(offset, limit, ack, label, nc.dic) if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } @@ -168,7 +183,11 @@ func (nc *NotificationController) NotificationsByStatus(c echo.Context) error { if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } - notifications, totalCount, err := application.NotificationsByStatus(offset, limit, status, nc.dic) + ack, err := parseAckStatusQueryString(r) + if err != nil { + return utils.WriteErrorResponse(w, ctx, lc, err, "") + } + notifications, totalCount, err := application.NotificationsByStatus(offset, limit, status, ack, nc.dic) if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } @@ -190,7 +209,11 @@ func (nc *NotificationController) NotificationsByTimeRange(c echo.Context) error if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } - notifications, totalCount, err := application.NotificationsByTimeRange(start, end, offset, limit, nc.dic) + ack, err := parseAckStatusQueryString(r) + if err != nil { + return utils.WriteErrorResponse(w, ctx, lc, err, "") + } + notifications, totalCount, err := application.NotificationsByTimeRange(start, end, offset, limit, ack, nc.dic) if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } @@ -220,6 +243,30 @@ func (nc *NotificationController) DeleteNotificationById(c echo.Context) error { return pkg.EncodeAndWriteResponse(response, w, lc) } +// DeleteNotificationByIds deletes the notifications by ids and all of their associated transmissions +func (nc *NotificationController) DeleteNotificationByIds(c echo.Context) error { + lc := container.LoggingClientFrom(nc.dic.Get) + r := c.Request() + w := c.Response() + ctx := r.Context() + + // URL parameters + idsStr := c.Param(common.Ids) + if len(idsStr) == 0 { + return utils.WriteErrorResponse(w, ctx, lc, errors.NewCommonEdgeX(errors.KindContractInvalid, "ids is empty", nil), "") + } + + ids := strings.Split(idsStr, common.CommaSeparator) + err := application.DeleteNotificationByIds(ids, nc.dic) + if err != nil { + return utils.WriteErrorResponse(w, ctx, lc, err, "") + } + + response := commonDTO.NewBaseResponse("", "", http.StatusOK) + utils.WriteHttpHeader(w, ctx, http.StatusOK) + return pkg.EncodeAndWriteResponse(response, w, lc) +} + // NotificationsBySubscriptionName queries notifications by offset, limit and subscriptionName func (nc *NotificationController) NotificationsBySubscriptionName(c echo.Context) error { lc := container.LoggingClientFrom(nc.dic.Get) @@ -235,7 +282,11 @@ func (nc *NotificationController) NotificationsBySubscriptionName(c echo.Context if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } - notifications, totalCount, err := application.NotificationsBySubscriptionName(offset, limit, subscriptionName, nc.dic) + ack, err := parseAckStatusQueryString(r) + if err != nil { + return utils.WriteErrorResponse(w, ctx, lc, err, "") + } + notifications, totalCount, err := application.NotificationsBySubscriptionName(offset, limit, subscriptionName, ack, nc.dic) if err != nil { return utils.WriteErrorResponse(w, ctx, lc, err, "") } @@ -310,3 +361,81 @@ func (nc *NotificationController) DeleteProcessedNotificationsByAge(c echo.Conte // encode and send out the response return pkg.EncodeAndWriteResponse(response, w, lc) } + +func (nc *NotificationController) NotificationsByQueryConditions(c echo.Context) error { + lc := container.LoggingClientFrom(nc.dic.Get) + r := c.Request() + w := c.Response() + ctx := r.Context() + + var reqDTO requestDTO.GetNotificationRequest + edgexErr := nc.reader.Read(r.Body, &reqDTO) + if edgexErr != nil { + if strings.Contains(edgexErr.Error(), io.EOF.Error()) { + edgexErr = errors.NewCommonEdgeX(errors.KindContractInvalid, "invalid request body", edgexErr) + } + return utils.WriteErrorResponse(w, ctx, lc, edgexErr, "") + } + + if reqDTO.QueryCondition.End <= reqDTO.QueryCondition.Start { + lc.Warnf(fmt.Sprintf("QueryCondition.End %d is not allowed to be less than QueryCondition.Start %d. "+ + "Use default value %d for QueryCondition.End.", reqDTO.QueryCondition.End, reqDTO.QueryCondition.Start, defaultEnd)) + reqDTO.QueryCondition.End = defaultEnd + } + + // parse URL query string for offset, limit, and ack + config := notificationContainer.ConfigurationFrom(nc.dic.Get) + offset, limit, _, edgexErr := utils.ParseGetAllObjectsRequestQueryString(c, 0, math.MaxInt32, -1, config.Service.MaxResultCount) + if edgexErr != nil { + return utils.WriteErrorResponse(w, ctx, lc, edgexErr, "") + } + ack, err := parseAckStatusQueryString(r) + if err != nil { + return utils.WriteErrorResponse(w, ctx, lc, err, "") + } + + notifications, totalCount, edgexErr := application.NotificationByQueryConditions(offset, limit, ack, reqDTO.QueryCondition, nc.dic) + if edgexErr != nil { + return utils.WriteErrorResponse(w, ctx, lc, edgexErr, "") + } + + response := responseDTO.NewMultiNotificationsResponse("", "", http.StatusOK, totalCount, notifications) + utils.WriteHttpHeader(w, ctx, http.StatusOK) + return pkg.EncodeAndWriteResponse(response, w, lc) +} + +func (nc *NotificationController) AcknowledgeNotificationByIds(c echo.Context) error { + return nc.updateAckByIds(c, true) +} + +func (nc *NotificationController) UnacknowledgeNotificationByIds(c echo.Context) error { + return nc.updateAckByIds(c, false) +} + +func (nc *NotificationController) updateAckByIds(c echo.Context, ack bool) error { + lc := container.LoggingClientFrom(nc.dic.Get) + r := c.Request() + w := c.Response() + ctx := r.Context() + idsStr := c.Param(common.Ids) + if len(idsStr) == 0 { + return utils.WriteErrorResponse(w, ctx, lc, errors.NewCommonEdgeX(errors.KindContractInvalid, "ids is empty", nil), "") + } + ids := strings.Split(idsStr, common.CommaSeparator) + err := application.UpdateNotificationAckStatus(ack, ids, nc.dic) + if err != nil { + return utils.WriteErrorResponse(w, ctx, lc, err, "") + } + response := commonDTO.NewBaseResponse("", "", http.StatusOK) + utils.WriteHttpHeader(w, ctx, http.StatusOK) + return pkg.EncodeAndWriteResponse(response, w, lc) +} + +// parseAckStatusQueryString parses ack from the query parameters and check if the value is valid. +func parseAckStatusQueryString(r *http.Request) (ack string, err errors.EdgeX) { + ack = utils.ParseQueryStringToString(r, common.Ack, "") + if len(ack) > 0 && ack != common.ValueTrue && ack != common.ValueFalse { + err = errors.NewCommonEdgeX(errors.KindContractInvalid, "invalid ack value", nil) + } + return +} diff --git a/internal/support/notifications/controller/http/notification_test.go b/internal/support/notifications/controller/http/notification_test.go index c7350d558a..3e328a9a6f 100644 --- a/internal/support/notifications/controller/http/notification_test.go +++ b/internal/support/notifications/controller/http/notification_test.go @@ -8,6 +8,8 @@ package http import ( "encoding/json" "fmt" + "github.com/stretchr/testify/mock" + "io" "net/http" "net/http/httptest" "strings" @@ -222,9 +224,9 @@ func TestNotificationsByCategory(t *testing.T) { expectedNotificationCount := uint32(0) dic := mockDic() dbClientMock := &dbMock.DBClient{} - dbClientMock.On("NotificationCountByCategory", testCategory).Return(expectedNotificationCount, nil) - dbClientMock.On("NotificationsByCategory", 0, 20, testCategory).Return([]models.Notification{}, nil) - dbClientMock.On("NotificationsByCategory", 0, 1, testCategory).Return([]models.Notification{}, nil) + dbClientMock.On("NotificationCountByCategory", testCategory, "").Return(expectedNotificationCount, nil) + dbClientMock.On("NotificationsByCategory", 0, 20, "", testCategory).Return([]models.Notification{}, nil) + dbClientMock.On("NotificationsByCategory", 0, 1, "", testCategory).Return([]models.Notification{}, nil) dic.Update(di.ServiceConstructorMap{ container.DBClientInterfaceName: func(get di.Get) interface{} { return dbClientMock @@ -297,9 +299,9 @@ func TestNotificationsByLabel(t *testing.T) { expectedNotificationCount := uint32(0) dic := mockDic() dbClientMock := &dbMock.DBClient{} - dbClientMock.On("NotificationCountByLabel", testLabel).Return(expectedNotificationCount, nil) - dbClientMock.On("NotificationsByLabel", 0, 20, testLabel).Return([]models.Notification{}, nil) - dbClientMock.On("NotificationsByLabel", 0, 1, testLabel).Return([]models.Notification{}, nil) + dbClientMock.On("NotificationCountByLabel", testLabel, "").Return(expectedNotificationCount, nil) + dbClientMock.On("NotificationsByLabel", 0, 20, "", testLabel).Return([]models.Notification{}, nil) + dbClientMock.On("NotificationsByLabel", 0, 1, "", testLabel).Return([]models.Notification{}, nil) dic.Update(di.ServiceConstructorMap{ container.DBClientInterfaceName: func(get di.Get) interface{} { return dbClientMock @@ -372,9 +374,9 @@ func TestNotificationsByStatus(t *testing.T) { expectedNotificationCount := uint32(0) dic := mockDic() dbClientMock := &dbMock.DBClient{} - dbClientMock.On("NotificationCountByStatus", testStatus).Return(expectedNotificationCount, nil) - dbClientMock.On("NotificationsByStatus", 0, 20, testStatus).Return([]models.Notification{}, nil) - dbClientMock.On("NotificationsByStatus", 0, 1, testStatus).Return([]models.Notification{}, nil) + dbClientMock.On("NotificationCountByStatus", testStatus, "").Return(expectedNotificationCount, nil) + dbClientMock.On("NotificationsByStatus", 0, 20, "", testStatus).Return([]models.Notification{}, nil) + dbClientMock.On("NotificationsByStatus", 0, 1, "", testStatus).Return([]models.Notification{}, nil) dic.Update(di.ServiceConstructorMap{ container.DBClientInterfaceName: func(get di.Get) interface{} { return dbClientMock @@ -446,8 +448,8 @@ func TestNotificationsByTimeRange(t *testing.T) { expectedNotificationCount := uint32(0) dic := mockDic() dbClientMock := &dbMock.DBClient{} - dbClientMock.On("NotificationCountByTimeRange", int64(0), int64(100)).Return(expectedNotificationCount, nil) - dbClientMock.On("NotificationsByTimeRange", int64(0), int64(100), 0, 10).Return([]models.Notification{}, nil) + dbClientMock.On("NotificationCountByTimeRange", int64(0), int64(100), "").Return(expectedNotificationCount, nil) + dbClientMock.On("NotificationsByTimeRange", int64(0), int64(100), 0, 10, "").Return([]models.Notification{}, nil) dic.Update(di.ServiceConstructorMap{ container.DBClientInterfaceName: func(get di.Get) interface{} { return dbClientMock @@ -510,7 +512,6 @@ func TestNotificationsByTimeRange(t *testing.T) { assert.Equal(t, common.ApiVersion, res.ApiVersion, "API Version not as expected") assert.Equal(t, testCase.expectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") assert.Equal(t, testCase.expectedStatusCode, int(res.StatusCode), "Response status code not as expected") - assert.Equal(t, testCase.expectedCount, len(res.Notifications), "Device count not as expected") assert.Equal(t, testCase.expectedTotalCount, res.TotalCount, "Response total count not as expected") assert.Empty(t, res.Message, "Message should be empty when it is successful") } @@ -587,9 +588,9 @@ func TestNotificationsBySubscriptionName(t *testing.T) { dic := mockDic() dbClientMock := &dbMock.DBClient{} dbClientMock.On("SubscriptionByName", subscription.Name).Return(subscription, nil) - dbClientMock.On("NotificationCountByCategoriesAndLabels", subscription.Categories, subscription.Labels).Return(expectedNotificationCount, nil) - dbClientMock.On("NotificationsByCategoriesAndLabels", 0, 20, subscription.Categories, subscription.Labels).Return([]models.Notification{}, nil) - dbClientMock.On("NotificationsByCategoriesAndLabels", 0, 1, subscription.Categories, subscription.Labels).Return([]models.Notification{}, nil) + dbClientMock.On("NotificationCountByCategoriesAndLabels", subscription.Categories, subscription.Labels, "").Return(expectedNotificationCount, nil) + dbClientMock.On("NotificationsByCategoriesAndLabels", 0, 20, subscription.Categories, subscription.Labels, "").Return([]models.Notification{}, uint32(0), nil) + dbClientMock.On("NotificationsByCategoriesAndLabels", 0, 1, subscription.Categories, subscription.Labels, "").Return([]models.Notification{}, uint32(0), nil) dic.Update(di.ServiceConstructorMap{ container.DBClientInterfaceName: func(get di.Get) interface{} { return dbClientMock @@ -826,3 +827,233 @@ func TestDeleteNotificationByAge(t *testing.T) { }) } } + +func TestNotificationsByQueryConditions(t *testing.T) { + start := int64(1556985370116) + end := int64(1756985370116) + expectedTotalCount := uint32(10) + queryCondition := requests.NotificationQueryCondition{Category: []string{"test"}, Start: start, End: end} + queryConditionNoCategory := requests.NotificationQueryCondition{Category: []string{}, Start: start, End: end} + queryConditionWithoutCategoryField := requests.NotificationQueryCondition{Start: start, End: end} + queryConditionWithoutStartField := requests.NotificationQueryCondition{Category: []string{"test"}, End: end} + queryConditionWithoutEndField := requests.NotificationQueryCondition{Category: []string{"test"}, Start: start} + queryConditionDbError := requests.NotificationQueryCondition{Category: []string{"dbError"}, Start: start, End: end} + var notifications []models.Notification + methodNotificationsByQueryConditions := "NotificationsByQueryConditions" + dic := mockDic() + dbClientMock := &dbMock.DBClient{} + dbClientMock.On(methodNotificationsByQueryConditions, 0, 20, queryCondition, "").Return(notifications, nil) + dbClientMock.On(methodNotificationsByQueryConditions, 0, 20, queryConditionNoCategory, "").Return(notifications, nil) + dbClientMock.On(methodNotificationsByQueryConditions, 0, 20, queryConditionWithoutCategoryField, "").Return( + notifications, nil) + dbClientMock.On(methodNotificationsByQueryConditions, 0, 20, queryConditionWithoutStartField, "").Return( + notifications, nil) + dbClientMockDefaultEndValue := queryConditionWithoutEndField + dbClientMockDefaultEndValue.End = defaultEnd + dbClientMock.On(methodNotificationsByQueryConditions, 0, 20, dbClientMockDefaultEndValue, "").Return( + notifications, nil) + dbClientMock.On(methodNotificationsByQueryConditions, 0, 20, queryConditionDbError, "").Return([]models.Notification{}, + errors.NewCommonEdgeX(errors.KindDatabaseError, "DB error", nil)) + dbClientMock.On("NotificationCountByQueryConditions", mock.Anything, "").Return(expectedTotalCount, nil) + dic.Update(di.ServiceConstructorMap{ + container.DBClientInterfaceName: func(get di.Get) interface{} { + return dbClientMock + }, + }) + nc := NewNotificationController(dic) + assert.NotNil(t, nc) + tests := []struct { + name string + offset string + limit string + condition *requests.NotificationQueryCondition + errorExpected bool + expectedTotalCount uint32 + expectedStatusCode int + }{ + {"Valid", "0", "20", &queryCondition, false, expectedTotalCount, http.StatusOK}, + {"Valid - category no specified", "0", "20", &queryConditionNoCategory, false, expectedTotalCount, http.StatusOK}, + {"Valid - without category field", "0", "20", &queryConditionWithoutCategoryField, false, expectedTotalCount, http.StatusOK}, + {"Valid - without start field", "0", "20", &queryConditionWithoutStartField, false, expectedTotalCount, http.StatusOK}, + {"Valid - without end field", "0", "20", &queryConditionWithoutEndField, false, expectedTotalCount, http.StatusOK}, + {"Invalid - empty request body", "0", "20", nil, true, 0, http.StatusBadRequest}, + {"Invalid - offset with unparsable format", "aaa", "20", &queryCondition, true, 0, http.StatusBadRequest}, + {"Invalid - limit with unparsable format", "0", "aaa", &queryCondition, true, 0, http.StatusBadRequest}, + {"Invalid - database error", "0", "20", &queryConditionDbError, true, 0, http.StatusInternalServerError}, + } + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + var reader io.Reader + if testCase.condition != nil { + reqDTO := requests.GetNotificationRequest{} + reqDTO.ApiVersion = common.ApiVersion + reqDTO.QueryCondition = *testCase.condition + byteData, err := json.Marshal(reqDTO) + require.NoError(t, err) + reader = strings.NewReader(string(byteData)) + } else { + reader = http.NoBody + } + + req, err := http.NewRequest(http.MethodGet, common.ApiNotificationRoute, reader) + req.Header.Set(common.ContentType, common.ContentTypeJSON) + require.NoError(t, err) + query := req.URL.Query() + query.Add(common.Offset, testCase.offset) + query.Add(common.Limit, testCase.limit) + req.URL.RawQuery = query.Encode() + + // Act + recorder := httptest.NewRecorder() + c := echo.New().NewContext(req, recorder) + err = nc.NotificationsByQueryConditions(c) + require.NoError(t, err) + + // Assert + if testCase.errorExpected { + var res commonDTO.BaseResponse + err = json.Unmarshal(recorder.Body.Bytes(), &res) + require.NoError(t, err) + assert.Equal(t, common.ApiVersion, res.ApiVersion, "API Version not as expected") + assert.Equal(t, testCase.expectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") + assert.Equal(t, testCase.expectedStatusCode, res.StatusCode, "Response status code not as expected") + assert.NotEmpty(t, res.Message, "Response message doesn't contain the error message") + } else { + var res responseDTO.MultiNotificationsResponse + err = json.Unmarshal(recorder.Body.Bytes(), &res) + require.NoError(t, err) + assert.Equal(t, common.ApiVersion, res.ApiVersion, "API Version not as expected") + assert.Equal(t, testCase.expectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") + assert.Equal(t, testCase.expectedStatusCode, res.StatusCode, "Response status code not as expected") + assert.Empty(t, res.Message, "Message should be empty when it is successful") + assert.Equal(t, testCase.expectedTotalCount, res.TotalCount, "Total count not as expected") + } + }) + + } +} + +func TestDeleteNotificationByIds(t *testing.T) { + ids := []string{"1793b2b5-1873-44da-9dbc-73a8bcb1f567", "f76e4602-419d-4a90-a7e4-4110c781eb0e"} + var noId []string + dbError := []string{"dbError"} + + dic := mockDic() + dbClientMock := &dbMock.DBClient{} + dbClientMock.On("DeleteNotificationByIds", ids).Return(nil) + dbClientMock.On("DeleteNotificationByIds", dbError).Return(errors.NewCommonEdgeX( + errors.KindDatabaseError, "DB error", nil)) + dic.Update(di.ServiceConstructorMap{ + container.DBClientInterfaceName: func(get di.Get) interface{} { + return dbClientMock + }, + }) + + controller := NewNotificationController(dic) + require.NotNil(t, controller) + + tests := []struct { + name string + ids []string + expectedStatusCode int + }{ + {"Valid - delete notification by ids", ids, http.StatusOK}, + {"Invalid - ids parameter is empty", noId, http.StatusBadRequest}, + {"Invalid - database error", dbError, http.StatusInternalServerError}, + } + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + reqPath := fmt.Sprintf("%s/%s", common.ApiNotificationByIdsRoute, testCase.ids) + req, err := http.NewRequest(http.MethodDelete, reqPath, http.NoBody) + require.NoError(t, err) + + // Act + recorder := httptest.NewRecorder() + c := echo.New().NewContext(req, recorder) + c.SetParamNames(common.Ids) + c.SetParamValues(strings.Join(testCase.ids[:], common.CommaSeparator)) + err = controller.DeleteNotificationByIds(c) + require.NoError(t, err) + + var res commonDTO.BaseResponse + err = json.Unmarshal(recorder.Body.Bytes(), &res) + require.NoError(t, err) + + // Assert + assert.Equal(t, common.ApiVersion, res.ApiVersion, "API Version not as expected") + assert.Equal(t, testCase.expectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") + assert.Equal(t, testCase.expectedStatusCode, int(res.StatusCode), "Response status code not as expected") + if testCase.expectedStatusCode == http.StatusOK { + assert.Empty(t, res.Message, "Message should be empty when it is successful") + } else { + assert.NotEmpty(t, res.Message, "Response message doesn't contain the error message") + } + }) + } +} + +func TestUpdateNotificationAckStatusByIds(t *testing.T) { + ids := []string{"1793b2b5-1873-44da-9dbc-73a8bcb1f567", "f76e4602-419d-4a90-a7e4-4110c781eb0e"} + var noId []string + dbError := []string{"dbError"} + + dic := mockDic() + dbClientMock := &dbMock.DBClient{} + dbClientMock.On("UpdateNotificationAckStatusByIds", true, ids).Return(nil) + dbClientMock.On("UpdateNotificationAckStatusByIds", true, dbError).Return(errors.NewCommonEdgeX( + errors.KindDatabaseError, "DB error", nil)) + dbClientMock.On("UpdateNotificationAckStatusByIds", false, ids).Return(nil) + dbClientMock.On("UpdateNotificationAckStatusByIds", false, dbError).Return(errors.NewCommonEdgeX( + errors.KindDatabaseError, "DB error", nil)) + dic.Update(di.ServiceConstructorMap{ + container.DBClientInterfaceName: func(get di.Get) interface{} { + return dbClientMock + }, + }) + + controller := NewNotificationController(dic) + require.NotNil(t, controller) + + tests := []struct { + name string + controller func(c echo.Context) error + ids []string + expectedStatusCode int + }{ + {"Valid - acknowledge succeeded", controller.AcknowledgeNotificationByIds, ids, http.StatusOK}, + {"Invalid - ids parameter is empty", controller.AcknowledgeNotificationByIds, noId, http.StatusBadRequest}, + {"Invalid - database error", controller.AcknowledgeNotificationByIds, dbError, http.StatusInternalServerError}, + {"Valid - unacknowledge succeeded", controller.UnacknowledgeNotificationByIds, ids, http.StatusOK}, + {"Invalid - ids parameter is empty", controller.UnacknowledgeNotificationByIds, noId, http.StatusBadRequest}, + {"Invalid - database error", controller.UnacknowledgeNotificationByIds, dbError, http.StatusInternalServerError}, + } + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + reqPath := fmt.Sprintf("%s/%s", common.ApiNotificationAcknowledgeByIdsRoute, testCase.ids) + req, err := http.NewRequest(http.MethodPut, reqPath, http.NoBody) + require.NoError(t, err) + + // Act + recorder := httptest.NewRecorder() + c := echo.New().NewContext(req, recorder) + c.SetParamNames(common.Ids) + c.SetParamValues(strings.Join(testCase.ids[:], common.CommaSeparator)) + err = testCase.controller(c) + require.NoError(t, err) + + var res commonDTO.BaseResponse + err = json.Unmarshal(recorder.Body.Bytes(), &res) + require.NoError(t, err) + + // Assert + assert.Equal(t, common.ApiVersion, res.ApiVersion, "API Version not as expected") + assert.Equal(t, testCase.expectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") + assert.Equal(t, testCase.expectedStatusCode, int(res.StatusCode), "Response status code not as expected") + if testCase.expectedStatusCode == http.StatusOK { + assert.Empty(t, res.Message, "Message should be empty when it is successful") + } else { + assert.NotEmpty(t, res.Message, "Response message doesn't contain the error message") + } + }) + } +} diff --git a/internal/support/notifications/infrastructure/interfaces/db.go b/internal/support/notifications/infrastructure/interfaces/db.go index 8b177947dc..f113ecd56b 100644 --- a/internal/support/notifications/infrastructure/interfaces/db.go +++ b/internal/support/notifications/infrastructure/interfaces/db.go @@ -6,6 +6,7 @@ package interfaces import ( + "github.com/edgexfoundry/go-mod-core-contracts/v4/dtos/requests" "github.com/edgexfoundry/go-mod-core-contracts/v4/errors" "github.com/edgexfoundry/go-mod-core-contracts/v4/models" ) @@ -30,20 +31,24 @@ type DBClient interface { AddNotification(n models.Notification) (models.Notification, errors.EdgeX) NotificationById(id string) (models.Notification, errors.EdgeX) - NotificationsByCategory(offset, limit int, category string) ([]models.Notification, errors.EdgeX) - NotificationsByLabel(offset, limit int, label string) ([]models.Notification, errors.EdgeX) - NotificationsByStatus(offset, limit int, status string) ([]models.Notification, errors.EdgeX) - NotificationsByTimeRange(start int64, end int64, offset int, limit int) ([]models.Notification, errors.EdgeX) + NotificationsByCategory(offset, limit int, ack, category string) ([]models.Notification, errors.EdgeX) + NotificationsByLabel(offset, limit int, ack, label string) ([]models.Notification, errors.EdgeX) + NotificationsByStatus(offset, limit int, ack, status string) ([]models.Notification, errors.EdgeX) + NotificationsByTimeRange(start int64, end int64, offset int, limit int, ack string) ([]models.Notification, errors.EdgeX) + NotificationsByQueryConditions(offset, limit int, condition requests.NotificationQueryCondition, ack string) ([]models.Notification, errors.EdgeX) DeleteNotificationById(id string) errors.EdgeX - NotificationsByCategoriesAndLabels(offset, limit int, categories []string, labels []string) ([]models.Notification, errors.EdgeX) + DeleteNotificationByIds(ids []string) errors.EdgeX + NotificationsByCategoriesAndLabels(offset, limit int, categories []string, labels []string, ack string) ([]models.Notification, errors.EdgeX) UpdateNotification(s models.Notification) errors.EdgeX + UpdateNotificationAckStatusByIds(ack bool, ids []string) errors.EdgeX CleanupNotificationsByAge(age int64) errors.EdgeX DeleteProcessedNotificationsByAge(age int64) errors.EdgeX - NotificationCountByCategory(category string) (uint32, errors.EdgeX) - NotificationCountByLabel(label string) (uint32, errors.EdgeX) - NotificationCountByStatus(status string) (uint32, errors.EdgeX) - NotificationCountByTimeRange(start int64, end int64) (uint32, errors.EdgeX) - NotificationCountByCategoriesAndLabels(categories []string, labels []string) (uint32, errors.EdgeX) + NotificationCountByCategory(category string, ack string) (uint32, errors.EdgeX) + NotificationCountByLabel(label string, ack string) (uint32, errors.EdgeX) + NotificationCountByStatus(status string, ack string) (uint32, errors.EdgeX) + NotificationCountByTimeRange(start int64, end int64, ack string) (uint32, errors.EdgeX) + NotificationCountByCategoriesAndLabels(categories []string, labels []string, ack string) (uint32, errors.EdgeX) + NotificationCountByQueryConditions(condition requests.NotificationQueryCondition, ack string) (uint32, errors.EdgeX) NotificationTotalCount() (uint32, errors.EdgeX) LatestNotificationByOffset(offset uint32) (models.Notification, errors.EdgeX) diff --git a/internal/support/notifications/infrastructure/interfaces/mocks/DBClient.go b/internal/support/notifications/infrastructure/interfaces/mocks/DBClient.go index 6290a9d1eb..edf5f2fa36 100644 --- a/internal/support/notifications/infrastructure/interfaces/mocks/DBClient.go +++ b/internal/support/notifications/infrastructure/interfaces/mocks/DBClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.43.0. DO NOT EDIT. +// Code generated by mockery v2.45.0. DO NOT EDIT. package mocks @@ -8,6 +8,8 @@ import ( mock "github.com/stretchr/testify/mock" models "github.com/edgexfoundry/go-mod-core-contracts/v4/models" + + requests "github.com/edgexfoundry/go-mod-core-contracts/v4/dtos/requests" ) // DBClient is an autogenerated mock type for the DBClient type @@ -214,6 +216,26 @@ func (_m *DBClient) DeleteNotificationById(id string) errors.EdgeX { return r0 } +// DeleteNotificationByIds provides a mock function with given fields: ids +func (_m *DBClient) DeleteNotificationByIds(ids []string) errors.EdgeX { + ret := _m.Called(ids) + + if len(ret) == 0 { + panic("no return value specified for DeleteNotificationByIds") + } + + var r0 errors.EdgeX + if rf, ok := ret.Get(0).(func([]string) errors.EdgeX); ok { + r0 = rf(ids) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(errors.EdgeX) + } + } + + return r0 +} + // DeleteProcessedNotificationsByAge provides a mock function with given fields: age func (_m *DBClient) DeleteProcessedNotificationsByAge(age int64) errors.EdgeX { ret := _m.Called(age) @@ -334,9 +356,9 @@ func (_m *DBClient) NotificationById(id string) (models.Notification, errors.Edg return r0, r1 } -// NotificationCountByCategoriesAndLabels provides a mock function with given fields: categories, labels -func (_m *DBClient) NotificationCountByCategoriesAndLabels(categories []string, labels []string) (uint32, errors.EdgeX) { - ret := _m.Called(categories, labels) +// NotificationCountByCategoriesAndLabels provides a mock function with given fields: categories, labels, ack +func (_m *DBClient) NotificationCountByCategoriesAndLabels(categories []string, labels []string, ack string) (uint32, errors.EdgeX) { + ret := _m.Called(categories, labels, ack) if len(ret) == 0 { panic("no return value specified for NotificationCountByCategoriesAndLabels") @@ -344,17 +366,17 @@ func (_m *DBClient) NotificationCountByCategoriesAndLabels(categories []string, var r0 uint32 var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func([]string, []string) (uint32, errors.EdgeX)); ok { - return rf(categories, labels) + if rf, ok := ret.Get(0).(func([]string, []string, string) (uint32, errors.EdgeX)); ok { + return rf(categories, labels, ack) } - if rf, ok := ret.Get(0).(func([]string, []string) uint32); ok { - r0 = rf(categories, labels) + if rf, ok := ret.Get(0).(func([]string, []string, string) uint32); ok { + r0 = rf(categories, labels, ack) } else { r0 = ret.Get(0).(uint32) } - if rf, ok := ret.Get(1).(func([]string, []string) errors.EdgeX); ok { - r1 = rf(categories, labels) + if rf, ok := ret.Get(1).(func([]string, []string, string) errors.EdgeX); ok { + r1 = rf(categories, labels, ack) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -364,9 +386,9 @@ func (_m *DBClient) NotificationCountByCategoriesAndLabels(categories []string, return r0, r1 } -// NotificationCountByCategory provides a mock function with given fields: category -func (_m *DBClient) NotificationCountByCategory(category string) (uint32, errors.EdgeX) { - ret := _m.Called(category) +// NotificationCountByCategory provides a mock function with given fields: category, ack +func (_m *DBClient) NotificationCountByCategory(category string, ack string) (uint32, errors.EdgeX) { + ret := _m.Called(category, ack) if len(ret) == 0 { panic("no return value specified for NotificationCountByCategory") @@ -374,17 +396,17 @@ func (_m *DBClient) NotificationCountByCategory(category string) (uint32, errors var r0 uint32 var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(string) (uint32, errors.EdgeX)); ok { - return rf(category) + if rf, ok := ret.Get(0).(func(string, string) (uint32, errors.EdgeX)); ok { + return rf(category, ack) } - if rf, ok := ret.Get(0).(func(string) uint32); ok { - r0 = rf(category) + if rf, ok := ret.Get(0).(func(string, string) uint32); ok { + r0 = rf(category, ack) } else { r0 = ret.Get(0).(uint32) } - if rf, ok := ret.Get(1).(func(string) errors.EdgeX); ok { - r1 = rf(category) + if rf, ok := ret.Get(1).(func(string, string) errors.EdgeX); ok { + r1 = rf(category, ack) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -394,9 +416,9 @@ func (_m *DBClient) NotificationCountByCategory(category string) (uint32, errors return r0, r1 } -// NotificationCountByLabel provides a mock function with given fields: label -func (_m *DBClient) NotificationCountByLabel(label string) (uint32, errors.EdgeX) { - ret := _m.Called(label) +// NotificationCountByLabel provides a mock function with given fields: label, ack +func (_m *DBClient) NotificationCountByLabel(label string, ack string) (uint32, errors.EdgeX) { + ret := _m.Called(label, ack) if len(ret) == 0 { panic("no return value specified for NotificationCountByLabel") @@ -404,17 +426,17 @@ func (_m *DBClient) NotificationCountByLabel(label string) (uint32, errors.EdgeX var r0 uint32 var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(string) (uint32, errors.EdgeX)); ok { - return rf(label) + if rf, ok := ret.Get(0).(func(string, string) (uint32, errors.EdgeX)); ok { + return rf(label, ack) } - if rf, ok := ret.Get(0).(func(string) uint32); ok { - r0 = rf(label) + if rf, ok := ret.Get(0).(func(string, string) uint32); ok { + r0 = rf(label, ack) } else { r0 = ret.Get(0).(uint32) } - if rf, ok := ret.Get(1).(func(string) errors.EdgeX); ok { - r1 = rf(label) + if rf, ok := ret.Get(1).(func(string, string) errors.EdgeX); ok { + r1 = rf(label, ack) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -424,9 +446,39 @@ func (_m *DBClient) NotificationCountByLabel(label string) (uint32, errors.EdgeX return r0, r1 } -// NotificationCountByStatus provides a mock function with given fields: status -func (_m *DBClient) NotificationCountByStatus(status string) (uint32, errors.EdgeX) { - ret := _m.Called(status) +// NotificationCountByQueryConditions provides a mock function with given fields: condition, ack +func (_m *DBClient) NotificationCountByQueryConditions(condition requests.NotificationQueryCondition, ack string) (uint32, errors.EdgeX) { + ret := _m.Called(condition, ack) + + if len(ret) == 0 { + panic("no return value specified for NotificationCountByQueryConditions") + } + + var r0 uint32 + var r1 errors.EdgeX + if rf, ok := ret.Get(0).(func(requests.NotificationQueryCondition, string) (uint32, errors.EdgeX)); ok { + return rf(condition, ack) + } + if rf, ok := ret.Get(0).(func(requests.NotificationQueryCondition, string) uint32); ok { + r0 = rf(condition, ack) + } else { + r0 = ret.Get(0).(uint32) + } + + if rf, ok := ret.Get(1).(func(requests.NotificationQueryCondition, string) errors.EdgeX); ok { + r1 = rf(condition, ack) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(errors.EdgeX) + } + } + + return r0, r1 +} + +// NotificationCountByStatus provides a mock function with given fields: status, ack +func (_m *DBClient) NotificationCountByStatus(status string, ack string) (uint32, errors.EdgeX) { + ret := _m.Called(status, ack) if len(ret) == 0 { panic("no return value specified for NotificationCountByStatus") @@ -434,17 +486,17 @@ func (_m *DBClient) NotificationCountByStatus(status string) (uint32, errors.Edg var r0 uint32 var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(string) (uint32, errors.EdgeX)); ok { - return rf(status) + if rf, ok := ret.Get(0).(func(string, string) (uint32, errors.EdgeX)); ok { + return rf(status, ack) } - if rf, ok := ret.Get(0).(func(string) uint32); ok { - r0 = rf(status) + if rf, ok := ret.Get(0).(func(string, string) uint32); ok { + r0 = rf(status, ack) } else { r0 = ret.Get(0).(uint32) } - if rf, ok := ret.Get(1).(func(string) errors.EdgeX); ok { - r1 = rf(status) + if rf, ok := ret.Get(1).(func(string, string) errors.EdgeX); ok { + r1 = rf(status, ack) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -454,9 +506,9 @@ func (_m *DBClient) NotificationCountByStatus(status string) (uint32, errors.Edg return r0, r1 } -// NotificationCountByTimeRange provides a mock function with given fields: start, end -func (_m *DBClient) NotificationCountByTimeRange(start int64, end int64) (uint32, errors.EdgeX) { - ret := _m.Called(start, end) +// NotificationCountByTimeRange provides a mock function with given fields: start, end, ack +func (_m *DBClient) NotificationCountByTimeRange(start int64, end int64, ack string) (uint32, errors.EdgeX) { + ret := _m.Called(start, end, ack) if len(ret) == 0 { panic("no return value specified for NotificationCountByTimeRange") @@ -464,17 +516,17 @@ func (_m *DBClient) NotificationCountByTimeRange(start int64, end int64) (uint32 var r0 uint32 var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(int64, int64) (uint32, errors.EdgeX)); ok { - return rf(start, end) + if rf, ok := ret.Get(0).(func(int64, int64, string) (uint32, errors.EdgeX)); ok { + return rf(start, end, ack) } - if rf, ok := ret.Get(0).(func(int64, int64) uint32); ok { - r0 = rf(start, end) + if rf, ok := ret.Get(0).(func(int64, int64, string) uint32); ok { + r0 = rf(start, end, ack) } else { r0 = ret.Get(0).(uint32) } - if rf, ok := ret.Get(1).(func(int64, int64) errors.EdgeX); ok { - r1 = rf(start, end) + if rf, ok := ret.Get(1).(func(int64, int64, string) errors.EdgeX); ok { + r1 = rf(start, end, ack) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -514,9 +566,9 @@ func (_m *DBClient) NotificationTotalCount() (uint32, errors.EdgeX) { return r0, r1 } -// NotificationsByCategoriesAndLabels provides a mock function with given fields: offset, limit, categories, labels -func (_m *DBClient) NotificationsByCategoriesAndLabels(offset int, limit int, categories []string, labels []string) ([]models.Notification, errors.EdgeX) { - ret := _m.Called(offset, limit, categories, labels) +// NotificationsByCategoriesAndLabels provides a mock function with given fields: offset, limit, categories, labels, ack +func (_m *DBClient) NotificationsByCategoriesAndLabels(offset int, limit int, categories []string, labels []string, ack string) ([]models.Notification, errors.EdgeX) { + ret := _m.Called(offset, limit, categories, labels, ack) if len(ret) == 0 { panic("no return value specified for NotificationsByCategoriesAndLabels") @@ -524,19 +576,19 @@ func (_m *DBClient) NotificationsByCategoriesAndLabels(offset int, limit int, ca var r0 []models.Notification var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(int, int, []string, []string) ([]models.Notification, errors.EdgeX)); ok { - return rf(offset, limit, categories, labels) + if rf, ok := ret.Get(0).(func(int, int, []string, []string, string) ([]models.Notification, errors.EdgeX)); ok { + return rf(offset, limit, categories, labels, ack) } - if rf, ok := ret.Get(0).(func(int, int, []string, []string) []models.Notification); ok { - r0 = rf(offset, limit, categories, labels) + if rf, ok := ret.Get(0).(func(int, int, []string, []string, string) []models.Notification); ok { + r0 = rf(offset, limit, categories, labels, ack) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]models.Notification) } } - if rf, ok := ret.Get(1).(func(int, int, []string, []string) errors.EdgeX); ok { - r1 = rf(offset, limit, categories, labels) + if rf, ok := ret.Get(1).(func(int, int, []string, []string, string) errors.EdgeX); ok { + r1 = rf(offset, limit, categories, labels, ack) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -546,9 +598,9 @@ func (_m *DBClient) NotificationsByCategoriesAndLabels(offset int, limit int, ca return r0, r1 } -// NotificationsByCategory provides a mock function with given fields: offset, limit, category -func (_m *DBClient) NotificationsByCategory(offset int, limit int, category string) ([]models.Notification, errors.EdgeX) { - ret := _m.Called(offset, limit, category) +// NotificationsByCategory provides a mock function with given fields: offset, limit, ack, category +func (_m *DBClient) NotificationsByCategory(offset int, limit int, ack string, category string) ([]models.Notification, errors.EdgeX) { + ret := _m.Called(offset, limit, ack, category) if len(ret) == 0 { panic("no return value specified for NotificationsByCategory") @@ -556,19 +608,19 @@ func (_m *DBClient) NotificationsByCategory(offset int, limit int, category stri var r0 []models.Notification var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(int, int, string) ([]models.Notification, errors.EdgeX)); ok { - return rf(offset, limit, category) + if rf, ok := ret.Get(0).(func(int, int, string, string) ([]models.Notification, errors.EdgeX)); ok { + return rf(offset, limit, ack, category) } - if rf, ok := ret.Get(0).(func(int, int, string) []models.Notification); ok { - r0 = rf(offset, limit, category) + if rf, ok := ret.Get(0).(func(int, int, string, string) []models.Notification); ok { + r0 = rf(offset, limit, ack, category) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]models.Notification) } } - if rf, ok := ret.Get(1).(func(int, int, string) errors.EdgeX); ok { - r1 = rf(offset, limit, category) + if rf, ok := ret.Get(1).(func(int, int, string, string) errors.EdgeX); ok { + r1 = rf(offset, limit, ack, category) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -578,9 +630,9 @@ func (_m *DBClient) NotificationsByCategory(offset int, limit int, category stri return r0, r1 } -// NotificationsByLabel provides a mock function with given fields: offset, limit, label -func (_m *DBClient) NotificationsByLabel(offset int, limit int, label string) ([]models.Notification, errors.EdgeX) { - ret := _m.Called(offset, limit, label) +// NotificationsByLabel provides a mock function with given fields: offset, limit, ack, label +func (_m *DBClient) NotificationsByLabel(offset int, limit int, ack string, label string) ([]models.Notification, errors.EdgeX) { + ret := _m.Called(offset, limit, ack, label) if len(ret) == 0 { panic("no return value specified for NotificationsByLabel") @@ -588,19 +640,19 @@ func (_m *DBClient) NotificationsByLabel(offset int, limit int, label string) ([ var r0 []models.Notification var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(int, int, string) ([]models.Notification, errors.EdgeX)); ok { - return rf(offset, limit, label) + if rf, ok := ret.Get(0).(func(int, int, string, string) ([]models.Notification, errors.EdgeX)); ok { + return rf(offset, limit, ack, label) } - if rf, ok := ret.Get(0).(func(int, int, string) []models.Notification); ok { - r0 = rf(offset, limit, label) + if rf, ok := ret.Get(0).(func(int, int, string, string) []models.Notification); ok { + r0 = rf(offset, limit, ack, label) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]models.Notification) } } - if rf, ok := ret.Get(1).(func(int, int, string) errors.EdgeX); ok { - r1 = rf(offset, limit, label) + if rf, ok := ret.Get(1).(func(int, int, string, string) errors.EdgeX); ok { + r1 = rf(offset, limit, ack, label) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -610,9 +662,41 @@ func (_m *DBClient) NotificationsByLabel(offset int, limit int, label string) ([ return r0, r1 } -// NotificationsByStatus provides a mock function with given fields: offset, limit, status -func (_m *DBClient) NotificationsByStatus(offset int, limit int, status string) ([]models.Notification, errors.EdgeX) { - ret := _m.Called(offset, limit, status) +// NotificationsByQueryConditions provides a mock function with given fields: offset, limit, condition, ack +func (_m *DBClient) NotificationsByQueryConditions(offset int, limit int, condition requests.NotificationQueryCondition, ack string) ([]models.Notification, errors.EdgeX) { + ret := _m.Called(offset, limit, condition, ack) + + if len(ret) == 0 { + panic("no return value specified for NotificationsByQueryConditions") + } + + var r0 []models.Notification + var r1 errors.EdgeX + if rf, ok := ret.Get(0).(func(int, int, requests.NotificationQueryCondition, string) ([]models.Notification, errors.EdgeX)); ok { + return rf(offset, limit, condition, ack) + } + if rf, ok := ret.Get(0).(func(int, int, requests.NotificationQueryCondition, string) []models.Notification); ok { + r0 = rf(offset, limit, condition, ack) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]models.Notification) + } + } + + if rf, ok := ret.Get(1).(func(int, int, requests.NotificationQueryCondition, string) errors.EdgeX); ok { + r1 = rf(offset, limit, condition, ack) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(errors.EdgeX) + } + } + + return r0, r1 +} + +// NotificationsByStatus provides a mock function with given fields: offset, limit, ack, status +func (_m *DBClient) NotificationsByStatus(offset int, limit int, ack string, status string) ([]models.Notification, errors.EdgeX) { + ret := _m.Called(offset, limit, ack, status) if len(ret) == 0 { panic("no return value specified for NotificationsByStatus") @@ -620,19 +704,19 @@ func (_m *DBClient) NotificationsByStatus(offset int, limit int, status string) var r0 []models.Notification var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(int, int, string) ([]models.Notification, errors.EdgeX)); ok { - return rf(offset, limit, status) + if rf, ok := ret.Get(0).(func(int, int, string, string) ([]models.Notification, errors.EdgeX)); ok { + return rf(offset, limit, ack, status) } - if rf, ok := ret.Get(0).(func(int, int, string) []models.Notification); ok { - r0 = rf(offset, limit, status) + if rf, ok := ret.Get(0).(func(int, int, string, string) []models.Notification); ok { + r0 = rf(offset, limit, ack, status) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]models.Notification) } } - if rf, ok := ret.Get(1).(func(int, int, string) errors.EdgeX); ok { - r1 = rf(offset, limit, status) + if rf, ok := ret.Get(1).(func(int, int, string, string) errors.EdgeX); ok { + r1 = rf(offset, limit, ack, status) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -642,9 +726,9 @@ func (_m *DBClient) NotificationsByStatus(offset int, limit int, status string) return r0, r1 } -// NotificationsByTimeRange provides a mock function with given fields: start, end, offset, limit -func (_m *DBClient) NotificationsByTimeRange(start int64, end int64, offset int, limit int) ([]models.Notification, errors.EdgeX) { - ret := _m.Called(start, end, offset, limit) +// NotificationsByTimeRange provides a mock function with given fields: start, end, offset, limit, ack +func (_m *DBClient) NotificationsByTimeRange(start int64, end int64, offset int, limit int, ack string) ([]models.Notification, errors.EdgeX) { + ret := _m.Called(start, end, offset, limit, ack) if len(ret) == 0 { panic("no return value specified for NotificationsByTimeRange") @@ -652,19 +736,19 @@ func (_m *DBClient) NotificationsByTimeRange(start int64, end int64, offset int, var r0 []models.Notification var r1 errors.EdgeX - if rf, ok := ret.Get(0).(func(int64, int64, int, int) ([]models.Notification, errors.EdgeX)); ok { - return rf(start, end, offset, limit) + if rf, ok := ret.Get(0).(func(int64, int64, int, int, string) ([]models.Notification, errors.EdgeX)); ok { + return rf(start, end, offset, limit, ack) } - if rf, ok := ret.Get(0).(func(int64, int64, int, int) []models.Notification); ok { - r0 = rf(start, end, offset, limit) + if rf, ok := ret.Get(0).(func(int64, int64, int, int, string) []models.Notification); ok { + r0 = rf(start, end, offset, limit, ack) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]models.Notification) } } - if rf, ok := ret.Get(1).(func(int64, int64, int, int) errors.EdgeX); ok { - r1 = rf(start, end, offset, limit) + if rf, ok := ret.Get(1).(func(int64, int64, int, int, string) errors.EdgeX); ok { + r1 = rf(start, end, offset, limit, ack) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(errors.EdgeX) @@ -1310,6 +1394,26 @@ func (_m *DBClient) UpdateNotification(s models.Notification) errors.EdgeX { return r0 } +// UpdateNotificationAckStatusByIds provides a mock function with given fields: ack, ids +func (_m *DBClient) UpdateNotificationAckStatusByIds(ack bool, ids []string) errors.EdgeX { + ret := _m.Called(ack, ids) + + if len(ret) == 0 { + panic("no return value specified for UpdateNotificationAckStatusByIds") + } + + var r0 errors.EdgeX + if rf, ok := ret.Get(0).(func(bool, []string) errors.EdgeX); ok { + r0 = rf(ack, ids) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(errors.EdgeX) + } + } + + return r0 +} + // UpdateSubscription provides a mock function with given fields: s func (_m *DBClient) UpdateSubscription(s models.Subscription) errors.EdgeX { ret := _m.Called(s) diff --git a/internal/support/notifications/router.go b/internal/support/notifications/router.go index bdc5390c91..d066f408be 100644 --- a/internal/support/notifications/router.go +++ b/internal/support/notifications/router.go @@ -40,8 +40,10 @@ func LoadRestRoutes(r *echo.Echo, dic *di.Container, serviceName string) { // Notification nc := notificationsController.NewNotificationController(dic) r.POST(common.ApiNotificationRoute, nc.AddNotification, authenticationHook) + r.GET(common.ApiNotificationRoute, nc.NotificationsByQueryConditions, authenticationHook) r.GET(common.ApiNotificationByIdEchoRoute, nc.NotificationById, authenticationHook) r.DELETE(common.ApiNotificationByIdEchoRoute, nc.DeleteNotificationById, authenticationHook) + r.DELETE(common.ApiNotificationByIdsRoute, nc.DeleteNotificationByIds, authenticationHook) r.GET(common.ApiNotificationByCategoryEchoRoute, nc.NotificationsByCategory, authenticationHook) r.GET(common.ApiNotificationByLabelEchoRoute, nc.NotificationsByLabel, authenticationHook) r.GET(common.ApiNotificationByStatusEchoRoute, nc.NotificationsByStatus, authenticationHook) @@ -50,6 +52,8 @@ func LoadRestRoutes(r *echo.Echo, dic *di.Container, serviceName string) { r.DELETE(common.ApiNotificationCleanupByAgeEchoRoute, nc.CleanupNotificationsByAge, authenticationHook) r.DELETE(common.ApiNotificationCleanupRoute, nc.CleanupNotifications, authenticationHook) r.DELETE(common.ApiNotificationByAgeEchoRoute, nc.DeleteProcessedNotificationsByAge, authenticationHook) + r.PUT(common.ApiNotificationAcknowledgeByIdsRoute, nc.AcknowledgeNotificationByIds, authenticationHook) + r.PUT(common.ApiNotificationUnacknowledgeByIdsRoute, nc.UnacknowledgeNotificationByIds, authenticationHook) // Transmission trans := notificationsController.NewTransmissionController(dic) diff --git a/openapi/support-notifications.yaml b/openapi/support-notifications.yaml index 9bcda0b604..18648d3329 100644 --- a/openapi/support-notifications.yaml +++ b/openapi/support-notifications.yaml @@ -200,6 +200,9 @@ components: - NEW - PROCESSED - ESCALATED + acknowledged: + description: "The acknowledgement status of the notification. Accepted values are: true, false, and empty. The default value is empty, and it means both of true and false." + type: boolean CreateNotification: description: "Defines the content included in a notification" type: object @@ -566,7 +569,32 @@ components: example: "secret-value" required: - key - - value + - value + QueryNotificationRequest: + allOf: + - $ref: '#/components/schemas/BaseRequest' + description: A request for querying notifications by the given conditions. + type: object + properties: + queryCondition: + $ref: '#/components/schemas/QueryConditions' + required: + - queryCondition + QueryConditions: + description: "QueryConditions for querying the notifications of interest" + type: object + properties: + category: + description: Categorizes the notification + type: array + items: + type: string + start: + description: Unix timestamp (nanoseconds) indicating the start of a date/time range + type: integer + end: + description: Unix timestamp (nanoseconds) indicating the end of a date/time range + type: integer parameters: offsetParam: in: query @@ -586,6 +614,18 @@ components: minimum: -1 default: 20 description: "The numbers of items to return. Specify -1 will return all remaining items after offset. The maximum will be the MaxResultCount as defined in the configuration of service." + ackParam: + in: query + name: ack + required: false + schema: + type: string + default: "" + enum: + - "" + - "true" + - "false" + description: "The acknowledgement status of the notifications you wish to load. Accepted values are: true, false, and empty. The default value is empty, and it means both of true and false." correlatedRequestHeader: in: header name: X-Correlation-ID @@ -895,6 +935,74 @@ paths: /notification: parameters: - $ref: '#/components/parameters/correlatedRequestHeader' + get: + summary: "Get a paginated list of notifications associated with the specified category within a given time range" + parameters: + - $ref: '#/components/parameters/offsetParam' + - $ref: '#/components/parameters/limitParam' + - $ref: '#/components/parameters/ackParam' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/QueryNotificationRequest' + example: + apiVersion: "v2" + requestId: "bc979763-afde-492c-b0a2-79ff3025b6de" + queryCondition: + category: ["Disconnection","DEVICE_CHANGED"] + start: 1656662808 + end: 1738632171 + responses: + '200': + description: "OK" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/MultiNotificationsResponse' + examples: + MultiNotificationResponseExample: + $ref: '#/components/examples/MultiNotificationResponseExample' + '400': + description: "Request is in an invalid state" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 400Example: + $ref: '#/components/examples/400Example' + '416': + description: "Request range is not satisfiable" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 416Example: + $ref: '#/components/examples/416Example' + '500': + description: "An unexpected error occurred on the server" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 500Example: + $ref: '#/components/examples/500Example' post: summary: "Adds one or more notifications to be sent." requestBody: @@ -966,6 +1074,7 @@ paths: description: "The ending timestamp of the range of notifications to be returned." - $ref: '#/components/parameters/offsetParam' - $ref: '#/components/parameters/limitParam' + - $ref: '#/components/parameters/ackParam' get: summary: "Allows querying of notifications by their creation timestamp within a given time range, sorted in descending order. Results are paginated." responses: @@ -1076,6 +1185,7 @@ paths: description: "The category of the notifications you wish to load." - $ref: '#/components/parameters/offsetParam' - $ref: '#/components/parameters/limitParam' + - $ref: '#/components/parameters/ackParam' get: summary: "Returns a paginated list of notifications associated with the given category." responses: @@ -1138,6 +1248,7 @@ paths: description: "The label of the notifications you wish to load." - $ref: '#/components/parameters/offsetParam' - $ref: '#/components/parameters/limitParam' + - $ref: '#/components/parameters/ackParam' get: summary: "Returns a paginated list of notifications associated with the given label." responses: @@ -1301,6 +1412,153 @@ paths: examples: 500Example: $ref: '#/components/examples/500Example' + /notification/acknowledge/ids/{ids}: + parameters: + - $ref: '#/components/parameters/correlatedRequestHeader' + - name: ids + in: path + required: true + schema: + type: string + description: "The notification IDs concatenate with comma." + example: "4b61880e-aa14-4dd4-8082-76635ce64351,ca510d91-fc0c-40d3-a768-494d44fcd61e" + put: + summary: "Updates the acknowledgment status of one or more notifications to true" + responses: + '200': + description: "Update successful" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 200Example: + $ref: '#/components/examples/200Example' + '400': + description: "Request is in an invalid state" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 400Example: + $ref: '#/components/examples/400Example' + '500': + description: An unexpected error occurred on the server + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 500Example: + $ref: '#/components/examples/500Example' + /notification/unacknowledge/ids/{ids}: + parameters: + - $ref: '#/components/parameters/correlatedRequestHeader' + - name: ids + in: path + required: true + schema: + type: string + description: "The notification IDs concatenate with comma." + example: "4b61880e-aa14-4dd4-8082-76635ce64351,ca510d91-fc0c-40d3-a768-494d44fcd61e" + put: + summary: "Updates the acknowledgment status of one or more notifications to false" + responses: + '200': + description: "Update successful" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 200Example: + $ref: '#/components/examples/200Example' + '400': + description: "Request is in an invalid state" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 400Example: + $ref: '#/components/examples/400Example' + '500': + description: An unexpected error occurred on the server + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 500Example: + $ref: '#/components/examples/500Example' + /notification/ids/{ids}: + parameters: + - $ref: '#/components/parameters/correlatedRequestHeader' + - name: ids + in: path + required: true + schema: + type: string + description: "The notification IDs concatenate with comma." + example: "4b61880e-aa14-4dd4-8082-76635ce64351,ca510d91-fc0c-40d3-a768-494d44fcd61e" + delete: + summary: "Deletes one or more notifications and their associated transmissions by ID." + responses: + '200': + description: "Delete successful" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 200Example: + $ref: '#/components/examples/200Example' + '400': + description: "Request is in an invalid state" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 400Example: + $ref: '#/components/examples/400Example' + '500': + description: "An unexpected error occurred on the server" + headers: + X-Correlation-ID: + $ref: '#/components/headers/correlatedResponseHeader' + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + 500Example: + $ref: '#/components/examples/500Example' /notification/status/{status}: parameters: - $ref: '#/components/parameters/correlatedRequestHeader' @@ -1316,6 +1574,7 @@ paths: description: "The status of the notifications you wish to load." - $ref: '#/components/parameters/offsetParam' - $ref: '#/components/parameters/limitParam' + - $ref: '#/components/parameters/ackParam' get: summary: "Returns a paginated list of notifications with the specified status." responses: @@ -1378,6 +1637,7 @@ paths: description: "The name of the subscription." - $ref: '#/components/parameters/offsetParam' - $ref: '#/components/parameters/limitParam' + - $ref: '#/components/parameters/ackParam' get: summary: "Returns a paginated list of notifications to which the specified subscription is interested." responses: @@ -2177,7 +2437,7 @@ paths: $ref: '#/components/schemas/TransmissionResponse' examples: MultiTransmissionResponseExample: - $ref: '#/components/examples/MultiTransmissionResponseExample' + $ref: '#/components/examples/MultiTransmissionResponseExample' '400': description: "Request is in an invalid state" headers: