Skip to content

Commit

Permalink
Merge pull request #2 from dgkanatsios/293_294
Browse files Browse the repository at this point in the history
293 294
  • Loading branch information
dgkanatsios authored Jun 27, 2022
2 parents 1f766d7 + 1685d17 commit 45e025f
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 62 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/sflow_a10/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ The way that the plugin works is that it parses incoming counter records from A1

# XML file containing counter definitions, according to A10 specification
a10_definitions_file = "/path/to/xml_file.xml"

# if true, metrics with zero values will not be sent to the output
# this is to lighten the load on the metrics database backend
ignore_zero_values = true
```

### Metrics
Expand Down
48 changes: 45 additions & 3 deletions plugins/inputs/sflow_a10/metricencoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sflow_a10

import (
"fmt"
"strconv"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -23,21 +24,62 @@ func makeMetricsForCounters(p *V5Format, d *PacketDecoder) ([]telegraf.Metric, e
continue
}

// this is for packets tagged 293 and 294
// as per A10, each packet that contains counter block tagged 293 or 294 is just a single sample
if !sample.SampleCounterData.NeedsIpAndPort() {
if len(sample.SampleCounterData.CounterRecords) != 1 {
d.Log.Error(" SampleCounterData.CounterRecords with false NeedsIpPort has length != 1")
continue
}

counterRecord := sample.SampleCounterData.CounterRecords[0]
if counterRecord.CounterData == nil {
d.debug(fmt.Sprintf(" nil CounterData tag is %x for sourceID %x", counterRecord.CounterFormat&4095, sample.SampleCounterData.SourceID))
continue
}
counterFields := counterRecord.CounterData.GetFields()
counterTags := map[string]string{"agent_address": p.AgentAddress.String()}

// hardcoded stuff for tag 294
// tag 294 contains Ethernet counters *and* interface index/speed/type
// we need to add the latter as tags
if counterRecord.IsEthernetCounters {
counterTags["ifindex"] = strconv.FormatUint(counterFields["ifindex"].(uint64), 10)
delete(counterFields, "ifindex")
delete(counterFields, "ifspeed")
delete(counterFields, "iftype")
d.debug(fmt.Sprintf(" Ethernet counters, %v, %v", counterTags, counterFields))
}

if len(counterFields) > 0 {
m, err := metric.New("sflow_a10", counterTags, counterFields, now)
if err != nil {
d.debug(fmt.Sprintf(" error sending new metric to telegraf %s", err))
return nil, err
}

d.debug(fmt.Sprintf(" sending 293 or 294 metric to telegraf %s", m))
metrics = append(metrics, m)
}

return metrics, nil
}

key := createMapKey(sample.SampleCounterData.SourceID, p.AgentAddress.String())

ipValue, ipExists := d.IPMap.Get(key)
portValue, portExists := d.PortMap.Get(key)

if !ipExists || !portExists {
d.debug(fmt.Sprintf(" sourceID %x and key %v does not exist in DimensionsPerSourceIDMap", sample.SampleCounterData.SourceID, key))
d.debug(fmt.Sprintf(" sourceID %x and key %v does not exist in IPMap or PortMap", sample.SampleCounterData.SourceID, key))
continue
}

ipDimensions := ipValue.([]IPDimension)
portDimensions := portValue.(*PortDimension)

if err := validate(ipDimensions, portDimensions); err != nil {
//d.debug(fmt.Sprintf(" error in DimensionsPerSourceIDMap.Validate, error is %s, map value is %v whereas counter source ID is %x and key is %v", err, dimensions, sample.SampleCounterData.SourceID, key))
//d.debug(fmt.Sprintf(" error in Validate, error is %s, map value is %v whereas counter source ID is %x and key is %v", err, dimensions, sample.SampleCounterData.SourceID, key))
continue
}

Expand Down Expand Up @@ -86,7 +128,7 @@ func appendCommonTags(p *V5Format, counterDefinedTags map[string]string) error {
return nil
}

// validate returns true if all fields of the DimensionsPerSourceID struct are valid
// validate returns true if IP and Port Dimensions are valid
func validate(ipDimensions []IPDimension, portDimensions *PortDimension) error {
if portDimensions == nil {
return fmt.Errorf("PortDimension is nil")
Expand Down
52 changes: 24 additions & 28 deletions plugins/inputs/sflow_a10/packetdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ import (
)

type PacketDecoder struct {
onPacket func(p *V5Format)
Log telegraf.Logger
CounterBlocks map[uint32]CounterBlock
onPacket func(p *V5Format)
Log telegraf.Logger
CounterBlocks map[uint32]CounterBlock
IgnoreZeroValues bool

IPMap *hm.HashMap
PortMap *hm.HashMap
}

func NewDecoder() *PacketDecoder {
return &PacketDecoder{
IPMap: &hm.HashMap{},
PortMap: &hm.HashMap{},
CounterBlocks: make(map[uint32]CounterBlock),
IPMap: &hm.HashMap{},
PortMap: &hm.HashMap{},
CounterBlocks: make(map[uint32]CounterBlock),
IgnoreZeroValues: true,
}
}

Expand Down Expand Up @@ -181,61 +183,48 @@ func (d *PacketDecoder) decodeCounterRecords(r io.Reader, sourceID uint32, agent

mr := binaryio.MinReader(r, int64(counterDataLen))

tag := cr.CounterFormat & 0xFFF // the least significant 12 bits, sflow_version_5.txt line: 1410
tagCF := cr.CounterFormat & 0xFFF // the least significant 12 bits, sflow_version_5.txt line: 1410
tag := uint32(tagCF)

key := createMapKey(sourceID, agentAddress)

if uint32(tag) == 260 { // hex 104 - contains port information
if tag == 260 { // hex 104 - contains port information
portDimensions, err := d.decode260(r)
if err != nil {
return recs, err
}

key := createMapKey(sourceID, agentAddress)
_, ok := d.PortMap.Get(key)

if !ok {
d.PortMap.Set(key, portDimensions)
}

// d.debug(fmt.Sprintf(" got 260 - before assigning portdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
// if val.PortDimensions == nil {
// val.PortDimensions = portDimensions
// }
// d.debug(fmt.Sprintf(" got 260 - assigning portdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
continue
} else if uint32(tag) == 271 { // hex 10F - contains IPv4 information
} else if tag == 271 { // hex 10F - contains IPv4 information
ipDimensions, err := d.decode271(r)
if err != nil {
return recs, err
}

key := createMapKey(sourceID, agentAddress)
_, ok := d.IPMap.Get(key)
if !ok {
d.IPMap.Set(key, ipDimensions)
}

// d.debug(fmt.Sprintf(" got 271 - before assigning ipdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
// if val.IPDimensions == nil {
// val.IPDimensions = ipDimensions // TODO: append in a set instead of overwriting
// }
// d.debug(fmt.Sprintf(" got 271 - assigning ipdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
continue
} else if uint32(tag) == 272 { // hex 110 - contains IPv6 information
} else if tag == 272 { // hex 110 - contains IPv6 information
ipDimensions, err := d.decode272(r)
if err != nil {
return recs, err
}

key := createMapKey(sourceID, agentAddress)
_, ok := d.IPMap.Get(key)
if !ok {
d.IPMap.Set(key, ipDimensions)
}

// d.debug(fmt.Sprintf(" got 272 - before assigning portdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
// if val.IPDimensions == nil {
// val.IPDimensions = ipDimensions
// }
// d.debug(fmt.Sprintf(" got 272 - assigning portdimensions fo2r sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
continue
}

Expand All @@ -245,6 +234,13 @@ func (d *PacketDecoder) decodeCounterRecords(r io.Reader, sourceID uint32, agent
continue
}

// as per A10, each packet of either counter block 293 or 294 is one sample of 293 or 294
// plus, we are not getting any IP and PORT information
cr.NeedsIpAndPort = tag != 293 && tag != 294

cr.IsEthernetCounters = tag == 294

d.debug(fmt.Sprintf(" tag %x for sourceID %x needs ip and and port: %t", tag, sourceID, cr.NeedsIpAndPort))
d.debug(fmt.Sprintf(" tag %x for sourceID %x found on xml file list. Gonna decode counter record", tag, sourceID))

err := d.decodeCounterRecord(mr, cr, uint32(tag), sourceID)
Expand Down Expand Up @@ -305,7 +301,7 @@ func (d *PacketDecoder) decodeCounterRecord(r io.Reader, cr *CounterRecord, tag
continue
}

if counterValue != uint64(0) { // no point in retugotrning 0 value for metric
if counterValue != uint64(0) || (counterValue == uint64(0) && !d.IgnoreZeroValues) {
//d.debug(fmt.Sprintf(" getting non-zero counter %s with value hex %x %#v %T for sourceID %x", counter.FieldName, counterValue, counterValue, counterValue, sourceID))
cr.CounterData.CounterFields[counter.FieldName] = counterValue
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/sflow_a10/packetdecoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestDecodeCounterSample(t *testing.T) {
SequenceNumber: uint32(5),
SourceID: uint32(278808),
CounterRecords: []CounterRecord{
CounterRecord{
{
CounterFormat: CounterFormatType(217),
CounterData: &CounterData{
CounterFields: map[string]interface{}{
Expand All @@ -100,6 +100,7 @@ func TestDecodeCounterSample(t *testing.T) {
"testCounter2": uint64(29),
},
},
NeedsIpAndPort: true,
},
},
},
Expand Down
108 changes: 107 additions & 1 deletion plugins/inputs/sflow_a10/sflow_3_2_t2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8334,7 +8334,6 @@
<ctr:fieldName>Max Health check time(ms)</ctr:fieldName>
</ctr:counter>
</ctr:counterBlock>

<ctr:counterBlock>
<ctr:mapVersion>v2</ctr:mapVersion>
<ctr:tag>601</ctr:tag>
Expand Down Expand Up @@ -8362,4 +8361,111 @@
<ctr:fieldName>ZAPR Event Log</ctr:fieldName>
</ctr:counter>
</ctr:counterBlock>
<ctr:counterBlock>
<ctr:mapVersion>v2</ctr:mapVersion>
<ctr:tag>293</ctr:tag>
<ctr:ctrBlkType>Fixed</ctr:ctrBlkType>
<ctr:ctrBlkSz>6</ctr:ctrBlkSz>
<ctr:counter>
<ctr:offset>0</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>bw_limit_drop</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>1</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>bw_limit_ignored</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>2</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>egr_pps_limit_drop</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>3</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ing_pps_limit_drop</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>4</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>pps_limit_ignored</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>5</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>license_expire_drop</ctr:fieldName>
</ctr:counter>
</ctr:counterBlock>
<ctr:counterBlock>
<ctr:mapVersion>v2</ctr:mapVersion>
<ctr:tag>294</ctr:tag>
<ctr:ctrBlkType>Fixed</ctr:ctrBlkType>
<ctr:ctrBlkSz>13</ctr:ctrBlkSz>
<ctr:counter>
<ctr:offset>0</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ifIndex</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>1</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ifType</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>2</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifSpeed</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>3</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ifDirection</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>4</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ifStatus</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>5</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifcrcerr</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>6</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifframeerr</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>7</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifrunts</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>8</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifgianterr</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>9</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifInTcpPkts</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>10</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifInUdpPkts</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>11</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifOutTcpPkts</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>12</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifOutUdpPkts</ctr:fieldName>
</ctr:counter>
</ctr:counterBlock>
</ctr:allctrblocks>
7 changes: 7 additions & 0 deletions plugins/inputs/sflow_a10/sflow_a10.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const sampleConfig = `
# XML file containing counter definitions, according to A10 specification
a10_definitions_file = "/path/to/xml_file.xml"
# if true, metrics with zero values will not be sent to the output
# this is to lighten the load on the metrics database backend
ignore_zero_values = true
`

const (
Expand All @@ -45,6 +49,7 @@ type SFlow_A10 struct {
ServiceAddress string `toml:"service_address"`
ReadBufferSize internal.Size `toml:"read_buffer_size"`
A10DefinitionsFile string `toml:"a10_definitions_file"`
IgnoreZeroValues bool `toml:"ignore_zero_values"`

sync.Mutex

Expand Down Expand Up @@ -115,6 +120,7 @@ func (s *SFlow_A10) initInternal(xmlData []byte) error {
return err
}
s.decoder.CounterBlocks = counterBlocks
s.decoder.IgnoreZeroValues = s.IgnoreZeroValues

return nil
}
Expand Down Expand Up @@ -158,6 +164,7 @@ func (s *SFlow_A10) Start(acc telegraf.Accumulator) error {
return err
}

// start the UDP server
conn, err := listenUDP(u.Scheme, u.Host)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 45e025f

Please sign in to comment.