Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow a connector's or processor's plugin name to be updated #1938

Merged
merged 9 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/connector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,17 @@ func (s *Service) Delete(ctx context.Context, id string, dispenserFetcher Plugin
}

// Update updates the connector config.
func (s *Service) Update(ctx context.Context, id string, data Config) (*Instance, error) {
func (s *Service) Update(ctx context.Context, id string, plugin string, data Config) (*Instance, error) {
conn, err := s.Get(ctx, id)
if err != nil {
return nil, err
}

if conn.Plugin != plugin {
s.logger.Warn(ctx).Msgf("connector plugin changing from %v to %v, "+
"this may lead to unexpected behavior and configuration issues.", conn.Plugin, plugin)
}
conn.Plugin = plugin
conn.Config = data
conn.UpdatedAt = time.Now().UTC()

Expand Down
6 changes: 4 additions & 2 deletions pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func TestService_UpdateSuccess(t *testing.T) {
Name: "changed-name",
Settings: map[string]string{"foo": "bar"},
}
wantPlugin := "changed-plugin"

conn, err := service.Create(
ctx,
Expand All @@ -530,10 +531,11 @@ func TestService_UpdateSuccess(t *testing.T) {
is.NoErr(err)

beforeUpdate := time.Now()
got, err := service.Update(ctx, conn.ID, want)
got, err := service.Update(ctx, conn.ID, wantPlugin, want)
is.NoErr(err)

is.Equal(got.Config, want)
is.Equal(got.Plugin, wantPlugin)
is.True(!got.UpdatedAt.Before(beforeUpdate))
}

Expand All @@ -545,7 +547,7 @@ func TestService_UpdateInstanceNotFound(t *testing.T) {

service := NewService(logger, db, nil)
// update connector that does not exist
got, err := service.Update(ctx, uuid.NewString(), Config{})
got, err := service.Update(ctx, uuid.NewString(), "foo-plugin", Config{})
is.True(err != nil)
is.True(cerrors.Is(err, ErrInstanceNotFound))
is.Equal(got, nil)
Expand Down
6 changes: 3 additions & 3 deletions pkg/orchestrator/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *ConnectorOrchestrator) Delete(ctx context.Context, id string) error {
return nil
}

func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config connector.Config) (*connector.Instance, error) {
func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, plugin string, config connector.Config) (*connector.Instance, error) {
var r rollback.R
defer r.MustExecute()
txn, ctx, err := c.db.NewTransaction(ctx, true)
Expand Down Expand Up @@ -181,12 +181,12 @@ func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config co
}

oldConfig := conn.Config
conn, err = c.connectors.Update(ctx, id, config)
conn, err = c.connectors.Update(ctx, id, plugin, config)
if err != nil {
return nil, err
}
r.Append(func() error {
_, err = c.connectors.Update(ctx, id, oldConfig)
_, err = c.connectors.Update(ctx, id, conn.Plugin, oldConfig)
return err
})
err = txn.Commit()
Expand Down
12 changes: 6 additions & 6 deletions pkg/orchestrator/connectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,11 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) {
ValidateSourceConfig(gomock.Any(), conn.Plugin, newConfig.Settings).
Return(nil)
consMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, newConfig).
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, conn.Plugin, newConfig).
Return(want, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Connectors.Update(ctx, conn.ID, newConfig)
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, newConfig)
is.NoErr(err)
is.Equal(got, want)
}
Expand All @@ -507,7 +507,7 @@ func TestConnectorOrchestrator_Update_ConnectorNotExist(t *testing.T) {
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Connectors.Update(ctx, id, connector.Config{})
got, err := orc.Connectors.Update(ctx, id, "test-plugin", connector.Config{})
is.True(got == nil)
is.True(err != nil)
is.True(cerrors.Is(err, wantErr))
Expand Down Expand Up @@ -537,7 +537,7 @@ func TestConnectorOrchestrator_Update_PipelineRunning(t *testing.T) {
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Connectors.Update(ctx, conn.ID, connector.Config{})
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, connector.Config{})
is.True(got == nil)
is.True(err != nil)
is.Equal(pipeline.ErrPipelineRunning, err)
Expand Down Expand Up @@ -571,11 +571,11 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) {
ValidateDestinationConfig(gomock.Any(), conn.Plugin, conn.Config.Settings).
Return(nil)
consMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, connector.Config{}).
Update(gomock.AssignableToTypeOf(ctxType), conn.ID, conn.Plugin, connector.Config{}).
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Connectors.Update(ctx, conn.ID, connector.Config{})
got, err := orc.Connectors.Update(ctx, conn.ID, conn.Plugin, connector.Config{})
is.True(got == nil)
is.True(err != nil)
is.True(cerrors.Is(err, wantErr))
Expand Down
24 changes: 12 additions & 12 deletions pkg/orchestrator/mock/orchestrator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type ConnectorService interface {
Get(ctx context.Context, id string) (*connector.Instance, error)
Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error)
Delete(ctx context.Context, id string, dispenserFetcher connector.PluginDispenserFetcher) error
Update(ctx context.Context, id string, c connector.Config) (*connector.Instance, error)
Update(ctx context.Context, id string, plugin string, c connector.Config) (*connector.Instance, error)

AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
Expand All @@ -108,7 +108,7 @@ type ProcessorService interface {
Get(ctx context.Context, id string) (*processor.Instance, error)
Create(ctx context.Context, id string, plugin string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType, condition string) (*processor.Instance, error)
MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error)
Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error)
Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error)
Delete(ctx context.Context, id string) error
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/orchestrator/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *ProcessorOrchestrator) Get(ctx context.Context, id string) (*processor.
return p.processors.Get(ctx, id)
}

func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error) {
func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, plugin string, cfg processor.Config) (*processor.Instance, error) {
var r rollback.R
defer r.MustExecute()

Expand All @@ -157,6 +157,7 @@ func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg proce
return nil, cerrors.Errorf("processor %q cannot be updated: %w", proc.ID, ErrImmutableProvisionedByConfig)
}
// provisioned by API
oldPlugin := proc.Plugin
oldConfig := proc.Config

pl, err := p.getProcessorsPipeline(ctx, proc.Parent)
Expand All @@ -168,12 +169,12 @@ func (p *ProcessorOrchestrator) Update(ctx context.Context, id string, cfg proce
return nil, pipeline.ErrPipelineRunning
}

proc, err = p.processors.Update(ctx, id, cfg)
proc, err = p.processors.Update(ctx, id, plugin, cfg)
if err != nil {
return nil, err
}
r.Append(func() error {
_, err = p.processors.Update(ctx, proc.ID, oldConfig)
_, err = p.processors.Update(ctx, proc.ID, oldPlugin, oldConfig)
return err
})

Expand Down
16 changes: 8 additions & 8 deletions pkg/orchestrator/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ func TestProcessorOrchestrator_UpdateOnPipeline_Success(t *testing.T) {
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
procsMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Config).
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Plugin, want.Config).
Return(want, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.NoErr(err)
is.Equal(want, got)
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_ProcessorNotExist(t *testing.T)
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.True(err != nil)
is.True(cerrors.Is(err, wantErr)) // errors did not match")
is.True(got == nil)
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_PipelineRunning(t *testing.T) {
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.True(err != nil)
is.Equal(pipeline.ErrPipelineRunning, err)
is.True(got == nil)
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestProcessorOrchestrator_UpdateOnPipeline_ProcessorProvisionedByConfig(t *
Return(before, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.Equal(got, nil)
is.True(err != nil)
is.True(cerrors.Is(err, ErrImmutableProvisionedByConfig)) // expected ErrImmutableProvisionedByConfig
Expand Down Expand Up @@ -550,11 +550,11 @@ func TestProcessorOrchestrator_UpdateOnPipeline_UpdateFail(t *testing.T) {
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Return(pl, nil)
procsMock.EXPECT().
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Config).
Update(gomock.AssignableToTypeOf(ctxType), want.ID, want.Plugin, want.Config).
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, before.ID, newConfig)
got, err := orc.Processors.Update(ctx, before.ID, before.Plugin, newConfig)
is.True(err != nil)
is.Equal(wantErr, err)
is.True(got == nil)
Expand Down Expand Up @@ -586,7 +586,7 @@ func TestProcessorOrchestrator_UpdateOnConnector_ConnectorNotExist(t *testing.T)
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, connPluginMock, procPluginMock, lifecycleMock)
got, err := orc.Processors.Update(ctx, want.ID, processor.Config{})
got, err := orc.Processors.Update(ctx, want.ID, want.Plugin, processor.Config{})
is.True(err != nil)
is.True(cerrors.Is(err, wantErr)) // errors did not match
is.True(got == nil)
Expand Down
11 changes: 10 additions & 1 deletion pkg/processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,25 @@ func (s *Service) Create(
}

// Update will update a processor instance config.
func (s *Service) Update(ctx context.Context, id string, cfg Config) (*Instance, error) {
func (s *Service) Update(ctx context.Context, id string, plugin string, cfg Config) (*Instance, error) {
instance, err := s.Get(ctx, id)
if err != nil {
return nil, err
}
if plugin == "" {
return nil, cerrors.Errorf("could not update processor instance (ID: %s): plugin name is empty", id)
}

if instance.running {
return nil, cerrors.Errorf("could not update processor instance (ID: %s): %w", id, ErrProcessorRunning)
}

if instance.Plugin != plugin {
s.logger.Warn(ctx).Msgf("processor plugin changing from %v to %v, "+
"this may lead to unexpected behavior and configuration issues.", instance.Plugin, plugin)
}

instance.Plugin = plugin
instance.Config = cfg
instance.UpdatedAt = time.Now()

Expand Down
Loading