Skip to content

Commit

Permalink
fn: add ParSliceErrorCollect
Browse files Browse the repository at this point in the history
Add ParSliceErrorCollect function to handle errors without halting
parallel processing. ParSliceErrorCollect is similar to ParSlice but
allows processing to continue even when errors occur. Instead of halting
on the first error, it collects all errors for later handling, providing
more robust parallel execution.
  • Loading branch information
ffranr committed Aug 22, 2024
1 parent e893dee commit 99392e1
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions fn/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package fn

import (
"context"
"fmt"
"runtime"
"sync"

"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -32,3 +34,43 @@ func ParSlice[V any](ctx context.Context, s []V, f ErrFunc[V]) error {

return errGroup.Wait()
}

// ParSliceErrorCollect can be used to execute a function on each element of a
// slice in parallel. This function is fully blocking and will wait for all
// goroutines to finish. Error will be collected and returned as a map of slice
// element index to error. Active goroutines limited with number of CPU.
func ParSliceErrorCollect[V any](ctx context.Context, s []V,
f ErrFunc[V]) (map[int]error, error) {

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.SetLimit(runtime.NumCPU())

var instanceErrorsMutex sync.Mutex
instanceErrors := make(map[int]error, len(s))

for idx, v := range s {
v := v
errGroup.Go(func() error {
err := f(ctx, v)
if err != nil {
instanceErrorsMutex.Lock()
instanceErrors[idx] = err
instanceErrorsMutex.Unlock()
}

// Avoid returning an error here, as that would cancel
// the errGroup and terminate all slice element
// processing instances. Instead, collect the error and
// return it later.
return nil
})
}

err := errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to wait on error group in "+
"ParSliceErrorCollect: %w", err)
}

return instanceErrors, nil
}

0 comments on commit 99392e1

Please sign in to comment.