diff --git a/pkg/connector/service.go b/pkg/connector/service.go index 365205a18..a5ddfcc8d 100644 --- a/pkg/connector/service.go +++ b/pkg/connector/service.go @@ -191,12 +191,17 @@ func (s *Service) Delete(ctx context.Context, id string, dispenserFetcher Plugin } // Update updates the connector config. -func (s *Service) Update(ctx context.Context, id string, data Config) (*Instance, error) { +func (s *Service) Update(ctx context.Context, id string, plugin string, data Config) (*Instance, error) { conn, err := s.Get(ctx, id) if err != nil { return nil, err } + if conn.Plugin != plugin { + s.logger.Warn(ctx).Msgf("connector plugin changing from %v to %v, "+ + "this may lead to unexpected behavior and configuration issues.", conn.Plugin, plugin) + } + conn.Plugin = plugin conn.Config = data conn.UpdatedAt = time.Now().UTC() diff --git a/pkg/connector/service_test.go b/pkg/connector/service_test.go index 30b5f6268..94b9a1794 100644 --- a/pkg/connector/service_test.go +++ b/pkg/connector/service_test.go @@ -514,6 +514,7 @@ func TestService_UpdateSuccess(t *testing.T) { Name: "changed-name", Settings: map[string]string{"foo": "bar"}, } + wantPlugin := "changed-plugin" conn, err := service.Create( ctx, @@ -530,10 +531,11 @@ func TestService_UpdateSuccess(t *testing.T) { is.NoErr(err) beforeUpdate := time.Now() - got, err := service.Update(ctx, conn.ID, want) + got, err := service.Update(ctx, conn.ID, wantPlugin, want) is.NoErr(err) is.Equal(got.Config, want) + is.Equal(got.Plugin, wantPlugin) is.True(!got.UpdatedAt.Before(beforeUpdate)) } @@ -545,7 +547,7 @@ func TestService_UpdateInstanceNotFound(t *testing.T) { service := NewService(logger, db, nil) // update connector that does not exist - got, err := service.Update(ctx, uuid.NewString(), Config{}) + got, err := service.Update(ctx, uuid.NewString(), "foo-plugin", Config{}) is.True(err != nil) is.True(cerrors.Is(err, ErrInstanceNotFound)) is.Equal(got, nil) diff --git a/pkg/orchestrator/connectors.go b/pkg/orchestrator/connectors.go index bbed8f4e3..33e67f757 100644 --- a/pkg/orchestrator/connectors.go +++ b/pkg/orchestrator/connectors.go @@ -151,7 +151,7 @@ func (c *ConnectorOrchestrator) Delete(ctx context.Context, id string) error { return nil } -func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config connector.Config) (*connector.Instance, error) { +func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, plugin string, config connector.Config) (*connector.Instance, error) { var r rollback.R defer r.MustExecute() txn, ctx, err := c.db.NewTransaction(ctx, true) @@ -181,12 +181,12 @@ func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config co } oldConfig := conn.Config - conn, err = c.connectors.Update(ctx, id, config) + conn, err = c.connectors.Update(ctx, id, plugin, config) if err != nil { return nil, err } r.Append(func() error { - _, err = c.connectors.Update(ctx, id, oldConfig) + _, err = c.connectors.Update(ctx, id, conn.Plugin, oldConfig) return err }) err = txn.Commit() diff --git a/pkg/orchestrator/connectors_test.go b/pkg/orchestrator/connectors_test.go index 7a448b6da..4f92e6231 100644 --- a/pkg/orchestrator/connectors_test.go +++ b/pkg/orchestrator/connectors_test.go @@ -485,11 +485,11 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) { ValidateSourceConfig(gomock.Any(), conn.Plugin, newConfig.Settings). Return(nil) consMock.EXPECT(). - Update(gomock.AssignableToTypeOf(ctxType), conn.ID, newConfig). + Update(gomock.AssignableToTypeOf(ctxType), conn.ID, conn.Plugin, newConfig). Return(want, nil) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Connectors.Update(ctx, conn.ID, newConfig) + got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, newConfig) is.NoErr(err) is.Equal(got, want) } @@ -507,7 +507,7 @@ func TestConnectorOrchestrator_Update_ConnectorNotExist(t *testing.T) { Return(nil, wantErr) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Connectors.Update(ctx, id, connector.Config{}) + got, err := orc.Connectors.Update(ctx, id, "test-plugin", connector.Config{}) is.True(got == nil) is.True(err != nil) is.True(cerrors.Is(err, wantErr)) @@ -537,7 +537,7 @@ func TestConnectorOrchestrator_Update_PipelineRunning(t *testing.T) { Return(pl, nil) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Connectors.Update(ctx, conn.ID, connector.Config{}) + got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, connector.Config{}) is.True(got == nil) is.True(err != nil) is.Equal(pipeline.ErrPipelineRunning, err) @@ -571,11 +571,11 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) { ValidateDestinationConfig(gomock.Any(), conn.Plugin, conn.Config.Settings). Return(nil) consMock.EXPECT(). - Update(gomock.AssignableToTypeOf(ctxType), conn.ID, connector.Config{}). + Update(gomock.AssignableToTypeOf(ctxType), conn.ID, conn.Plugin, connector.Config{}). Return(nil, wantErr) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Connectors.Update(ctx, conn.ID, connector.Config{}) + got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, connector.Config{}) is.True(got == nil) is.True(err != nil) is.True(cerrors.Is(err, wantErr)) diff --git a/pkg/orchestrator/mock/orchestrator.go b/pkg/orchestrator/mock/orchestrator.go index 15a4b552f..997915ec3 100644 --- a/pkg/orchestrator/mock/orchestrator.go +++ b/pkg/orchestrator/mock/orchestrator.go @@ -692,18 +692,18 @@ func (c *ConnectorServiceRemoveProcessorCall) DoAndReturn(f func(context.Context } // Update mocks base method. -func (m *ConnectorService) Update(ctx context.Context, id string, c connector.Config) (*connector.Instance, error) { +func (m *ConnectorService) Update(ctx context.Context, id, plugin string, c connector.Config) (*connector.Instance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", ctx, id, c) + ret := m.ctrl.Call(m, "Update", ctx, id, plugin, c) ret0, _ := ret[0].(*connector.Instance) ret1, _ := ret[1].(error) return ret0, ret1 } // Update indicates an expected call of Update. -func (mr *ConnectorServiceMockRecorder) Update(ctx, id, c any) *ConnectorServiceUpdateCall { +func (mr *ConnectorServiceMockRecorder) Update(ctx, id, plugin, c any) *ConnectorServiceUpdateCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ConnectorService)(nil).Update), ctx, id, c) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ConnectorService)(nil).Update), ctx, id, plugin, c) return &ConnectorServiceUpdateCall{Call: call} } @@ -719,13 +719,13 @@ func (c_2 *ConnectorServiceUpdateCall) Return(arg0 *connector.Instance, arg1 err } // Do rewrite *gomock.Call.Do -func (c_2 *ConnectorServiceUpdateCall) Do(f func(context.Context, string, connector.Config) (*connector.Instance, error)) *ConnectorServiceUpdateCall { +func (c_2 *ConnectorServiceUpdateCall) Do(f func(context.Context, string, string, connector.Config) (*connector.Instance, error)) *ConnectorServiceUpdateCall { c_2.Call = c_2.Call.Do(f) return c_2 } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c_2 *ConnectorServiceUpdateCall) DoAndReturn(f func(context.Context, string, connector.Config) (*connector.Instance, error)) *ConnectorServiceUpdateCall { +func (c_2 *ConnectorServiceUpdateCall) DoAndReturn(f func(context.Context, string, string, connector.Config) (*connector.Instance, error)) *ConnectorServiceUpdateCall { c_2.Call = c_2.Call.DoAndReturn(f) return c_2 } @@ -948,18 +948,18 @@ func (c *ProcessorServiceMakeRunnableProcessorCall) DoAndReturn(f func(context.C } // Update mocks base method. -func (m *ProcessorService) Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) { +func (m *ProcessorService) Update(ctx context.Context, id, plugin string, cfg processor.Config) (*processor.Instance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", ctx, id, cfg) + ret := m.ctrl.Call(m, "Update", ctx, id, plugin, cfg) ret0, _ := ret[0].(*processor.Instance) ret1, _ := ret[1].(error) return ret0, ret1 } // Update indicates an expected call of Update. -func (mr *ProcessorServiceMockRecorder) Update(ctx, id, cfg any) *ProcessorServiceUpdateCall { +func (mr *ProcessorServiceMockRecorder) Update(ctx, id, plugin, cfg any) *ProcessorServiceUpdateCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ProcessorService)(nil).Update), ctx, id, cfg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ProcessorService)(nil).Update), ctx, id, plugin, cfg) return &ProcessorServiceUpdateCall{Call: call} } @@ -975,13 +975,13 @@ func (c *ProcessorServiceUpdateCall) Return(arg0 *processor.Instance, arg1 error } // Do rewrite *gomock.Call.Do -func (c *ProcessorServiceUpdateCall) Do(f func(context.Context, string, processor.Config) (*processor.Instance, error)) *ProcessorServiceUpdateCall { +func (c *ProcessorServiceUpdateCall) Do(f func(context.Context, string, string, processor.Config) (*processor.Instance, error)) *ProcessorServiceUpdateCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ProcessorServiceUpdateCall) DoAndReturn(f func(context.Context, string, processor.Config) (*processor.Instance, error)) *ProcessorServiceUpdateCall { +func (c *ProcessorServiceUpdateCall) DoAndReturn(f func(context.Context, string, string, processor.Config) (*processor.Instance, error)) *ProcessorServiceUpdateCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/pkg/orchestrator/orchestrator.go b/pkg/orchestrator/orchestrator.go index a5477d9ff..6f1468729 100644 --- a/pkg/orchestrator/orchestrator.go +++ b/pkg/orchestrator/orchestrator.go @@ -97,7 +97,7 @@ type ConnectorService interface { Get(ctx context.Context, id string) (*connector.Instance, error) Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error) Delete(ctx context.Context, id string, dispenserFetcher connector.PluginDispenserFetcher) error - Update(ctx context.Context, id string, c connector.Config) (*connector.Instance, error) + Update(ctx context.Context, id string, plugin string, c connector.Config) (*connector.Instance, error) AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error) RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error) @@ -108,7 +108,7 @@ type ProcessorService interface { Get(ctx context.Context, id string) (*processor.Instance, error) Create(ctx context.Context, id string, plugin string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType, condition string) (*processor.Instance, error) MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error) - Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) + Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error) Delete(ctx context.Context, id string) error } diff --git a/pkg/orchestrator/processors.go b/pkg/orchestrator/processors.go index e97a4f07f..06bd8b85b 100644 --- a/pkg/orchestrator/processors.go +++ b/pkg/orchestrator/processors.go @@ -137,7 +137,7 @@ func (p *ProcessorOrchestrator) Get(ctx context.Context, id string) (*processor. return p.processors.Get(ctx, id) } -func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) { +func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error) { var r rollback.R defer r.MustExecute() @@ -157,6 +157,7 @@ func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg proce return nil, cerrors.Errorf("processor %q cannot be updated: %w", proc.ID, ErrImmutableProvisionedByConfig) } // provisioned by API + oldPlugin := proc.Plugin oldConfig := proc.Config pl, err := p.getProcessorsPipeline(ctx, proc.Parent) @@ -168,12 +169,12 @@ func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg proce return nil, pipeline.ErrPipelineRunning } - proc, err = p.processors.Update(ctx, id, cfg) + proc, err = p.processors.Update(ctx, id, plugin, cfg) if err != nil { return nil, err } r.Append(func() error { - _, err = p.processors.Update(ctx, proc.ID, oldConfig) + _, err = p.processors.Update(ctx, proc.ID, oldPlugin, oldConfig) return err }) diff --git a/pkg/orchestrator/processors_test.go b/pkg/orchestrator/processors_test.go index 8fc9868d9..9e38c15d6 100644 --- a/pkg/orchestrator/processors_test.go +++ b/pkg/orchestrator/processors_test.go @@ -375,11 +375,11 @@ func TestProcessorOrchestrator_UpdateOnPipeline_Success(t *testing.T) { Get(gomock.AssignableToTypeOf(ctxType), pl.ID). Return(pl, nil) procsMock.EXPECT(). - Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Config). + Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Plugin, want.Config). Return(want, nil) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Processors.Update(ctx, before.ID, newConfig) + got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig) is.NoErr(err) is.Equal(want, got) } @@ -417,7 +417,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_ProcessorNotExist(t *testing.T) Return(nil, wantErr) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Processors.Update(ctx, before.ID, newConfig) + got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig) is.True(err != nil) is.True(cerrors.Is(err, wantErr)) // errors did not match") is.True(got == nil) @@ -458,7 +458,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_PipelineRunning(t *testing.T) { Return(pl, nil) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Processors.Update(ctx, before.ID, newConfig) + got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig) is.True(err != nil) is.Equal(pipeline.ErrPipelineRunning, err) is.True(got == nil) @@ -498,7 +498,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_ProcessorProvisionedByConfig(t * Return(before, nil) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Processors.Update(ctx, before.ID, newConfig) + got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig) is.Equal(got, nil) is.True(err != nil) is.True(cerrors.Is(err, ErrImmutableProvisionedByConfig)) // expected ErrImmutableProvisionedByConfig @@ -550,11 +550,11 @@ func TestProcessorOrchestrator_UpdateOnPipeline_UpdateFail(t *testing.T) { Get(gomock.AssignableToTypeOf(ctxType), pl.ID). Return(pl, nil) procsMock.EXPECT(). - Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Config). + Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Plugin, want.Config). Return(nil, wantErr) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Processors.Update(ctx, before.ID, newConfig) + got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig) is.True(err != nil) is.Equal(wantErr, err) is.True(got == nil) @@ -586,7 +586,7 @@ func TestProcessorOrchestrator_UpdateOnConnector_ConnectorNotExist(t *testing.T) Return(nil, wantErr) orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock) - got, err := orc.Processors.Update(ctx, want.ID, processor.Config{}) + got, err := orc.Processors.Update(ctx, want.ID, want.Plugin, processor.Config{}) is.True(err != nil) is.True(cerrors.Is(err, wantErr)) // errors did not match is.True(got == nil) diff --git a/pkg/processor/service.go b/pkg/processor/service.go index e0ab16ee8..5533f0abf 100644 --- a/pkg/processor/service.go +++ b/pkg/processor/service.go @@ -162,16 +162,25 @@ func (s *Service) Create( } // Update will update a processor instance config. -func (s *Service) Update(ctx context.Context, id string, cfg Config) (*Instance, error) { +func (s *Service) Update(ctx context.Context, id string, plugin string, cfg Config) (*Instance, error) { instance, err := s.Get(ctx, id) if err != nil { return nil, err } + if plugin == "" { + return nil, cerrors.Errorf("could not update processor instance (ID: %s): plugin name is empty", id) + } if instance.running { return nil, cerrors.Errorf("could not update processor instance (ID: %s): %w", id, ErrProcessorRunning) } + if instance.Plugin != plugin { + s.logger.Warn(ctx).Msgf("processor plugin changing from %v to %v, "+ + "this may lead to unexpected behavior and configuration issues.", instance.Plugin, plugin) + } + + instance.Plugin = plugin instance.Config = cfg instance.UpdatedAt = time.Now() diff --git a/pkg/processor/service_test.go b/pkg/processor/service_test.go index f1eb59382..1c803f048 100644 --- a/pkg/processor/service_test.go +++ b/pkg/processor/service_test.go @@ -16,6 +16,7 @@ package processor import ( "context" + "strings" "testing" "github.com/conduitio/conduit-commons/database/inmemory" @@ -388,8 +389,9 @@ func TestService_Update_Success(t *testing.T) { "processor-config-field-2": "bar", }, } + newProcType := "new-proc-type" - got, err := service.Update(ctx, want.ID, newConfig) + got, err := service.Update(ctx, want.ID, newProcType, newConfig) is.NoErr(err) is.Equal(want, got) // same instance is returned is.Equal(newConfig, got.Config) // config was updated @@ -397,19 +399,41 @@ func TestService_Update_Success(t *testing.T) { got, err = service.Get(ctx, want.ID) is.NoErr(err) is.Equal(newConfig, got.Config) + is.Equal(newProcType, got.Plugin) } -func TestService_Update_Fail(t *testing.T) { +func TestService_Update_NonExistentProcessor(t *testing.T) { is := is.New(t) ctx := context.Background() db := &inmemory.DB{} service := NewService(log.Nop(), db, &proc_plugin.PluginService{}) - got, err := service.Update(ctx, "non-existent processor", Config{}) + got, err := service.Update(ctx, "non-existent processor", "test-processor", Config{}) is.True(cerrors.Is(err, ErrInstanceNotFound)) // expected instance not found error is.Equal(got, nil) } +func TestService_Update_EmptyPluginName(t *testing.T) { + is := is.New(t) + ctx := context.Background() + db := &inmemory.DB{} + + procType := "processor-type" + p := proc_mock.NewProcessor(gomock.NewController(t)) + p.EXPECT().Teardown(ctx).Return(nil) + + registry := newPluginService(t, map[string]sdk.Processor{procType: p}) + service := NewService(log.Nop(), db, registry) + + // create a processor instance + want, err := service.Create(ctx, uuid.NewString(), procType, Parent{}, Config{}, ProvisionTypeAPI, "") + is.NoErr(err) + + _, err = service.Update(ctx, want.ID, "", Config{}) + is.True(err != nil) + is.True(strings.Contains(err.Error(), "plugin name is empty")) +} + // newPluginService creates a registry with builders for the supplied // processors map keyed by processor type. If a value in the map is nil then a // builder will be registered that returns an error. diff --git a/pkg/provisioning/config/parser.go b/pkg/provisioning/config/parser.go index b6cf10d72..81970e923 100644 --- a/pkg/provisioning/config/parser.go +++ b/pkg/provisioning/config/parser.go @@ -61,11 +61,8 @@ var ( PipelineMutableFields = []string{"Name", "Description", "Connectors", "Processors", "DLQ"} PipelineIgnoredFields = []string{"Status"} - ConnectorImmutableFields = []string{"Type", "Plugin"} - ConnectorMutableFields = []string{"Name", "Settings", "Processors"} - - ProcessorImmutableFields = []string{"Plugin"} - ProcessorMutableFields = []string{"Settings", "Workers", "Condition"} + ConnectorImmutableFields = []string{"Type"} + ConnectorMutableFields = []string{"Name", "Settings", "Processors", "Plugin"} ) // Parser reads data from reader and parses all pipelines defined in the diff --git a/pkg/provisioning/config/parser_test.go b/pkg/provisioning/config/parser_test.go index 38407b1e4..ca9ed072b 100644 --- a/pkg/provisioning/config/parser_test.go +++ b/pkg/provisioning/config/parser_test.go @@ -50,20 +50,6 @@ func TestConnectorFields(t *testing.T) { is.Equal(wantFields, haveFields) // fields don't match, if you added a field to Connector please add it to ConnectorImmutableFields or ConnectorMutableFields } -func TestProcessorFields(t *testing.T) { - is := is.New(t) - - wantFields := extractFieldNames(reflect.TypeOf(Processor{})) - haveFields := []string{"ID"} // ID is special, it's the identifier - haveFields = append(haveFields, ProcessorImmutableFields...) - haveFields = append(haveFields, ProcessorMutableFields...) - - sort.Strings(wantFields) - sort.Strings(haveFields) - - is.Equal(wantFields, haveFields) // fields don't match, if you added a field to Processor please add it to ProcessorImmutableFields or ProcessorMutableFields -} - func extractFieldNames(t reflect.Type) []string { fields := make([]string, t.NumField()) for i := 0; i < t.NumField(); i++ { diff --git a/pkg/provisioning/import.go b/pkg/provisioning/import.go index 80bc3cc2b..ea61b3356 100644 --- a/pkg/provisioning/import.go +++ b/pkg/provisioning/import.go @@ -290,7 +290,7 @@ func (ab actionsBuilder) prepareConnectorActions(oldConfig, newConfig config.Con func (ab actionsBuilder) prepareProcessorActions(oldConfig, newConfig config.Processor, parent processor.Parent) []action { if oldConfig.ID == "" { - // no old config, it's a brand new processor + // no old config, it's a brand-new processor return []action{createProcessorAction{ cfg: newConfig, parent: parent, @@ -305,40 +305,17 @@ func (ab actionsBuilder) prepareProcessorActions(oldConfig, newConfig config.Pro }} } - // first compare whole configs + // configs match, no need to do anything if cmp.Equal(oldConfig, newConfig) { - // configs match, no need to do anything return nil } - // compare them again but ignore mutable fields, if configs are still - // different an update is not possible, we have to entirely recreate the - // processor - opts := []cmp.Option{ - cmpopts.IgnoreFields(config.Processor{}, config.ProcessorMutableFields...), - } - if cmp.Equal(oldConfig, newConfig, opts...) { - // only updatable fields don't match, we can update the processor - return []action{updateProcessorAction{ - oldConfig: oldConfig, - newConfig: newConfig, - processorService: ab.processorService, - }} - } - - // we have to delete the old processor and create a new one - return []action{ - deleteProcessorAction{ - cfg: oldConfig, - parent: parent, - processorService: ab.processorService, - }, - createProcessorAction{ - cfg: newConfig, - parent: parent, - processorService: ab.processorService, - }, - } + // the processor changed, and all parts of a processor are updateable + return []action{updateProcessorAction{ + oldConfig: oldConfig, + newConfig: newConfig, + processorService: ab.processorService, + }} } func reverseActions(actions []action) { diff --git a/pkg/provisioning/import_actions.go b/pkg/provisioning/import_actions.go index 90e0e1883..cca746d18 100644 --- a/pkg/provisioning/import_actions.go +++ b/pkg/provisioning/import_actions.go @@ -316,7 +316,7 @@ func (a updateConnectorAction) Rollback(ctx context.Context) error { } func (a updateConnectorAction) update(ctx context.Context, cfg config.Connector) error { - c, err := a.connectorService.Update(ctx, cfg.ID, connector.Config{ + c, err := a.connectorService.Update(ctx, cfg.ID, cfg.Plugin, connector.Config{ Name: cfg.Name, Settings: cfg.Settings, }) @@ -376,10 +376,15 @@ func (a updateProcessorAction) Rollback(ctx context.Context) error { } func (a updateProcessorAction) update(ctx context.Context, cfg config.Processor) error { - _, err := a.processorService.Update(ctx, cfg.ID, processor.Config{ - Settings: cfg.Settings, - Workers: cfg.Workers, - }) + _, err := a.processorService.Update( + ctx, + cfg.ID, + cfg.Plugin, + processor.Config{ + Settings: cfg.Settings, + Workers: cfg.Workers, + }, + ) if err != nil { return cerrors.Errorf("failed to update processor: %w", err) } diff --git a/pkg/provisioning/import_actions_test.go b/pkg/provisioning/import_actions_test.go index 7cd60597b..098208a95 100644 --- a/pkg/provisioning/import_actions_test.go +++ b/pkg/provisioning/import_actions_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/conduitio/conduit-commons/lang" "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/pipeline" "github.com/conduitio/conduit/pkg/processor" @@ -42,8 +43,8 @@ func TestCreatePipelineAction_Do(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: lang.Ptr(1), + WindowNackThreshold: lang.Ptr(2), }, } wantCfg := pipeline.Config{ @@ -87,8 +88,8 @@ func TestCreatePipelineAction_Rollback(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: lang.Ptr(1), + WindowNackThreshold: lang.Ptr(2), }, } @@ -113,8 +114,8 @@ func TestUpdatePipelineAction(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: lang.Ptr(1), + WindowNackThreshold: lang.Ptr(2), }, } @@ -198,8 +199,8 @@ func TestDeletePipelineAction_Do(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: lang.Ptr(1), + WindowNackThreshold: lang.Ptr(2), }, } @@ -228,8 +229,8 @@ func TestDeletePipelineAction_Rollback(t *testing.T) { DLQ: config.DLQ{ Plugin: "dlq-plugin", Settings: map[string]string{"foo": "bar"}, - WindowSize: intPtr(1), - WindowNackThreshold: intPtr(2), + WindowSize: lang.Ptr(1), + WindowNackThreshold: lang.Ptr(2), }, } wantCfg := pipeline.Config{ @@ -381,7 +382,7 @@ func TestUpdateConnectorAction(t *testing.T) { } connSrv := mock.NewConnectorService(ctrl) - connSrv.EXPECT().Update(ctx, haveCfg.ID, wantCfg).Return(instance, nil) + connSrv.EXPECT().Update(ctx, haveCfg.ID, haveCfg.Plugin, wantCfg).Return(instance, nil) connSrv.EXPECT().RemoveProcessor(ctx, haveCfg.ID, instance.ProcessorIDs[0]) connSrv.EXPECT().RemoveProcessor(ctx, haveCfg.ID, instance.ProcessorIDs[1]) connSrv.EXPECT().AddProcessor(ctx, haveCfg.ID, haveCfg.Processors[0].ID) @@ -574,7 +575,7 @@ func TestUpdateProcessorAction(t *testing.T) { } connSrv := mock.NewProcessorService(ctrl) - connSrv.EXPECT().Update(ctx, haveCfg.ID, wantCfg).Return(instance, nil) + connSrv.EXPECT().Update(ctx, haveCfg.ID, haveCfg.Plugin, wantCfg).Return(instance, nil) a := updateProcessorAction{ oldConfig: tc.oldConfig, diff --git a/pkg/provisioning/import_test.go b/pkg/provisioning/import_test.go index bd85ab461..3f958ccc6 100644 --- a/pkg/provisioning/import_test.go +++ b/pkg/provisioning/import_test.go @@ -260,17 +260,10 @@ func TestActionBuilder_Build(t *testing.T) { newConfig: newConfig, pipelineService: pipSrv, }, - deleteConnectorAction{ - cfg: oldConfig.Connectors[1], - pipelineID: oldConfig.ID, - connectorService: connSrv, - connectorPluginService: connPlugSrv, - }, - createConnectorAction{ - cfg: newConfig.Connectors[1], - pipelineID: newConfig.ID, - connectorService: connSrv, - connectorPluginService: connPlugSrv, + updateConnectorAction{ + oldConfig: oldConfig.Connectors[1], + newConfig: newConfig.Connectors[1], + connectorService: connSrv, }, updateConnectorAction{ oldConfig: oldConfig.Connectors[2], @@ -291,20 +284,9 @@ func TestActionBuilder_Build(t *testing.T) { }, processorService: procSrv, }, - deleteProcessorAction{ - cfg: oldConfig.Processors[1], - parent: processor.Parent{ - ID: oldConfig.ID, - Type: processor.ParentTypePipeline, - }, - processorService: procSrv, - }, - createProcessorAction{ - cfg: newConfig.Processors[1], - parent: processor.Parent{ - ID: newConfig.ID, - Type: processor.ParentTypePipeline, - }, + updateProcessorAction{ + oldConfig: oldConfig.Processors[1], + newConfig: newConfig.Processors[1], processorService: procSrv, }, updateProcessorAction{ @@ -595,6 +577,10 @@ func TestActionsBuilder_PrepareConnectorActions_Update(t *testing.T) { name: "different Name", oldConfig: config.Connector{ID: "config-id", Name: "old-name"}, newConfig: config.Connector{ID: "config-id", Name: "new-name"}, + }, { + name: "different Plugin", + oldConfig: config.Connector{ID: "config-id", Plugin: "old-plugin"}, + newConfig: config.Connector{ID: "config-id", Plugin: "new-plugin"}, }, { name: "different Settings", oldConfig: config.Connector{ID: "config-id", Settings: map[string]string{"foo": "bar"}}, @@ -634,10 +620,6 @@ func TestActionsBuilder_PrepareConnectorActions_Recreate(t *testing.T) { name: "different Type", oldConfig: config.Connector{ID: "config-id", Type: config.TypeSource}, newConfig: config.Connector{ID: "config-id", Type: config.TypeDestination}, - }, { - name: "different Plugin", - oldConfig: config.Connector{ID: "config-id", Plugin: "old-plugin"}, - newConfig: config.Connector{ID: "config-id", Plugin: "new-plugin"}, }} for _, tc := range testCases { @@ -764,50 +746,10 @@ func TestActionsBuilder_PrepareProcessorActions_Update(t *testing.T) { } } -func TestActionsBuilder_PrepareProcessorActions_Recreate(t *testing.T) { - logger := log.Nop() - ctrl := gomock.NewController(t) - - srv, _, _, procSrv, _, _ := newTestService(ctrl, logger) - parent := processor.Parent{ - ID: uuid.NewString(), - Type: processor.ParentTypePipeline, - } - - testCases := []struct { - name string - oldConfig config.Processor - newConfig config.Processor - }{{ - name: "different Type", - oldConfig: config.Processor{ID: "config-id", Plugin: "old-type"}, - newConfig: config.Processor{ID: "config-id", Plugin: "new-type"}, - }} - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - is := is.New(t) - want := []action{deleteProcessorAction{ - cfg: tc.oldConfig, - parent: parent, - processorService: procSrv, - }, createProcessorAction{ - cfg: tc.newConfig, - parent: parent, - processorService: procSrv, - }} - got := srv.newActionsBuilder().prepareProcessorActions(tc.oldConfig, tc.newConfig, parent) - is.Equal(got, want) - }) - } -} - // ------------- // -- HELPERS -- // ------------- -func intPtr(i int) *int { return &i } - func newTestService(ctrl *gomock.Controller, logger log.CtxLogger) (*Service, *mock.PipelineService, *mock.ConnectorService, *mock.ProcessorService, *mock.ConnectorPluginService, *mock.LifecycleService) { db := &inmemory.DB{} pipSrv := mock.NewPipelineService(ctrl) diff --git a/pkg/provisioning/interfaces.go b/pkg/provisioning/interfaces.go index 21f2142e2..a05b383c9 100644 --- a/pkg/provisioning/interfaces.go +++ b/pkg/provisioning/interfaces.go @@ -43,7 +43,7 @@ type PipelineService interface { type ConnectorService interface { Get(ctx context.Context, id string) (*connector.Instance, error) Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error) - Update(ctx context.Context, id string, c connector.Config) (*connector.Instance, error) + Update(ctx context.Context, id string, plugin string, c connector.Config) (*connector.Instance, error) Delete(ctx context.Context, id string, dispenserFetcher connector.PluginDispenserFetcher) error AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error) @@ -62,7 +62,7 @@ type ProcessorService interface { condition string, ) (*processor.Instance, error) MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error) - Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) + Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error) Delete(ctx context.Context, id string) error } diff --git a/pkg/provisioning/mock/provisioning.go b/pkg/provisioning/mock/provisioning.go index 5454818af..49857ac0b 100644 --- a/pkg/provisioning/mock/provisioning.go +++ b/pkg/provisioning/mock/provisioning.go @@ -652,18 +652,18 @@ func (c *ConnectorServiceRemoveProcessorCall) DoAndReturn(f func(context.Context } // Update mocks base method. -func (m *ConnectorService) Update(ctx context.Context, id string, c connector.Config) (*connector.Instance, error) { +func (m *ConnectorService) Update(ctx context.Context, id, plugin string, c connector.Config) (*connector.Instance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", ctx, id, c) + ret := m.ctrl.Call(m, "Update", ctx, id, plugin, c) ret0, _ := ret[0].(*connector.Instance) ret1, _ := ret[1].(error) return ret0, ret1 } // Update indicates an expected call of Update. -func (mr *ConnectorServiceMockRecorder) Update(ctx, id, c any) *ConnectorServiceUpdateCall { +func (mr *ConnectorServiceMockRecorder) Update(ctx, id, plugin, c any) *ConnectorServiceUpdateCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ConnectorService)(nil).Update), ctx, id, c) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ConnectorService)(nil).Update), ctx, id, plugin, c) return &ConnectorServiceUpdateCall{Call: call} } @@ -679,13 +679,13 @@ func (c_2 *ConnectorServiceUpdateCall) Return(arg0 *connector.Instance, arg1 err } // Do rewrite *gomock.Call.Do -func (c_2 *ConnectorServiceUpdateCall) Do(f func(context.Context, string, connector.Config) (*connector.Instance, error)) *ConnectorServiceUpdateCall { +func (c_2 *ConnectorServiceUpdateCall) Do(f func(context.Context, string, string, connector.Config) (*connector.Instance, error)) *ConnectorServiceUpdateCall { c_2.Call = c_2.Call.Do(f) return c_2 } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c_2 *ConnectorServiceUpdateCall) DoAndReturn(f func(context.Context, string, connector.Config) (*connector.Instance, error)) *ConnectorServiceUpdateCall { +func (c_2 *ConnectorServiceUpdateCall) DoAndReturn(f func(context.Context, string, string, connector.Config) (*connector.Instance, error)) *ConnectorServiceUpdateCall { c_2.Call = c_2.Call.DoAndReturn(f) return c_2 } @@ -870,18 +870,18 @@ func (c *ProcessorServiceMakeRunnableProcessorCall) DoAndReturn(f func(context.C } // Update mocks base method. -func (m *ProcessorService) Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) { +func (m *ProcessorService) Update(ctx context.Context, id, plugin string, cfg processor.Config) (*processor.Instance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", ctx, id, cfg) + ret := m.ctrl.Call(m, "Update", ctx, id, plugin, cfg) ret0, _ := ret[0].(*processor.Instance) ret1, _ := ret[1].(error) return ret0, ret1 } // Update indicates an expected call of Update. -func (mr *ProcessorServiceMockRecorder) Update(ctx, id, cfg any) *ProcessorServiceUpdateCall { +func (mr *ProcessorServiceMockRecorder) Update(ctx, id, plugin, cfg any) *ProcessorServiceUpdateCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ProcessorService)(nil).Update), ctx, id, cfg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ProcessorService)(nil).Update), ctx, id, plugin, cfg) return &ProcessorServiceUpdateCall{Call: call} } @@ -897,13 +897,13 @@ func (c *ProcessorServiceUpdateCall) Return(arg0 *processor.Instance, arg1 error } // Do rewrite *gomock.Call.Do -func (c *ProcessorServiceUpdateCall) Do(f func(context.Context, string, processor.Config) (*processor.Instance, error)) *ProcessorServiceUpdateCall { +func (c *ProcessorServiceUpdateCall) Do(f func(context.Context, string, string, processor.Config) (*processor.Instance, error)) *ProcessorServiceUpdateCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ProcessorServiceUpdateCall) DoAndReturn(f func(context.Context, string, processor.Config) (*processor.Instance, error)) *ProcessorServiceUpdateCall { +func (c *ProcessorServiceUpdateCall) DoAndReturn(f func(context.Context, string, string, processor.Config) (*processor.Instance, error)) *ProcessorServiceUpdateCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index b8536bd66..db9b753f5 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -171,9 +171,9 @@ func TestService_Init_Update(t *testing.T) { // update pipeline pipelineService.EXPECT().Update(anyCtx, p1.P1.ID, p1.P1.Config).Return(oldPipelineInstance, nil) pipelineService.EXPECT().UpdateDLQ(anyCtx, p1.P1.ID, p1.P1.DLQ) - connService.EXPECT().Update(anyCtx, p1.P1C1.ID, p1.P1C1.Config).Return(oldConnector1Instance, nil) - procService.EXPECT().Update(anyCtx, p1.P1C2P1.ID, p1.P1C2P1.Config) - procService.EXPECT().Update(anyCtx, p1.P1P1.ID, p1.P1P1.Config) + connService.EXPECT().Update(anyCtx, p1.P1C1.ID, p1.P1C1.Plugin, p1.P1C1.Config).Return(oldConnector1Instance, nil) + procService.EXPECT().Update(anyCtx, p1.P1C2P1.ID, p1.P1C2P1.Plugin, p1.P1C2P1.Config) + procService.EXPECT().Update(anyCtx, p1.P1P1.ID, p1.P1P1.Plugin, p1.P1P1.Config) // start pipeline lifecycleService.EXPECT().Start(anyCtx, p1.P1.ID) @@ -310,15 +310,15 @@ func TestService_Init_RollbackUpdate(t *testing.T) { // update pipeline pipelineService.EXPECT().Update(anyCtx, p1.P1.ID, p1.P1.Config).Return(oldPipelineInstance, nil) pipelineService.EXPECT().UpdateDLQ(anyCtx, p1.P1.ID, p1.P1.DLQ) - connService.EXPECT().Update(anyCtx, p1.P1C1.ID, p1.P1C1.Config).Return(oldConnector1Instance, nil) - procService.EXPECT().Update(anyCtx, p1.P1C2P1.ID, p1.P1C2P1.Config) + connService.EXPECT().Update(anyCtx, p1.P1C1.ID, p1.P1C1.Plugin, p1.P1C1.Config).Return(oldConnector1Instance, nil) + procService.EXPECT().Update(anyCtx, p1.P1C2P1.ID, p1.P1C2P1.Plugin, p1.P1C2P1.Config) wantErr := cerrors.New("err") - procService.EXPECT().Update(anyCtx, p1.P1P1.ID, p1.P1P1.Config).Return(nil, wantErr) // fails + procService.EXPECT().Update(anyCtx, p1.P1P1.ID, p1.P1P1.Plugin, p1.P1P1.Config).Return(nil, wantErr) // fails // rollback changes - procService.EXPECT().Update(anyCtx, oldPipelineProcessorInstance.ID, oldPipelineProcessorInstance.Config) - procService.EXPECT().Update(anyCtx, oldConnectorProcessorInstance.ID, oldConnectorProcessorInstance.Config) - connService.EXPECT().Update(anyCtx, oldConnector1Instance.ID, oldConnector1Instance.Config).Return(oldConnector1Instance, nil) + procService.EXPECT().Update(anyCtx, oldPipelineProcessorInstance.ID, oldPipelineProcessorInstance.Plugin, oldPipelineProcessorInstance.Config) + procService.EXPECT().Update(anyCtx, oldConnectorProcessorInstance.ID, oldPipelineProcessorInstance.Plugin, oldConnectorProcessorInstance.Config) + connService.EXPECT().Update(anyCtx, oldConnector1Instance.ID, oldConnector1Instance.Plugin, oldConnector1Instance.Config).Return(oldConnector1Instance, nil) pipelineService.EXPECT().Update(anyCtx, oldPipelineInstance.ID, oldPipelineInstance.Config).Return(oldPipelineInstance, nil) pipelineService.EXPECT().UpdateDLQ(anyCtx, oldPipelineInstance.ID, oldPipelineInstance.DLQ) diff --git a/pkg/web/api/connector_v1.go b/pkg/web/api/connector_v1.go index 971c3074a..2a4dde881 100644 --- a/pkg/web/api/connector_v1.go +++ b/pkg/web/api/connector_v1.go @@ -39,7 +39,7 @@ type ConnectorOrchestrator interface { List(ctx context.Context) map[string]*connector.Instance Get(ctx context.Context, id string) (*connector.Instance, error) Delete(ctx context.Context, id string) error - Update(ctx context.Context, id string, config connector.Config) (*connector.Instance, error) + Update(ctx context.Context, id string, plugin string, config connector.Config) (*connector.Instance, error) Validate(ctx context.Context, t connector.Type, plugin string, config connector.Config) error Inspect(ctx context.Context, id string) (*inspector.Session, error) } @@ -163,7 +163,7 @@ func (c *ConnectorAPIv1) UpdateConnector( return nil, cerrors.ErrEmptyID } - updated, err := c.connectorOrchestrator.Update(ctx, req.Id, fromproto.ConnectorConfig(req.Config)) + updated, err := c.connectorOrchestrator.Update(ctx, req.Id, req.Plugin, fromproto.ConnectorConfig(req.Config)) if err != nil { return nil, status.ConnectorError(cerrors.Errorf("failed to update connector: %w", err)) } diff --git a/pkg/web/api/connector_v1_test.go b/pkg/web/api/connector_v1_test.go index 6e87aae61..3a4a54db8 100644 --- a/pkg/web/api/connector_v1_test.go +++ b/pkg/web/api/connector_v1_test.go @@ -401,13 +401,14 @@ func TestConnectorAPIv1_UpdateConnector(t *testing.T) { before := newTestSource() after := newTestSource() after.ID = before.ID + after.Plugin = "test-plugin" after.Config = connector.Config{ Name: "A source connector", Settings: map[string]string{"path": "path/to"}, } after.State = connector.SourceState{Position: []byte("irrelevant")} - csMock.EXPECT().Update(ctx, before.ID, after.Config).Return(after, nil).Times(1) + csMock.EXPECT().Update(ctx, before.ID, after.Plugin, after.Config).Return(after, nil).Times(1) now := time.Now() want := &apiv1.UpdateConnectorResponse{Connector: &apiv1.Connector{ @@ -432,6 +433,7 @@ func TestConnectorAPIv1_UpdateConnector(t *testing.T) { &apiv1.UpdateConnectorRequest{ Id: want.Connector.Id, Config: want.Connector.Config, + Plugin: want.Connector.Plugin, }, ) is.NoErr(err) diff --git a/pkg/web/api/mock/connector.go b/pkg/web/api/mock/connector.go index 114cb206c..a8e18b688 100644 --- a/pkg/web/api/mock/connector.go +++ b/pkg/web/api/mock/connector.go @@ -236,18 +236,18 @@ func (c *ConnectorOrchestratorListCall) DoAndReturn(f func(context.Context) map[ } // Update mocks base method. -func (m *ConnectorOrchestrator) Update(ctx context.Context, id string, config connector.Config) (*connector.Instance, error) { +func (m *ConnectorOrchestrator) Update(ctx context.Context, id, plugin string, config connector.Config) (*connector.Instance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", ctx, id, config) + ret := m.ctrl.Call(m, "Update", ctx, id, plugin, config) ret0, _ := ret[0].(*connector.Instance) ret1, _ := ret[1].(error) return ret0, ret1 } // Update indicates an expected call of Update. -func (mr *ConnectorOrchestratorMockRecorder) Update(ctx, id, config any) *ConnectorOrchestratorUpdateCall { +func (mr *ConnectorOrchestratorMockRecorder) Update(ctx, id, plugin, config any) *ConnectorOrchestratorUpdateCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ConnectorOrchestrator)(nil).Update), ctx, id, config) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ConnectorOrchestrator)(nil).Update), ctx, id, plugin, config) return &ConnectorOrchestratorUpdateCall{Call: call} } @@ -263,13 +263,13 @@ func (c *ConnectorOrchestratorUpdateCall) Return(arg0 *connector.Instance, arg1 } // Do rewrite *gomock.Call.Do -func (c *ConnectorOrchestratorUpdateCall) Do(f func(context.Context, string, connector.Config) (*connector.Instance, error)) *ConnectorOrchestratorUpdateCall { +func (c *ConnectorOrchestratorUpdateCall) Do(f func(context.Context, string, string, connector.Config) (*connector.Instance, error)) *ConnectorOrchestratorUpdateCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ConnectorOrchestratorUpdateCall) DoAndReturn(f func(context.Context, string, connector.Config) (*connector.Instance, error)) *ConnectorOrchestratorUpdateCall { +func (c *ConnectorOrchestratorUpdateCall) DoAndReturn(f func(context.Context, string, string, connector.Config) (*connector.Instance, error)) *ConnectorOrchestratorUpdateCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/pkg/web/api/mock/processor.go b/pkg/web/api/mock/processor.go index 40d77fb4c..7e1fa50c1 100644 --- a/pkg/web/api/mock/processor.go +++ b/pkg/web/api/mock/processor.go @@ -275,18 +275,18 @@ func (c *ProcessorOrchestratorListCall) DoAndReturn(f func(context.Context) map[ } // Update mocks base method. -func (m *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) { +func (m *ProcessorOrchestrator) Update(ctx context.Context, id, plugin string, cfg processor.Config) (*processor.Instance, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", ctx, id, cfg) + ret := m.ctrl.Call(m, "Update", ctx, id, plugin, cfg) ret0, _ := ret[0].(*processor.Instance) ret1, _ := ret[1].(error) return ret0, ret1 } // Update indicates an expected call of Update. -func (mr *ProcessorOrchestratorMockRecorder) Update(ctx, id, cfg any) *ProcessorOrchestratorUpdateCall { +func (mr *ProcessorOrchestratorMockRecorder) Update(ctx, id, plugin, cfg any) *ProcessorOrchestratorUpdateCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ProcessorOrchestrator)(nil).Update), ctx, id, cfg) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*ProcessorOrchestrator)(nil).Update), ctx, id, plugin, cfg) return &ProcessorOrchestratorUpdateCall{Call: call} } @@ -302,13 +302,13 @@ func (c *ProcessorOrchestratorUpdateCall) Return(arg0 *processor.Instance, arg1 } // Do rewrite *gomock.Call.Do -func (c *ProcessorOrchestratorUpdateCall) Do(f func(context.Context, string, processor.Config) (*processor.Instance, error)) *ProcessorOrchestratorUpdateCall { +func (c *ProcessorOrchestratorUpdateCall) Do(f func(context.Context, string, string, processor.Config) (*processor.Instance, error)) *ProcessorOrchestratorUpdateCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ProcessorOrchestratorUpdateCall) DoAndReturn(f func(context.Context, string, processor.Config) (*processor.Instance, error)) *ProcessorOrchestratorUpdateCall { +func (c *ProcessorOrchestratorUpdateCall) DoAndReturn(f func(context.Context, string, string, processor.Config) (*processor.Instance, error)) *ProcessorOrchestratorUpdateCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/pkg/web/api/processor_v1.go b/pkg/web/api/processor_v1.go index 80c086b60..6a1985723 100644 --- a/pkg/web/api/processor_v1.go +++ b/pkg/web/api/processor_v1.go @@ -44,7 +44,7 @@ type ProcessorOrchestrator interface { // Create will make a new processor. Create(ctx context.Context, procType string, parent processor.Parent, cfg processor.Config, condition string) (*processor.Instance, error) // Update will update a processor's config. - Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) + Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error) // Delete removes a processor Delete(ctx context.Context, id string) error // InspectIn starts an inspector session for the records coming into the processor with given ID. @@ -218,7 +218,7 @@ func (p *ProcessorAPIv1) UpdateProcessor( return nil, cerrors.ErrEmptyID } - updated, err := p.processorOrchestrator.Update(ctx, req.Id, fromproto.ProcessorConfig(req.Config)) + updated, err := p.processorOrchestrator.Update(ctx, req.Id, req.Plugin, fromproto.ProcessorConfig(req.Config)) if err != nil { return nil, status.ProcessorError(cerrors.Errorf("failed to update processor: %w", err)) } diff --git a/pkg/web/api/processor_v1_test.go b/pkg/web/api/processor_v1_test.go index c25fac2d7..bddcfbc6d 100644 --- a/pkg/web/api/processor_v1_test.go +++ b/pkg/web/api/processor_v1_test.go @@ -372,7 +372,7 @@ func TestProcessorAPIv1_UpdateProcessor(t *testing.T) { UpdatedAt: now, CreatedAt: now, } - psMock.EXPECT().Update(ctx, pr.ID, config).Return(pr, nil).Times(1) + psMock.EXPECT().Update(ctx, pr.ID, pr.Plugin, config).Return(pr, nil).Times(1) want := &apiv1.UpdateProcessorResponse{Processor: &apiv1.Processor{ Id: pr.ID, @@ -392,6 +392,7 @@ func TestProcessorAPIv1_UpdateProcessor(t *testing.T) { ctx, &apiv1.UpdateProcessorRequest{ Id: want.Processor.Id, + Plugin: want.Processor.Plugin, Config: want.Processor.Config, }, ) diff --git a/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json b/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json index cc23fbab6..22ac03e4f 100644 --- a/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json +++ b/pkg/web/openapi/swagger-ui/api/v1/api.swagger.json @@ -1528,6 +1528,9 @@ "properties": { "config": { "$ref": "#/definitions/v1ConnectorConfig" + }, + "plugin": { + "type": "string" } } }, @@ -1647,6 +1650,9 @@ "properties": { "config": { "$ref": "#/definitions/v1ProcessorConfig" + }, + "plugin": { + "type": "string" } } }, diff --git a/proto/api/v1/api.pb.go b/proto/api/v1/api.pb.go index 4caaac7f4..60a188b06 100644 --- a/proto/api/v1/api.pb.go +++ b/proto/api/v1/api.pb.go @@ -2525,6 +2525,7 @@ type UpdateConnectorRequest struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Config *Connector_Config `protobuf:"bytes,2,opt,name=config,proto3" json:"config,omitempty"` + Plugin string `protobuf:"bytes,3,opt,name=plugin,proto3" json:"plugin,omitempty"` } func (x *UpdateConnectorRequest) Reset() { @@ -2573,6 +2574,13 @@ func (x *UpdateConnectorRequest) GetConfig() *Connector_Config { return nil } +func (x *UpdateConnectorRequest) GetPlugin() string { + if x != nil { + return x.Plugin + } + return "" +} + type UpdateConnectorResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -3311,6 +3319,7 @@ type UpdateProcessorRequest struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Config *Processor_Config `protobuf:"bytes,2,opt,name=config,proto3" json:"config,omitempty"` + Plugin string `protobuf:"bytes,3,opt,name=plugin,proto3" json:"plugin,omitempty"` } func (x *UpdateProcessorRequest) Reset() { @@ -3359,6 +3368,13 @@ func (x *UpdateProcessorRequest) GetConfig() *Processor_Config { return nil } +func (x *UpdateProcessorRequest) GetPlugin() string { + if x != nil { + return x.Plugin + } + return "" +} + type UpdateProcessorResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -4844,85 +4860,88 @@ var file_api_v1_api_proto_rawDesc = []byte{ 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x6f, 0x72, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x22, 0x5a, 0x0a, + 0x6f, 0x72, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x22, 0x72, 0x0a, 0x16, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x30, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x4a, 0x0a, 0x17, 0x55, 0x70, 0x64, - 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, - 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, - 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x6f, 0x72, 0x22, 0x28, 0x0a, 0x16, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, - 0x19, 0x0a, 0x17, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x31, 0x0a, 0x1b, 0x4c, 0x69, - 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x6c, 0x75, 0x67, 0x69, - 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x5f, 0x0a, - 0x1c, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x6c, - 0x75, 0x67, 0x69, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, - 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, - 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, - 0x72, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x53, 0x70, 0x65, 0x63, 0x69, 0x66, 0x69, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x22, 0x36, - 0x0a, 0x15, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x73, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x65, 0x6e, - 0x74, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x72, - 0x65, 0x6e, 0x74, 0x49, 0x64, 0x73, 0x22, 0x4b, 0x0a, 0x16, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, - 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x31, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x73, 0x18, 0x01, - 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, - 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x0a, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, - 0x6f, 0x72, 0x73, 0x22, 0x2b, 0x0a, 0x19, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x50, 0x72, - 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x49, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x67, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x6c, 0x75, + 0x67, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, + 0x6e, 0x22, 0x4a, 0x0a, 0x17, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, + 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x6f, 0x72, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x22, 0x28, 0x0a, + 0x16, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x19, 0x0a, 0x17, 0x44, 0x65, 0x6c, 0x65, 0x74, + 0x65, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x31, 0x0a, 0x1b, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x5f, 0x0a, 0x1c, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x07, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, + 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x53, + 0x70, 0x65, 0x63, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x70, + 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x73, 0x22, 0x36, 0x0a, 0x15, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, + 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x1d, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x09, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x73, 0x22, 0x4b, + 0x0a, 0x16, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x63, + 0x65, 0x73, 0x73, 0x6f, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, + 0x0a, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x73, 0x22, 0x2b, 0x0a, 0x19, 0x49, + 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x49, + 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x48, 0x0a, 0x1a, 0x49, 0x6e, 0x73, 0x70, + 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x49, 0x6e, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, + 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, + 0x72, 0x64, 0x22, 0x2c, 0x0a, 0x1a, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, + 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, - 0x22, 0x48, 0x0a, 0x1a, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, - 0x73, 0x73, 0x6f, 0x72, 0x49, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, - 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, - 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x2c, 0x0a, 0x1a, 0x49, 0x6e, - 0x73, 0x70, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x4f, 0x75, - 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x49, 0x0a, 0x1b, 0x49, 0x6e, 0x73, 0x70, - 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x4f, 0x75, 0x74, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, - 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x22, 0xca, 0x01, 0x0a, 0x16, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, - 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, - 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, 0x18, 0x01, - 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x30, 0x0a, 0x06, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, - 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x2e, 0x50, 0x61, 0x72, 0x65, 0x6e, 0x74, - 0x52, 0x06, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x12, 0x30, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, - 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, - 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, - 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x6c, 0x75, 0x67, - 0x69, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, - 0x22, 0x4a, 0x0a, 0x17, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, - 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x70, - 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, + 0x22, 0x49, 0x0a, 0x1b, 0x49, 0x6e, 0x73, 0x70, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, + 0x73, 0x73, 0x6f, 0x72, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x12, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x63, 0x64, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, + 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0xca, 0x01, 0x0a, 0x16, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x42, 0x02, 0x18, 0x01, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x30, + 0x0a, 0x06, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, - 0x72, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x22, 0x25, 0x0a, 0x13, - 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x22, 0x47, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, - 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x70, - 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, + 0x72, 0x2e, 0x50, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, + 0x12, 0x30, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, + 0x73, 0x6f, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x16, 0x0a, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x22, 0x4a, 0x0a, 0x17, 0x43, 0x72, 0x65, 0x61, + 0x74, 0x65, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, + 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, + 0x73, 0x73, 0x6f, 0x72, 0x22, 0x25, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, + 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x47, 0x0a, 0x14, 0x47, + 0x65, 0x74, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, + 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, + 0x73, 0x73, 0x6f, 0x72, 0x22, 0x72, 0x0a, 0x16, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x72, + 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x30, + 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, - 0x72, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x22, 0x5a, 0x0a, 0x16, - 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x30, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, - 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x4a, 0x0a, 0x17, 0x55, 0x70, 0x64, 0x61, + 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x12, 0x16, 0x0a, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x22, 0x4a, 0x0a, 0x17, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, diff --git a/proto/api/v1/api.proto b/proto/api/v1/api.proto index 5361653dc..588a7c4d5 100644 --- a/proto/api/v1/api.proto +++ b/proto/api/v1/api.proto @@ -821,6 +821,7 @@ message GetConnectorResponse { message UpdateConnectorRequest { string id = 1; Connector.Config config = 2; + string plugin = 3; } message UpdateConnectorResponse { Connector connector = 1; @@ -1022,6 +1023,7 @@ message GetProcessorResponse { message UpdateProcessorRequest { string id = 1; Processor.Config config = 2; + string plugin = 3; } message UpdateProcessorResponse { Processor processor = 1; diff --git a/proto/api/v1/api.swagger.json b/proto/api/v1/api.swagger.json index cc23fbab6..22ac03e4f 100644 --- a/proto/api/v1/api.swagger.json +++ b/proto/api/v1/api.swagger.json @@ -1528,6 +1528,9 @@ "properties": { "config": { "$ref": "#/definitions/v1ConnectorConfig" + }, + "plugin": { + "type": "string" } } }, @@ -1647,6 +1650,9 @@ "properties": { "config": { "$ref": "#/definitions/v1ProcessorConfig" + }, + "plugin": { + "type": "string" } } }, diff --git a/proto/buf.yaml b/proto/buf.yaml index 882e3e5cc..82527f919 100644 --- a/proto/buf.yaml +++ b/proto/buf.yaml @@ -2,7 +2,7 @@ version: v1 name: buf.build/conduitio/conduit lint: use: - - DEFAULT + - STANDARD breaking: use: - FILE