From 99392e192f81878d12ca3e97616a839f633ed143 Mon Sep 17 00:00:00 2001 From: ffranr Date: Thu, 22 Aug 2024 13:01:59 +0100 Subject: [PATCH] fn: add ParSliceErrorCollect Add ParSliceErrorCollect function to handle errors without halting parallel processing. ParSliceErrorCollect is similar to ParSlice but allows processing to continue even when errors occur. Instead of halting on the first error, it collects all errors for later handling, providing more robust parallel execution. --- fn/concurrency.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/fn/concurrency.go b/fn/concurrency.go index 790f7514a..f6f55c494 100644 --- a/fn/concurrency.go +++ b/fn/concurrency.go @@ -2,7 +2,9 @@ package fn import ( "context" + "fmt" "runtime" + "sync" "golang.org/x/sync/errgroup" ) @@ -32,3 +34,43 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error { return errGroup.Wait() } + +// ParSliceErrorCollect can be used to execute a function on each element of a +// slice in parallel. This function is fully blocking and will wait for all +// goroutines to finish. Error will be collected and returned as a map of slice +// element index to error. Active goroutines limited with number of CPU. +func ParSliceErrorCollect[V any](ctx context.Context, s []V, + f ErrFunc[V]) (map[int]error, error) { + + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(runtime.NumCPU()) + + var instanceErrorsMutex sync.Mutex + instanceErrors := make(map[int]error, len(s)) + + for idx, v := range s { + v := v + errGroup.Go(func() error { + err := f(ctx, v) + if err != nil { + instanceErrorsMutex.Lock() + instanceErrors[idx] = err + instanceErrorsMutex.Unlock() + } + + // Avoid returning an error here, as that would cancel + // the errGroup and terminate all slice element + // processing instances. Instead, collect the error and + // return it later. + return nil + }) + } + + err := errGroup.Wait() + if err != nil { + return nil, fmt.Errorf("failed to wait on error group in "+ + "ParSliceErrorCollect: %w", err) + } + + return instanceErrors, nil +}