-
Notifications
You must be signed in to change notification settings - Fork 155
/
flow.go
2119 lines (1933 loc) · 70.3 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2017 Intel Corporation.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package flow provides functionality for constructing packet processing graph
// Preparations of construction:
// All construction should be between SystemInit and SystemStart functions.
// User command line options should be added as flags before SystemInit option - it will
// parse them as well as internal library options.
// Packet processing graph construction:
// NFF-GO library provides nine so-called Flow Functions for packet processing graph
// construction. They operate term "flow" however it is just abstraction for connecting
// them. Not anything beyond this. These nine flow functions are:
// Receive, Generate - for adding packets to graph
// Send, Stop - for removing packets from graph
// Handle - for handling packets inside graph
// Separate, Split, Count, Merge for combining flows inside graph
// All this functions can be added to the graph be "Set" functions like
// SetReceiver, SetSplitter, etc.
// Flow functions Generate, Handle, Separate and Split use user defined functions
// for processing. These functions are received each packet from flow (or new
// allocated packet in generate). Function types of user defined functions are
// also defined in this file.
// Package flow is the main package of NFF-GO library and should be always imported by
// user application.
package flow
import (
"net"
"os"
"os/signal"
"runtime"
"sync/atomic"
"time"
"github.com/intel-go/nff-go/asm"
"github.com/intel-go/nff-go/common"
"github.com/intel-go/nff-go/internal/low"
"github.com/intel-go/nff-go/packet"
"github.com/intel-go/nff-go/types"
)
var openFlowsNumber = uint32(0)
var createdPorts []port
var portPair map[types.IPv4Address](*port)
var schedState *scheduler
var vEach [10][vBurstSize]uint8
var ioDevices map[string]interface{}
type Timer struct {
t *time.Ticker
handler func(UserContext)
contexts []UserContext
checks []*bool
}
type processSegment struct {
out []low.Rings
contexts []UserContext
stype uint8
}
// Flow is an abstraction for connecting flow functions with each other.
// Flow shouldn't be understood in any way beyond this.
type Flow struct {
current low.Rings
segment *processSegment
previous **Func
inIndexNumber int32
}
type partitionCtx struct {
currentAnswer uint8
currentCompare uint64
currentPacketNumber uint64
N uint64
M uint64
}
func (c partitionCtx) Copy() interface{} {
return &partitionCtx{N: c.N, M: c.M, currentCompare: c.N}
}
func (c partitionCtx) Delete() {
}
type Func struct {
sHandleFunction HandleFunction
sSeparateFunction SeparateFunction
sSplitFunction SplitFunction
sFunc func(*packet.Packet, *Func, UserContext) uint
vHandleFunction VectorHandleFunction
vSeparateFunction VectorSeparateFunction
vSplitFunction VectorSplitFunction
vFunc func([]*packet.Packet, *[vBurstSize]bool, *[vBurstSize]uint8, *Func, UserContext)
next [](*Func)
bufIndex uint
contextIndex int
followingNumber uint8
}
// GenerateFunction is a function type for user defined function which generates packets.
// Function receives preallocated packet where user should add
// its size and content.
type GenerateFunction func(*packet.Packet, UserContext)
// VectorGenerateFunction is a function type like GenerateFunction for vector generating
type VectorGenerateFunction func([]*packet.Packet, UserContext)
// HandleFunction is a function type for user defined function which handles packets.
// Function receives a packet from flow. User should parse it
// and make necessary changes. It is prohibit to free packet in this
// function.
type HandleFunction func(*packet.Packet, UserContext)
// VectorHandleFunction is a function type like HandleFunction for vector handling
type VectorHandleFunction func([]*packet.Packet, *[vBurstSize]bool, UserContext)
// SeparateFunction is a function type for user defined function which separates packets
// based on some rule for two flows. Functions receives a packet from flow.
// User should parse it and decide whether this packet should remains in
// this flow - return true, or should be sent to new added flow - return false.
type SeparateFunction func(*packet.Packet, UserContext) bool
// VectorSeparateFunction is a function type like SeparateFunction for vector separation
type VectorSeparateFunction func([]*packet.Packet, *[vBurstSize]bool, *[vBurstSize]bool, UserContext)
// SplitFunction is a function type for user defined function which splits packets
// based in some rule for multiple flows. Function receives a packet from
// flow. User should parse it and decide in which output flows this packet
// should be sent. Return number of flow shouldn't exceed target number
// which was put to SetSplitter function. Also it is assumed that "0"
// output flow is used for dropping packets - "Stop" function should be
// set after "Split" function in it.
type SplitFunction func(*packet.Packet, UserContext) uint
// VectorSplitFunction is a function type like SplitFunction for vector splitting
type VectorSplitFunction func([]*packet.Packet, *[vBurstSize]bool, *[vBurstSize]uint8, UserContext)
// Kni is a high level struct of KNI device. The device itself is stored
// in C memory in low.c and is defined by its port which is equal to port
// in this structure
type Kni struct {
portId uint16
}
type receiveParameters struct {
out low.Rings
port *low.Port
status []int32
stats common.RXTXStats
}
func addReceiver(portId uint16, out low.Rings, inIndexNumber int32) {
par := new(receiveParameters)
par.port = low.GetPort(portId)
par.out = out
par.status = make([]int32, maxRecv, maxRecv)
schedState.addFF("receiverPort"+string(portId), nil, recvRSS, nil, par, nil, receiveRSS, inIndexNumber, &par.stats)
}
type receiveOSParameters struct {
out low.Rings
socket int
stats common.RXTXStats
}
func addOSReceiver(socket int, out low.Rings) {
par := new(receiveOSParameters)
par.socket = socket
par.out = out
schedState.addFF("OS receiver", nil, recvOS, nil, par, nil, sendReceiveKNI, 0, &par.stats)
}
type receiveXDPParameters struct {
out low.Rings
socket low.XDPSocket
stats common.RXTXStats
}
func addXDPReceiver(socket low.XDPSocket, out low.Rings) {
par := new(receiveXDPParameters)
par.socket = socket
par.out = out
schedState.addFF("AF_XDP receiver", nil, recvXDP, nil, par, nil, sendReceiveKNI, 0, &par.stats)
}
type KNIParameters struct {
in low.Rings
out low.Rings
port *low.Port
recv bool
send bool
linuxCore bool
stats common.RXTXStats
}
func addKNI(portId uint16, recv bool, out low.Rings, send bool, in low.Rings, inIndexNumber int32, name string, core bool) {
par := new(KNIParameters)
par.port = low.GetPort(portId)
par.in = in
par.out = out
par.recv = recv
par.send = send
par.linuxCore = core
if core {
schedState.addFF(name, nil, processKNI, nil, par, nil, comboKNI, inIndexNumber, &par.stats)
} else {
schedState.addFF(name, nil, processKNI, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats)
}
}
type generateParameters struct {
out low.Rings
generateFunction GenerateFunction
vectorGenerateFunction VectorGenerateFunction
mempool *low.Mempool
targetChannel chan uint64
targetSpeed float64
stats common.RXTXStats
}
func addGenerator(out low.Rings, generateFunction GenerateFunction, context UserContext) {
par := new(generateParameters)
par.out = out
par.generateFunction = generateFunction
ctx := make([]UserContext, 1, 1)
ctx[0] = context
schedState.addFF("generator", nil, nil, pGenerate, par, &ctx, generate, 0, &par.stats)
}
func addFastGenerator(out low.Rings, generateFunction GenerateFunction,
vectorGenerateFunction VectorGenerateFunction, targetSpeed uint64, context UserContext) (chan uint64, error) {
fTargetSpeed := float64(targetSpeed)
if fTargetSpeed/(1000 /*milleseconds*/ /float64(schedTime)) < float64(burstSize) {
// TargetSpeed per schedTime should be more than burstSize because one burstSize packets in
// one schedTime seconds are out minimal scheduling part. We can't make generate speed less than this.
return nil, common.WrapWithNFError(nil, "Target speed per schedTime should be more than burstSize", common.BadArgument)
}
par := new(generateParameters)
par.out = out
par.generateFunction = generateFunction
par.mempool = low.CreateMempool("fast generate")
par.vectorGenerateFunction = vectorGenerateFunction
par.targetSpeed = fTargetSpeed
par.targetChannel = make(chan uint64, 1)
ctx := make([]UserContext, 1, 1)
ctx[0] = context
schedState.addFF("fast generator", nil, nil, pFastGenerate, par, &ctx, fastGenerate, 0, &par.stats)
return par.targetChannel, nil
}
type sendParameters struct {
in low.Rings
port uint16
unrestrictedClones bool
stats common.RXTXStats
sendThreadIndex int
}
func addSender(port uint16, in low.Rings, inIndexNumber int32) {
for iii := 0; iii < sendCPUCoresPerPort; iii++ {
par := new(sendParameters)
par.port = port
par.in = in
par.unrestrictedClones = schedState.unrestrictedClones
par.sendThreadIndex = iii
schedState.addFF("senderPort"+string(port)+"Thread"+string(iii),
nil, send, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats)
}
}
type sendOSParameters struct {
in low.Rings
socket int
stats common.RXTXStats
}
func addSenderOS(socket int, in low.Rings, inIndexNumber int32) {
par := new(sendOSParameters)
par.socket = socket
par.in = in
schedState.addFF("sender OS", nil, sendOS, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats)
}
type sendXDPParameters struct {
in low.Rings
socket low.XDPSocket
stats common.RXTXStats
}
func addSenderXDP(socket low.XDPSocket, in low.Rings, inIndexNumber int32) {
par := new(sendXDPParameters)
par.socket = socket
par.in = in
schedState.addFF("AF_XDP sender", nil, sendXDP, nil, par, nil, sendReceiveKNI, inIndexNumber, &par.stats)
}
type copyParameters struct {
in low.Rings
out low.Rings
outCopy low.Rings
mempool *low.Mempool
}
func addCopier(in low.Rings, out low.Rings, outCopy low.Rings, inIndexNumber int32) {
par := new(copyParameters)
par.in = in
par.out = out
par.outCopy = outCopy
par.mempool = low.CreateMempool("copy")
schedState.addFF("copy", nil, nil, pcopy, par, nil, segmentCopy, inIndexNumber, nil)
}
func makePartitioner(N uint64, M uint64) *Func {
f := new(Func)
f.sFunc = partition
f.vFunc = vPartition
f.next = make([]*Func, 2, 2)
f.followingNumber = 2
return f
}
func makeSeparator(separateFunction SeparateFunction, vectorSeparateFunction VectorSeparateFunction) *Func {
f := new(Func)
f.sSeparateFunction = separateFunction
f.vSeparateFunction = vectorSeparateFunction
f.sFunc = separate
f.vFunc = vSeparate
f.next = make([]*Func, 2, 2)
f.followingNumber = 2
return f
}
func makeSplitter(splitFunction SplitFunction, vectorSplitFunction VectorSplitFunction, n uint8) *Func {
f := new(Func)
f.sSplitFunction = splitFunction
f.vSplitFunction = vectorSplitFunction
f.sFunc = split
f.vFunc = vSplit
f.next = make([]*Func, n, n)
f.followingNumber = n
return f
}
func makeHandler(handleFunction HandleFunction, vectorHandleFunction VectorHandleFunction) *Func {
f := new(Func)
f.sHandleFunction = handleFunction
f.vHandleFunction = vectorHandleFunction
f.sFunc = handle
f.vFunc = vHandle
f.next = make([]*Func, 1, 1)
f.followingNumber = 1
return f
}
type writeParameters struct {
in low.Rings
filename string
stats common.RXTXStats
}
func addWriter(filename string, in low.Rings, inIndexNumber int32) {
par := new(writeParameters)
par.in = in
par.filename = filename
schedState.addFF("write", write, nil, nil, par, nil, readWrite, inIndexNumber, &par.stats)
}
type readParameters struct {
out low.Rings
filename string
repcount int32
stats common.RXTXStats
}
func addReader(filename string, out low.Rings, repcount int32) {
par := new(readParameters)
par.out = out
par.filename = filename
par.repcount = repcount
schedState.addFF("read", read, nil, nil, par, nil, readWrite, 0, &par.stats)
}
func makeSlice(out low.Rings, segment *processSegment) *Func {
f := new(Func)
f.sFunc = constructSlice
f.vFunc = vConstructSlice
segment.out = append(segment.out, out)
f.bufIndex = uint(len(segment.out) - 1)
f.followingNumber = 0
return f
}
type segmentParameters struct {
in low.Rings
out *([]low.Rings)
firstFunc *Func
stype *uint8
}
func addSegment(in low.Rings, first *Func, inIndexNumber int32) *processSegment {
par := new(segmentParameters)
par.in = in
par.firstFunc = first
segment := new(processSegment)
segment.out = make([]low.Rings, 0, 0)
segment.contexts = make([](UserContext), 0, 0)
par.out = &segment.out
par.stype = &segment.stype
schedState.addFF("segment", nil, nil, segmentProcess, par, &segment.contexts, segmentCopy, inIndexNumber, nil)
return segment
}
type HWCapability int
const (
HWTXChecksumCapability HWCapability = iota
HWRXPacketsTimestamp
)
const (
recvNotUsed int32 = iota
recvNotDone
recvDone
)
// CheckHWCapability returns array of booleans for every requested
// port. An element of this array is set to true if hardware
// offloading capability is supported on corresponding port, otherwise
// it is set to false.
func CheckHWCapability(capa HWCapability, ports []uint16) []bool {
ret := make([]bool, len(ports))
for p := range ports {
switch capa {
case HWTXChecksumCapability:
ret[p] = low.CheckHWTXChecksumCapability(ports[p])
case HWRXPacketsTimestamp:
ret[p] = low.CheckHWRXPacketsTimestamp(ports[p])
default:
ret[p] = false
}
}
return ret
}
// SetUseHWCapability enables or disables using a hardware offloading
// capability.
func SetUseHWCapability(capa HWCapability, use bool) {
switch capa {
case HWTXChecksumCapability:
packet.SetHWTXChecksumFlag(use)
}
}
// Size of operations with internal ring buffers and NIC receive/send
// Can be changed for debug and test purposes for scalar examples, not recommended
// At i40e drivers burstSize should be >= 4
// http://mails.dpdk.org/archives/dev/2016-December/052554.html
const burstSize = 32
// Size of all vectors in system. Can't be changed due to asm stickiness
// Using vector functions with vBurstSize != burstSize is undefined behaviour
const vBurstSize = 32
const reportMbits = false
var sizeMultiplier uint
var schedTime uint
var hwtxchecksum, hwrxpacketstimestamp, setSIGINTHandler bool
var maxRecv int
var sendCPUCoresPerPort, tXQueuesNumberPerPort int
type port struct {
wasRequested bool // has user requested any send/receive operations at this port
willReceive bool // will this port receive packets
willKNI bool // will this port has assigned KNI device
KNICoreIndex int
port uint16
MAC types.MACAddress
InIndex int32
sendRings low.Rings
}
// Config is a struct with all parameters, which user can pass to NFF-GO library
type Config struct {
// Specifies cores which will be available for scheduler to place
// flow functions and their clones.
CPUList string
// If true, scheduler is disabled entirely. Default value is false.
DisableScheduler bool
// If true, scheduler does not stop any previously cloned flow
// function threads. Default value is false.
PersistentClones bool
// If true, Stop routine gets a dedicated CPU core instead of
// running together with scheduler. Default value is false.
StopOnDedicatedCore bool
// Calculate IPv4, UDP and TCP checksums in hardware. This flag
// slows down general TX processing, so it should be enabled if
// applications intends to modify packets often, and therefore
// needs to recalculate their checksums. If application doesn't
// modify many packets, it may chose to calculate checksums in SW
// and leave this flag off. Default value is false.
HWTXChecksum bool
// Specifies number of mbufs in mempool per port. Default value is
// 8191.
MbufNumber uint
// Specifies number of mbufs in per-CPU core cache in
// mempool. Default value is 250.
MbufCacheSize uint
// Number of burstSize groups in all rings. This should be power
// of 2. Default value is 256.
RingSize uint
// Time between scheduler actions in miliseconds. Default value is
// 1500.
ScaleTime uint
// Time in miliseconds for scheduler to check changing of flow
// function behaviour. Default value is 10000.
CheckTime uint
// Time in miliseconds for scheduler to display statistics.
// Default value is 1000.
DebugTime uint
// Specifies logging type. Default value is common.No |
// common.Initialization | common.Debug.
LogType common.LogType
// Command line arguments to pass to DPDK initialization.
DPDKArgs []string
// Is user going to use KNI
NeedKNI bool
// Maximum simultaneous receives that should handle all
// input at your network card
MaxRecv int
// Limits parallel instances. 1 for one instance, 1000 for RSS count determine instances
MaxInIndex int32
// Scheduler should clone functions even if it can lead to reordering.
// This option should be switched off for all high level reassembling like TCP or HTTP
RestrictedCloning bool
// If application uses EncapsulateHead or DecapsulateHead functions L2 pointers
// should be reinit every receving or generating a packet. This can be removed if
// EncapsulateHead and DecapsulateHead are not in use
NoPacketHeadChange bool
// HTTP server address to use for serving statistics and
// telemetry. Server provides different types of statistics which
// can be controlled by statistics flags. File format is
// JSON. Registered roots return statistics for all framework
// graph nodes or accept an optional argument /ID where ID is port
// number for send and receive nodes.
//
// Following are possible statistics requests:
//
// /rxtxstats for protocol statistics gathered on all send and
// receive or /rxtxstats/name for individual send/receiver node.
//
// /telemetry for all nodes names and their counters which include
// received, send, processed, lost and dropped packets. Using
// /telemetry/name returns information about individual node.
//
// If no string is specified, no HTTP server is spawned.
StatsHTTPAddress *net.TCPAddr
// Enables possibility of IP reassembly via chaining packets
ChainedReassembly bool
// Enables possibility of handling jumbo frames via chaining packets
ChainedJumbo bool
// Enables possibility of handling jumbo frames via making huge packets
// Will require big amount of memory
MemoryJumbo bool
// Enables hardware assisted timestamps in packet mbufs. These
// timestamps can be accessed with GetPacketTimestamp function.
HWRXPacketsTimestamp bool
// Disable setting custom handler for SIGINT in
// SystemStartScheduler. When handler is enabled
// SystemStartScheduler waits for SIGINT notification and calls
// SystemStop after it. It is enabled by default.
NoSetSIGINTHandler bool
// Number of CPU cores to be occupied by Send routines. It is
// necessary to set TXQueuesNumberPerPort to a reasonably big
// number which can be divided by SendCPUCoresPerPort.
SendCPUCoresPerPort int
// Number of transmit queues to use on network card. By default it
// is minimum of NIC supported TX queues number and 2. If this
// value is specified and NIC doesn't support this number of TX
// queues, initialization fails.
TXQueuesNumberPerPort int
// Controls scheduler interval in milliseconds. Default value is
// 500. Lower values allow faster reaction to changing traffic but
// increase scheduling overhead.
SchedulerInterval uint
}
// SystemInit is initialization of system. This function should be always called before graph construction.
func SystemInit(args *Config) error {
if args == nil {
args = &Config{}
}
CPUCoresNumber := runtime.NumCPU()
var cpus []int
var err error
if args.CPUList != "" {
if cpus, err = common.HandleCPUList(args.CPUList, CPUCoresNumber); err != nil {
return err
}
} else {
cpus = common.GetDefaultCPUs(CPUCoresNumber)
}
tXQueuesNumberPerPort = args.TXQueuesNumberPerPort
if tXQueuesNumberPerPort == 0 {
tXQueuesNumberPerPort = 2
}
sendCPUCoresPerPort = args.SendCPUCoresPerPort
if sendCPUCoresPerPort == 0 {
sendCPUCoresPerPort = 1
if tXQueuesNumberPerPort%sendCPUCoresPerPort != 0 {
return common.WrapWithNFError(nil, "TXQueuesNumberPerPort should be divisible by SendCPUCoresPerPort",
common.BadArgument)
}
}
schedulerOff := args.DisableScheduler
schedulerOffRemove := args.PersistentClones
stopDedicatedCore := args.StopOnDedicatedCore
hwtxchecksum = args.HWTXChecksum
hwrxpacketstimestamp = args.HWRXPacketsTimestamp
unrestrictedClones := !args.RestrictedCloning
mbufNumber := uint(8191)
if args.MbufNumber != 0 {
mbufNumber = args.MbufNumber
}
mbufCacheSize := uint(250)
if args.MbufCacheSize != 0 {
mbufCacheSize = args.MbufCacheSize
}
sizeMultiplier = 64
if args.RingSize != 0 {
sizeMultiplier = args.RingSize
}
if args.SchedulerInterval != 0 {
schedTime = args.SchedulerInterval
} else {
schedTime = 500
}
if args.ScaleTime != 0 {
schedTime = args.ScaleTime
}
checkTime := uint(10000)
if args.CheckTime != 0 {
checkTime = args.CheckTime
}
debugTime := uint(1000)
if args.DebugTime != 0 {
debugTime = args.DebugTime
}
if debugTime < schedTime {
return common.WrapWithNFError(nil, "debugTime should be larger or equal to schedTime", common.Fail)
}
needKNI := 0
if args.NeedKNI != false {
needKNI = 1
}
logType := common.No | common.Initialization | common.Debug
if args.LogType != 0 {
logType = args.LogType
}
common.SetLogType(logType)
maxRecv = 2
if args.MaxRecv != 0 {
maxRecv = args.MaxRecv
}
maxInIndex := int32(16)
if schedulerOff == true {
maxInIndex = 1
}
if args.MaxInIndex != 0 {
if args.MaxInIndex > 1 && args.MaxInIndex%2 != 0 {
return common.WrapWithNFError(nil, "MaxInIndex should be 1 or even", common.Fail)
}
maxInIndex = args.MaxInIndex
}
NoPacketHeadChange := false
if args.NoPacketHeadChange == true {
NoPacketHeadChange = true
}
needChainedReassembly := false
if args.ChainedReassembly == true {
needChainedReassembly = true
}
needChainedJumbo := false
if args.ChainedJumbo == true {
needChainedJumbo = true
}
needMemoryJumbo := false
if args.MemoryJumbo == true {
needMemoryJumbo = true
}
argc, argv := low.InitDPDKArguments(args.DPDKArgs)
// We want to add new clone if input ring is approximately 80% full
maxPacketsToClone := uint32(sizeMultiplier * burstSize / 5 * 4)
// TODO all low level initialization here! Now everything is default.
// Init eal
common.LogTitle(common.Initialization, "------------***-------- Initializing DPDK --------***------------")
if err := low.InitDPDK(argc, argv, burstSize, mbufNumber, mbufCacheSize, needKNI,
NoPacketHeadChange, needChainedReassembly, needChainedJumbo, needMemoryJumbo); err != nil {
return err
}
// Init Ports
createdPorts = make([]port, low.GetPortsNumber(), low.GetPortsNumber())
for i := range createdPorts {
createdPorts[i].port = uint16(i)
if maxInIndex > low.CheckPortRSS(createdPorts[i].port) {
createdPorts[i].InIndex = low.CheckPortRSS(createdPorts[i].port)
} else {
createdPorts[i].InIndex = maxInIndex
}
}
portPair = make(map[types.IPv4Address](*port))
ioDevices = make(map[string]interface{})
// Init scheduler
common.LogTitle(common.Initialization, "------------***------ Initializing scheduler -----***------------")
StopRing := low.CreateRings(burstSize*sizeMultiplier, maxInIndex /* Maximum possible rings */)
common.LogDebug(common.Initialization, "Scheduler can use cores:", cpus)
schedState = newScheduler(cpus, schedulerOff, schedulerOffRemove, stopDedicatedCore, StopRing, checkTime, debugTime, maxPacketsToClone, maxRecv, unrestrictedClones)
// Set HW offloading flag in packet package
packet.SetHWTXChecksumFlag(hwtxchecksum)
// Initialize telemetry web server
if countersEnabledInFramework {
if args.StatsHTTPAddress != nil {
if err = initCounters(args.StatsHTTPAddress); err != nil {
return err
}
}
}
// Init packet processing
for i := 0; i < 10; i++ {
for j := 0; j < burstSize; j++ {
vEach[i][j] = uint8(i)
}
}
setSIGINTHandler = !args.NoSetSIGINTHandler
return nil
}
// SystemInitPortsAndMemory performs all initialization necessary to
// create and send new packets before scheduler may be started.
func SystemInitPortsAndMemory() error {
if openFlowsNumber != 0 {
return common.WrapWithNFError(nil, "Some flows are left open at the end of configuration!", common.OpenedFlowAtTheEnd)
}
common.LogTitle(common.Initialization, "------------***---------- Creating ports ---------***------------")
for i := range createdPorts {
if createdPorts[i].wasRequested {
if err := low.CreatePort(createdPorts[i].port, createdPorts[i].willReceive,
true, hwtxchecksum, hwrxpacketstimestamp, createdPorts[i].InIndex, tXQueuesNumberPerPort); err != nil {
return err
}
}
createdPorts[i].MAC = GetPortMACAddress(createdPorts[i].port)
common.LogDebug(common.Initialization, "Port", createdPorts[i].port, "MAC address:", createdPorts[i].MAC.String())
}
common.LogTitle(common.Initialization, "------------***------ Starting FlowFunctions -----***------------")
// Init low performance mempool
packet.SetNonPerfMempool(low.CreateMempool("slow operations"))
return nil
}
// SystemStartScheduler starts scheduler packet processing. Function
// does not return.
func SystemStartScheduler() error {
if err := schedState.systemStart(); err != nil {
return common.WrapWithNFError(err, "scheduler start failed", common.Fail)
}
common.LogTitle(common.Initialization, "------------***---------- NFF-GO Started ---------***------------")
if setSIGINTHandler {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
go func() {
schedState.schedule(schedTime)
}()
<-signalChan
common.LogTitle(common.Debug, "Received an interrupt, stopping everything")
SystemStop()
} else {
schedState.schedule(schedTime)
}
return nil
}
// SystemStart starts system - begin packet receiving and packet sending.
// This functions should be always called after flow graph construction.
// Function can panic during execution.
func SystemStart() error {
err := SystemInitPortsAndMemory()
if err != nil {
return err
}
err = SystemStartScheduler()
if err != nil {
return err
}
return nil
}
// SystemStop stops the system. All Flow functions plus resource releasing
// Doesn't cleanup DPDK
func SystemStop() error {
// TODO we should release rings here
schedState.systemStop()
for i := range createdPorts {
if createdPorts[i].wasRequested {
low.StopPort(createdPorts[i].port)
createdPorts[i].wasRequested = false
createdPorts[i].willReceive = false
createdPorts[i].sendRings = nil
}
if createdPorts[i].willKNI {
err := low.FreeKNI(createdPorts[i].port)
if err != nil {
return err
}
schedState.setCoreByIndex(createdPorts[i].KNICoreIndex)
createdPorts[i].willKNI = false
}
}
low.FreeMempools()
return nil
}
// SystemReset stops whole framework plus cleanup DPDK
// TODO DPDK cleanup is now incomplete at DPDK side
// It is n't able to re-init after it and also is
// under deprecated pragma. Need to fix after DPDK changes.
func SystemReset() {
SystemStop()
low.StopDPDK()
}
// SetSenderFile adds write function to flow graph.
// Gets flow which packets will be written to file and
// target file name.
func SetSenderFile(IN *Flow, filename string) error {
if err := checkFlow(IN); err != nil {
return err
}
addWriter(filename, finishFlow(IN), IN.inIndexNumber)
return nil
}
// SetReceiverFile adds read function to flow graph.
// Gets name of pcap formatted file and number of reads. If repcount = -1,
// file is read infinitely in circle.
// Returns new opened flow with read packets.
func SetReceiverFile(filename string, repcount int32) (OUT *Flow) {
rings := low.CreateRings(burstSize*sizeMultiplier, 1)
addReader(filename, rings, repcount)
return newFlow(rings, 1)
}
// SetReceiver adds receive function to flow graph.
// Gets port number from which packets will be received.
// Receive queue will be added to port automatically.
// Returns new opened flow with received packets
func SetReceiver(portId uint16) (OUT *Flow, err error) {
if portId >= uint16(len(createdPorts)) {
return nil, common.WrapWithNFError(nil, "Requested receive port exceeds number of ports which can be used by DPDK (bind to DPDK).", common.ReqTooManyPorts)
}
if createdPorts[portId].willReceive {
return nil, common.WrapWithNFError(nil, "Requested receive port was already set to receive. Two receives from one port are prohibited.", common.MultipleReceivePort)
}
createdPorts[portId].wasRequested = true
createdPorts[portId].willReceive = true
rings := low.CreateRings(burstSize*sizeMultiplier, createdPorts[portId].InIndex)
addReceiver(portId, rings, createdPorts[portId].InIndex)
return newFlow(rings, createdPorts[portId].InIndex), nil
}
// SetReceiverOS adds function receive from Linux interface to flow graph.
// Gets name of device, will return error if can't initialize socket.
// Creates RAW socket, returns new opened flow with received packets.
func SetReceiverOS(device string) (*Flow, error) {
v, ok := ioDevices[device]
var socketID int
if !ok {
socketID = low.InitDevice(device)
if socketID == -1 {
return nil, common.WrapWithNFError(nil, "Can't initialize socket", common.BadSocket)
}
ioDevices[device] = socketID
} else {
socketID = v.(int)
}
rings := low.CreateRings(burstSize*sizeMultiplier, 1)
addOSReceiver(socketID, rings)
return newFlow(rings, 1), nil
}
// SetSenderOS adds function send from flow graph to Linux interface.
// Gets name of device, will return error if can't initialize socket.
// Creates RAW socket, sends packets, closes input flow.
func SetSenderOS(IN *Flow, device string) error {
if err := checkFlow(IN); err != nil {
return err
}
v, ok := ioDevices[device]
var socketID int
if !ok {
socketID = low.InitDevice(device)
if socketID == -1 {
return common.WrapWithNFError(nil, "Can't initialize socket", common.BadSocket)
}
ioDevices[device] = socketID
} else {
socketID = v.(int)
}
addSenderOS(socketID, finishFlow(IN), IN.inIndexNumber)
return nil
}
// SetReceiverXDP adds function receive from Linux AF_XDP to flow graph.
// Gets name of device and queue number, will return error if can't initialize socket.
// Creates AF_XDP socket, returns new opened flow with received packets.
func SetReceiverXDP(device string, queue int) (*Flow, error) {
_, ok := ioDevices[device]
if ok {
return nil, common.WrapWithNFError(nil, "Device shouldn't have any sockets before AF_XDP. AF_XDP Send and Receive for one device in forbidden now", common.BadSocket)
}
socketID := low.InitXDP(device, queue)
if socketID == nil {
return nil, common.WrapWithNFError(nil, "Can't initialize AF_XDP socket", common.BadSocket)
}
ioDevices[device] = socketID
rings := low.CreateRings(burstSize*sizeMultiplier, 1)
addXDPReceiver(socketID, rings)
return newFlow(rings, 1), nil
}
// SetSenderXDP adds function send from flow graph to Linux AF_XDP interface.
// Gets name of device, will return error if can't initialize socket.
// Creates RAW socket, sends packets, closes input flow.
func SetSenderXDP(IN *Flow, device string) error {
if err := checkFlow(IN); err != nil {
return err
}
_, ok := ioDevices[device]
if ok {
return common.WrapWithNFError(nil, "Device shouldn't have any sockets before AF_XDP. AF_XDP Send and Receive for one device in forbidden now", common.BadSocket)
}
socketID := low.InitXDP(device, 0)
if socketID == nil {
return common.WrapWithNFError(nil, "Can't initialize AF_XDP socket", common.BadSocket)
}
ioDevices[device] = socketID
addSenderXDP(socketID, finishFlow(IN), IN.inIndexNumber)
return nil
}
// SetReceiverKNI adds function receive from KNI to flow graph.
// Gets KNI device from which packets will be received.
// Receive queue will be added to port automatically.
// Returns new opened flow with received packets
func SetReceiverKNI(kni *Kni) (OUT *Flow) {
rings := low.CreateRings(burstSize*sizeMultiplier, 1)
addKNI(kni.portId, true, rings, false, nil, 1, "receiver KNI", false)
return newFlow(rings, 1)
}
// SetSenderReceiverKNI adds function send/receive from KNI.
// Gets KNI device from which packets will be received and flow to send.
// Returns new opened flow with received packets
// If linuxCore parameter is true function will use core that was assigned
// to KNI device in Linux. So all send/receive/device can use one core
func SetSenderReceiverKNI(IN *Flow, kni *Kni, linuxCore bool) (OUT *Flow, err error) {
if err := checkFlow(IN); err != nil {
return nil, err
}