-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline_test.go
98 lines (92 loc) · 2.28 KB
/
pipeline_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package parallel
import (
"context"
"testing"
)
func genRange(start, end, step int) TapFunc[int] {
val := start
return func(context.Context) (int, error) {
v := val
if v >= end {
return v, ErrEOD
}
val += step
return v, nil
}
}
func TestJoin(t *testing.T) {
var (
start = 0
end = 10000
step = 1
)
ctx := context.Background()
tap1 := NewTap(genRange(start, end, step))
p1_mapper1 := NewMapper(5, func(ctx context.Context, in int) (int, error) {
return in * in, nil
})
p1_reducer1 := NewReducer(3, func(ctx context.Context, in int, out []int) ([]int, bool, error) {
var flush bool
out = append(out, in)
if len(out) >= 7 {
flush = true
}
return out, flush, nil
})
p1_spreader1 := NewSpreader(3, func(ctx context.Context, in []int) ([]int, error) {
return in, nil
})
p1_reducer2 := NewReducer(1, func(ctx context.Context, in int, out int) (int, bool, error) {
return in + out, false, nil
})
tap1.Join(p1_mapper1)
p1_mapper1.Join(p1_reducer1)
p1_reducer1.Join(p1_spreader1)
p1_spreader1.Join(p1_reducer2)
tap1.Run(ctx)
result1 := <-p1_reducer2.Out()
var expected1 int
for v := start; v < end; v += step {
expected1 += v * v
}
if result1.Value() != expected1 {
t.Errorf("%v expected but got %v", expected1, result1.Value())
}
}
func TestCompose(t *testing.T) {
var (
start = 0
end = 10000
step = 1
)
ctx := context.Background()
tap1 := NewTap(genRange(start, end, step))
p1_mapper1 := NewMapper(5, func(ctx context.Context, in int) (int, error) {
return in * in, nil
})
p1_reducer1 := NewReducer(3, func(ctx context.Context, in int, out []int) ([]int, bool, error) {
var flush bool
out = append(out, in)
if len(out) >= 7 {
flush = true
}
return out, flush, nil
})
p1_spreader1 := NewSpreader(3, func(ctx context.Context, in []int) ([]int, error) {
return in, nil
})
p1_reducer2 := NewReducer(1, func(ctx context.Context, in int, out int) (int, bool, error) {
return in + out, false, nil
})
p1 := Compose(Compose(Compose(p1_mapper1, p1_reducer1), p1_spreader1), p1_reducer2)
tap1.Join(p1)
tap1.Run(ctx)
result1 := <-p1_reducer2.Out()
var expected1 int
for v := start; v < end; v += step {
expected1 += v * v
}
if result1.Value() != expected1 {
t.Errorf("%v expected but got %v", expected1, result1.Value())
}
}