Skip to content

Commit

Permalink
Plug more gaps in forced shutdown cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jul 13, 2023
1 parent 245912a commit fbb32e9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 15 deletions.
10 changes: 8 additions & 2 deletions internal/bundle/tracing/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ func (t *tracedInput) loop() {
defer close(t.tChan)
readChan := t.wrapped.TransactionChan()
for {
tran, open := <-readChan
if !open {
var tran message.Transaction
var open bool
select {
case tran, open = <-readChan:
if !open {
return
}
case <-t.shutSig.CloseNowChan():
return
}
if t.e.IsEnabled() {
Expand Down
1 change: 1 addition & 0 deletions internal/bundle/tracing/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (t *tracedOutput) Connected() bool {

func (t *tracedOutput) TriggerCloseNow() {
t.wrapped.TriggerCloseNow()
t.shutSig.CloseNow()
}

func (t *tracedOutput) WaitForClose(ctx context.Context) error {
Expand Down
28 changes: 20 additions & 8 deletions internal/impl/pure/input_broker_fan_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/message"
"github.com/benthosdev/benthos/v4/internal/shutdown"
)

type fanInInputBroker struct {
Expand All @@ -17,7 +18,7 @@ type fanInInputBroker struct {
remainingMap map[int]struct{}
remainingMapMut sync.Mutex

closedChan chan struct{}
shutSig *shutdown.Signaller
}

func newFanInInputBroker(inputs []input.Streamed) (*fanInInputBroker, error) {
Expand All @@ -31,8 +32,8 @@ func newFanInInputBroker(inputs []input.Streamed) (*fanInInputBroker, error) {
inputClosedChan: make(chan int),
remainingMap: make(map[int]struct{}),

closables: []input.Streamed{},
closedChan: make(chan struct{}),
closables: []input.Streamed{},
shutSig: shutdown.NewSignaller(),
}

for n, input := range inputs {
Expand All @@ -48,11 +49,21 @@ func newFanInInputBroker(inputs []input.Streamed) (*fanInInputBroker, error) {
i.inputClosedChan <- index
}()
for {
in, open := <-inputs[index].TransactionChan()
if !open {
var in message.Transaction
var open bool
select {
case in, open = <-inputs[index].TransactionChan():
if !open {
return
}
case <-i.shutSig.CloseNowChan():
return
}
select {
case i.transactions <- in:
case <-i.shutSig.CloseNowChan():
return
}
i.transactions <- in
}
}(n)
}
Expand Down Expand Up @@ -85,7 +96,7 @@ func (i *fanInInputBroker) loop() {
defer func() {
close(i.inputClosedChan)
close(i.transactions)
close(i.closedChan)
i.shutSig.ShutdownComplete()
}()

for {
Expand All @@ -112,11 +123,12 @@ func (i *fanInInputBroker) TriggerCloseNow() {
for _, closable := range i.closables {
closable.TriggerCloseNow()
}
i.shutSig.CloseNow()
}

func (i *fanInInputBroker) WaitForClose(ctx context.Context) error {
select {
case <-i.closedChan:
case <-i.shutSig.HasClosedChan():
case <-ctx.Done():
return ctx.Err()
}
Expand Down
1 change: 1 addition & 0 deletions internal/pipeline/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (p *Pool) TriggerCloseNow() {
for _, w := range p.workers {
w.TriggerCloseNow()
}
p.shutSig.CloseNow()
}

// WaitForClose blocks until the component has closed down or the context is
Expand Down
22 changes: 17 additions & 5 deletions internal/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ func (p *Processor) dispatchMessages(ctx context.Context, msgs []message.Batch,

pending := msgs
for len(pending) > 0 {
wg := sync.WaitGroup{}
wg.Add(len(pending))
doneChan := make(chan struct{}, len(pending))

var newPending []message.Batch
var newPendingMut sync.Mutex
Expand All @@ -107,18 +106,31 @@ func (p *Processor) dispatchMessages(ctx context.Context, msgs []message.Batch,
newPending = append(newPending, b)
newPendingMut.Unlock()
}
wg.Done()
select {
case doneChan <- struct{}{}:
default:
}
return nil
})

select {
case p.messagesOut <- transac:
case <-ctx.Done():
wg.Done()
select {
case doneChan <- struct{}{}:
default:
}
return
}
}

for i := 0; i < len(pending); i++ {
select {
case <-doneChan:
case <-ctx.Done():
return
}
}
wg.Wait()

if pending = newPending; len(pending) > 0 && !throt.Retry() {
return
Expand Down

0 comments on commit fbb32e9

Please sign in to comment.