diff --git a/docs/api/packages/slice.md b/docs/api/packages/slice.md index e81b5311..714c42f4 100644 --- a/docs/api/packages/slice.md +++ b/docs/api/packages/slice.md @@ -70,6 +70,7 @@ import ( - [Merge](#Merge) - [Reverse](#Reverse) - [Reducedeprecated](#Reduce) +- [ReduceConcurrent](#ReduceConcurrent) - [ReduceBy](#ReduceBy) - [ReduceRight](#ReduceRight) - [Replace](#Replace) @@ -1578,15 +1579,15 @@ import ( func main() { nums := []int{1, 2, 3, 4, 5, 6} - + result := slice.MapConcurrent(nums, func(_, n int) int { return n * n }, 4) - fmt.Println(result) + fmt.Println(result) - // Output: - // [1 4 9 16 25 36] + // Output: + // [1 4 9 16 25 36] } ``` @@ -1759,6 +1760,38 @@ func main() { } ``` +### ReduceConcurrent + +

对切片元素执行并发reduce操作。

+ +函数签名: + +```go +func ReduceConcurrent[T any](slice []T, initial T, reducer func(index int, item T, agg T) T, numThreads int) T +``` + +示例:[运行]() + +```go +import ( + "fmt" + "github.com/duke-git/lancet/v2/slice" +) + +func main() { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + + result := slice.ReduceConcurrent(nums, 0, func(_ int, item, agg int) int { + return agg + item + }, 1) + + fmt.Println(result) + + // Output: + // 55 +} +``` + ### ReduceBy

对切片元素执行reduce操作。

diff --git a/docs/en/api/packages/slice.md b/docs/en/api/packages/slice.md index b739fd4c..9fdda7a2 100644 --- a/docs/en/api/packages/slice.md +++ b/docs/en/api/packages/slice.md @@ -70,6 +70,7 @@ import ( - [Merge](#Merge) - [Reverse](#Reverse) - [Reducedeprecated](#Reduce) +- [ReduceConcurrent](#ReduceConcurrent) - [ReduceBy](#ReduceBy) - [ReduceRight](#ReduceRight) - [Replace](#Replace) @@ -1754,6 +1755,39 @@ func main() { } ``` +### ReduceConcurrent + +

Reduces the slice to a single value by applying the reducer function to each item in the slice concurrently.

+ +Signature: + +```go +func ReduceConcurrent[T any](slice []T, initial T, reducer func(index int, item T, agg T) T, numThreads int) T +``` + +Example:[运行]() + +```go +import ( + "fmt" + "github.com/duke-git/lancet/v2/slice" +) + +func main() { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + + result := slice.ReduceConcurrent(nums, 0, func(_ int, item, agg int) int { + return agg + item + }, 1) + + fmt.Println(result) + + // Output: + // 55 +} +``` + + ### ReduceBy

Produces a value from slice by accumulating the result of each element as passed through the reducer function.

diff --git a/slice/slice_concurrent.go b/slice/slice_concurrent.go index 7ff47889..e5d40ffb 100644 --- a/slice/slice_concurrent.go +++ b/slice/slice_concurrent.go @@ -76,6 +76,50 @@ func MapConcurrent[T any, U any](slice []T, iteratee func(index int, item T) U, return result } +// ReduceConcurrent reduces the slice to a single value by applying the reducer function to each item in the slice concurrently. +// Play: todo +func ReduceConcurrent[T any](slice []T, initial T, reducer func(index int, item T, agg T) T, numThreads int) T { + if numThreads <= 0 { + numThreads = 1 + } + + var wg sync.WaitGroup + var mu sync.Mutex + + sliceLen := len(slice) + chunkSize := (sliceLen + numThreads - 1) / numThreads + results := make([]T, numThreads) + + for i := 0; i < numThreads; i++ { + start := i * chunkSize + end := start + chunkSize + if end > sliceLen { + end = sliceLen + } + + wg.Add(1) + go func(i, start, end int) { + defer wg.Done() + tempResult := initial + for j := start; j < end; j++ { + tempResult = reducer(j, slice[j], tempResult) + } + mu.Lock() + results[i] = tempResult + mu.Unlock() + }(i, start, end) + } + + wg.Wait() + + result := initial + for i, r := range results { + result = reducer(i, result, r) + } + + return result +} + // FilterConcurrent applies the provided filter function `predicate` to each element of the input slice concurrently. // Play: todo func FilterConcurrent[T any](slice []T, predicate func(index int, item T) bool, numThreads int) []T { diff --git a/slice/slice_example_test.go b/slice/slice_example_test.go index c9bde20a..b20a4b47 100644 --- a/slice/slice_example_test.go +++ b/slice/slice_example_test.go @@ -527,6 +527,18 @@ func ExampleReduce() { // 6 } +func ExampleReduceConcurrent() { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int { + return agg + item + }, 1) + + fmt.Println(result) + + // Output: + // 55 +} + func ExampleReduceBy() { result1 := ReduceBy([]int{1, 2, 3, 4}, 0, func(_ int, item int, agg int) int { return agg + item diff --git a/slice/slice_test.go b/slice/slice_test.go index 21b00c2f..4fd9da2d 100644 --- a/slice/slice_test.go +++ b/slice/slice_test.go @@ -587,6 +587,44 @@ func TestReduce(t *testing.T) { } } +func TestReduceConcurrent(t *testing.T) { + t.Parallel() + + assert := internal.NewAssert(t, "TestReduceConcurrent") + + t.Run("basic", func(t *testing.T) { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int { + return agg + item + }, 4) + assert.Equal(55, result) + }) + + t.Run("empty slice", func(t *testing.T) { + nums := []int{} + result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int { + return agg + item + }, 4) + assert.Equal(0, result) + }) + + t.Run("single thread", func(t *testing.T) { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int { + return agg + item + }, 1) + assert.Equal(55, result) + }) + + t.Run("negative threads", func(t *testing.T) { + nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + result := ReduceConcurrent(nums, 0, func(_ int, item, agg int) int { + return agg + item + }, -1) + assert.Equal(55, result) + }) +} + func TestReduceBy(t *testing.T) { t.Parallel()