-
Notifications
You must be signed in to change notification settings - Fork 109
/
flow.go
332 lines (304 loc) · 8.7 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
// Package flow processes gotocol context information to collect and export request flows across the system
package flow
import (
"encoding/json"
"fmt"
"log"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/adrianco/spigo/tooling/archaius"
"github.com/adrianco/spigo/tooling/collect"
"github.com/adrianco/spigo/tooling/dhcp"
"github.com/adrianco/spigo/tooling/gotocol"
"github.com/adrianco/spigo/tooling/graphneo4j"
"github.com/go-kit/kit/metrics/generic"
)
// Values for zipkin span direction
type Values int
// Zipkin annotation tags
const (
CS Values = iota // client send
SR // server receive
SS // server send
CR // client receive
Unknown // something went wrong
)
// pretty printer for Values
func (v Values) String() string {
switch v {
case CS:
return "cs"
case SR:
return "sr"
case SS:
return "ss"
case CR:
return "cr"
default:
return "unknown"
}
}
// flowmap is a map by traceid of slices of pointers to spannotations
type flowmaptype map[gotocol.TraceContextType][]*spannotype
// Annotation information for each step in the span
type spannotype struct {
Ctx string `json:"ctx"` // Context as string
Host string `json:"host"` // host name
Imp string `json:"imposition"` // protocol request type
Intent string `json:"intention"` // request body
Timestamp int64 `json:"ts"` // unix nanotimestamp
Value string `json:"value"` // direction of span
}
// ByCtx sortable spans
type ByCtx []*spannotype
func (a ByCtx) Len() int { return len(a) }
func (a ByCtx) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByCtx) Less(i, j int) bool { // sort by span first then time
if a[i].Ctx == a[j].Ctx {
return a[i].Timestamp < a[j].Timestamp
}
return a[i].Ctx < a[j].Ctx
}
var flowmap flowmaptype
var flowlock sync.Mutex // lock changes to the maps
// file to log flow data to
var file *os.File
var collector *KafkaCollector
// Common Annotation code
func annotate(msg gotocol.Message, name string, t time.Time, resp, others Values) *spannotype {
if flowmap == nil {
flowmap = make(flowmaptype, archaius.Conf.Population)
}
if flowmap[msg.Ctx.Trace] == nil {
flowmap[msg.Ctx.Trace] = make([]*spannotype, 0, 2) // reserve space for at least 2 annotations in a span
}
annotation := new(spannotype)
annotation.Host = name
annotation.Ctx = msg.Ctx.String()
annotation.Imp = msg.Imposition.String()
annotation.Intent = msg.Intention
annotation.Timestamp = t.UnixNano()
if msg.Imposition == gotocol.GetResponse {
annotation.Value = resp.String()
} else {
annotation.Value = others.String()
}
return annotation
}
// AnnotateReceive service activity when receiving a message
func AnnotateReceive(msg gotocol.Message, name string, received time.Time) {
if !archaius.Conf.Collect {
return
}
flowlock.Lock()
flowmap[msg.Ctx.Trace] = append(flowmap[msg.Ctx.Trace], annotate(msg, name, received, CR, SR))
flowlock.Unlock()
if graphneo4j.Enabled {
trace := flowmap[msg.Ctx.Trace]
if len(trace) >= 2 {
graphneo4j.WriteFlow(strings.Replace(trace[len(trace)-2].Host, "-", "_", -1), strings.Replace(trace[len(trace)-1].Host, "-", "_", -1), trace[1].Imp, trace[1].Timestamp, msg.Ctx.Trace)
}
}
return
}
// AnnotateSend service sends on a flow
func AnnotateSend(msg gotocol.Message, name string) {
if !archaius.Conf.Collect {
return
}
flowlock.Lock()
flowmap[msg.Ctx.Trace] = append(flowmap[msg.Ctx.Trace], annotate(msg, name, msg.Sent, SS, CS))
flowlock.Unlock()
return
}
// End a flow, flushing output and freeing the request id to keep the map smaller
func End(msg gotocol.Message, resphist, servhist, rthist *generic.Histogram) {
if !archaius.Conf.Collect {
return
}
var cs, sr, ss, cr int64
// find the annotations for the client send time and the client receive time
for _, a := range flowmap[msg.Ctx.Trace] {
if a.Value == CS.String() {
cs = a.Timestamp
}
if a.Value == SR.String() {
sr = a.Timestamp
}
if a.Value == SS.String() {
ss = a.Timestamp
}
if a.Value == CR.String() {
cr = a.Timestamp
}
}
if cs > 0 && cr > 0 { // response time measured at the client
collect.Measure(resphist, time.Unix(0, cr).Sub(time.Unix(0, cs)))
}
if ss > 0 && sr > 0 { // service time measured at the server
collect.Measure(servhist, time.Unix(0, ss).Sub(time.Unix(0, sr)))
}
if cs > 0 && cr > 0 && ss > 0 && sr > 0 { // network time sum of each direction
collect.Measure(rthist, time.Unix(0, sr).Sub(time.Unix(0, cs))+time.Unix(0, cr).Sub(time.Unix(0, ss)))
}
}
// Shutdown the flow mapping system and flush remaining flows
func Shutdown() {
if !archaius.Conf.Collect {
return
}
// Try to add Kafka collector if configured to do so
if len(archaius.Conf.Kafka) > 0 {
log.Printf("Flushing flows to Kafka Collector %#v\n", archaius.Conf.Kafka)
var err error
collector, err = NewKafkaCollector(archaius.Conf.Kafka)
if err != nil {
log.Printf("Unable to start Kafka Collector: %#v\n", err)
} else {
defer collector.Close()
}
}
flowlock.Lock()
defer flowlock.Unlock()
f, err := os.Create("json_metrics/" + archaius.Conf.Arch + "_flow.json")
if err != nil {
log.Fatal(err)
} else {
file = f
}
log.Printf("Flushing flows to %v\n", file.Name())
file.WriteString("[\n")
comma := false
for c, f := range flowmap {
if comma {
file.WriteString(",\n")
} else {
comma = true
}
Flush(c, f)
}
file.WriteString("\n]\n")
file.Close()
}
/* example: Zipkin format is an array of these
{
"traceId": "5e27c67030932221",
"name": "GET",
"id": "38357d8f309b379d",
"parentId": "5e27c67030932221",
"annotations": [
{
"endpoint": {
"serviceName": "zipkin-query",
"ipv4": "127.0.0.1"
},
"timestamp": 1444780030334000,
"value": "cs"
},
{
"endpoint": {
"serviceName": "zipkin-query",
"ipv4": "172.17.0.84"
},
"timestamp": 1444780030643000,
"value": "sr"
},
{
"endpoint": {
"serviceName": "zipkin-query",
"ipv4": "127.0.0.1"
},
"timestamp": 1444780031521000,
"value": "cr"
},
{
"endpoint": {
"serviceName": "zipkin-query",
"ipv4": "172.17.0.84"
},
"timestamp": 1444780031689000,
"value": "ss"
}
]
},
*/
// endpoint for zipkin
type zipkinendpoint struct {
Servicename string `json:"serviceName"`
Ipv4 string `json:"ipv4"`
Port int `json:"port"`
}
// annotation for zipkin
type zipkinannotation struct {
Endpoint zipkinendpoint `json:"endpoint"`
Timestamp int64 `json:"timestamp"`
Value string `json:"value"`
}
// trace for zipkin
type zipkinspan struct {
Traceid string `json:"traceId"`
Name string `json:"name"`
Id string `json:"id"`
ParentId string `json:"parentId,omitempty"`
Annotations []zipkinannotation `json:"annotations"`
}
// WriteZip stores zipkin as json
func WriteZip(zip zipkinspan) {
j, err := json.Marshal([]*zipkinspan{&zip})
if err != nil {
log.Fatal(err)
}
if collector != nil {
collector.Collect(j)
}
file.Write(j[1 : len(j)-1])
}
// Flush the spans for a request in zipkin format
func Flush(t gotocol.TraceContextType, trace []*spannotype) {
var zip zipkinspan
var ctx string
n := -1
sort.Sort(ByCtx(trace))
for _, a := range trace {
//fmt.Println(*a)
if ctx != a.Ctx { // new span
if ctx != "" { // not the first
WriteZip(zip)
file.WriteString(",\n")
zip.Annotations = nil
}
n++
zip.Traceid = fmt.Sprintf("%016d", t) // pad id's to 16 characters to keep zipkin happy
zip.Name = a.Imp
s := strings.SplitAfter(a.Ctx, "s") // tXpYsZ -> [tXpYs, Z]
p := strings.TrimSuffix(strings.SplitAfter(s[0], "p")[1], "s") // tXpYs -> [tXp, Ys] -> Ys -> Y
zip.Id = "000000000000000"[0:(16-len(s[1]))] + s[1] // pad id's to 16 characters to keep zipkin happy
if p != "0" {
zip.ParentId = "000000000000000"[0:(16-len(p))] + p // pad id's to 16 characters to keep zipkin happy
}
ctx = a.Ctx
}
var ann zipkinannotation
ann.Endpoint.Servicename = a.Host
ann.Endpoint.Ipv4 = dhcp.Lookup(a.Host)
ann.Endpoint.Port = 8080
ann.Timestamp = a.Timestamp / 1000 // convert from UnixNano to Microseconds
ann.Value = a.Value
zip.Annotations = append(zip.Annotations, ann)
}
WriteZip(zip)
}
// Instrument common code for requests
func Instrument(msg gotocol.Message, name string, hist *generic.Histogram) {
received := time.Now()
collect.Measure(hist, received.Sub(msg.Sent))
if archaius.Conf.Msglog {
log.Printf("%v: %v\n", name, msg)
}
if msg.Ctx != gotocol.NilContext {
AnnotateReceive(msg, name, received) // store the annotation for this request
}
}