Skip to content

Commit

Permalink
add publish subscribe
Browse files Browse the repository at this point in the history
Signed-off-by: Arvindh <[email protected]>
  • Loading branch information
arvindh123 committed Oct 30, 2024
1 parent 41328b1 commit b47d3f6
Show file tree
Hide file tree
Showing 35 changed files with 327 additions and 142 deletions.
5 changes: 3 additions & 2 deletions channels/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

grpcChannelsV1 "github.com/absmach/magistrala/internal/grpc/channels/v1"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/errors"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/go-kit/kit/endpoint"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (client grpcClient) Authorize(ctx context.Context, req *grpcChannelsV1.Auth
clientID: req.GetClientId(),
clientType: req.GetClientType(),
channelID: req.GetChannelId(),
permission: req.GetPermission(),
connType: connections.Type(req.GetType()),
})
if err != nil {
return &grpcChannelsV1.AuthzRes{}, decodeError(err)
Expand All @@ -88,7 +89,7 @@ func encodeAuthorizeRequest(_ context.Context, grpcReq interface{}) (interface{}
ClientId: req.clientID,
ClientType: req.clientType,
ChannelId: req.channelID,
Permission: req.permission,
Type: uint32(req.connType),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion channels/api/grpc/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func authorizeEndpoint(svc channels.Service) endpoint.Endpoint {
ClientID: req.clientID,
ClientType: req.clientType,
ChannelID: req.channelID,
Permission: req.permission,
Type: req.connType,
}); err != nil {
return authorizeRes{}, err
}
Expand Down
4 changes: 3 additions & 1 deletion channels/api/grpc/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

package grpc

import "github.com/absmach/magistrala/pkg/connections"

type authorizeReq struct {
domainID string
channelID string
clientID string
clientType string
permission string
connType connections.Type
}
type removeThingConnectionsReq struct {
thingID string
Expand Down
7 changes: 6 additions & 1 deletion channels/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
channels "github.com/absmach/magistrala/channels/private"
grpcChannelsV1 "github.com/absmach/magistrala/internal/grpc/channels/v1"
"github.com/absmach/magistrala/pkg/apiutil"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/errors"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
kitgrpc "github.com/go-kit/kit/transport/grpc"
Expand Down Expand Up @@ -59,12 +60,16 @@ func (s *grpcServer) Authorize(ctx context.Context, req *grpcChannelsV1.AuthzReq
func decodeAuthorizeRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*grpcChannelsV1.AuthzReq)

connType := connections.Type(req.GetType())
if err := connType.CheckType(); err != nil {
return nil, err
}
return authorizeReq{
domainID: req.GetDomainId(),
clientID: req.GetClientId(),
clientType: req.GetClientType(),
channelID: req.GetChannelId(),
permission: req.GetPermission(),
connType: connType,
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions channels/api/http/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func connectChannelThingsEndpoint(svc channels.Service) endpoint.Endpoint {
return nil, svcerr.ErrAuthentication
}

if err := svc.Connect(ctx, session, []string{req.channelID}, req.ThingIds); err != nil {
if err := svc.Connect(ctx, session, []string{req.channelID}, req.ThingIds, req.Types); err != nil {
return nil, err
}

Expand All @@ -300,7 +300,7 @@ func disconnectChannelThingsEndpoint(svc channels.Service) endpoint.Endpoint {
return nil, svcerr.ErrAuthentication
}

if err := svc.Disconnect(ctx, session, []string{req.channelID}, req.ThingIds); err != nil {
if err := svc.Disconnect(ctx, session, []string{req.channelID}, req.ThingIds, req.Types); err != nil {
return nil, err
}

Expand All @@ -320,7 +320,7 @@ func connectEndpoint(svc channels.Service) endpoint.Endpoint {
return nil, svcerr.ErrAuthentication
}

if err := svc.Connect(ctx, session, req.ChannelIds, req.ThingIds); err != nil {
if err := svc.Connect(ctx, session, req.ChannelIds, req.ThingIds, req.Types); err != nil {
return nil, err
}

Expand All @@ -340,7 +340,7 @@ func disconnectEndpoint(svc channels.Service) endpoint.Endpoint {
return nil, svcerr.ErrAuthentication
}

if err := svc.Disconnect(ctx, session, req.ChannelIds, req.ThingIds); err != nil {
if err := svc.Disconnect(ctx, session, req.ChannelIds, req.ThingIds, req.Types); err != nil {
return nil, err
}

Expand Down
61 changes: 43 additions & 18 deletions channels/api/http/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
package http

import (
"strings"

"github.com/absmach/magistrala/channels"
"github.com/absmach/magistrala/internal/api"
"github.com/absmach/magistrala/pkg/apiutil"
mgclients "github.com/absmach/magistrala/pkg/clients"
"github.com/absmach/magistrala/pkg/connections"
)

type createChannelReq struct {
Expand All @@ -19,7 +22,9 @@ func (req createChannelReq) validate() error {
return apiutil.ErrNameSize
}
if req.Channel.ID != "" {
return api.ValidateUUID(req.Channel.ID)
if strings.TrimSpace(req.Channel.ID) == "" {
return apiutil.ErrMissingChannelID
}
}

return nil
Expand All @@ -35,8 +40,8 @@ func (req createChannelsReq) validate() error {
}
for _, channel := range req.Channels {
if channel.ID != "" {
if err := api.ValidateUUID(channel.ID); err != nil {
return err
if strings.TrimSpace(channel.ID) == "" {
return apiutil.ErrMissingChannelID
}
}
if len(channel.Name) > api.MaxNameSize {
Expand Down Expand Up @@ -163,19 +168,16 @@ func (req changeChannelStatusReq) validate() error {

type connectChannelThingsRequest struct {
channelID string
ThingIds []string `json:"thing_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
Types []connections.Type `json:"types,omitempty"`
}

func (req *connectChannelThingsRequest) validate() error {

if req.channelID == "" {
if req.channelID == "" || strings.TrimSpace(req.channelID) == "" {
return apiutil.ErrMissingID
}

if err := api.ValidateUUID(req.channelID); err != nil {
return err
}

if len(req.ThingIds) == 0 {
return apiutil.ErrMissingID
}
Expand All @@ -185,12 +187,18 @@ func (req *connectChannelThingsRequest) validate() error {
return err
}
}

if len(req.Types) == 0 {
return apiutil.ErrMissingConnectionType
}

return nil
}

type disconnectChannelThingsRequest struct {
channelID string
ThingIds []string `json:"thing_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
Types []connections.Type `json:"types,omitempty"`
}

func (req *disconnectChannelThingsRequest) validate() error {
Expand All @@ -211,21 +219,27 @@ func (req *disconnectChannelThingsRequest) validate() error {
return err
}
}

if len(req.Types) == 0 {
return apiutil.ErrMissingConnectionType
}

return nil
}

type connectRequest struct {
ChannelIds []string `json:"channel_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
ChannelIds []string `json:"channel_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
Types []connections.Type `json:"types,omitempty"`
}

func (req *connectRequest) validate() error {
if len(req.ChannelIds) == 0 {
return apiutil.ErrMissingID
}
for _, cid := range req.ChannelIds {
if err := api.ValidateUUID(cid); err != nil {
return err
if strings.TrimSpace(cid) == "" {
return apiutil.ErrMissingChannelID
}
}

Expand All @@ -234,16 +248,22 @@ func (req *connectRequest) validate() error {
}

for _, tid := range req.ThingIds {
if err := api.ValidateUUID(tid); err != nil {
return err
if strings.TrimSpace(tid) == "" {
return apiutil.ErrMissingChannelID
}
}

if len(req.Types) == 0 {
return apiutil.ErrMissingConnectionType
}

return nil
}

type disconnectRequest struct {
ChannelIds []string `json:"channel_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
ChannelIds []string `json:"channel_ids,omitempty"`
ThingIds []string `json:"thing_ids,omitempty"`
Types []connections.Type `json:"types,omitempty"`
}

func (req *disconnectRequest) validate() error {
Expand All @@ -265,6 +285,11 @@ func (req *disconnectRequest) validate() error {
return err
}
}

if len(req.Types) == 0 {
return apiutil.ErrMissingConnectionType
}

return nil
}

Expand Down
8 changes: 5 additions & 3 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/absmach/magistrala/pkg/authn"
"github.com/absmach/magistrala/pkg/clients"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/roles"
)

Expand Down Expand Up @@ -57,14 +58,15 @@ type Connection struct {
ThingID string
ChannelID string
DomainID string
Type connections.Type
}

type AuthzReq struct {
DomainID string
ChannelID string
ClientID string
ClientType string
Permission string
Type connections.Type
}

//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
Expand Down Expand Up @@ -101,10 +103,10 @@ type Service interface {
RemoveChannel(ctx context.Context, session authn.Session, id string) error

// Connect adds things to the channels list of connected things.
Connect(ctx context.Context, session authn.Session, chIDs, thIDs []string) error
Connect(ctx context.Context, session authn.Session, chIDs, thIDs []string, connType []connections.Type) error

// Disconnect removes things from the channels list of connected things.
Disconnect(ctx context.Context, session authn.Session, chIDs, thIDs []string) error
Disconnect(ctx context.Context, session authn.Session, chIDs, thIDs []string, connType []connections.Type) error

SetParentGroup(ctx context.Context, session authn.Session, parentGroupID string, id string) error

Expand Down
5 changes: 5 additions & 0 deletions channels/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/absmach/magistrala/channels"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/events"
)

Expand Down Expand Up @@ -260,26 +261,30 @@ func (dce removeChannelEvent) Encode() (map[string]interface{}, error) {
type connectEvent struct {
chIDs []string
thIDs []string
types []connections.Type
}

func (ce connectEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": channelConnect,
"thing_ids": ce.thIDs,
"channel_ids": ce.chIDs,
"types": ce.types,
}, nil
}

type disconnectEvent struct {
chIDs []string
thIDs []string
types []connections.Type
}

func (de disconnectEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": channelDisconnect,
"thing_ids": de.thIDs,
"channel_ids": de.chIDs,
"types": de.types,
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions channels/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/absmach/magistrala/channels"
"github.com/absmach/magistrala/pkg/authn"
"github.com/absmach/magistrala/pkg/connections"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
rmEvents "github.com/absmach/magistrala/pkg/roles/rolemanager/events"
Expand Down Expand Up @@ -178,12 +179,12 @@ func (es *eventStore) RemoveChannel(ctx context.Context, session authn.Session,
return nil
}

func (es *eventStore) Connect(ctx context.Context, session authn.Session, chIDs, thIDs []string) error {
if err := es.svc.Connect(ctx, session, chIDs, thIDs); err != nil {
func (es *eventStore) Connect(ctx context.Context, session authn.Session, chIDs, thIDs []string, connTypes []connections.Type) error {
if err := es.svc.Connect(ctx, session, chIDs, thIDs, connTypes); err != nil {
return err
}

event := connectEvent{chIDs, thIDs}
event := connectEvent{chIDs, thIDs, connTypes}

if err := es.Publish(ctx, event); err != nil {
return err
Expand All @@ -192,12 +193,12 @@ func (es *eventStore) Connect(ctx context.Context, session authn.Session, chIDs,
return nil
}

func (es *eventStore) Disconnect(ctx context.Context, session authn.Session, chIDs, thIDs []string) error {
if err := es.svc.Disconnect(ctx, session, chIDs, thIDs); err != nil {
func (es *eventStore) Disconnect(ctx context.Context, session authn.Session, chIDs, thIDs []string, connTypes []connections.Type) error {
if err := es.svc.Disconnect(ctx, session, chIDs, thIDs, connTypes); err != nil {
return err
}

event := disconnectEvent{chIDs, thIDs}
event := disconnectEvent{chIDs, thIDs, connTypes}

if err := es.Publish(ctx, event); err != nil {
return err
Expand Down
Loading

0 comments on commit b47d3f6

Please sign in to comment.