Skip to content

Commit

Permalink
Add meaningful HTTP/gRPC codes (#69)
Browse files Browse the repository at this point in the history
* Add meaningful HTTP status codes using gRPC

* Update API docs with error status and responses

* Add http/grpc status tests

* Make codeForError function private

* Update api proto formatting and version/license
  • Loading branch information
jmar910 authored Jan 19, 2022
1 parent 0012b6e commit d073a89
Show file tree
Hide file tree
Showing 11 changed files with 1,579 additions and 207 deletions.
1 change: 1 addition & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ plugins:
opt:
- logtostderr=true
- allow_repeated_fields_in_body=true
- disable_default_errors=true
6 changes: 3 additions & 3 deletions buf.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ deps:
owner: googleapis
repository: googleapis
branch: main
commit: a53a098b1b97483f909bdeae7139efe0
digest: b1-jscswo15-9kAMV8GguLOSSPZ2bU-U5x4X2IKpB73wNI=
create_time: 2021-11-13T15:08:47.460243Z
commit: 0039142542b74d11b0ae5a1e9fd4d523
digest: b1-AzPoqjtlLpx5dVojvgRauI1re5i4LPsghzU0KO-16o4=
create_time: 2022-01-05T15:03:17.46488Z
- remote: buf.build
owner: grpc-ecosystem
repository: grpc-gateway
Expand Down
13 changes: 7 additions & 6 deletions pkg/web/api/connector_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/web/api/fromproto"
"github.com/conduitio/conduit/pkg/web/api/status"
"github.com/conduitio/conduit/pkg/web/api/toproto"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"google.golang.org/grpc"
Expand Down Expand Up @@ -69,13 +70,13 @@ func (c *ConnectorAPIv1) GetConnector(
req *apiv1.GetConnectorRequest,
) (*apiv1.GetConnectorResponse, error) {
if req.Id == "" {
return nil, cerrors.ErrEmptyID
return nil, status.ConnectorError(cerrors.ErrEmptyID)
}

// fetch the connector from the ConnectorOrchestrator
pr, err := c.cs.Get(ctx, req.Id)
if err != nil {
return nil, cerrors.Errorf("failed to get connector by ID: %w", err)
return nil, status.ConnectorError(cerrors.Errorf("failed to get connector by ID: %w", err))
}

resp := toproto.Connector(pr)
Expand All @@ -96,7 +97,7 @@ func (c *ConnectorAPIv1) CreateConnector(
)

if err != nil {
return nil, cerrors.Errorf("failed to create connector: %w", err)
return nil, status.ConnectorError(cerrors.Errorf("failed to create connector: %w", err))
}

co := toproto.Connector(created)
Expand All @@ -114,7 +115,7 @@ func (c *ConnectorAPIv1) UpdateConnector(

old, err := c.cs.Get(ctx, req.Id)
if err != nil {
return nil, cerrors.Errorf("failed to get connector by ID: %w", err)
return nil, status.ConnectorError(cerrors.Errorf("failed to get connector by ID: %w", err))
}

config := fromproto.ConnectorConfig(
Expand All @@ -127,7 +128,7 @@ func (c *ConnectorAPIv1) UpdateConnector(
updated, err := c.cs.Update(ctx, req.Id, config)

if err != nil {
return nil, cerrors.Errorf("failed to update connector: %w", err)
return nil, status.ConnectorError(cerrors.Errorf("failed to update connector: %w", err))
}

co := toproto.Connector(updated)
Expand All @@ -139,7 +140,7 @@ func (c *ConnectorAPIv1) DeleteConnector(ctx context.Context, req *apiv1.DeleteC
err := c.cs.Delete(ctx, req.Id)

if err != nil {
return nil, cerrors.Errorf("failed to delete connector: %w", err)
return nil, status.ConnectorError(cerrors.Errorf("failed to delete connector: %w", err))
}

return &apiv1.DeleteConnectorResponse{}, nil
Expand Down
15 changes: 8 additions & 7 deletions pkg/web/api/pipeline_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/pipeline"
"github.com/conduitio/conduit/pkg/web/api/fromproto"
"github.com/conduitio/conduit/pkg/web/api/status"
"github.com/conduitio/conduit/pkg/web/api/toproto"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"google.golang.org/grpc"
Expand Down Expand Up @@ -66,13 +67,13 @@ func (p *PipelineAPIv1) GetPipeline(
req *apiv1.GetPipelineRequest,
) (*apiv1.GetPipelineResponse, error) {
if req.Id == "" {
return nil, cerrors.ErrEmptyID
return nil, status.PipelineError(cerrors.ErrEmptyID)
}

// fetch the pipeline from the PipelineOrchestrator
pl, err := p.ps.Get(ctx, req.Id)
if err != nil {
return nil, cerrors.Errorf("failed to get pipeline by ID: %w", err)
return nil, status.PipelineError(cerrors.Errorf("failed to get pipeline by ID: %w", err))
}

// setup an empty pipeline to hydrate.
Expand Down Expand Up @@ -108,7 +109,7 @@ func (p *PipelineAPIv1) CreatePipeline(
// create the pipeline
created, err := p.ps.Create(ctx, cfg)
if err != nil {
return nil, cerrors.Errorf("failed to create pipeline: %w", err)
return nil, status.PipelineError(cerrors.Errorf("failed to create pipeline: %w", err))
}

// translate persisted config to proto response
Expand All @@ -129,7 +130,7 @@ func (p *PipelineAPIv1) UpdatePipeline(
updated, err := p.ps.Update(ctx, req.Id, cfg)

if err != nil {
return nil, cerrors.Errorf("failed to update pipeline: %w", err)
return nil, status.PipelineError(cerrors.Errorf("failed to update pipeline: %w", err))
}

pl := toproto.Pipeline(updated)
Expand All @@ -144,7 +145,7 @@ func (p *PipelineAPIv1) DeletePipeline(
err := p.ps.Delete(ctx, req.Id)

if err != nil {
return nil, cerrors.Errorf("failed to delete pipeline: %w", err)
return nil, status.PipelineError(cerrors.Errorf("failed to delete pipeline: %w", err))
}

return &apiv1.DeletePipelineResponse{}, nil
Expand All @@ -156,7 +157,7 @@ func (p *PipelineAPIv1) StartPipeline(
) (*apiv1.StartPipelineResponse, error) {
err := p.ps.Start(ctx, req.Id)
if err != nil {
return nil, cerrors.Errorf("failed to start pipeline: %w", err)
return nil, status.PipelineError(cerrors.Errorf("failed to start pipeline: %w", err))
}

return &apiv1.StartPipelineResponse{}, nil
Expand All @@ -168,7 +169,7 @@ func (p *PipelineAPIv1) StopPipeline(
) (*apiv1.StopPipelineResponse, error) {
err := p.ps.Stop(ctx, req.Id)
if err != nil {
return nil, cerrors.Errorf("failed to stop pipeline: %w", err)
return nil, status.PipelineError(cerrors.Errorf("failed to stop pipeline: %w", err))
}

return &apiv1.StopPipelineResponse{}, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/web/api/processor_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/processor"
"github.com/conduitio/conduit/pkg/web/api/fromproto"
"github.com/conduitio/conduit/pkg/web/api/status"
"github.com/conduitio/conduit/pkg/web/api/toproto"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"google.golang.org/grpc"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (p *ProcessorAPIv1) GetProcessor(
// fetch the processor from the ProcessorOrchestrator
pr, err := p.ps.Get(ctx, req.Id)
if err != nil {
return nil, cerrors.Errorf("failed to get processor by ID: %w", err)
return nil, status.ProcessorError(cerrors.Errorf("failed to get processor by ID: %w", err))
}

resp := toproto.Processor(pr)
Expand All @@ -106,7 +107,7 @@ func (p *ProcessorAPIv1) CreateProcessor(
)

if err != nil {
return nil, cerrors.Errorf("failed to create processor: %w", err)
return nil, status.ProcessorError(cerrors.Errorf("failed to create processor: %w", err))
}

pr := toproto.Processor(created)
Expand All @@ -125,7 +126,7 @@ func (p *ProcessorAPIv1) UpdateProcessor(
updated, err := p.ps.Update(ctx, req.Id, fromproto.ProcessorConfig(req.Config))

if err != nil {
return nil, cerrors.Errorf("failed to update processor: %w", err)
return nil, status.ProcessorError(cerrors.Errorf("failed to update processor: %w", err))
}

pr := toproto.Processor(updated)
Expand All @@ -137,7 +138,7 @@ func (p *ProcessorAPIv1) DeleteProcessor(ctx context.Context, req *apiv1.DeleteP
err := p.ps.Delete(ctx, req.Id)

if err != nil {
return nil, cerrors.Errorf("failed to delete processor: %w", err)
return nil, status.ProcessorError(cerrors.Errorf("failed to delete processor: %w", err))
}

return &apiv1.DeleteProcessorResponse{}, nil
Expand Down
95 changes: 95 additions & 0 deletions pkg/web/api/status/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package status

import (
"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/orchestrator"
"github.com/conduitio/conduit/pkg/pipeline"
"github.com/conduitio/conduit/pkg/processor"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

func PipelineError(err error) error {
var code codes.Code

switch {
case cerrors.Is(err, pipeline.ErrNameMissing):
code = codes.InvalidArgument
case cerrors.Is(err, pipeline.ErrInstanceNotFound):
code = codes.NotFound
default:
code = codeFromError(err)
}

return grpcstatus.Error(code, err.Error())
}

func ConnectorError(err error) error {
var code codes.Code

switch {
case cerrors.Is(err, connector.ErrInvalidConnectorType):
code = codes.InvalidArgument
case cerrors.Is(err, connector.ErrInstanceNotFound):
code = codes.NotFound
default:
code = codeFromError(err)
}

return grpcstatus.Error(code, err.Error())
}

func ProcessorError(err error) error {
var code codes.Code

switch {
case cerrors.Is(err, orchestrator.ErrInvalidProcessorParentType):
code = codes.InvalidArgument
case cerrors.Is(err, processor.ErrInstanceNotFound):
code = codes.NotFound
default:
code = codeFromError(err)
}

return grpcstatus.Error(code, err.Error())
}

func codeFromError(err error) codes.Code {
switch {
case cerrors.Is(err, cerrors.ErrNotImpl):
return codes.Unimplemented
case cerrors.Is(err, cerrors.ErrEmptyID):
return codes.InvalidArgument
case cerrors.Is(err, pipeline.ErrPipelineRunning):
return codes.FailedPrecondition
case cerrors.Is(err, pipeline.ErrPipelineNotRunning):
return codes.FailedPrecondition
case cerrors.Is(err, pipeline.ErrNameAlreadyExists):
return codes.AlreadyExists
case cerrors.Is(err, connector.ErrConnectorRunning):
return codes.FailedPrecondition
case cerrors.Is(err, orchestrator.ErrPipelineHasConnectorsAttached):
return codes.FailedPrecondition
case cerrors.Is(err, orchestrator.ErrPipelineHasProcessorsAttached):
return codes.FailedPrecondition
case cerrors.Is(err, orchestrator.ErrConnectorHasProcessorsAttached):
return codes.FailedPrecondition
default:
return codes.Internal
}
}
Loading

0 comments on commit d073a89

Please sign in to comment.