Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

293 294 #2

Merged
merged 5 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/inputs/sflow_a10/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ The way that the plugin works is that it parses incoming counter records from A1

# XML file containing counter definitions, according to A10 specification
a10_definitions_file = "/path/to/xml_file.xml"

# if true, metrics with zero values will not be sent to the output
# this is to lighten the load on the metrics database backend
ignore_zero_values = true
```

### Metrics
Expand Down
48 changes: 45 additions & 3 deletions plugins/inputs/sflow_a10/metricencoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sflow_a10

import (
"fmt"
"strconv"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -23,21 +24,62 @@ func makeMetricsForCounters(p *V5Format, d *PacketDecoder) ([]telegraf.Metric, e
continue
}

// this is for packets tagged 293 and 294
// as per A10, each packet that contains counter block tagged 293 or 294 is just a single sample
if !sample.SampleCounterData.NeedsIpAndPort() {
if len(sample.SampleCounterData.CounterRecords) != 1 {
d.Log.Error(" SampleCounterData.CounterRecords with false NeedsIpPort has length != 1")
continue
}

counterRecord := sample.SampleCounterData.CounterRecords[0]
if counterRecord.CounterData == nil {
d.debug(fmt.Sprintf(" nil CounterData tag is %x for sourceID %x", counterRecord.CounterFormat&4095, sample.SampleCounterData.SourceID))
continue
}
counterFields := counterRecord.CounterData.GetFields()
counterTags := map[string]string{"agent_address": p.AgentAddress.String()}

// hardcoded stuff for tag 294
// tag 294 contains Ethernet counters *and* interface index/speed/type
// we need to add the latter as tags
if counterRecord.IsEthernetCounters {
counterTags["ifindex"] = strconv.FormatUint(counterFields["ifindex"].(uint64), 10)
delete(counterFields, "ifindex")
delete(counterFields, "ifspeed")
delete(counterFields, "iftype")
d.debug(fmt.Sprintf(" Ethernet counters, %v, %v", counterTags, counterFields))
}

if len(counterFields) > 0 {
m, err := metric.New("sflow_a10", counterTags, counterFields, now)
if err != nil {
d.debug(fmt.Sprintf(" error sending new metric to telegraf %s", err))
return nil, err
}

d.debug(fmt.Sprintf(" sending 293 or 294 metric to telegraf %s", m))
metrics = append(metrics, m)
}

return metrics, nil
}

key := createMapKey(sample.SampleCounterData.SourceID, p.AgentAddress.String())

ipValue, ipExists := d.IPMap.Get(key)
portValue, portExists := d.PortMap.Get(key)

if !ipExists || !portExists {
d.debug(fmt.Sprintf(" sourceID %x and key %v does not exist in DimensionsPerSourceIDMap", sample.SampleCounterData.SourceID, key))
d.debug(fmt.Sprintf(" sourceID %x and key %v does not exist in IPMap or PortMap", sample.SampleCounterData.SourceID, key))
continue
}

ipDimensions := ipValue.([]IPDimension)
portDimensions := portValue.(*PortDimension)

if err := validate(ipDimensions, portDimensions); err != nil {
//d.debug(fmt.Sprintf(" error in DimensionsPerSourceIDMap.Validate, error is %s, map value is %v whereas counter source ID is %x and key is %v", err, dimensions, sample.SampleCounterData.SourceID, key))
//d.debug(fmt.Sprintf(" error in Validate, error is %s, map value is %v whereas counter source ID is %x and key is %v", err, dimensions, sample.SampleCounterData.SourceID, key))
continue
}

Expand Down Expand Up @@ -86,7 +128,7 @@ func appendCommonTags(p *V5Format, counterDefinedTags map[string]string) error {
return nil
}

// validate returns true if all fields of the DimensionsPerSourceID struct are valid
// validate returns true if IP and Port Dimensions are valid
func validate(ipDimensions []IPDimension, portDimensions *PortDimension) error {
if portDimensions == nil {
return fmt.Errorf("PortDimension is nil")
Expand Down
52 changes: 24 additions & 28 deletions plugins/inputs/sflow_a10/packetdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ import (
)

type PacketDecoder struct {
onPacket func(p *V5Format)
Log telegraf.Logger
CounterBlocks map[uint32]CounterBlock
onPacket func(p *V5Format)
Log telegraf.Logger
CounterBlocks map[uint32]CounterBlock
IgnoreZeroValues bool

IPMap *hm.HashMap
PortMap *hm.HashMap
}

func NewDecoder() *PacketDecoder {
return &PacketDecoder{
IPMap: &hm.HashMap{},
PortMap: &hm.HashMap{},
CounterBlocks: make(map[uint32]CounterBlock),
IPMap: &hm.HashMap{},
PortMap: &hm.HashMap{},
CounterBlocks: make(map[uint32]CounterBlock),
IgnoreZeroValues: true,
}
}

Expand Down Expand Up @@ -181,61 +183,48 @@ func (d *PacketDecoder) decodeCounterRecords(r io.Reader, sourceID uint32, agent

mr := binaryio.MinReader(r, int64(counterDataLen))

tag := cr.CounterFormat & 0xFFF // the least significant 12 bits, sflow_version_5.txt line: 1410
tagCF := cr.CounterFormat & 0xFFF // the least significant 12 bits, sflow_version_5.txt line: 1410
tag := uint32(tagCF)

key := createMapKey(sourceID, agentAddress)

if uint32(tag) == 260 { // hex 104 - contains port information
if tag == 260 { // hex 104 - contains port information
portDimensions, err := d.decode260(r)
if err != nil {
return recs, err
}

key := createMapKey(sourceID, agentAddress)
_, ok := d.PortMap.Get(key)

if !ok {
d.PortMap.Set(key, portDimensions)
}

// d.debug(fmt.Sprintf(" got 260 - before assigning portdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
// if val.PortDimensions == nil {
// val.PortDimensions = portDimensions
// }
// d.debug(fmt.Sprintf(" got 260 - assigning portdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
continue
} else if uint32(tag) == 271 { // hex 10F - contains IPv4 information
} else if tag == 271 { // hex 10F - contains IPv4 information
ipDimensions, err := d.decode271(r)
if err != nil {
return recs, err
}

key := createMapKey(sourceID, agentAddress)
_, ok := d.IPMap.Get(key)
if !ok {
d.IPMap.Set(key, ipDimensions)
}

// d.debug(fmt.Sprintf(" got 271 - before assigning ipdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
// if val.IPDimensions == nil {
// val.IPDimensions = ipDimensions // TODO: append in a set instead of overwriting
// }
// d.debug(fmt.Sprintf(" got 271 - assigning ipdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
continue
} else if uint32(tag) == 272 { // hex 110 - contains IPv6 information
} else if tag == 272 { // hex 110 - contains IPv6 information
ipDimensions, err := d.decode272(r)
if err != nil {
return recs, err
}

key := createMapKey(sourceID, agentAddress)
_, ok := d.IPMap.Get(key)
if !ok {
d.IPMap.Set(key, ipDimensions)
}

// d.debug(fmt.Sprintf(" got 272 - before assigning portdimensions for sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
// if val.IPDimensions == nil {
// val.IPDimensions = ipDimensions
// }
// d.debug(fmt.Sprintf(" got 272 - assigning portdimensions fo2r sourceID %x and agentAddress %v, now it's %v", sourceID, agentAddress, val))
continue
}

Expand All @@ -245,6 +234,13 @@ func (d *PacketDecoder) decodeCounterRecords(r io.Reader, sourceID uint32, agent
continue
}

// as per A10, each packet of either counter block 293 or 294 is one sample of 293 or 294
// plus, we are not getting any IP and PORT information
cr.NeedsIpAndPort = tag != 293 && tag != 294

cr.IsEthernetCounters = tag == 294

d.debug(fmt.Sprintf(" tag %x for sourceID %x needs ip and and port: %t", tag, sourceID, cr.NeedsIpAndPort))
d.debug(fmt.Sprintf(" tag %x for sourceID %x found on xml file list. Gonna decode counter record", tag, sourceID))

err := d.decodeCounterRecord(mr, cr, uint32(tag), sourceID)
Expand Down Expand Up @@ -305,7 +301,7 @@ func (d *PacketDecoder) decodeCounterRecord(r io.Reader, cr *CounterRecord, tag
continue
}

if counterValue != uint64(0) { // no point in retugotrning 0 value for metric
if counterValue != uint64(0) || (counterValue == uint64(0) && !d.IgnoreZeroValues) {
//d.debug(fmt.Sprintf(" getting non-zero counter %s with value hex %x %#v %T for sourceID %x", counter.FieldName, counterValue, counterValue, counterValue, sourceID))
cr.CounterData.CounterFields[counter.FieldName] = counterValue
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/inputs/sflow_a10/packetdecoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestDecodeCounterSample(t *testing.T) {
SequenceNumber: uint32(5),
SourceID: uint32(278808),
CounterRecords: []CounterRecord{
CounterRecord{
{
CounterFormat: CounterFormatType(217),
CounterData: &CounterData{
CounterFields: map[string]interface{}{
Expand All @@ -100,6 +100,7 @@ func TestDecodeCounterSample(t *testing.T) {
"testCounter2": uint64(29),
},
},
NeedsIpAndPort: true,
},
},
},
Expand Down
108 changes: 107 additions & 1 deletion plugins/inputs/sflow_a10/sflow_3_2_t2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8334,7 +8334,6 @@
<ctr:fieldName>Max Health check time(ms)</ctr:fieldName>
</ctr:counter>
</ctr:counterBlock>

<ctr:counterBlock>
<ctr:mapVersion>v2</ctr:mapVersion>
<ctr:tag>601</ctr:tag>
Expand Down Expand Up @@ -8362,4 +8361,111 @@
<ctr:fieldName>ZAPR Event Log</ctr:fieldName>
</ctr:counter>
</ctr:counterBlock>
<ctr:counterBlock>
<ctr:mapVersion>v2</ctr:mapVersion>
<ctr:tag>293</ctr:tag>
<ctr:ctrBlkType>Fixed</ctr:ctrBlkType>
<ctr:ctrBlkSz>6</ctr:ctrBlkSz>
<ctr:counter>
<ctr:offset>0</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>bw_limit_drop</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>1</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>bw_limit_ignored</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>2</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>egr_pps_limit_drop</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>3</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ing_pps_limit_drop</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>4</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>pps_limit_ignored</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>5</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>license_expire_drop</ctr:fieldName>
</ctr:counter>
</ctr:counterBlock>
<ctr:counterBlock>
<ctr:mapVersion>v2</ctr:mapVersion>
<ctr:tag>294</ctr:tag>
<ctr:ctrBlkType>Fixed</ctr:ctrBlkType>
<ctr:ctrBlkSz>13</ctr:ctrBlkSz>
<ctr:counter>
<ctr:offset>0</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ifIndex</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>1</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ifType</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>2</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifSpeed</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>3</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ifDirection</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>4</ctr:offset>
<ctr:dtype>u32</ctr:dtype>
<ctr:fieldName>ifStatus</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>5</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifcrcerr</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>6</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifframeerr</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>7</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifrunts</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>8</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifgianterr</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>9</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifInTcpPkts</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>10</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifInUdpPkts</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>11</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifOutTcpPkts</ctr:fieldName>
</ctr:counter>
<ctr:counter>
<ctr:offset>12</ctr:offset>
<ctr:dtype>u64</ctr:dtype>
<ctr:fieldName>ifOutUdpPkts</ctr:fieldName>
</ctr:counter>
</ctr:counterBlock>
</ctr:allctrblocks>
7 changes: 7 additions & 0 deletions plugins/inputs/sflow_a10/sflow_a10.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const sampleConfig = `

# XML file containing counter definitions, according to A10 specification
a10_definitions_file = "/path/to/xml_file.xml"

# if true, metrics with zero values will not be sent to the output
# this is to lighten the load on the metrics database backend
ignore_zero_values = true
`

const (
Expand All @@ -45,6 +49,7 @@ type SFlow_A10 struct {
ServiceAddress string `toml:"service_address"`
ReadBufferSize internal.Size `toml:"read_buffer_size"`
A10DefinitionsFile string `toml:"a10_definitions_file"`
IgnoreZeroValues bool `toml:"ignore_zero_values"`

sync.Mutex

Expand Down Expand Up @@ -115,6 +120,7 @@ func (s *SFlow_A10) initInternal(xmlData []byte) error {
return err
}
s.decoder.CounterBlocks = counterBlocks
s.decoder.IgnoreZeroValues = s.IgnoreZeroValues

return nil
}
Expand Down Expand Up @@ -158,6 +164,7 @@ func (s *SFlow_A10) Start(acc telegraf.Accumulator) error {
return err
}

// start the UDP server
conn, err := listenUDP(u.Scheme, u.Host)
if err != nil {
return err
Expand Down
Loading