Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Perf] Inspector refactoring and optimization #1248

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 59 additions & 64 deletions pkg/inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package inspector
import (
"context"
"sync"
"sync/atomic"

"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
Expand All @@ -32,23 +33,9 @@ const DefaultBufferSize = 1000
type Session struct {
C chan record.Record

id string
logger log.CtxLogger
onClose func()
once *sync.Once
}

func (s *Session) close() {
// close() can be called multiple times on a session. One example is:
// There's an active inspector session on a component (processor or connector),
// during which the component is deleted.
// The session channel will be closed, which terminate the API request fetching
// record from this session.
// However, the API request termination also closes the session.
s.once.Do(func() {
s.onClose()
close(s.C)
})
id string
componentID string
logger log.CtxLogger
}

// send a record to the session's channel.
Expand All @@ -74,7 +61,12 @@ type Inspector struct {
// keys are sessions IDs.
sessions map[string]*Session
// guards access to sessions
lock sync.Mutex
lock sync.Mutex
// hasSessions is set to true when there are open sessions. This allows us
// to take a shortcut without acquiring the lock in the happy path, when
// there are no sessions.
hasSessions atomic.Bool

logger log.CtxLogger
bufferSize int
}
Expand All @@ -87,81 +79,84 @@ func New(logger log.CtxLogger, bufferSize int) *Inspector {
}
}

// NewSession creates a new session in given inspector.
// componentID is the ID of the component being inspected (connector or processor).
func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session {
s := &Session{
C: make(chan record.Record, i.bufferSize),
id: uuid.NewString(),
componentID: componentID,
logger: i.logger.WithComponent("inspector.Session"),
}

i.add(s)
go func() {
<-ctx.Done()
i.remove(s.id)
}()

return s
}

// Send the given record to all registered sessions.
// The method does not wait for consumers to get the records.
func (i *Inspector) Send(ctx context.Context, r record.Record) {
// copy metadata, to prevent issues when concurrently accessing the metadata
var meta record.Metadata
if len(r.Metadata) != 0 {
meta = make(record.Metadata, len(r.Metadata))
for k, v := range r.Metadata {
meta[k] = v
}
// shortcut - we don't expect any sessions, so we check the atomic variable
// before acquiring an actual lock
if !i.hasSessions.Load() {
return
}

// todo optimize this, as we have locks for every record.
// clone record only once, the listeners aren't expected to manipulate the records
rClone := r.Clone()

// locks are needed to make sure the `sessions` slice
// is not modified as we're iterating over it
i.lock.Lock()
defer i.lock.Unlock()
for _, s := range i.sessions {
s.send(ctx, record.Record{
Position: r.Position,
Operation: r.Operation,
Metadata: meta,
Key: r.Key,
Payload: r.Payload,
})
s.send(ctx, rClone)
}
}

// NewSession creates a new session in given inspector.
// componentID is the ID of the component being inspected (connector or processor).
func (i *Inspector) NewSession(ctx context.Context, componentID string) *Session {
id := uuid.NewString()
s := &Session{
C: make(chan record.Record, i.bufferSize),
id: id,
logger: i.logger.WithComponent("inspector.Session"),
onClose: func() {
i.remove(id)
measure.InspectorsGauge.WithValues(componentID).Dec()
},
once: &sync.Once{},
func (i *Inspector) Close() {
for k := range i.sessions {
i.remove(k)
}
measure.InspectorsGauge.WithValues(componentID).Inc()

go func() {
<-ctx.Done()
s.logger.
Debug(context.Background()).
Msgf("context done: %v", ctx.Err())
s.close()
}()
}

// add a session with given ID to this Inspector.
func (i *Inspector) add(s *Session) {
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
i.lock.Lock()
defer i.lock.Unlock()

i.sessions[id] = s
i.sessions[s.id] = s
i.hasSessions.Store(true)
measure.InspectorsGauge.WithValues(s.componentID).Inc()

i.logger.
Info(context.Background()).
Str(log.InspectorSessionID, id).
Str(log.InspectorSessionID, s.id).
Msg("session created")
return s
}

func (i *Inspector) Close() {
for _, s := range i.sessions {
s.close()
}
}

// remove a session with given ID from this Inspector.
func (i *Inspector) remove(id string) {
i.lock.Lock()
defer i.lock.Unlock()

s, ok := i.sessions[id]
if !ok {
return // session already removed
}

close(s.C)
delete(i.sessions, id)
if len(i.sessions) == 0 {
i.hasSessions.Store(false)
}
measure.InspectorsGauge.WithValues(s.componentID).Dec()

i.logger.
Info(context.Background()).
Str(log.InspectorSessionID, id).
Expand Down
19 changes: 19 additions & 0 deletions pkg/inspector/inspector_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ import (
"github.com/conduitio/conduit/pkg/record"
)

func BenchmarkInspector_NoSession_Send(b *testing.B) {
ins := New(log.Nop(), 10)

for i := 0; i < b.N; i++ {
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
}
}

func BenchmarkInspector_SingleSession_Send(b *testing.B) {
ins := New(log.Nop(), 10)
ins.NewSession(context.Background(), "test-id")
Expand All @@ -30,3 +38,14 @@ func BenchmarkInspector_SingleSession_Send(b *testing.B) {
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
}
}

func BenchmarkInspector_10Sessions_Send(b *testing.B) {
ins := New(log.Nop(), 10)
for i := 0; i < 10; i++ {
ins.NewSession(context.Background(), "test-id")
}

lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < b.N; i++ {
ins.Send(context.Background(), record.Record{Position: record.Position("test-pos")})
}
}
2 changes: 1 addition & 1 deletion pkg/inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestInspector_Send_SessionClosed(t *testing.T) {
underTest.Send(context.Background(), r)
assertGotRecord(is, s, r)

s.close()
underTest.remove(s.id)
underTest.Send(
context.Background(),
record.Record{
Expand Down
Loading