Skip to content

Commit

Permalink
Validate connector name and pipeline description through API (#1242)
Browse files Browse the repository at this point in the history
* validate connector name when creating a connector through the API

* validate pipeline description character limit to be 250 characters

* add validation for name, description and id of pipeline

* add validation for name and id of connectors

* lint fix

* regexp pattern fix

* regexp pattern reformat to string

* resolve pr conversations, use multierr to return err in validateConnector

* modify regex to correctly match and add tests for validation in connector service_test.go

* modify and add tests for service_test.go
increase delay in persister_test.go

* update regex to confide with provisioning test

* lint fix

* Update pkg/pipeline/errors.go

Co-authored-by: Lovro Mažgon <[email protected]>

* regex fix

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
AdamHaffar and lovromazgon authored Nov 29, 2023
1 parent e06af99 commit 01fafc5
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 19 deletions.
5 changes: 5 additions & 0 deletions pkg/connector/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ var (
ErrInvalidConnectorStateType = cerrors.New("invalid connector state type")
ErrProcessorIDNotFound = cerrors.New("processor ID not found")
ErrConnectorRunning = cerrors.New("connector is running")
ErrInvalidCharacters = cerrors.New("connector ID contains invalid characters")
ErrIDOverLimit = cerrors.New("connector ID is over the character limit (64)")
ErrNameOverLimit = cerrors.New("connector name is over the character limit (64)")
ErrNameMissing = cerrors.New("must provide a connector name")
ErrIDMissing = cerrors.New("must provide a connector ID")
)
34 changes: 33 additions & 1 deletion pkg/connector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package connector

import (
"context"
"regexp"
"strings"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/foundation/multierror"
)

var idRegex = regexp.MustCompile(`^[A-Za-z0-9-_:]*$`)

// Service manages connectors.
type Service struct {
logger log.CtxLogger
Expand Down Expand Up @@ -111,6 +115,11 @@ func (s *Service) Create(
cfg Config,
p ProvisionType,
) (*Instance, error) {
err := s.validateConnector(cfg, id)
if err != nil {
return nil, cerrors.Errorf("connector is invalid: %w", err)
}

// determine the path of the Connector binary
if plugin == "" {
return nil, cerrors.New("must provide a plugin")
Expand Down Expand Up @@ -142,7 +151,7 @@ func (s *Service) Create(
}

// persist instance
err := s.store.Set(ctx, id, conn)
err = s.store.Set(ctx, id, conn)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -275,3 +284,26 @@ func (s *Service) SetState(ctx context.Context, id string, state any) (*Instance

return conn, err
}
func (s *Service) validateConnector(cfg Config, id string) error {
// contains all the errors occurred while provisioning configuration files.
var multierr error

if cfg.Name == "" {
multierr = multierror.Append(multierr, ErrNameMissing)
}
if len(cfg.Name) > 64 {
multierr = multierror.Append(multierr, ErrNameOverLimit)
}
if id == "" {
multierr = multierror.Append(multierr, ErrIDMissing)
}
matched := idRegex.MatchString(id)
if !matched {
multierr = multierror.Append(multierr, ErrInvalidCharacters)
}
if len(id) > 64 {
multierr = multierror.Append(multierr, ErrIDOverLimit)
}

return multierr
}
125 changes: 123 additions & 2 deletions pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ func TestService_CreateDLQ(t *testing.T) {
TypeDestination,
"test-plugin",
uuid.NewString(),
Config{},
Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
ProvisionTypeDLQ,
)
is.NoErr(err)
Expand Down Expand Up @@ -271,7 +274,8 @@ func TestService_CreateError(t *testing.T) {
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}}
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -290,6 +294,123 @@ func TestService_CreateError(t *testing.T) {
}
}

func TestService_Create_ValidateSuccess(t *testing.T) {
is := is.New(t)
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}

service := NewService(logger, db, nil)

testCases := []struct {
name string
connID string
data Config
}{{
name: "valid config name",
connID: uuid.NewString(),
data: Config{
Name: "Name#@-/_0%$",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "valid connector ID",
connID: "Aa0-_",
data: Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
got, err := service.Create(
ctx,
tt.connID,
TypeSource,
"test-plugin",
uuid.NewString(),
tt.data,
ProvisionTypeAPI,
)
is.True(got != nil)
is.Equal(err, nil)
})
}
}

func TestService_Create_ValidateError(t *testing.T) {
is := is.New(t)
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}

service := NewService(logger, db, nil)

testCases := []struct {
name string
connID string
errType error
data Config
}{{
name: "empty config name",
connID: uuid.NewString(),
errType: ErrNameMissing,
data: Config{
Name: "",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "connector name over 64 characters",
connID: uuid.NewString(),
errType: ErrNameOverLimit,
data: Config{
Name: "aaaaaaaaa1bbbbbbbbb2ccccccccc3ddddddddd4eeeeeeeee5fffffffff6ggggg",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "connector ID over 64 characters",
connID: "aaaaaaaaa1bbbbbbbbb2ccccccccc3ddddddddd4eeeeeeeee5fffffffff6ggggg",
errType: ErrIDOverLimit,
data: Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "invalid characters in connector ID",
connID: "a%bc",
errType: ErrInvalidCharacters,
data: Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "empty connector ID",
connID: "",
errType: ErrIDMissing,
data: Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
got, err := service.Create(
ctx,
tt.connID,
TypeSource,
"test-plugin",
uuid.NewString(),
tt.data,
ProvisionTypeAPI,
)
is.True(cerrors.Is(err, tt.errType))
is.Equal(got, nil)
})
}
}

func TestService_GetInstanceNotFound(t *testing.T) {
is := is.New(t)
ctx := context.Background()
Expand Down
23 changes: 14 additions & 9 deletions pkg/pipeline/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ package pipeline
import "github.com/conduitio/conduit/pkg/foundation/cerrors"

var (
ErrTimeout = cerrors.New("operation timed out")
ErrGracefulShutdown = cerrors.New("graceful shutdown")
ErrPipelineRunning = cerrors.New("pipeline is running")
ErrPipelineNotRunning = cerrors.New("pipeline not running")
ErrInstanceNotFound = cerrors.New("pipeline instance not found")
ErrNameMissing = cerrors.New("must provide a pipeline name")
ErrNameAlreadyExists = cerrors.New("pipeline name already exists")
ErrConnectorIDNotFound = cerrors.New("connector ID not found")
ErrProcessorIDNotFound = cerrors.New("processor ID not found")
ErrTimeout = cerrors.New("operation timed out")
ErrGracefulShutdown = cerrors.New("graceful shutdown")
ErrPipelineRunning = cerrors.New("pipeline is running")
ErrPipelineNotRunning = cerrors.New("pipeline not running")
ErrInstanceNotFound = cerrors.New("pipeline instance not found")
ErrNameMissing = cerrors.New("must provide a pipeline name")
ErrIDMissing = cerrors.New("must provide a pipeline ID")
ErrNameAlreadyExists = cerrors.New("pipeline name already exists")
ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters")
ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)")
ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)")
ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)")
ErrConnectorIDNotFound = cerrors.New("connector ID not found")
ErrProcessorIDNotFound = cerrors.New("processor ID not found")
)
43 changes: 37 additions & 6 deletions pkg/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package pipeline

import (
"context"
"regexp"
"strings"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/foundation/multierror"
)

var idRegex = regexp.MustCompile(`^[A-Za-z0-9-_:]*$`)

type FailureEvent struct {
// ID is the ID of the pipeline which failed.
ID string
Expand Down Expand Up @@ -111,11 +115,9 @@ func (s *Service) Get(_ context.Context, id string) (*Instance, error) {
// Create will create a new pipeline instance with the given config and return
// it if it was successfully saved to the database.
func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error) {
if cfg.Name == "" {
return nil, ErrNameMissing
}
if s.instanceNames[cfg.Name] {
return nil, ErrNameAlreadyExists
err := s.validatePipeline(cfg, id)
if err != nil {
return nil, cerrors.Errorf("pipeline is invalid: %w", err)
}

t := time.Now()
Expand All @@ -129,7 +131,7 @@ func (s *Service) Create(ctx context.Context, id string, cfg Config, p Provision
DLQ: DefaultDLQ,
}

err := s.store.Set(ctx, pl.ID, pl)
err = s.store.Set(ctx, pl.ID, pl)
if err != nil {
return nil, cerrors.Errorf("failed to save pipeline with ID %q: %w", pl.ID, err)
}
Expand Down Expand Up @@ -326,3 +328,32 @@ func (s *Service) notify(pipelineID string, err error) {
handler(e)
}
}
func (s *Service) validatePipeline(cfg Config, id string) error {
// contains all the errors occurred while provisioning configuration files.
var multierr error

if cfg.Name == "" {
multierr = multierror.Append(multierr, ErrNameMissing)
}
if s.instanceNames[cfg.Name] {
multierr = multierror.Append(multierr, ErrNameAlreadyExists)
}
if len(cfg.Name) > 64 {
multierr = multierror.Append(multierr, ErrNameOverLimit)
}
if len(cfg.Description) > 8192 {
multierr = multierror.Append(multierr, ErrDescriptionOverLimit)
}
if id == "" {
multierr = multierror.Append(multierr, ErrIDMissing)
}
matched := idRegex.MatchString(id)
if !matched {
multierr = multierror.Append(multierr, ErrInvalidCharacters)
}
if len(id) > 64 {
multierr = multierror.Append(multierr, ErrIDOverLimit)
}

return multierr
}
Loading

0 comments on commit 01fafc5

Please sign in to comment.