Skip to content

Commit

Permalink
do not use a sandbox for Run (#1886)
Browse files Browse the repository at this point in the history
Co-authored-by: Haris Osmanagić <[email protected]>
  • Loading branch information
lovromazgon and hariso authored Oct 9, 2024
1 parent 292193e commit b429365
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/plugin/connector/builtin/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (d *destinationPluginAdapter) Run(ctx context.Context, stream pconnector.De

d.logger.Debug(ctx).Msg("calling Run")
go func() {
err := runSandboxNoResp(d.impl.Run, d.withLogger(ctx), stream, d.logger)
err := d.impl.Run(d.withLogger(ctx), stream)
if err != nil {
if !inmemStream.Close(err) {
d.logger.Err(ctx, err).Msg("stream already stopped")
Expand Down
13 changes: 0 additions & 13 deletions pkg/plugin/connector/builtin/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,3 @@ func returnResponse(ctx context.Context, res any, err error, c chan<- any, logge
c <- err
}
}

func runSandboxNoResp[REQ any](
f func(context.Context, REQ) error,
ctx context.Context,
req REQ,
logger log.CtxLogger,
) error {
_, err := runSandbox(func(ctx context.Context, req REQ) (any, error) {
err := f(ctx, req)
return nil, err
}, ctx, req, logger)
return err
}
10 changes: 0 additions & 10 deletions pkg/plugin/connector/builtin/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,3 @@ func TestRunSandbox(t *testing.T) {
})
}
}

func TestRunSandboxNoResp(t *testing.T) {
ctx := context.Background()
is := is.New(t)
logger := log.New(zerolog.New(zerolog.NewTestWriter(t)))

wantErr := cerrors.New("test error")
gotErr := runSandboxNoResp(func(context.Context, any) error { panic(wantErr) }, ctx, nil, logger)
is.Equal(gotErr, wantErr)
}
2 changes: 1 addition & 1 deletion pkg/plugin/connector/builtin/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *sourcePluginAdapter) Run(ctx context.Context, stream pconnector.SourceR

s.logger.Debug(ctx).Msg("calling Run")
go func() {
err := runSandboxNoResp(s.impl.Run, s.withLogger(ctx), stream, s.logger)
err := s.impl.Run(s.withLogger(ctx), stream)
if err != nil {
if !inmemStream.Close(err) {
s.logger.Err(ctx, err).Msg("stream already stopped")
Expand Down

0 comments on commit b429365

Please sign in to comment.