-
Notifications
You must be signed in to change notification settings - Fork 0
/
vertex_spec.rb
237 lines (220 loc) · 8.4 KB
/
vertex_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# frozen_string_literal: true
require 'timeout'
module ExcADG
# rubocop:disable Metrics/BlockLength
# rubocop:disable Style/BlockDelimiters
describe Vertex do
before(:all) {
Broker.instance.start
}
after(:all) {
Broker.instance.teardown
}
context 'base features' do
before(:all) {
@a = Vertex.new payload: Payload::Example::Echo.new(args: :a)
@b = Vertex.new payload: Payload::Example::Echo.new(args: :b), name: :custom_name
@list = [@a, @b]
sleep 0.1 until @list.all? { |e| Broker.instance.data_store.key? e }
@ra = @a.data.vertex
@rb = @b.data.vertex
}
it 'has correct name' do
expect(@a.name).to eq "v#{@a.number}".to_sym
expect(@b.name).to eq :custom_name
end
it 'equals to its ractor' do
expect(@a).to be_a Vertex
expect(@ra).to be_a Vertex
expect(@a).to eq @ra
end
it 'are not equal to each other' do
expect(@a).not_to eq(@b)
expect(@b).not_to eq(@a)
end
it 'should bookkeep all vertices' do
expect(Broker.instance.data_store.key?(@a)).to eq true
expect(Broker.instance.data_store.key?(@b)).to eq true
end
it 'subtracts from list' do
expect(@list - [@a]).to eq [@b]
end
end
context 'with correct payload' do
subject { Vertex.new payload: Payload::Example::Echo.new }
it 'should have correct info' do
expect(subject.info.count).to eq 4
expect(%i[terminated running blocking]).to include subject.status
expect(subject.number).to be_a(Numeric)
end
it 'keeps returning data once finished' do
sleep 0.5 until subject.state.eql? :done
expect(subject.data.state).to eq :done
expect(subject.data.data).to eq :ping
expect(subject.data.data).to eq :ping
end
end
context 'with timeout set' do
shared_examples 'timing out vertex' do
it 'times out with plain timeout' do
v = Vertex.new timeout: 0.1, payload: subject
loop {
sleep 0.1
raise "unexpected: #{v.state}" if v.state.eql? :done
break if v.state.eql? :failed
}
expect(v.data.data).to be_a RTimeout::TimedOutError
end
it 'times out with global timeout only' do
v = Vertex.new timeout: VTimeout.new(global: 0.1), payload: subject
loop {
sleep 0.1
raise "unexpected: #{v.state}" if v.state.eql? :done
break if v.state.eql? :failed
}
expect(v.data.data).to be_a RTimeout::TimedOutError
end
it 'times out with payload timeout only' do
v = Vertex.new timeout: VTimeout.new(payload: 0.1), payload: subject
loop {
sleep 0.1
raise "unexpected: #{v.state}" if v.state.eql? :done
break if v.state.eql? :failed
}
expect(v.data.data).to be_a RTimeout::TimedOutError
end
it 'times out with deps timeout only' do
long_dep = Vertex.new payload: subject
v = Vertex.new timeout: VTimeout.new(deps: 0.1), payload: Payload::Example::Echo.new, deps: [long_dep]
loop {
sleep 0.1
raise "unexpected: #{v.state}" if v.state.eql? :done
break if v.state.eql? :failed
}
expect(v.data.data).to be_a RTimeout::TimedOutError
end
end
it_behaves_like('timing out vertex') {
subject { Payload::Example::Sleepy.new(args: 1) }
}
# TODO: find a command that hangs on I/O
# it_behaves_like('timing out vertex') {
# subject { Payload::Wrapper::Bin.new args: 'ncat -l 12345' }
# }
end
context 'with faulty payload' do
subject { Vertex.new payload: Payload::Example::Faulty.new }
it 'reaches failed state' do
loop {
sleep 0.1
break if subject.state.eql? :failed
raise 'it should not reach :done' if subject.state.eql? :done
}
expect(subject.data.data).to be_a StandardError
end
end
context 'with dependencies' do
subject(:dep1) { Vertex.new(payload: Payload::Example::Sleepy.new) }
subject(:dep2) { Vertex.new(payload: Payload::Example::Sleepy.new) }
subject(:stranger) { Vertex.new(payload: Payload::Example::Faulty.new) }
subject { Vertex.new payload: Payload::Example::Sleepy.new, deps: [dep1, "v#{dep2.number}".to_sym] }
it 'does not construct with incorrect dependency type' do
expect { Vertex.new payload: Payload::Example::Echo.new, deps: [dep1, 'some'] }.to raise_error StandardError
end
it 'waits for dependencies' do
sleep 0.1 until subject.state.eql? :new
sleep 0.1 until [dep1, dep2].all? { |d| d.state.eql? :done } && stranger.state.eql?(:failed)
expect(%i[new ready]).to include subject.state
sleep 0.1 until subject.state.eql?(:done)
expect(stranger.data.state).to eq :failed
end
end
context 'with failing dependency' do
subject(:dep1) { Vertex.new(payload: Payload::Example::Sleepy.new) }
subject(:dep2) { Vertex.new(payload: Payload::Example::Faulty.new) }
subject(:subj) { Vertex.new payload: Payload::Example::Echo.new, deps: [dep1, dep2] }
it 'fails itself' do
sleep 0.1 until subj.state.eql?(:failed)
expect(dep2.data.state).to eq :failed
end
context 'with failing grand-dependency' do
subject(:grandchild) { Vertex.new payload: Payload::Example::Echo.new, deps: [subj] }
it 'fails too (in cascade)' do
sleep 0.1 until grandchild.state.eql?(:failed)
expect(subj.data.state).to eq :failed
end
end
end
context 'with deps data processing' do
subject(:dep1) { Vertex.new(payload: Payload::Example::Echo.new) }
subject(:dep2) { Vertex.new(payload: Payload::Example::Echo.new) }
subject { Vertex.new payload: Payload::Example::Receiver.new, deps: [dep1, dep2] }
it 'finishes successfully' do
loop {
break if subject.state.eql? :done
raise if subject.state.eql? :failed
sleep 0.5
}
end
end
context 'with conditional vertex' do
subject(:dep1) { Vertex.new(payload: Payload::Example::Echo.new(args: :trigger)) }
subject(:dep2) { Vertex.new(payload: Payload::Example::Echo.new(args: :trigger)) }
subject { Vertex.new payload: Payload::Example::Condition.new, deps: [dep1, dep2] }
it 'triggers another vertex' do
loop {
sleep 0.5
break if subject.state.eql? :done
raise if subject.state.eql? :failed
}
expect(subject.data.data).to be_a Vertex
end
context 'with threadkill dep' do
subject(:dep3) { Vertex.new(payload: Payload::Example::Echo.new(args: :not_trigger)) }
subject { Vertex.new payload: Payload::Example::Condition.new, deps: [dep1, dep2, dep3] }
it 'does not trigger another vertex' do
loop {
sleep 0.5
break if subject.state.eql? :done
raise if subject.state.eql? :failed
}
expect(subject.data.data).to eq nil
end
end
end
context 'with loop vertex' do
subject(:source) { Vertex.new payload: Payload::Example::Echo.new(args: [3, 2, 1]) }
subject(:looper) { Vertex.new payload: Payload::Example::Loop.new, deps: [source] }
it 'produces 3 new vertices' do
loop {
break if looper.state.eql? :done
raise 'looper failed' if looper.state.eql? :failed
}
children = looper.data.data
expect(children).to be_a Array
expect(children.size).to eq 3
loop {
break if children.collect(&:state).all? :done
raise 'one of children failed' if children.collect(&:state).any? :failed
}
end
end
context 'with many vertices', :perf do
subject(:array) {
Array.new(32) { Vertex.new payload: Payload::Example::Benchmark.new }
}
subject { Vertex.new payload: Payload::Example::Echo.new, deps: array }
it 'should finish in timeout' do
Timeout.timeout(90) { # timeout is speculative
loop {
sleep 2
raise 'payload failed' if subject.data&.failed?
break if subject.data&.done?
}
}
end
end
end
# rubocop:enable Style/BlockDelimiters
# rubocop:enable Metrics/BlockLength
end