Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Inspector: Close inspector when connector/processor is deleted #882

Merged
merged 17 commits into from
Feb 24, 2023
4 changes: 4 additions & 0 deletions pkg/connector/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,7 @@ func (i *Instance) Connector(ctx context.Context, dispenserFetcher PluginDispens
return nil, ErrInvalidConnectorType
}
}

func (i *Instance) Close() {
i.inspector.Close()
}
1 change: 1 addition & 0 deletions pkg/connector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (s *Service) Delete(ctx context.Context, id string) error {
return cerrors.Errorf("could not delete connector instance %v from store: %w", id, err)
}
delete(s.connectors, id)
instance.Close()
measure.ConnectorsGauge.WithValues(strings.ToLower(instance.Type.String())).Dec()

return nil
Expand Down
20 changes: 18 additions & 2 deletions pkg/inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,20 @@ type Session struct {
id string
logger log.CtxLogger
onClose func()
once *sync.Once
}

func (s *Session) close() {
s.onClose()
close(s.C)
// close() can be called multiple times on a session. One example is:
// There's an active inspector session on a component (processor or connector),
// during which the component is deleted.
// The session channel will be closed, which terminate the API request fetching
// record from this session.
// However, the API request termination also closes the session.
s.once.Do(func() {
s.onClose()
close(s.C)
})
}

// send a record to the session's channel.
Expand Down Expand Up @@ -114,6 +123,7 @@ func (i *Inspector) NewSession(ctx context.Context) *Session {
onClose: func() {
i.remove(id)
},
once: &sync.Once{},
}
go func() {
<-ctx.Done()
Expand All @@ -134,6 +144,12 @@ func (i *Inspector) NewSession(ctx context.Context) *Session {
return s
}

func (i *Inspector) Close() {
for _, s := range i.sessions {
s.close()
}
Comment on lines +148 to +150
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are strict, we should be locking i.lock before we loop through i.sessions, but then s.close() also tries to lock it and we have a deadlock 😕 For correctness' sake we might want to lock first, then copy i.sessions into a separate map/slice, then unlock and then loop through the copied map/slice to close the sessions. It still doesn't guarantee that a new session won't be created while this is happening.

I know, it's an edge case, but I'm trying to say that something feels off regarding this design 😕

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't too happy with these locks from the beginning, and even less now. The thing is that the lock handles concurrent access to the sessions' map, but what we actually need to do is to make sure that the inspector will refuse to create a new session ever again after it's been closed.

IMHO, the edge case is rare enough that working more on this design doesn't return much, especially after we move inspector.Close() after connector/processor deletion. At that point, starting a new session won't be possible at all (the API will complain that the processor/connector doesn't exist).

Copy link
Member

@lovromazgon lovromazgon Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, if we delete the entity first and then close the inspector, there can't be any more requests to create new sessions. Still, it doesn't feel right to have a component that can behave unexpectedly if it's used the wrong way and we rely on using it correctly elsewhere 😕 Don't worry, I won't block the PR because I feel bad 😅 I think we can do better though.

I thought about the locking and especially the optimization TODO in Send. I gave it a go and spiked out a slightly different approach to managing sessions in the inspector using a channel (see 2796796). The main difference would be that we need an upper bound on how many sessions we can create (I think it would be smart to have that anyway). We still can't get away without a lock, it's needed because we want to close the session channel in a different goroutine than the one that's writing to that channel, without a lock we risk a panic. The nice thing about this approach is that we don't need to lock anything to check for open sessions (using a map we don't get around that), so if there are no open sessions the performance is 2x as fast (and that's IMO the case we need to optimize for).

There are some TODOs in that spike that would need to be addressed if we wanted to go down that route. Let me know what you think and if we should pursue this in a separate PR.

(BTW: I'll keep this comment unresolved so we can discuss it, but you can merge the PR anyway)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, it doesn't feel right to have a component that can behave unexpectedly if it's used the wrong way and we rely on using it correctly elsewhere confused

I definitely agree with that and my point was that it reduces the changes of something which is an edge case already.

I gave it a go and spiked out a slightly different approach to managing sessions in the inspector using a channel (see 2796796).

Really nice! 👍 It solves some of the annoyances we currently have. I was initially thinking about something like a slice or a channel to store the sessions, but didn't go with it because I wanted the sessions to be removed and not stay around after they are not needed anymore.

The nice thing about this approach is that we don't need to lock anything to check for open sessions

When you say "to check for open sessions" which part do you refer to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about this part where we return early if there are no open sessions. If we did the same thing with a map we would need to acquire a lock first.

}

// remove a session with given ID from this Inspector.
func (i *Inspector) remove(id string) {
i.lock.Lock()
Expand Down
11 changes: 11 additions & 0 deletions pkg/inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ func TestInspector_Send_SessionClosed(t *testing.T) {
)
}

func TestInspector_Close(t *testing.T) {
is := is.New(t)

underTest := New(log.Nop(), 10)
s := underTest.NewSession(context.Background())

underTest.Close()
_, ok := <-s.C
is.True(!ok)
}

func TestInspector_Send_SessionCtxCanceled(t *testing.T) {
is := is.New(t)

Expand Down
6 changes: 6 additions & 0 deletions pkg/processor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ type Interface interface {
// Process runs the processor function on a record.
Process(ctx context.Context, record record.Record) (record.Record, error)

// InspectIn starts an inspection session for input records for this processor.
InspectIn(ctx context.Context) *inspector.Session

// InspectOut starts an inspection session for output records for this processor.
InspectOut(ctx context.Context) *inspector.Session

// Close closes this processor and releases any resources
// which may have been used by it.
Close()
}

// Instance represents a processor instance.
Expand Down
12 changes: 12 additions & 0 deletions pkg/processor/mock/processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/processor/procbuiltin/func_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,8 @@ func (f FuncWrapper) InspectIn(ctx context.Context) *inspector.Session {
func (f FuncWrapper) InspectOut(ctx context.Context) *inspector.Session {
return f.outInsp.NewSession(ctx)
}

func (f FuncWrapper) Close() {
f.inInsp.Close()
f.outInsp.Close()
}
24 changes: 24 additions & 0 deletions pkg/processor/procbuiltin/func_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,27 @@ func TestFuncWrapper_InspectOut_ProcessingFailed(t *testing.T) {
_, _, err := cchan.ChanOut[record.Record](session.C).RecvTimeout(ctx, 100*time.Millisecond)
is.True(cerrors.Is(err, context.DeadlineExceeded))
}

func TestFuncWrapper_Close(t *testing.T) {
ctx := context.Background()

is := is.New(t)

underTest := NewFuncWrapper(func(_ context.Context, in record.Record) (record.Record, error) {
return record.Record{}, nil
})

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
underTest.Close()

// incoming records session should be closed
_, got, err := cchan.ChanOut[record.Record](in.C).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err)
is.True(!got)

// outgoing records session should be closed
_, got, err = cchan.ChanOut[record.Record](out.C).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err)
is.True(!got)
}
5 changes: 5 additions & 0 deletions pkg/processor/procjs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (p *Processor) InspectOut(ctx context.Context) *inspector.Session {
return p.outInsp.NewSession(ctx)
}

func (p *Processor) Close() {
p.inInsp.Close()
p.outInsp.Close()
}

func (p *Processor) toJSRecord(r record.Record) goja.Value {
convertData := func(d record.Data) interface{} {
switch v := d.(type) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/processor/procjs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,34 @@ func TestJSProcessor_Inspect(t *testing.T) {
is.True(got)
is.Equal(recOut, inspOut)
}

func TestJSProcessor_Close(t *testing.T) {
is := is.New(t)
ctx := context.Background()
src := `
function process(record) {
record.Key = new RawData();
record.Key.Raw = "foobar";
return record;
}`
underTest, err := New(src, zerolog.Nop())
is.NoErr(err) // expected no error when creating the JS processor

in := underTest.InspectIn(ctx)
out := underTest.InspectOut(ctx)
underTest.Close()

// incoming records session should be closed
_, got, err := cchan.ChanOut[record.Record](in.C).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err)
is.True(!got)

// outgoing records session should be closed
_, got, err = cchan.ChanOut[record.Record](out.C).RecvTimeout(ctx, 100*time.Millisecond)
is.NoErr(err)
is.True(!got)
}

func TestJSProcessor_JavaScriptException(t *testing.T) {
is := is.New(t)

Expand Down
3 changes: 2 additions & 1 deletion pkg/processor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *Service) List(_ context.Context) map[string]*Instance {
}

// Get will return a single processor or an error.
func (s *Service) Get(ctx context.Context, id string) (*Instance, error) {
func (s *Service) Get(_ context.Context, id string) (*Instance, error) {
ins, ok := s.instances[id]
if !ok {
return nil, cerrors.Errorf("%w (ID: %s)", ErrInstanceNotFound, id)
Expand Down Expand Up @@ -167,6 +167,7 @@ func (s *Service) Delete(ctx context.Context, id string) error {
return cerrors.Errorf("could not delete processor instance from store: %w", err)
}
delete(s.instances, id)
instance.Processor.Close()
measure.ProcessorsGauge.WithValues(instance.Type).Dec()

return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/processor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func TestService_Delete_Success(t *testing.T) {

procType := "processor-type"
p := mock.NewProcessor(ctrl)
p.EXPECT().Close()

registry := newTestBuilderRegistry(t, map[string]processor.Interface{procType: p})
service := processor.NewService(log.Nop(), db, registry)
Expand Down