-
Notifications
You must be signed in to change notification settings - Fork 456
/
Copy pathtoxic.go
155 lines (134 loc) · 3.68 KB
/
toxic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package toxics
import (
"math/rand"
"reflect"
"sync"
"github.com/Shopify/toxiproxy/v2/stream"
)
// A Toxic is something that can be attatched to a link to modify the way
// data can be passed through (for example, by adding latency)
//
// Toxic
// v
// Client <-> ToxicStub <-> Upstream
//
// Toxic's work in a pipeline fashion, and can be chained together
// with channels. The toxic itself only defines the settings and
// Pipe() function definition, and uses the ToxicStub struct to store
// per-connection information. This allows the same toxic to be used
// for multiple connections.
type Toxic interface {
// Defines how packets flow through a ToxicStub.
// Pipe() blocks until the link is closed or interrupted.
Pipe(*ToxicStub)
}
type CleanupToxic interface {
// Cleanup is called before a toxic is removed.
Cleanup(*ToxicStub)
}
type BufferedToxic interface {
// Defines the size of buffer this toxic should use
GetBufferSize() int
}
// Stateful toxics store a per-connection state object on the ToxicStub.
// The state is created once when the toxic is added and persists until the
// toxic is removed or the connection is closed.
type StatefulToxic interface {
// Creates a new object to store toxic state in
NewState() interface{}
}
type ToxicWrapper struct {
Toxic `json:"attributes"`
Name string `json:"name"`
Type string `json:"type"`
Stream string `json:"stream"`
Toxicity float32 `json:"toxicity"`
Direction stream.Direction `json:"-"`
Index int `json:"-"`
BufferSize int `json:"-"`
}
type ToxicStub struct {
Input <-chan *stream.StreamChunk
Output chan<- *stream.StreamChunk
State interface{}
Interrupt chan struct{}
running chan struct{}
closed chan struct{}
}
func NewToxicStub(input <-chan *stream.StreamChunk, output chan<- *stream.StreamChunk) *ToxicStub {
return &ToxicStub{
Interrupt: make(chan struct{}),
closed: make(chan struct{}),
Input: input,
Output: output,
}
}
// Begin running a toxic on this stub, can be interrupted.
// Runs a noop toxic randomly depending on toxicity.
func (s *ToxicStub) Run(toxic *ToxicWrapper) {
s.running = make(chan struct{})
defer close(s.running)
//#nosec
if rand.Float32() < toxic.Toxicity {
toxic.Pipe(s)
} else {
new(NoopToxic).Pipe(s)
}
}
// Interrupt the flow of data so that the toxic controlling the stub can be replaced.
// Returns true if the stream was successfully interrupted, or false if the stream is closed.
func (s *ToxicStub) InterruptToxic() bool {
select {
case <-s.closed:
return false
case s.Interrupt <- struct{}{}:
<-s.running // Wait for the running toxic to exit
return true
}
}
func (s *ToxicStub) Closed() bool {
select {
case <-s.closed:
return true
default:
return false
}
}
func (s *ToxicStub) Close() {
if !s.Closed() {
close(s.closed)
close(s.Output)
}
}
var (
ToxicRegistry map[string]Toxic
registryMutex sync.RWMutex
)
func Register(typeName string, toxic Toxic) {
registryMutex.Lock()
defer registryMutex.Unlock()
if ToxicRegistry == nil {
ToxicRegistry = make(map[string]Toxic)
}
ToxicRegistry[typeName] = toxic
}
func New(wrapper *ToxicWrapper) Toxic {
registryMutex.RLock()
defer registryMutex.RUnlock()
orig, ok := ToxicRegistry[wrapper.Type]
if !ok {
return nil
}
wrapper.Toxic = reflect.New(reflect.TypeOf(orig).Elem()).Interface().(Toxic)
if buffered, ok := wrapper.Toxic.(BufferedToxic); ok {
wrapper.BufferSize = buffered.GetBufferSize()
} else {
wrapper.BufferSize = 0
}
return wrapper.Toxic
}
func Count() int {
registryMutex.RLock()
defer registryMutex.RUnlock()
return len(ToxicRegistry)
}