diff --git a/pkg/plugin/connector/builtin/registry.go b/pkg/plugin/connector/builtin/registry.go index 9ffd90836..efd36f709 100644 --- a/pkg/plugin/connector/builtin/registry.go +++ b/pkg/plugin/connector/builtin/registry.go @@ -46,16 +46,6 @@ var DefaultBuiltinConnectors = map[string]sdk.Connector{ "github.com/conduitio/conduit-connector-s3": s3.Connector, } -type Registry struct { - logger log.CtxLogger - - connectors map[string]sdk.Connector - // plugins stores plugin blueprints in a 2D map, first key is the plugin - // name, the second key is the plugin version - plugins map[string]map[string]blueprint - service *connutils.SchemaService -} - type blueprint struct { fullName plugin.FullName specification pconnector.Specification @@ -89,6 +79,16 @@ func newDispenserFactory(conn sdk.Connector) dispenserFactory { } } +type Registry struct { + logger log.CtxLogger + + connectors map[string]sdk.Connector + // plugins stores plugin blueprints in a 2D map, first key is the plugin + // name, the second key is the plugin version + plugins map[string]map[string]blueprint + service *connutils.SchemaService +} + func NewRegistry(logger log.CtxLogger, connectors map[string]sdk.Connector, service *connutils.SchemaService) *Registry { logger = logger.WithComponentFromType(Registry{}) // The built-in plugins use Conduit's own schema service diff --git a/pkg/plugin/connector/builtin/registry_test.go b/pkg/plugin/connector/builtin/registry_test.go new file mode 100644 index 000000000..132a1f673 --- /dev/null +++ b/pkg/plugin/connector/builtin/registry_test.go @@ -0,0 +1,110 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builtin + +import ( + "context" + "testing" + + "github.com/conduitio/conduit-connector-protocol/pconnector" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/matryer/is" +) + +func TestRegistry_InitList(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + underTest := NewRegistry(log.Nop(), DefaultBuiltinConnectors, nil) + underTest.Init(ctx) + + specs := underTest.List() + + is.Equal(len(DefaultBuiltinConnectors), len(specs)) + for _, gotSpec := range specs { + wantSpec := DefaultBuiltinConnectors["github.com/conduitio/conduit-connector-"+gotSpec.Name].NewSpecification() + is.Equal( + "", + cmp.Diff( + pconnector.Specification(wantSpec), + gotSpec, + cmpopts.IgnoreFields(pconnector.Specification{}, "SourceParams", "DestinationParams"), + ), + ) + } +} + +func TestRegistry_NewDispenser_PluginNotFound(t *testing.T) { + testCases := []struct { + name string + pluginName string + wantErr bool + }{ + { + name: "non-existing plugin", + pluginName: "builtin:foobar", + wantErr: true, + }, + { + name: "plugin exists, version doesn't", + pluginName: "builtin:file@v12.34.56", + wantErr: true, + }, + { + name: "existing plugin, no builtin prefix, no version", + pluginName: "file", + wantErr: false, + }, + { + name: "existing plugin, with builtin prefix, no version", + pluginName: "builtin:file", + wantErr: false, + }, + { + name: "existing plugin, with builtin prefix, with version", + pluginName: func() string { + v := DefaultBuiltinConnectors["github.com/conduitio/conduit-connector-file"]. + NewSpecification(). + Version + return "builtin:file@" + v + }(), + wantErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + underTest := NewRegistry(log.Nop(), DefaultBuiltinConnectors, nil) + underTest.Init(ctx) + + dispenser, err := underTest.NewDispenser(log.Nop(), plugin.FullName(tc.pluginName), pconnector.PluginConfig{}) + if tc.wantErr { + is.True(cerrors.Is(err, plugin.ErrPluginNotFound)) + is.True(dispenser == nil) + return + } + + is.NoErr(err) + is.True(dispenser != nil) + }) + } +} diff --git a/pkg/plugin/connector/mock/source_interface_mocks.go b/pkg/plugin/connector/mock/source_interface_mocks.go new file mode 100644 index 000000000..8444e62f0 --- /dev/null +++ b/pkg/plugin/connector/mock/source_interface_mocks.go @@ -0,0 +1,494 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: service.go +// +// Generated by this command: +// +// mockgen -typed -source=service.go -destination=mock/source_interface_mocks.go -package=mock -mock_names=builtinReg=BuiltinReg,standaloneReg=StandaloneReg,authManager=AuthManager . builtinReg,standaloneReg,authManager +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + pconnector "github.com/conduitio/conduit-connector-protocol/pconnector" + log "github.com/conduitio/conduit/pkg/foundation/log" + plugin "github.com/conduitio/conduit/pkg/plugin" + connector "github.com/conduitio/conduit/pkg/plugin/connector" + gomock "go.uber.org/mock/gomock" +) + +// Mockregistry is a mock of registry interface. +type Mockregistry struct { + ctrl *gomock.Controller + recorder *MockregistryMockRecorder + isgomock struct{} +} + +// MockregistryMockRecorder is the mock recorder for Mockregistry. +type MockregistryMockRecorder struct { + mock *Mockregistry +} + +// NewMockregistry creates a new mock instance. +func NewMockregistry(ctrl *gomock.Controller) *Mockregistry { + mock := &Mockregistry{ctrl: ctrl} + mock.recorder = &MockregistryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Mockregistry) EXPECT() *MockregistryMockRecorder { + return m.recorder +} + +// List mocks base method. +func (m *Mockregistry) List() map[plugin.FullName]pconnector.Specification { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "List") + ret0, _ := ret[0].(map[plugin.FullName]pconnector.Specification) + return ret0 +} + +// List indicates an expected call of List. +func (mr *MockregistryMockRecorder) List() *MockregistryListCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*Mockregistry)(nil).List)) + return &MockregistryListCall{Call: call} +} + +// MockregistryListCall wrap *gomock.Call +type MockregistryListCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockregistryListCall) Return(arg0 map[plugin.FullName]pconnector.Specification) *MockregistryListCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockregistryListCall) Do(f func() map[plugin.FullName]pconnector.Specification) *MockregistryListCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockregistryListCall) DoAndReturn(f func() map[plugin.FullName]pconnector.Specification) *MockregistryListCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// NewDispenser mocks base method. +func (m *Mockregistry) NewDispenser(logger log.CtxLogger, name plugin.FullName, cfg pconnector.PluginConfig) (connector.Dispenser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewDispenser", logger, name, cfg) + ret0, _ := ret[0].(connector.Dispenser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewDispenser indicates an expected call of NewDispenser. +func (mr *MockregistryMockRecorder) NewDispenser(logger, name, cfg any) *MockregistryNewDispenserCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDispenser", reflect.TypeOf((*Mockregistry)(nil).NewDispenser), logger, name, cfg) + return &MockregistryNewDispenserCall{Call: call} +} + +// MockregistryNewDispenserCall wrap *gomock.Call +type MockregistryNewDispenserCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockregistryNewDispenserCall) Return(arg0 connector.Dispenser, arg1 error) *MockregistryNewDispenserCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockregistryNewDispenserCall) Do(f func(log.CtxLogger, plugin.FullName, pconnector.PluginConfig) (connector.Dispenser, error)) *MockregistryNewDispenserCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockregistryNewDispenserCall) DoAndReturn(f func(log.CtxLogger, plugin.FullName, pconnector.PluginConfig) (connector.Dispenser, error)) *MockregistryNewDispenserCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// BuiltinReg is a mock of builtinReg interface. +type BuiltinReg struct { + ctrl *gomock.Controller + recorder *BuiltinRegMockRecorder + isgomock struct{} +} + +// BuiltinRegMockRecorder is the mock recorder for BuiltinReg. +type BuiltinRegMockRecorder struct { + mock *BuiltinReg +} + +// NewBuiltinReg creates a new mock instance. +func NewBuiltinReg(ctrl *gomock.Controller) *BuiltinReg { + mock := &BuiltinReg{ctrl: ctrl} + mock.recorder = &BuiltinRegMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *BuiltinReg) EXPECT() *BuiltinRegMockRecorder { + return m.recorder +} + +// Init mocks base method. +func (m *BuiltinReg) Init(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Init", arg0) +} + +// Init indicates an expected call of Init. +func (mr *BuiltinRegMockRecorder) Init(arg0 any) *BuiltinRegInitCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*BuiltinReg)(nil).Init), arg0) + return &BuiltinRegInitCall{Call: call} +} + +// BuiltinRegInitCall wrap *gomock.Call +type BuiltinRegInitCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *BuiltinRegInitCall) Return() *BuiltinRegInitCall { + c.Call = c.Call.Return() + return c +} + +// Do rewrite *gomock.Call.Do +func (c *BuiltinRegInitCall) Do(f func(context.Context)) *BuiltinRegInitCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *BuiltinRegInitCall) DoAndReturn(f func(context.Context)) *BuiltinRegInitCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// List mocks base method. +func (m *BuiltinReg) List() map[plugin.FullName]pconnector.Specification { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "List") + ret0, _ := ret[0].(map[plugin.FullName]pconnector.Specification) + return ret0 +} + +// List indicates an expected call of List. +func (mr *BuiltinRegMockRecorder) List() *BuiltinRegListCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*BuiltinReg)(nil).List)) + return &BuiltinRegListCall{Call: call} +} + +// BuiltinRegListCall wrap *gomock.Call +type BuiltinRegListCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *BuiltinRegListCall) Return(arg0 map[plugin.FullName]pconnector.Specification) *BuiltinRegListCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *BuiltinRegListCall) Do(f func() map[plugin.FullName]pconnector.Specification) *BuiltinRegListCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *BuiltinRegListCall) DoAndReturn(f func() map[plugin.FullName]pconnector.Specification) *BuiltinRegListCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// NewDispenser mocks base method. +func (m *BuiltinReg) NewDispenser(logger log.CtxLogger, name plugin.FullName, cfg pconnector.PluginConfig) (connector.Dispenser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewDispenser", logger, name, cfg) + ret0, _ := ret[0].(connector.Dispenser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewDispenser indicates an expected call of NewDispenser. +func (mr *BuiltinRegMockRecorder) NewDispenser(logger, name, cfg any) *BuiltinRegNewDispenserCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDispenser", reflect.TypeOf((*BuiltinReg)(nil).NewDispenser), logger, name, cfg) + return &BuiltinRegNewDispenserCall{Call: call} +} + +// BuiltinRegNewDispenserCall wrap *gomock.Call +type BuiltinRegNewDispenserCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *BuiltinRegNewDispenserCall) Return(arg0 connector.Dispenser, arg1 error) *BuiltinRegNewDispenserCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *BuiltinRegNewDispenserCall) Do(f func(log.CtxLogger, plugin.FullName, pconnector.PluginConfig) (connector.Dispenser, error)) *BuiltinRegNewDispenserCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *BuiltinRegNewDispenserCall) DoAndReturn(f func(log.CtxLogger, plugin.FullName, pconnector.PluginConfig) (connector.Dispenser, error)) *BuiltinRegNewDispenserCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// StandaloneReg is a mock of standaloneReg interface. +type StandaloneReg struct { + ctrl *gomock.Controller + recorder *StandaloneRegMockRecorder + isgomock struct{} +} + +// StandaloneRegMockRecorder is the mock recorder for StandaloneReg. +type StandaloneRegMockRecorder struct { + mock *StandaloneReg +} + +// NewStandaloneReg creates a new mock instance. +func NewStandaloneReg(ctrl *gomock.Controller) *StandaloneReg { + mock := &StandaloneReg{ctrl: ctrl} + mock.recorder = &StandaloneRegMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *StandaloneReg) EXPECT() *StandaloneRegMockRecorder { + return m.recorder +} + +// Init mocks base method. +func (m *StandaloneReg) Init(ctx context.Context, connUtilsAddr string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Init", ctx, connUtilsAddr) +} + +// Init indicates an expected call of Init. +func (mr *StandaloneRegMockRecorder) Init(ctx, connUtilsAddr any) *StandaloneRegInitCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*StandaloneReg)(nil).Init), ctx, connUtilsAddr) + return &StandaloneRegInitCall{Call: call} +} + +// StandaloneRegInitCall wrap *gomock.Call +type StandaloneRegInitCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *StandaloneRegInitCall) Return() *StandaloneRegInitCall { + c.Call = c.Call.Return() + return c +} + +// Do rewrite *gomock.Call.Do +func (c *StandaloneRegInitCall) Do(f func(context.Context, string)) *StandaloneRegInitCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *StandaloneRegInitCall) DoAndReturn(f func(context.Context, string)) *StandaloneRegInitCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// List mocks base method. +func (m *StandaloneReg) List() map[plugin.FullName]pconnector.Specification { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "List") + ret0, _ := ret[0].(map[plugin.FullName]pconnector.Specification) + return ret0 +} + +// List indicates an expected call of List. +func (mr *StandaloneRegMockRecorder) List() *StandaloneRegListCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*StandaloneReg)(nil).List)) + return &StandaloneRegListCall{Call: call} +} + +// StandaloneRegListCall wrap *gomock.Call +type StandaloneRegListCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *StandaloneRegListCall) Return(arg0 map[plugin.FullName]pconnector.Specification) *StandaloneRegListCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *StandaloneRegListCall) Do(f func() map[plugin.FullName]pconnector.Specification) *StandaloneRegListCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *StandaloneRegListCall) DoAndReturn(f func() map[plugin.FullName]pconnector.Specification) *StandaloneRegListCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// NewDispenser mocks base method. +func (m *StandaloneReg) NewDispenser(logger log.CtxLogger, name plugin.FullName, cfg pconnector.PluginConfig) (connector.Dispenser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewDispenser", logger, name, cfg) + ret0, _ := ret[0].(connector.Dispenser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewDispenser indicates an expected call of NewDispenser. +func (mr *StandaloneRegMockRecorder) NewDispenser(logger, name, cfg any) *StandaloneRegNewDispenserCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewDispenser", reflect.TypeOf((*StandaloneReg)(nil).NewDispenser), logger, name, cfg) + return &StandaloneRegNewDispenserCall{Call: call} +} + +// StandaloneRegNewDispenserCall wrap *gomock.Call +type StandaloneRegNewDispenserCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *StandaloneRegNewDispenserCall) Return(arg0 connector.Dispenser, arg1 error) *StandaloneRegNewDispenserCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *StandaloneRegNewDispenserCall) Do(f func(log.CtxLogger, plugin.FullName, pconnector.PluginConfig) (connector.Dispenser, error)) *StandaloneRegNewDispenserCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *StandaloneRegNewDispenserCall) DoAndReturn(f func(log.CtxLogger, plugin.FullName, pconnector.PluginConfig) (connector.Dispenser, error)) *StandaloneRegNewDispenserCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// AuthManager is a mock of authManager interface. +type AuthManager struct { + ctrl *gomock.Controller + recorder *AuthManagerMockRecorder + isgomock struct{} +} + +// AuthManagerMockRecorder is the mock recorder for AuthManager. +type AuthManagerMockRecorder struct { + mock *AuthManager +} + +// NewAuthManager creates a new mock instance. +func NewAuthManager(ctrl *gomock.Controller) *AuthManager { + mock := &AuthManager{ctrl: ctrl} + mock.recorder = &AuthManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *AuthManager) EXPECT() *AuthManagerMockRecorder { + return m.recorder +} + +// Deregister mocks base method. +func (m *AuthManager) Deregister(token string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Deregister", token) +} + +// Deregister indicates an expected call of Deregister. +func (mr *AuthManagerMockRecorder) Deregister(token any) *AuthManagerDeregisterCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Deregister", reflect.TypeOf((*AuthManager)(nil).Deregister), token) + return &AuthManagerDeregisterCall{Call: call} +} + +// AuthManagerDeregisterCall wrap *gomock.Call +type AuthManagerDeregisterCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *AuthManagerDeregisterCall) Return() *AuthManagerDeregisterCall { + c.Call = c.Call.Return() + return c +} + +// Do rewrite *gomock.Call.Do +func (c *AuthManagerDeregisterCall) Do(f func(string)) *AuthManagerDeregisterCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *AuthManagerDeregisterCall) DoAndReturn(f func(string)) *AuthManagerDeregisterCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// GenerateNew mocks base method. +func (m *AuthManager) GenerateNew(connectorID string) string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GenerateNew", connectorID) + ret0, _ := ret[0].(string) + return ret0 +} + +// GenerateNew indicates an expected call of GenerateNew. +func (mr *AuthManagerMockRecorder) GenerateNew(connectorID any) *AuthManagerGenerateNewCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateNew", reflect.TypeOf((*AuthManager)(nil).GenerateNew), connectorID) + return &AuthManagerGenerateNewCall{Call: call} +} + +// AuthManagerGenerateNewCall wrap *gomock.Call +type AuthManagerGenerateNewCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *AuthManagerGenerateNewCall) Return(arg0 string) *AuthManagerGenerateNewCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *AuthManagerGenerateNewCall) Do(f func(string) string) *AuthManagerGenerateNewCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *AuthManagerGenerateNewCall) DoAndReturn(f func(string) string) *AuthManagerGenerateNewCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/pkg/plugin/connector/service.go b/pkg/plugin/connector/service.go index 93785a94b..f70f42c94 100644 --- a/pkg/plugin/connector/service.go +++ b/pkg/plugin/connector/service.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate mockgen -typed -source=service.go -destination=mock/source_interface_mocks.go -package=mock -mock_names=builtinReg=BuiltinReg,standaloneReg=StandaloneReg,authManager=AuthManager . builtinReg,standaloneReg,authManager + package connector import ( @@ -21,7 +23,6 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" - "github.com/conduitio/conduit/pkg/plugin/connector/connutils" ) // registry is an object that can create new plugin dispensers. We need to use @@ -51,25 +52,30 @@ type standaloneReg interface { Init(ctx context.Context, connUtilsAddr string) } +type authManager interface { + GenerateNew(connectorID string) string + Deregister(token string) +} + type PluginService struct { logger log.CtxLogger builtinReg builtinReg standaloneReg standaloneReg - tokenService *connutils.AuthManager + authManager authManager } func NewPluginService( logger log.CtxLogger, builtin builtinReg, standalone standaloneReg, - tokenService *connutils.AuthManager, + authManager authManager, ) *PluginService { return &PluginService{ logger: logger.WithComponent("connector.PluginService"), builtinReg: builtin, standaloneReg: standalone, - tokenService: tokenService, + authManager: authManager, } } @@ -86,7 +92,7 @@ func (s *PluginService) NewDispenser(logger log.CtxLogger, name string, connecto logger = logger.WithComponent("plugin") cfg := pconnector.PluginConfig{ - Token: s.tokenService.GenerateNew(connectorID), + Token: s.authManager.GenerateNew(connectorID), ConnectorID: connectorID, LogLevel: logger.GetLevel().String(), } @@ -98,7 +104,7 @@ func (s *PluginService) NewDispenser(logger log.CtxLogger, name string, connecto return DispenserWithCleanup( dispenser, - func() { s.tokenService.Deregister(cfg.Token) }, + func() { s.authManager.Deregister(cfg.Token) }, ), nil } diff --git a/pkg/plugin/connector/service_test.go b/pkg/plugin/connector/service_test.go new file mode 100644 index 000000000..f4fcc96f1 --- /dev/null +++ b/pkg/plugin/connector/service_test.go @@ -0,0 +1,301 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connector_test + +import ( + "context" + "testing" + + "github.com/conduitio/conduit-connector-protocol/pconnector" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin" + "github.com/conduitio/conduit/pkg/plugin/connector" + "github.com/conduitio/conduit/pkg/plugin/connector/mock" + "github.com/matryer/is" + "go.uber.org/mock/gomock" +) + +func TestService_Init(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + + builtinReg := mock.NewBuiltinReg(ctrl) + builtinReg.EXPECT().Init(ctx) + + standaloneReg := mock.NewStandaloneReg(ctrl) + standaloneReg.EXPECT().Init(ctx, "test-conn-utils-address") + + underTest := connector.NewPluginService( + log.Nop(), + builtinReg, + standaloneReg, + mock.NewAuthManager(ctrl), + ) + + underTest.Init(ctx, "test-conn-utils-address") + is.NoErr(underTest.Check(ctx)) +} + +func TestService_NewDispenser(t *testing.T) { + testCases := []struct { + name string + plugin string + setup func(builtinReg *mock.BuiltinReg, standaloneReg *mock.StandaloneReg) + wantErr bool + }{ + { + name: "standalone plugin found", + plugin: "standalone:foobar-connector", + setup: func(builtinReg *mock.BuiltinReg, standaloneReg *mock.StandaloneReg) { + standaloneReg.EXPECT(). + NewDispenser( + gomock.Any(), + gomock.Eq(plugin.FullName("standalone:foobar-connector")), + gomock.Any(), + ).Return(nil, nil) + }, + }, + { + name: "standalone plugin not found", + wantErr: true, + plugin: "standalone:foobar-connector", + setup: func(builtinReg *mock.BuiltinReg, standaloneReg *mock.StandaloneReg) { + standaloneReg.EXPECT(). + NewDispenser( + gomock.Any(), + gomock.Eq(plugin.FullName("standalone:foobar-connector")), + gomock.Any(), + ).Return(nil, plugin.ErrPluginNotFound) + }, + }, + { + name: "builtin plugin found", + plugin: "builtin:foobar-connector", + setup: func(builtinReg *mock.BuiltinReg, standaloneReg *mock.StandaloneReg) { + builtinReg.EXPECT(). + NewDispenser( + gomock.Any(), + gomock.Eq(plugin.FullName("builtin:foobar-connector")), + gomock.Any(), + ).Return(nil, nil) + }, + }, + { + name: "builtin plugin not found", + plugin: "builtin:foobar-connector", + wantErr: true, + setup: func(builtinReg *mock.BuiltinReg, standaloneReg *mock.StandaloneReg) { + builtinReg.EXPECT(). + NewDispenser( + gomock.Any(), + gomock.Eq(plugin.FullName("builtin:foobar-connector")), + gomock.Any(), + ).Return(nil, plugin.ErrPluginNotFound) + }, + }, + { + name: "no plugin type, standalone is assumed", + plugin: "foobar-connector", + setup: func(builtinReg *mock.BuiltinReg, standaloneReg *mock.StandaloneReg) { + standaloneReg.EXPECT(). + NewDispenser( + gomock.Any(), + gomock.Eq(plugin.FullName("foobar-connector")), + gomock.Any(), + ).Return(nil, nil) + }, + }, + { + name: "plugin without type not found, fall back to built-in", + plugin: "foobar-connector", + setup: func(builtinReg *mock.BuiltinReg, standaloneReg *mock.StandaloneReg) { + standaloneReg.EXPECT(). + NewDispenser( + gomock.Any(), + gomock.Eq(plugin.FullName("foobar-connector")), + gomock.Any(), + ).Return(nil, plugin.ErrPluginNotFound) + + builtinReg.EXPECT(). + NewDispenser( + gomock.Any(), + gomock.Eq(plugin.FullName("foobar-connector")), + gomock.Any(), + ).Return(nil, nil) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + + authManager := mock.NewAuthManager(ctrl) + authManager.EXPECT().GenerateNew("foobar-connector-id") + + builtinReg := mock.NewBuiltinReg(ctrl) + builtinReg.EXPECT().Init(ctx) + + standaloneReg := mock.NewStandaloneReg(ctrl) + standaloneReg.EXPECT().Init(ctx, "test-conn-utils-address") + + if tc.setup != nil { + tc.setup(builtinReg, standaloneReg) + } + + underTest := connector.NewPluginService( + log.Nop(), + builtinReg, + standaloneReg, + authManager, + ) + + underTest.Init(ctx, "test-conn-utils-address") + _, err := underTest.NewDispenser(log.Nop(), tc.plugin, "foobar-connector-id") + if tc.wantErr { + is.True(cerrors.Is(err, plugin.ErrPluginNotFound)) + } else { + is.NoErr(err) + } + }) + } +} + +func TestService_NewDispenser_InvalidPluginPrefix(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + + authManager := mock.NewAuthManager(ctrl) + authManager.EXPECT().GenerateNew("foobar-connector-id") + + builtinReg := mock.NewBuiltinReg(ctrl) + builtinReg.EXPECT().Init(ctx) + + standaloneReg := mock.NewStandaloneReg(ctrl) + standaloneReg.EXPECT().Init(ctx, "test-conn-utils-address") + + underTest := connector.NewPluginService( + log.Nop(), + builtinReg, + standaloneReg, + authManager, + ) + + underTest.Init(ctx, "test-conn-utils-address") + _, err := underTest.NewDispenser(log.Nop(), "mistake:plugin-name", "foobar-connector-id") + is.True(err != nil) + is.Equal(`invalid plugin name prefix "mistake"`, err.Error()) +} + +func TestService_NewDispenser_Source_TokenHandling(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + conn := "builtin:foobar-connector" + connID := "foobar-connector-id" + token := "test-token" + logger := log.Nop() + + mockSourcePlugin := mock.NewSourcePlugin(ctrl) + mockSourcePlugin.EXPECT().Teardown(ctx, pconnector.SourceTeardownRequest{}) + + mockDispenser := mock.NewDispenser(ctrl) + mockDispenser.EXPECT().DispenseSource().Return(mockSourcePlugin, nil) + + builtinReg := mock.NewBuiltinReg(ctrl) + builtinReg.EXPECT().Init(ctx) + builtinReg.EXPECT(). + NewDispenser( + gomock.Any(), + plugin.FullName(conn), + pconnector.PluginConfig{ + Token: token, + ConnectorID: connID, + LogLevel: logger.GetLevel().String(), + }, + ). + Return(mockDispenser, nil) + + standaloneReg := mock.NewStandaloneReg(ctrl) + standaloneReg.EXPECT().Init(ctx, "test-conn-utils-address") + + authManager := mock.NewAuthManager(ctrl) + authManager.EXPECT().GenerateNew(connID).Return(token) + authManager.EXPECT().Deregister(token) + + underTest := connector.NewPluginService(logger, builtinReg, standaloneReg, authManager) + underTest.Init(ctx, "test-conn-utils-address") + + dispenser, err := underTest.NewDispenser(logger, conn, connID) + is.NoErr(err) + + source, err := dispenser.DispenseSource() + is.NoErr(err) + _, err = source.Teardown(ctx, pconnector.SourceTeardownRequest{}) + is.NoErr(err) +} + +func TestService_NewDispenser_Destination_TokenHandling(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + conn := "builtin:foobar-connector" + connID := "foobar-connector-id" + token := "test-token" + logger := log.Nop() + + mockDestinationPlugin := mock.NewDestinationPlugin(ctrl) + mockDestinationPlugin.EXPECT().Teardown(ctx, pconnector.DestinationTeardownRequest{}) + + mockDispenser := mock.NewDispenser(ctrl) + mockDispenser.EXPECT().DispenseDestination().Return(mockDestinationPlugin, nil) + + builtinReg := mock.NewBuiltinReg(ctrl) + builtinReg.EXPECT().Init(ctx) + builtinReg.EXPECT(). + NewDispenser( + gomock.Any(), + plugin.FullName(conn), + pconnector.PluginConfig{ + Token: token, + ConnectorID: connID, + LogLevel: logger.GetLevel().String(), + }, + ). + Return(mockDispenser, nil) + + standaloneReg := mock.NewStandaloneReg(ctrl) + standaloneReg.EXPECT().Init(ctx, "test-conn-utils-address") + + authManager := mock.NewAuthManager(ctrl) + authManager.EXPECT().GenerateNew(connID).Return(token) + authManager.EXPECT().Deregister(token) + + underTest := connector.NewPluginService(logger, builtinReg, standaloneReg, authManager) + underTest.Init(ctx, "test-conn-utils-address") + + dispenser, err := underTest.NewDispenser(logger, conn, connID) + is.NoErr(err) + + destination, err := dispenser.DispenseDestination() + is.NoErr(err) + _, err = destination.Teardown(ctx, pconnector.DestinationTeardownRequest{}) + is.NoErr(err) +}