Skip to content

Commit

Permalink
Parallel processors (#744)
Browse files Browse the repository at this point in the history
* spike out parallel processor

* simplify by spawning workers in advance

* ParallelNode

* add test case for a single failing worker

* refactor parallel node

* add extensive tests for parallel worker and coordinator

* cleanup and docs

* use pool for done channels

* validate processor workers in pipeline config file

* add processor workers validation in service

* use temp folder in runtime test

* use is instead of assert in runtime test

* clarify Message.StatusError usage in parallel node

* move comment

* refactor + comments
  • Loading branch information
lovromazgon authored Mar 15, 2023
1 parent 37312e5 commit e0e600c
Show file tree
Hide file tree
Showing 25 changed files with 1,962 additions and 814 deletions.
21 changes: 9 additions & 12 deletions pkg/conduit/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,32 @@ package conduit_test

import (
"context"
"os"
"testing"
"time"

"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/pkg/foundation/assert"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/matryer/is"
)

// path where tests store their data during runs.
const testingDBPath = "./testing.app.db"
const delay = 500 * time.Millisecond

func TestRuntime(t *testing.T) {
is := is.New(t)

var cfg conduit.Config
cfg.DB.Type = "badger"
cfg.DB.Badger.Path = testingDBPath
cfg.DB.Badger.Path = t.TempDir() + "/testing.app.db"
cfg.GRPC.Address = ":0"
cfg.HTTP.Address = ":0"
cfg.Log.Level = "info"
cfg.Log.Format = "cli"
cfg.Pipelines.Path = "./pipelines"

e, err := conduit.NewRuntime(cfg)
t.Cleanup(func() {
os.RemoveAll(testingDBPath)
})
assert.Ok(t, err)
assert.NotNil(t, e)
r, err := conduit.NewRuntime(cfg)
is.NoErr(err)
is.True(r != nil)

// set a cancel on a trigger to kill the context after THRESHOLD duration.
ctx, cancel := context.WithCancel(context.TODO())
Expand All @@ -55,6 +52,6 @@ func TestRuntime(t *testing.T) {

// wait on Run and assert that the context was canceled and no other error
// occurred.
err = e.Run(ctx)
assert.True(t, cerrors.Is(err, context.Canceled), "expected error to be context.Cancelled")
err = r.Run(ctx)
is.True(cerrors.Is(err, context.Canceled)) // expected error to be context.Cancelled
}
19 changes: 10 additions & 9 deletions pkg/foundation/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
package log

const (
ComponentField = "component"
ConnectorIDField = "connector_id"
DurationField = "duration"
MessageIDField = "message_id"
NodeIDField = "node_id"
PipelineIDField = "pipeline_id"
RecordPositionField = "record_position"
RequestIDField = "request_id"
ServerAddressField = "address"
ComponentField = "component"
ConnectorIDField = "connector_id"
DurationField = "duration"
MessageIDField = "message_id"
NodeIDField = "node_id"
ParallelWorkerIDField = "parallel_worker_id"
PipelineIDField = "pipeline_id"
RecordPositionField = "record_position"
RequestIDField = "request_id"
ServerAddressField = "address"

GRPCMethodField = "grpc_method"
GRPCStatusCodeField = "grpc_status_code"
Expand Down
10 changes: 5 additions & 5 deletions pkg/foundation/semaphore/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Simple struct {
// Ticket reserves a place in the queue and can be used to acquire access to a
// Lock.
type Ticket struct {
Lock
l Lock
// ready is the same channel as Lock.next of the previous lock. Once the
// previous lock is released, the value i will be sent into ready, signaling
// this ticket that it can acquire a Lock.
Expand Down Expand Up @@ -64,13 +64,13 @@ func (s *Simple) Enqueue() Ticket {
})

t := Ticket{
Lock: Lock{
l: Lock{
i: s.last.i + 1,
next: s.chanPool.Get().(chan int),
},
ready: s.last.next,
}
s.last = t.Lock
s.last = t.l
return t
}

Expand All @@ -80,15 +80,15 @@ func (s *Simple) Enqueue() Ticket {
// Acquire is safe for concurrent use.
func (s *Simple) Acquire(t Ticket) Lock {
i := <-t.ready
if t.i != i {
if t.l.i != i {
// Multiple reasons this can happen. A ticket other than this one could
// have been successfully released twice. The other possibility is that
// this ticket is trying to be acquired after it was released. Both
// cases are invalid.
s.panic()
}
s.chanPool.Put(t.ready)
return t.Lock // return only the lock part of the ticket
return t.l // return only the lock part of the ticket
}

// Release releases the lock and allows the next ticket in line to acquire a
Expand Down
41 changes: 35 additions & 6 deletions pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pipeline
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -288,21 +289,49 @@ func (s *Service) buildProcessorNodes(
return nil, cerrors.Errorf("could not fetch processor: %w", err)
}

node := stream.ProcessorNode{
Name: proc.ID,
Processor: proc.Processor,
ProcessorTimer: measure.ProcessorExecutionDurationTimer.WithValues(pl.Config.Name, proc.Type),
var node stream.PubSubNode
if proc.Config.Workers > 1 {
node = s.buildParallelProcessorNode(pl, proc)
} else {
node = s.buildProcessorNode(pl, proc)
}

node.Sub(prev.Pub())
prev = &node
prev = node

nodes = append(nodes, &node)
nodes = append(nodes, node)
}

last.Sub(prev.Pub())
return nodes, nil
}

func (s *Service) buildParallelProcessorNode(
pl *Instance,
proc *processor.Instance,
) *stream.ParallelNode {
return &stream.ParallelNode{
Name: proc.ID + "-parallel",
NewNode: func(i int) stream.PubSubNode {
n := s.buildProcessorNode(pl, proc)
n.Name = n.Name + "-" + strconv.Itoa(i) // add suffix to name
return n
},
Workers: proc.Config.Workers,
}
}

func (s *Service) buildProcessorNode(
pl *Instance,
proc *processor.Instance,
) *stream.ProcessorNode {
return &stream.ProcessorNode{
Name: proc.ID,
Processor: proc.Processor,
ProcessorTimer: measure.ProcessorExecutionDurationTimer.WithValues(pl.Config.Name, proc.Type),
}
}

func (s *Service) buildSourceNodes(
ctx context.Context,
connFetcher ConnectorFetcher,
Expand Down
8 changes: 6 additions & 2 deletions pkg/pipeline/stream/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,17 @@ func (n *nodeBase) Send(
if msg.Ctx == nil {
msg.Ctx = ctxutil.ContextWithMessageID(ctx, msg.ID())
}
// copy context into a local variable, we shouldn't access it anymore after
// we send the message to out, this prevents race conditions in case the
// field gets changed
msgCtx := msg.Ctx

select {
case <-ctx.Done():
logger.Debug(msg.Ctx).Msg("context closed while sending message")
logger.Debug(msgCtx).Msg("context closed while sending message")
return ctx.Err()
case out <- msg:
logger.Trace(msg.Ctx).Msg("sent message to outgoing channel")
logger.Trace(msgCtx).Msg("sent message to outgoing channel")
}
return nil
}
33 changes: 26 additions & 7 deletions pkg/pipeline/stream/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,39 @@ import (
)

// SetLogger figures out if the node needs a logger, sets static metadata in the
// logger and supplies it to the node.
func SetLogger(n Node, logger log.CtxLogger) {
// logger and supplies it to the node. Behavior can be customized by supplying
// custom options.
func SetLogger(n Node, logger log.CtxLogger, options ...func(log.CtxLogger, Node) log.CtxLogger) {
ln, ok := n.(LoggingNode)
if !ok {
return
}

nt := reflect.TypeOf(ln)
for nt.Kind() == reflect.Ptr {
nt = nt.Elem()
if len(options) == 0 {
// default options
logger = LoggerWithNodeID(logger, n)
logger = LoggerWithComponent(logger, n)
} else {
for _, c := range options {
logger = c(logger, n)
}
}
ln.SetLogger(logger)
}

logger = logger.WithComponent(nt.Name())
// LoggerWithNodeID creates a logger with the node ID field.
func LoggerWithNodeID(logger log.CtxLogger, n Node) log.CtxLogger {
logger.Logger = logger.With().Str(log.NodeIDField, n.ID()).Logger()
return logger
}

ln.SetLogger(logger)
// LoggerWithComponent creates a logger with the component set to the node name.
func LoggerWithComponent(logger log.CtxLogger, v Node) log.CtxLogger {
t := reflect.TypeOf(v)
for t.Kind() == reflect.Ptr {
t = t.Elem()
}

logger = logger.WithComponent(t.Name())
return logger
}
17 changes: 16 additions & 1 deletion pkg/pipeline/stream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ var (
ErrUnexpectedMessageStatus = cerrors.New("unexpected message status")
)

// Message represents a single message flowing through a pipeline.
// Message represents a single message flowing through a pipeline. Only a single
// node is allowed to hold a message and access its fields at a specific point
// in time, otherwise we could introduce race conditions.
type Message struct {
// Ctx is the context in which the record was fetched. It should be used for
// any function calls when processing the message. If the context is done
Expand Down Expand Up @@ -272,6 +274,19 @@ func (m *Message) Status() MessageStatus {
}
}

// StatusError returns the error that was returned when the message was acked or
// nacked. If the message was successfully acked/nacked or it is still open the
// method returns nil.
func (m *Message) StatusError() error {
switch m.Status() {
case MessageStatusAcked:
return m.Ack()
case MessageStatusNacked:
return m.Nack(nil, "")
}
return nil
}

// OpenMessagesTracker allows you to track messages until they reach the end of
// the pipeline.
type OpenMessagesTracker sync.WaitGroup
Expand Down
Loading

0 comments on commit e0e600c

Please sign in to comment.