Skip to content

Commit

Permalink
Merge pull request #104 from nitrictech/fix/plugin-checks
Browse files Browse the repository at this point in the history
Re-instate plugin registration checks for adapters.
  • Loading branch information
jyecusch authored Aug 2, 2021
2 parents 003f0de + e749464 commit 04eb070
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 1 deletion.
24 changes: 24 additions & 0 deletions pkg/adapters/grpc/document_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,19 @@ type DocumentServiceServer struct {
documentPlugin document.DocumentService
}

func (s *DocumentServiceServer) checkPluginRegistered() error {
if s.documentPlugin == nil {
return NewPluginNotRegisteredError("Document")
}

return nil
}

func (s *DocumentServiceServer) Get(ctx context.Context, req *pb.DocumentGetRequest) (*pb.DocumentGetResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

key := keyFromWire(req.Key)

doc, err := s.documentPlugin.Get(key)
Expand All @@ -49,6 +61,10 @@ func (s *DocumentServiceServer) Get(ctx context.Context, req *pb.DocumentGetRequ
}

func (s *DocumentServiceServer) Set(ctx context.Context, req *pb.DocumentSetRequest) (*pb.DocumentSetResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

key := keyFromWire(req.Key)

err := s.documentPlugin.Set(key, req.GetContent().AsMap())
Expand All @@ -60,6 +76,10 @@ func (s *DocumentServiceServer) Set(ctx context.Context, req *pb.DocumentSetRequ
}

func (s *DocumentServiceServer) Delete(ctx context.Context, req *pb.DocumentDeleteRequest) (*pb.DocumentDeleteResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

key := keyFromWire(req.Key)

err := s.documentPlugin.Delete(key)
Expand All @@ -71,6 +91,10 @@ func (s *DocumentServiceServer) Delete(ctx context.Context, req *pb.DocumentDele
}

func (s *DocumentServiceServer) Query(ctx context.Context, req *pb.DocumentQueryRequest) (*pb.DocumentQueryResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

collection := collectionFromWire(req.Collection)
expressions := make([]document.QueryExpression, len(req.GetExpressions()))
for i, exp := range req.GetExpressions() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/adapters/grpc/utils.go → pkg/adapters/grpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ func NewGrpcError(operation string, err error) error {
return status.Errorf(codes.Internal, "%s: %v", operation, err)
}
}

// Provides generic error for unregistered plugins
func NewPluginNotRegisteredError(plugin string) error {
return status.Errorf(codes.Unimplemented, "%s plugin not registered", plugin)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
. "github.com/onsi/gomega"
)

var _ = Describe("GRPC Utils", func() {
var _ = Describe("GRPC Errors", func() {
Context("GrpcError", func() {
When("sdk.IllegalArgumentError", func() {
It("Should report GRPC IllegalArgument error", func() {
Expand All @@ -40,4 +40,13 @@ var _ = Describe("GRPC Utils", func() {
})
})
})

Context("PluginNotRegisteredError", func() {
When("Creating a New PluginNotRegisteredError", func() {
It("Should contain the name of the plugin", func() {
err := grpc.NewPluginNotRegisteredError("Document")
Expect(err.Error()).To(ContainSubstring("rpc error: code = Unimplemented desc = Document plugin not registered"))
})
})
})
})
24 changes: 24 additions & 0 deletions pkg/adapters/grpc/event_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,19 @@ type EventServiceServer struct {
eventPlugin events.EventService
}

func (s *EventServiceServer) checkPluginRegistered() error {
if s.eventPlugin == nil {
return NewPluginNotRegisteredError("Event")
}

return nil
}

func (s *EventServiceServer) Publish(ctx context.Context, req *pb.EventPublishRequest) (*pb.EventPublishResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

// auto generate an ID if we did not receive one
var ID = req.GetEvent().GetId()
if ID == "" {
Expand Down Expand Up @@ -60,7 +72,19 @@ type TopicServiceServer struct {
eventPlugin events.EventService
}

func (s *TopicServiceServer) checkPluginRegistered() error {
if s.eventPlugin == nil {
return NewPluginNotRegisteredError("Event")
}

return nil
}

func (s *TopicServiceServer) List(context.Context, *pb.TopicListRequest) (*pb.TopicListResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

if res, err := s.eventPlugin.ListTopics(); err == nil {
topics := make([]*pb.NitricTopic, len(res))
for i, topicName := range res {
Expand Down
24 changes: 24 additions & 0 deletions pkg/adapters/grpc/queue_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,19 @@ type QueueServiceServer struct {
plugin queue.QueueService
}

func (s *QueueServiceServer) checkPluginRegistered() error {
if s.plugin == nil {
return NewPluginNotRegisteredError("Queue")
}

return nil
}

func (s *QueueServiceServer) Send(ctx context.Context, req *pb.QueueSendRequest) (*pb.QueueSendResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

task := req.GetTask()

nitricTask := queue.NitricTask{
Expand All @@ -46,6 +58,10 @@ func (s *QueueServiceServer) Send(ctx context.Context, req *pb.QueueSendRequest)
}

func (s *QueueServiceServer) SendBatch(ctx context.Context, req *pb.QueueSendBatchRequest) (*pb.QueueSendBatchResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

// Translate tasks
tasks := make([]queue.NitricTask, len(req.GetTasks()))
for i, task := range req.GetTasks() {
Expand Down Expand Up @@ -78,6 +94,10 @@ func (s *QueueServiceServer) SendBatch(ctx context.Context, req *pb.QueueSendBat
}

func (s *QueueServiceServer) Receive(ctx context.Context, req *pb.QueueReceiveRequest) (*pb.QueueReceiveResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

// Convert gRPC request to plugin params
depth := uint32(req.GetDepth())
popOptions := queue.ReceiveOptions{
Expand Down Expand Up @@ -111,6 +131,10 @@ func (s *QueueServiceServer) Receive(ctx context.Context, req *pb.QueueReceiveRe
}

func (s *QueueServiceServer) Complete(ctx context.Context, req *pb.QueueCompleteRequest) (*pb.QueueCompleteResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

// Convert gRPC request to plugin params
queueName := req.GetQueue()
leaseId := req.GetLeaseId()
Expand Down
20 changes: 20 additions & 0 deletions pkg/adapters/grpc/storage_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,19 @@ type StorageServiceServer struct {
storagePlugin storage.StorageService
}

func (s *StorageServiceServer) checkPluginRegistered() error {
if s.storagePlugin == nil {
return NewPluginNotRegisteredError("Storage")
}

return nil
}

func (s *StorageServiceServer) Write(ctx context.Context, req *pb.StorageWriteRequest) (*pb.StorageWriteResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

if err := s.storagePlugin.Write(req.GetBucketName(), req.GetKey(), req.GetBody()); err == nil {
return &pb.StorageWriteResponse{}, nil
} else {
Expand All @@ -36,6 +48,10 @@ func (s *StorageServiceServer) Write(ctx context.Context, req *pb.StorageWriteRe
}

func (s *StorageServiceServer) Read(ctx context.Context, req *pb.StorageReadRequest) (*pb.StorageReadResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

if object, err := s.storagePlugin.Read(req.GetBucketName(), req.GetKey()); err == nil {
return &pb.StorageReadResponse{
Body: object,
Expand All @@ -46,6 +62,10 @@ func (s *StorageServiceServer) Read(ctx context.Context, req *pb.StorageReadRequ
}

func (s *StorageServiceServer) Delete(ctx context.Context, req *pb.StorageDeleteRequest) (*pb.StorageDeleteResponse, error) {
if err := s.checkPluginRegistered(); err != nil {
return nil, err
}

if err := s.storagePlugin.Delete(req.GetBucketName(), req.GetKey()); err == nil {
return &pb.StorageDeleteResponse{}, nil
} else {
Expand Down

0 comments on commit 04eb070

Please sign in to comment.