Skip to content

Commit

Permalink
Merge branch 'main' into lovro/connector-persister-optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Oct 24, 2023
2 parents 1fca110 + 3111797 commit b8eee67
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 54 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ require (
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0
github.com/hamba/avro/v2 v2.16.0
github.com/hamba/avro/v2 v2.17.1
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.5.2
github.com/jackc/pgx/v5 v5.4.3
github.com/jinzhu/copier v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a
github.com/matryer/is v1.4.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 h1:RtRsiaGvWxcwd8y3BiRZxsylPT8hLWZ5SPcfI+3IDNk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0/go.mod h1:TzP6duP4Py2pHLVPPQp42aoYI92+PCrVotyR5e8Vqlk=
github.com/hamba/avro/v2 v2.16.0 h1:0XhyP65Hs8iMLtdSR0v7ZrwRjsbIZdvr7KzYgmx1Mbo=
github.com/hamba/avro/v2 v2.16.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/hamba/avro/v2 v2.17.1 h1:pbhsKtZD4peH+v3xdzrOwLUaQnKAN9DPqSbaEGLh6qw=
github.com/hamba/avro/v2 v2.17.1/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.5.2 h1:aWv8eimFqWlsEiMrYZdPYl+FdHaBJSN4AWwGWfT1G2Y=
Expand Down Expand Up @@ -437,8 +437,6 @@ github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0
github.com/jdx/go-netrc v1.0.0 h1:QbLMLyCZGj0NA8glAhxUpf1zDg6cxnWgMBbjq40W0gQ=
github.com/jdx/go-netrc v1.0.0/go.mod h1:Gh9eFQJnoTNIRHXl2j5bJXA1u84hQWJWgGh569zF3v8=
github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
5 changes: 5 additions & 0 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type Config struct {

PluginDispenserFactories map[string]builtin.DispenserFactory
ProcessorBuilderRegistry *processor.BuilderRegistry

dev struct {
cpuprofile string
memprofile string
}
}

func DefaultConfig() Config {
Expand Down
22 changes: 21 additions & 1 deletion pkg/conduit/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/peterbourgon/ff/v3"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (e *Entrypoint) Serve(cfg Config) {
// config struct.
func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet {
// TODO extract flags from config struct rather than defining flags manually
flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
flags := flag.NewFlagSet("conduit", flag.ExitOnError)

flags.StringVar(&cfg.DB.Type, "db.type", cfg.DB.Type, "database type; accepts badger,postgres,inmemory")
flags.StringVar(&cfg.DB.Badger.Path, "db.badger.path", cfg.DB.Badger.Path, "path to badger DB")
Expand All @@ -93,6 +94,25 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet {
flags.StringVar(&cfg.Pipelines.Path, "pipelines.path", cfg.Pipelines.Path, "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file")
flags.BoolVar(&cfg.Pipelines.ExitOnError, "pipelines.exit-on-error", cfg.Pipelines.ExitOnError, "exit Conduit if a pipeline experiences an error while running")

// NB: flags with prefix dev.* are hidden from help output by default, they only show up using '-dev -help'
showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags")
flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file")
flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file")

// show user or dev flags
flags.Usage = func() {
tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError)
flags.VisitAll(func(f *flag.Flag) {
if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp {
return // hide flag from output
}
// reset value to its default, to ensure default is shown correctly
_ = f.Value.Set(f.DefValue)
tmpFlags.Var(f.Value, f.Name, f.Usage)
})
tmpFlags.Usage()
}

return flags
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"context"
"net"
"net/http"
"os"
"runtime"
"runtime/pprof"
"strings"
"time"

Expand Down Expand Up @@ -199,6 +202,33 @@ func newServices(
// one of the services experiences a fatal error.
func (r *Runtime) Run(ctx context.Context) (err error) {
t, ctx := tomb.WithContext(ctx)

if r.Config.dev.cpuprofile != "" {
f, err := os.Create(r.Config.dev.cpuprofile)
if err != nil {
return cerrors.Errorf("could not create CPU profile: %w", err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
return cerrors.Errorf("could not start CPU profile: %w", err)
}
defer pprof.StopCPUProfile()
}
if r.Config.dev.memprofile != "" {
defer func() {
f, err := os.Create(r.Config.dev.memprofile)
if err != nil {
r.logger.Err(ctx, err).Msg("could not create memory profile")
return
}
defer f.Close()
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
r.logger.Err(ctx, err).Msg("could not write memory profile")
}
}()
}

defer func() {
if err != nil {
// This means run failed, we kill the tomb to stop any goroutines
Expand Down
56 changes: 34 additions & 22 deletions pkg/pipeline/stream/fanin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package stream

import (
"context"
"reflect"
)

type FaninNode struct {
Expand Down Expand Up @@ -49,31 +48,14 @@ func (n *FaninNode) Run(ctx context.Context) error {
n.running = false
}()

cases := make([]reflect.SelectCase, len(n.in)+1)
cases[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}
for i, ch := range n.in {
cases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
trigger := n.trigger(ctx)

for {
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
if !ok {
if chosen == 0 {
// context is done
return ctx.Err()
}
// one of the in channels is closed, remove it from select case
cases = append(cases[:chosen], cases[chosen+1:]...)
if len(cases) == 1 {
// only context is left, we're done
return nil
}
continue
msg, err := trigger()
if err != nil || msg == nil {
return err
}

msg := value.Interface().(*Message)

select {
case <-ctx.Done():
return msg.Nack(ctx.Err(), n.ID())
Expand All @@ -82,6 +64,36 @@ func (n *FaninNode) Run(ctx context.Context) error {
}
}

func (n *FaninNode) trigger(ctx context.Context) func() (*Message, error) {
in := make([]<-chan *Message, len(n.in))
copy(in, n.in)

f := n.chooseSelectFunc(ctx, in)

return func() (*Message, error) {
for {
chosen, msg, ok := f()
// ok will be true if the channel has not been closed.
if !ok {
if chosen == 0 {
// context is done
return nil, ctx.Err()
}
// one of the in channels is closed, remove it from select case
in = append(in[:chosen-1], in[chosen:]...)
if len(in) == 0 {
// only context is left, we're done
return nil, nil
}

f = n.chooseSelectFunc(ctx, in)
continue // keep selecting with new select func
}
return msg, nil
}
}
}

func (n *FaninNode) Sub(in <-chan *Message) {
n.in = append(n.in, in)
}
Expand Down
168 changes: 168 additions & 0 deletions pkg/pipeline/stream/fanin_select.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stream

import (
"context"
"reflect"
)

func (n *FaninNode) chooseSelectFunc(ctx context.Context, in []<-chan *Message) func() (int, *Message, bool) {
switch len(in) {
case 1:
return func() (int, *Message, bool) { return n.select1(ctx, in[0]) }
case 2:
return func() (int, *Message, bool) { return n.select2(ctx, in[0], in[1]) }
case 3:
return func() (int, *Message, bool) { return n.select3(ctx, in[0], in[1], in[2]) }
case 4:
return func() (int, *Message, bool) { return n.select4(ctx, in[0], in[1], in[2], in[3]) }
case 5:
return func() (int, *Message, bool) { return n.select5(ctx, in[0], in[1], in[2], in[3], in[4]) }
case 6:
return func() (int, *Message, bool) { return n.select6(ctx, in[0], in[1], in[2], in[3], in[4], in[5]) }
default:
// use reflection for more channels
cases := make([]reflect.SelectCase, len(in)+1)
cases[0] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}
for i, ch := range in {
cases[i+1] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
return func() (int, *Message, bool) {
chosen, value, ok := reflect.Select(cases)
if !ok { // a channel was closed
return chosen, nil, ok
}
return chosen, value.Interface().(*Message), ok
}
}
}

func (*FaninNode) select1(
ctx context.Context,
c1 <-chan *Message,
) (int, *Message, bool) {
select {
case <-ctx.Done():
return 0, nil, false
case val, ok := <-c1:
return 1, val, ok
}
}

func (*FaninNode) select2(
ctx context.Context,
c1 <-chan *Message,
c2 <-chan *Message,
) (int, *Message, bool) {
select {
case <-ctx.Done():
return 0, nil, false
case val, ok := <-c1:
return 1, val, ok
case val, ok := <-c2:
return 2, val, ok
}
}

func (*FaninNode) select3(
ctx context.Context,
c1 <-chan *Message,
c2 <-chan *Message,
c3 <-chan *Message,
) (int, *Message, bool) {
select {
case <-ctx.Done():
return 0, nil, false
case val, ok := <-c1:
return 1, val, ok
case val, ok := <-c2:
return 2, val, ok
case val, ok := <-c3:
return 3, val, ok
}
}

func (*FaninNode) select4(
ctx context.Context,
c1 <-chan *Message,
c2 <-chan *Message,
c3 <-chan *Message,
c4 <-chan *Message,
) (int, *Message, bool) {
select {
case <-ctx.Done():
return 0, nil, false
case val, ok := <-c1:
return 1, val, ok
case val, ok := <-c2:
return 2, val, ok
case val, ok := <-c3:
return 3, val, ok
case val, ok := <-c4:
return 4, val, ok
}
}

func (*FaninNode) select5(
ctx context.Context,
c1 <-chan *Message,
c2 <-chan *Message,
c3 <-chan *Message,
c4 <-chan *Message,
c5 <-chan *Message,
) (int, *Message, bool) {
select {
case <-ctx.Done():
return 0, nil, false
case val, ok := <-c1:
return 1, val, ok
case val, ok := <-c2:
return 2, val, ok
case val, ok := <-c3:
return 3, val, ok
case val, ok := <-c4:
return 4, val, ok
case val, ok := <-c5:
return 5, val, ok
}
}

func (*FaninNode) select6(
ctx context.Context,
c1 <-chan *Message,
c2 <-chan *Message,
c3 <-chan *Message,
c4 <-chan *Message,
c5 <-chan *Message,
c6 <-chan *Message,
) (int, *Message, bool) {
select {
case <-ctx.Done():
return 0, nil, false
case val, ok := <-c1:
return 1, val, ok
case val, ok := <-c2:
return 2, val, ok
case val, ok := <-c3:
return 3, val, ok
case val, ok := <-c4:
return 4, val, ok
case val, ok := <-c5:
return 5, val, ok
case val, ok := <-c6:
return 6, val, ok
}
}
Loading

0 comments on commit b8eee67

Please sign in to comment.