Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE: Add Publish/Subscribe to channels #2497

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions channels/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

grpcChannelsV1 "github.com/absmach/magistrala/internal/grpc/channels/v1"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/errors"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/go-kit/kit/endpoint"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (client grpcClient) Authorize(ctx context.Context, req *grpcChannelsV1.Auth
clientID: req.GetClientId(),
clientType: req.GetClientType(),
channelID: req.GetChannelId(),
permission: req.GetPermission(),
connType: connections.ConnType(req.GetType()),
})
if err != nil {
return &grpcChannelsV1.AuthzRes{}, decodeError(err)
Expand All @@ -88,7 +89,7 @@ func encodeAuthorizeRequest(_ context.Context, grpcReq interface{}) (interface{}
ClientId: req.clientID,
ClientType: req.clientType,
ChannelId: req.channelID,
Permission: req.permission,
Type: uint32(req.connType),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion channels/api/grpc/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func authorizeEndpoint(svc channels.Service) endpoint.Endpoint {
ClientID: req.clientID,
ClientType: req.clientType,
ChannelID: req.channelID,
Permission: req.permission,
Type: req.connType,
}); err != nil {
return authorizeRes{}, err
}
Expand Down
4 changes: 3 additions & 1 deletion channels/api/grpc/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

package grpc

import "github.com/absmach/magistrala/pkg/connections"

type authorizeReq struct {
domainID string
channelID string
clientID string
clientType string
permission string
connType connections.ConnType
}
type removeThingConnectionsReq struct {
thingID string
Expand Down
7 changes: 6 additions & 1 deletion channels/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
channels "github.com/absmach/magistrala/channels/private"
grpcChannelsV1 "github.com/absmach/magistrala/internal/grpc/channels/v1"
"github.com/absmach/magistrala/pkg/apiutil"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/errors"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
kitgrpc "github.com/go-kit/kit/transport/grpc"
Expand Down Expand Up @@ -59,12 +60,16 @@ func (s *grpcServer) Authorize(ctx context.Context, req *grpcChannelsV1.AuthzReq
func decodeAuthorizeRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*grpcChannelsV1.AuthzReq)

connType := connections.ConnType(req.GetType())
if err := connections.CheckConnType(connType); err != nil {
return nil, err
}
return authorizeReq{
domainID: req.GetDomainId(),
clientID: req.GetClientId(),
clientType: req.GetClientType(),
channelID: req.GetChannelId(),
permission: req.GetPermission(),
connType: connType,
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions channels/api/http/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func connectChannelThingsEndpoint(svc channels.Service) endpoint.Endpoint {
return nil, svcerr.ErrAuthentication
}

if err := svc.Connect(ctx, session, []string{req.channelID}, req.ThingIds); err != nil {
if err := svc.Connect(ctx, session, []string{req.channelID}, req.ThingIds, req.Types); err != nil {
return nil, err
}

Expand All @@ -300,7 +300,7 @@ func disconnectChannelThingsEndpoint(svc channels.Service) endpoint.Endpoint {
return nil, svcerr.ErrAuthentication
}

if err := svc.Disconnect(ctx, session, []string{req.channelID}, req.ThingIds); err != nil {
if err := svc.Disconnect(ctx, session, []string{req.channelID}, req.ThingIds, req.Types); err != nil {
return nil, err
}

Expand All @@ -320,7 +320,7 @@ func connectEndpoint(svc channels.Service) endpoint.Endpoint {
return nil, svcerr.ErrAuthentication
}

if err := svc.Connect(ctx, session, req.ChannelIds, req.ThingIds); err != nil {
if err := svc.Connect(ctx, session, req.ChannelIds, req.ThingIds, req.Types); err != nil {
return nil, err
}

Expand All @@ -340,7 +340,7 @@ func disconnectEndpoint(svc channels.Service) endpoint.Endpoint {
return nil, svcerr.ErrAuthentication
}

if err := svc.Disconnect(ctx, session, req.ChannelIds, req.ThingIds); err != nil {
if err := svc.Disconnect(ctx, session, req.ChannelIds, req.ThingIds, req.Types); err != nil {
return nil, err
}

Expand Down
61 changes: 43 additions & 18 deletions channels/api/http/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
package http

import (
"strings"

"github.com/absmach/magistrala/channels"
"github.com/absmach/magistrala/internal/api"
"github.com/absmach/magistrala/pkg/apiutil"
"github.com/absmach/magistrala/pkg/connections"
mgclients "github.com/absmach/magistrala/things"
)

Expand All @@ -19,7 +22,9 @@ func (req createChannelReq) validate() error {
return apiutil.ErrNameSize
}
if req.Channel.ID != "" {
return api.ValidateUUID(req.Channel.ID)
if strings.TrimSpace(req.Channel.ID) == "" {
return apiutil.ErrMissingChannelID
}
}

return nil
Expand All @@ -35,8 +40,8 @@ func (req createChannelsReq) validate() error {
}
for _, channel := range req.Channels {
if channel.ID != "" {
if err := api.ValidateUUID(channel.ID); err != nil {
return err
if strings.TrimSpace(channel.ID) == "" {
return apiutil.ErrMissingChannelID
}
}
if len(channel.Name) > api.MaxNameSize {
Expand Down Expand Up @@ -163,19 +168,16 @@ func (req changeChannelStatusReq) validate() error {

type connectChannelThingsRequest struct {
channelID string
ThingIds []string `json:"thing_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
Types []connections.ConnType `json:"types,omitempty"`
}

func (req *connectChannelThingsRequest) validate() error {

if req.channelID == "" {
if req.channelID == "" || strings.TrimSpace(req.channelID) == "" {
return apiutil.ErrMissingID
}

if err := api.ValidateUUID(req.channelID); err != nil {
return err
}

if len(req.ThingIds) == 0 {
return apiutil.ErrMissingID
}
Expand All @@ -185,12 +187,18 @@ func (req *connectChannelThingsRequest) validate() error {
return err
}
}

if len(req.Types) == 0 {
return apiutil.ErrMissingConnectionType
}

return nil
}

type disconnectChannelThingsRequest struct {
channelID string
ThingIds []string `json:"thing_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
Types []connections.ConnType `json:"types,omitempty"`
}

func (req *disconnectChannelThingsRequest) validate() error {
Expand All @@ -211,21 +219,27 @@ func (req *disconnectChannelThingsRequest) validate() error {
return err
}
}

if len(req.Types) == 0 {
return apiutil.ErrMissingConnectionType
}

return nil
}

type connectRequest struct {
ChannelIds []string `json:"channel_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
ChannelIds []string `json:"channel_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
Types []connections.ConnType `json:"types,omitempty"`
}

func (req *connectRequest) validate() error {
if len(req.ChannelIds) == 0 {
return apiutil.ErrMissingID
}
for _, cid := range req.ChannelIds {
if err := api.ValidateUUID(cid); err != nil {
return err
if strings.TrimSpace(cid) == "" {
return apiutil.ErrMissingChannelID
}
}

Expand All @@ -234,16 +248,22 @@ func (req *connectRequest) validate() error {
}

for _, tid := range req.ThingIds {
if err := api.ValidateUUID(tid); err != nil {
return err
if strings.TrimSpace(tid) == "" {
return apiutil.ErrMissingChannelID
}
}

if len(req.Types) == 0 {
return apiutil.ErrMissingConnectionType
}

return nil
}

type disconnectRequest struct {
ChannelIds []string `json:"channel_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
ChannelIds []string `json:"channel_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
Types []connections.ConnType `json:"types,omitempty"`
}

func (req *disconnectRequest) validate() error {
Expand All @@ -265,6 +285,11 @@ func (req *disconnectRequest) validate() error {
return err
}
}

if len(req.Types) == 0 {
return apiutil.ErrMissingConnectionType
}

return nil
}

Expand Down
8 changes: 5 additions & 3 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/absmach/magistrala/pkg/authn"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/roles"
clients "github.com/absmach/magistrala/things"
)
Expand Down Expand Up @@ -57,14 +58,15 @@ type Connection struct {
ThingID string
ChannelID string
DomainID string
Type connections.ConnType
}

type AuthzReq struct {
DomainID string
ChannelID string
ClientID string
ClientType string
Permission string
Type connections.ConnType
}

//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
Expand Down Expand Up @@ -101,10 +103,10 @@ type Service interface {
RemoveChannel(ctx context.Context, session authn.Session, id string) error

// Connect adds things to the channels list of connected things.
Connect(ctx context.Context, session authn.Session, chIDs, thIDs []string) error
Connect(ctx context.Context, session authn.Session, chIDs, thIDs []string, connType []connections.ConnType) error

// Disconnect removes things from the channels list of connected things.
Disconnect(ctx context.Context, session authn.Session, chIDs, thIDs []string) error
Disconnect(ctx context.Context, session authn.Session, chIDs, thIDs []string, connType []connections.ConnType) error

SetParentGroup(ctx context.Context, session authn.Session, parentGroupID string, id string) error

Expand Down
5 changes: 5 additions & 0 deletions channels/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/absmach/magistrala/channels"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/events"
)

Expand Down Expand Up @@ -260,26 +261,30 @@ func (dce removeChannelEvent) Encode() (map[string]interface{}, error) {
type connectEvent struct {
chIDs []string
thIDs []string
types []connections.ConnType
}

func (ce connectEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": channelConnect,
"thing_ids": ce.thIDs,
"channel_ids": ce.chIDs,
"types": ce.types,
}, nil
}

type disconnectEvent struct {
chIDs []string
thIDs []string
types []connections.ConnType
}

func (de disconnectEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": channelDisconnect,
"thing_ids": de.thIDs,
"channel_ids": de.chIDs,
"types": de.types,
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions channels/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/absmach/magistrala/channels"
"github.com/absmach/magistrala/pkg/authn"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
rmEvents "github.com/absmach/magistrala/pkg/roles/rolemanager/events"
Expand Down Expand Up @@ -178,12 +179,12 @@ func (es *eventStore) RemoveChannel(ctx context.Context, session authn.Session,
return nil
}

func (es *eventStore) Connect(ctx context.Context, session authn.Session, chIDs, thIDs []string) error {
if err := es.svc.Connect(ctx, session, chIDs, thIDs); err != nil {
func (es *eventStore) Connect(ctx context.Context, session authn.Session, chIDs, thIDs []string, connTypes []connections.ConnType) error {
if err := es.svc.Connect(ctx, session, chIDs, thIDs, connTypes); err != nil {
return err
}

event := connectEvent{chIDs, thIDs}
event := connectEvent{chIDs, thIDs, connTypes}

if err := es.Publish(ctx, event); err != nil {
return err
Expand All @@ -192,12 +193,12 @@ func (es *eventStore) Connect(ctx context.Context, session authn.Session, chIDs,
return nil
}

func (es *eventStore) Disconnect(ctx context.Context, session authn.Session, chIDs, thIDs []string) error {
if err := es.svc.Disconnect(ctx, session, chIDs, thIDs); err != nil {
func (es *eventStore) Disconnect(ctx context.Context, session authn.Session, chIDs, thIDs []string, connTypes []connections.ConnType) error {
if err := es.svc.Disconnect(ctx, session, chIDs, thIDs, connTypes); err != nil {
return err
}

event := disconnectEvent{chIDs, thIDs}
event := disconnectEvent{chIDs, thIDs, connTypes}

if err := es.Publish(ctx, event); err != nil {
return err
Expand Down
Loading