Skip to content

Commit

Permalink
Merge pull request #9 from ties/feature/track-long-term-diffs
Browse files Browse the repository at this point in the history
Track objects that differ for a longer period separately
  • Loading branch information
job authored Jul 21, 2021
2 parents f856087 + 71a7648 commit 8547db1
Showing 1 changed file with 104 additions and 12 deletions.
116 changes: 104 additions & 12 deletions cmd/rtrmon/rtrmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"net/url"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -34,6 +36,8 @@ const (
METHOD_KEY
)

type thresholds []int64

var (
version = ""
buildinfos = ""
Expand Down Expand Up @@ -90,6 +94,13 @@ var (
},
[]string{"server", "url", "type"},
)
VRPDifferenceForDuration = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "vrp_diff",
Help: "Number of VRPS in [lhs_url] that are not in [rhs_url] that were first seen [visibility_seconds] ago in lhs.",
},
[]string{"lhs_url", "rhs_url", "visibility_seconds"},
)
RTRState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rtr_state",
Expand Down Expand Up @@ -124,14 +135,50 @@ var (
1: "primary",
2: "secondary",
}

visibilityThresholds = thresholds{0, 56, 256, 596, 851, 1024, 1706, 3411}
)

func init() {
prometheus.MustRegister(VRPCount)
prometheus.MustRegister(VRPDifferenceForDuration)
prometheus.MustRegister(RTRState)
prometheus.MustRegister(RTRSerial)
prometheus.MustRegister(RTRSession)
prometheus.MustRegister(LastUpdate)

flag.Var(&visibilityThresholds, "visibility.thresholds", "comma-separated list of visibility thresholds to override the default")
}

// String formats an array of thresholds as a comma separated string.
func (t *thresholds) String() string {
res := []byte("")
for idx, tr := range *t {
res = strconv.AppendInt(res, tr, 10)
if idx < len(*t)-1 {
res = append(res, ","...)
}
}
return string(res)
}

func (t *thresholds) Set(value string) error {
// Setting overrides current values
if len(*t) > 0 {
*t = thresholds{}
}

for _, tr := range strings.Split(value, ",") {
threshold, err := strconv.ParseInt(tr, 10, 32)

if err != nil {
return err
}

*t = append(*t, threshold)
}

return nil
}

func decodeJSON(data []byte) (*prefixfile.VRPList, error) {
Expand Down Expand Up @@ -279,6 +326,7 @@ func (c *Client) Start(id int, ch chan int) {
}

c.lastUpdate = time.Now().UTC()
tCurrentUpdate := time.Now().UTC().Unix()

tmpVrpMap := make(map[string]*VRPJsonSimple)
for _, vrp := range decoded.Data {
Expand All @@ -296,10 +344,17 @@ func (c *Client) Start(id int, ch chan int) {
maxlen := vrp.GetMaxLen()
key := fmt.Sprintf("%s-%d-%d", prefix.String(), maxlen, asn)

firstSeen := tCurrentUpdate
currentEntry, ok := c.vrps[key]
if ok {
firstSeen = currentEntry.FirstSeen
}

vrpSimple := VRPJsonSimple{
Prefix: prefix.String(),
ASN: asn,
Length: uint8(maxlen),
Prefix: prefix.String(),
ASN: asn,
Length: uint8(maxlen),
FirstSeen: firstSeen,
}
tmpVrpMap[key] = &vrpSimple
}
Expand All @@ -320,9 +375,10 @@ func (c *Client) HandlePDU(cs *rtr.ClientSession, pdu rtr.PDU) {
switch pdu := pdu.(type) {
case *rtr.PDUIPv4Prefix:
vrp := VRPJsonSimple{
Prefix: pdu.Prefix.String(),
ASN: pdu.ASN,
Length: pdu.MaxLen,
Prefix: pdu.Prefix.String(),
ASN: pdu.ASN,
Length: pdu.MaxLen,
FirstSeen: time.Now().Unix(),
}

key := fmt.Sprintf("%s-%d-%d", pdu.Prefix.String(), pdu.MaxLen, pdu.ASN)
Expand All @@ -337,9 +393,10 @@ func (c *Client) HandlePDU(cs *rtr.ClientSession, pdu rtr.PDU) {
c.compRtrLock.Unlock()
case *rtr.PDUIPv6Prefix:
vrp := VRPJsonSimple{
Prefix: pdu.Prefix.String(),
ASN: pdu.ASN,
Length: pdu.MaxLen,
Prefix: pdu.Prefix.String(),
ASN: pdu.ASN,
Length: pdu.MaxLen,
FirstSeen: time.Now().Unix(),
}

key := fmt.Sprintf("%s-%d-%d", pdu.Prefix.String(), pdu.MaxLen, pdu.ASN)
Expand Down Expand Up @@ -486,6 +543,18 @@ func NewComparator(c1, c2 *Client) *Comparator {
}
}

func countFirstSeenOnOrBefore(vrps []*VRPJsonSimple, thresholdTimestamp int64) float64 {
count := 0

for _, vrp := range vrps {
if vrp.FirstSeen <= thresholdTimestamp {
count++
}
}

return float64(count)
}

func Diff(a, b map[string]*VRPJsonSimple) []*VRPJsonSimple {
onlyInA := make([]*VRPJsonSimple, 0)
for key, vrp := range a {
Expand All @@ -509,9 +578,10 @@ type diffMetadata struct {
}

type VRPJsonSimple struct {
ASN uint32 `json:"asn"`
Length uint8 `json:"max-length"`
Prefix string `json:"prefix"`
ASN uint32 `json:"asn"`
Length uint8 `json:"max-length"`
Prefix string `json:"prefix"`
FirstSeen int64 `json:"first-seen"`
}

type diffExport struct {
Expand Down Expand Up @@ -546,6 +616,7 @@ func (c *Comparator) ServeDiff(wr http.ResponseWriter, req *http.Request) {
func (c *Comparator) Compare() {
var donePrimary, doneSecondary bool
var stop bool
startedAt := time.Now().Unix()
for !stop {
select {
case <-c.q:
Expand Down Expand Up @@ -595,6 +666,27 @@ func (c *Comparator) Compare() {
"type": "diff",
}).Set(float64(len(onlyIn2)))

for _, visibleFor := range visibilityThresholds {
thresholdTimestamp := time.Now().Unix() - visibleFor
// Prevent differences with value 0 appearing if the process has not
// been running long enough for them to exist.
if thresholdTimestamp >= startedAt {
VRPDifferenceForDuration.With(
prometheus.Labels{
"lhs_url": md1.URL,
"rhs_url": md2.URL,
"visibility_seconds": strconv.FormatInt(visibleFor, 10),
}).Set(countFirstSeenOnOrBefore(onlyIn1, thresholdTimestamp))

VRPDifferenceForDuration.With(
prometheus.Labels{
"lhs_url": md2.URL,
"rhs_url": md1.URL,
"visibility_seconds": strconv.FormatInt(visibleFor, 10),
}).Set(countFirstSeenOnOrBefore(onlyIn2, thresholdTimestamp))
}
}

RTRSerial.With(
prometheus.Labels{
"server": "primary",
Expand Down

0 comments on commit 8547db1

Please sign in to comment.