Skip to content

Commit

Permalink
Merge branch 'main' into lovro/inspector-optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Oct 24, 2023
2 parents 1c0387f + 3145202 commit 0bf427a
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 60 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ require (
github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7
github.com/dop251/goja_nodejs v0.0.0-20230602164024-804a84515562
github.com/gammazero/deque v0.2.1
github.com/goccy/go-json v0.10.2
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0
github.com/hamba/avro/v2 v2.16.0
github.com/hamba/avro/v2 v2.17.1
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.5.2
github.com/jackc/pgx/v5 v5.4.3
github.com/jinzhu/copier v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a
github.com/matryer/is v1.4.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyL
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
Expand Down Expand Up @@ -352,8 +354,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 h1:RtRsiaGvWxcwd8y3BiRZxsylPT8hLWZ5SPcfI+3IDNk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0/go.mod h1:TzP6duP4Py2pHLVPPQp42aoYI92+PCrVotyR5e8Vqlk=
github.com/hamba/avro/v2 v2.16.0 h1:0XhyP65Hs8iMLtdSR0v7ZrwRjsbIZdvr7KzYgmx1Mbo=
github.com/hamba/avro/v2 v2.16.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/hamba/avro/v2 v2.17.1 h1:pbhsKtZD4peH+v3xdzrOwLUaQnKAN9DPqSbaEGLh6qw=
github.com/hamba/avro/v2 v2.17.1/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.5.2 h1:aWv8eimFqWlsEiMrYZdPYl+FdHaBJSN4AWwGWfT1G2Y=
Expand Down Expand Up @@ -435,8 +437,6 @@ github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0
github.com/jdx/go-netrc v1.0.0 h1:QbLMLyCZGj0NA8glAhxUpf1zDg6cxnWgMBbjq40W0gQ=
github.com/jdx/go-netrc v1.0.0/go.mod h1:Gh9eFQJnoTNIRHXl2j5bJXA1u84hQWJWgGh569zF3v8=
github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
5 changes: 5 additions & 0 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type Config struct {

PluginDispenserFactories map[string]builtin.DispenserFactory
ProcessorBuilderRegistry *processor.BuilderRegistry

dev struct {
cpuprofile string
memprofile string
}
}

func DefaultConfig() Config {
Expand Down
22 changes: 21 additions & 1 deletion pkg/conduit/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/peterbourgon/ff/v3"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (e *Entrypoint) Serve(cfg Config) {
// config struct.
func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet {
// TODO extract flags from config struct rather than defining flags manually
flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
flags := flag.NewFlagSet("conduit", flag.ExitOnError)

flags.StringVar(&cfg.DB.Type, "db.type", cfg.DB.Type, "database type; accepts badger,postgres,inmemory")
flags.StringVar(&cfg.DB.Badger.Path, "db.badger.path", cfg.DB.Badger.Path, "path to badger DB")
Expand All @@ -93,6 +94,25 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet {
flags.StringVar(&cfg.Pipelines.Path, "pipelines.path", cfg.Pipelines.Path, "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file")
flags.BoolVar(&cfg.Pipelines.ExitOnError, "pipelines.exit-on-error", cfg.Pipelines.ExitOnError, "exit Conduit if a pipeline experiences an error while running")

// NB: flags with prefix dev.* are hidden from help output by default, they only show up using '-dev -help'
showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags")
flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file")
flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file")

// show user or dev flags
flags.Usage = func() {
tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError)
flags.VisitAll(func(f *flag.Flag) {
if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp {
return // hide flag from output
}
// reset value to its default, to ensure default is shown correctly
_ = f.Value.Set(f.DefValue)
tmpFlags.Var(f.Value, f.Name, f.Usage)
})
tmpFlags.Usage()
}

return flags
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"context"
"net"
"net/http"
"os"
"runtime"
"runtime/pprof"
"strings"
"time"

Expand Down Expand Up @@ -199,6 +202,33 @@ func newServices(
// one of the services experiences a fatal error.
func (r *Runtime) Run(ctx context.Context) (err error) {
t, ctx := tomb.WithContext(ctx)

if r.Config.dev.cpuprofile != "" {
f, err := os.Create(r.Config.dev.cpuprofile)
if err != nil {
return cerrors.Errorf("could not create CPU profile: %w", err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
return cerrors.Errorf("could not start CPU profile: %w", err)
}
defer pprof.StopCPUProfile()
}
if r.Config.dev.memprofile != "" {
defer func() {
f, err := os.Create(r.Config.dev.memprofile)
if err != nil {
r.logger.Err(ctx, err).Msg("could not create memory profile")
return
}
defer f.Close()
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
r.logger.Err(ctx, err).Msg("could not write memory profile")
}
}()
}

defer func() {
if err != nil {
// This means run failed, we kill the tomb to stop any goroutines
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

const (
DefaultPersisterDelayThreshold = time.Second
DefaultPersisterBundleCountThreshold = 100
DefaultPersisterBundleCountThreshold = 10000
)

// Persister is responsible for persisting connectors and their state when
Expand Down
27 changes: 22 additions & 5 deletions pkg/connector/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ package connector

import (
"context"
"encoding/json"
"strings"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/goccy/go-json"
)

const (
Expand Down Expand Up @@ -148,13 +148,30 @@ func (s *Store) PrepareSet(id string, instance *Instance) (func(context.Context)
return nil, cerrors.Errorf("can't store connector: %w", cerrors.ErrEmptyID)
}

raw, err := s.encode(instance)
if err != nil {
return nil, err
icopy := Instance{
ID: instance.ID,
Type: instance.Type,
Config: Config{
Name: instance.Config.Name,
Settings: instance.Config.Settings,
},
PipelineID: instance.PipelineID,
Plugin: instance.Plugin,
ProcessorIDs: instance.ProcessorIDs,
ProvisionedBy: instance.ProvisionedBy,
State: instance.State,
CreatedAt: instance.CreatedAt,
UpdatedAt: instance.UpdatedAt,
LastActiveConfig: instance.LastActiveConfig,
}
key := s.addKeyPrefix(id)

return func(ctx context.Context) error {
raw, err := s.encode(&icopy)
if err != nil {
return err
}
key := s.addKeyPrefix(id)

err = s.db.Set(ctx, key, raw)
if err != nil {
return cerrors.Errorf("failed to store connector with ID %q: %w", id, err)
Expand Down
56 changes: 34 additions & 22 deletions pkg/pipeline/stream/fanin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package stream

import (
"context"
"reflect"
)

type FaninNode struct {
Expand Down Expand Up @@ -49,31 +48,14 @@ func (n *FaninNode) Run(ctx context.Context) error {
n.running = false
}()

cases := make([]reflect.SelectCase, len(n.in)+1)
cases[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}
for i, ch := range n.in {
cases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
trigger := n.trigger(ctx)

for {
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
if !ok {
if chosen == 0 {
// context is done
return ctx.Err()
}
// one of the in channels is closed, remove it from select case
cases = append(cases[:chosen], cases[chosen+1:]...)
if len(cases) == 1 {
// only context is left, we're done
return nil
}
continue
msg, err := trigger()
if err != nil || msg == nil {
return err
}

msg := value.Interface().(*Message)

select {
case <-ctx.Done():
return msg.Nack(ctx.Err(), n.ID())
Expand All @@ -82,6 +64,36 @@ func (n *FaninNode) Run(ctx context.Context) error {
}
}

func (n *FaninNode) trigger(ctx context.Context) func() (*Message, error) {
in := make([]<-chan *Message, len(n.in))
copy(in, n.in)

f := n.chooseSelectFunc(ctx, in)

return func() (*Message, error) {
for {
chosen, msg, ok := f()
// ok will be true if the channel has not been closed.
if !ok {
if chosen == 0 {
// context is done
return nil, ctx.Err()
}
// one of the in channels is closed, remove it from select case
in = append(in[:chosen-1], in[chosen:]...)
if len(in) == 0 {
// only context is left, we're done
return nil, nil
}

f = n.chooseSelectFunc(ctx, in)
continue // keep selecting with new select func
}
return msg, nil
}
}
}

func (n *FaninNode) Sub(in <-chan *Message) {
n.in = append(n.in, in)
}
Expand Down
Loading

0 comments on commit 0bf427a

Please sign in to comment.