Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Proof Delivery Resilience with ParSliceErrCollect in ChainPorter #1100

Merged
merged 3 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions fn/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package fn

import (
"context"
"fmt"
"runtime"
"sync"

"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -32,3 +34,48 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error {

return errGroup.Wait()
}

// ParSliceErrCollect can be used to execute a function on each element of a
ffranr marked this conversation as resolved.
Show resolved Hide resolved
// slice in parallel. This function is fully blocking and will wait for all
// goroutines to finish (subject to context cancellation/timeout). Any errors
// will be collected and returned as a map of slice element index to error.
// Active goroutines limited with number of CPU.
func ParSliceErrCollect[V any](ctx context.Context, s []V,
f ErrFunc[V]) (map[int]error, error) {

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.SetLimit(runtime.NumCPU())

var instanceErrorsMutex sync.Mutex
instanceErrors := make(map[int]error, len(s))
jharveyb marked this conversation as resolved.
Show resolved Hide resolved

for idx := range s {
errGroup.Go(func() error {
err := f(ctx, s[idx])
if err != nil {
instanceErrorsMutex.Lock()
instanceErrors[idx] = err
instanceErrorsMutex.Unlock()
}

// Avoid returning an error here, as that would cancel
// the errGroup and terminate all slice element
// processing instances. Instead, collect the error and
// return it later.
return nil
})
}

// Now we will wait/block for all goroutines to finish.
//
// The goroutines that are executing in parallel should not return an
// error, but the Wait call may return an error if the context is
// canceled or timed out.
err := errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to wait on error group in "+
ffranr marked this conversation as resolved.
Show resolved Hide resolved
"ParSliceErrorCollect: %w", err)
}

return instanceErrors, nil
}
22 changes: 21 additions & 1 deletion fn/func.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package fn

import "fmt"
import (
"fmt"

"github.com/lightningnetwork/lnd/fn"
)

// Reducer represents a function that takes an accumulator and the value, then
// returns a new accumulator.
Expand Down Expand Up @@ -263,3 +267,19 @@ func Last[T any](xs []*T, pred func(*T) bool) (*T, error) {

return matches[len(matches)-1], nil
}

// KV is a generic struct that holds a key-value pair.
type KV[K any, V any] struct {
Key K
Value V
}

// PeekMap non-deterministically selects and returns a single key-value pair
// from the given map.
func PeekMap[K comparable, V any](m map[K]V) fn.Option[KV[K, V]] {
for k, v := range m {
return fn.Some(KV[K, V]{Key: k, Value: v})
}

return fn.None[KV[K, V]]()
}
34 changes: 33 additions & 1 deletion tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,11 +870,43 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {

// If we have a non-interactive proof, then we'll launch several
// goroutines to deliver the proof(s) to the receiver(s).
err := fn.ParSlice(ctx, pkg.OutboundPkg.Outputs, deliver)
instanceErrors, err := fn.ParSliceErrCollect(
ctx, pkg.OutboundPkg.Outputs, deliver,
)
if err != nil {
return fmt.Errorf("error delivering proof(s): %w", err)
}

// If there were any errors during the proof delivery process, we'll
// log them all here.
for idx := range instanceErrors {
output := pkg.OutboundPkg.Outputs[idx]
instanceErr := instanceErrors[idx]

scriptPubKey := output.ScriptKey.PubKey.SerializeCompressed()
anchorOutpoint := output.Anchor.OutPoint.String()
courierAddr := string(output.ProofCourierAddr)

log.Errorf("Error delivering transfer output proof "+
"(anchor_outpoint=%s, script_pub_key=%v, "+
"position=%d, proof_courier_addr=%s, "+
"proof_delivery_status=%v): %v",
anchorOutpoint, scriptPubKey, output.Position,
courierAddr, output.ProofDeliveryComplete,
instanceErr)
}

// Return the first error encountered during the proof delivery process,
// if any.
var firstErr error
fn.PeekMap(instanceErrors).WhenSome(func(kv fn.KV[int, error]) {
firstErr = err
})

if firstErr != nil {
return firstErr
}

// At this point, the transfer is fully finalised and successful:
// - The anchoring transaction has been confirmed on-chain.
// - The proof(s) have been delivered to the receiver(s).
Expand Down
Loading