Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grace period for diff #70

Merged
merged 4 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ vet:
test:
go test -v github.com/bgp/stayrtr/lib
go test -v github.com/bgp/stayrtr/prefixfile
go test -v github.com/bgp/stayrtr/cmd/rtrmon
go test -v github.com/bgp/stayrtr/cmd/stayrtr

.PHONY: prepare
prepare:
Expand Down
120 changes: 78 additions & 42 deletions cmd/rtrmon/rtrmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (

UserAgent = flag.String("useragent", fmt.Sprintf("StayRTR-%v (+https://github.com/bgp/stayrtr)", AppVersion), "User-Agent header")
DisableConditionalRequests = flag.Bool("disable.conditional.requests", false, "Disable conditional requests (using If-None-Match/If-Modified-Since headers)")
GracePeriod = flag.Duration("grace.period", time.Minute*20, "Grace period during which objects removed from a source are not considered for the diff")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just skimming really and no expert in Golang either.
Is there a way to set this GracePeriod via configuration file?


PrimaryHost = flag.String("primary.host", "tcp://rtr.rpki.cloudflare.com:8282", "primary server")
PrimaryValidateCert = flag.Bool("primary.tls.validate", true, "Validate TLS")
Expand Down Expand Up @@ -106,6 +107,13 @@ var (
},
[]string{"lhs_url", "rhs_url", "visibility_seconds"},
)
VRPInGracePeriod = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rpki_grace_period_vrps",
Help: "Number of unique VRPS in grace period by url.",
},
[]string{"url"},
)
RTRState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "rtr_state",
Expand Down Expand Up @@ -147,6 +155,7 @@ var (
func init() {
prometheus.MustRegister(VRPCount)
prometheus.MustRegister(VRPDifferenceForDuration)
prometheus.MustRegister(VRPInGracePeriod)
prometheus.MustRegister(RTRState)
prometheus.MustRegister(RTRSerial)
prometheus.MustRegister(RTRSession)
Expand Down Expand Up @@ -222,9 +231,9 @@ type Client struct {
lastUpdate time.Time

compLock *sync.RWMutex
vrps map[string]*VRPJsonSimple
vrps VRPMap
compRtrLock *sync.RWMutex
vrpsRtr map[string]*VRPJsonSimple
vrpsRtr VRPMap

unlock chan bool
ch chan int
Expand All @@ -238,9 +247,9 @@ type Client struct {
func NewClient() *Client {
return &Client{
compLock: &sync.RWMutex{},
vrps: make(map[string]*VRPJsonSimple),
vrps: make(VRPMap),
compRtrLock: &sync.RWMutex{},
vrpsRtr: make(map[string]*VRPJsonSimple),
vrpsRtr: make(VRPMap),
}
}

Expand Down Expand Up @@ -285,7 +294,7 @@ func (c *Client) Start(id int, ch chan int) {
serverKeyHash := ssh.FingerprintSHA256(key)
if c.ValidateSSH {
if serverKeyHash != fmt.Sprintf("SHA256:%v", c.SSHServerKey) {
return errors.New(fmt.Sprintf("Server key hash %v is different than expected key hash SHA256:%v", serverKeyHash, c.SSHServerKey))
return fmt.Errorf("server key hash %v is different than expected key hash SHA256:%v", serverKeyHash, c.SSHServerKey)
}
}
log.Infof("%d: Connected to server %v via ssh. Fingerprint: %v", id, remote.String(), serverKeyHash)
Expand Down Expand Up @@ -334,42 +343,13 @@ func (c *Client) Start(id int, ch chan int) {
continue
}

c.lastUpdate = time.Now().UTC()
tCurrentUpdate := time.Now().UTC().Unix()

tmpVrpMap := make(map[string]*VRPJsonSimple)
for _, vrp := range decoded.Data {
asn, err := vrp.GetASN2()
if err != nil {
log.Errorf("%d: exploration error for %v asn: %v", id, vrp, err)
continue
}
prefix, err := vrp.GetPrefix2()
if err != nil {
log.Errorf("%d: exploration error for %v prefix: %v", id, vrp, err)
continue
}

maxlen := vrp.GetMaxLen()
key := fmt.Sprintf("%s-%d-%d", prefix.String(), maxlen, asn)

firstSeen := tCurrentUpdate
currentEntry, ok := c.vrps[key]
if ok {
firstSeen = currentEntry.FirstSeen
}
tCurrentUpdate := time.Now().UTC()
updatedVrpMap, inGracePeriod := BuildNewVrpMap(log.WithField("client", c.id), c.vrps, decoded.Data, tCurrentUpdate)
VRPInGracePeriod.With(prometheus.Labels{"url": c.Path}).Set(float64(inGracePeriod))

vrpSimple := VRPJsonSimple{
Prefix: prefix.String(),
ASN: asn,
Length: uint8(maxlen),
FirstSeen: firstSeen,
}
tmpVrpMap[key] = &vrpSimple
}
c.compLock.Lock()
c.vrps = tmpVrpMap
c.lastUpdate = time.Now().UTC()
c.vrps = updatedVrpMap
c.lastUpdate = tCurrentUpdate
c.compLock.Unlock()
if ch != nil {
ch <- id
Expand All @@ -380,6 +360,60 @@ func (c *Client) Start(id int, ch chan int) {

}

// Build the new vrpMap
// The result:
// * contains all the VRPs in newVRPs
// * keeps the firstSeen value for VRPs already in the old map
// * keeps elements around for GracePeriod after they are not in the input.
func BuildNewVrpMap(log *log.Entry, currentVrps VRPMap, newVrps []prefixfile.VRPJson, now time.Time) (VRPMap, int) {
tCurrentUpdate := now.Unix()
res := make(VRPMap)

for _, vrp := range newVrps {
asn, err := vrp.GetASN2()
if err != nil {
log.Errorf("exploration error for %v asn: %v", vrp, err)
continue
}
prefix, err := vrp.GetPrefix2()
if err != nil {
log.Errorf("exploration error for %v prefix: %v", vrp, err)
continue
}

maxlen := vrp.GetMaxLen()
key := fmt.Sprintf("%s-%d-%d", prefix.String(), maxlen, asn)

firstSeen := tCurrentUpdate
currentEntry, ok := currentVrps[key]
if ok {
firstSeen = currentEntry.FirstSeen
}

res[key] = &VRPJsonSimple{
Prefix: prefix.String(),
ASN: asn,
Length: uint8(maxlen),
FirstSeen: firstSeen,
LastSeen: tCurrentUpdate,
}
}

// Copy objects that are within the grace period to the new map
gracePeriodEnds := tCurrentUpdate - int64(GracePeriod.Seconds())
inGracePeriod := 0
for k, entry := range currentVrps {
if _, ok := res[k]; !ok { // no longer present
if (*entry).LastSeen >= gracePeriodEnds {
res[k] = entry
inGracePeriod++
}
}
}

return res, inGracePeriod
}

func (c *Client) HandlePDU(cs *rtr.ClientSession, pdu rtr.PDU) {
switch pdu := pdu.(type) {
case *rtr.PDUIPv4Prefix:
Expand Down Expand Up @@ -423,7 +457,7 @@ func (c *Client) HandlePDU(cs *rtr.ClientSession, pdu rtr.PDU) {

c.compRtrLock.Lock()
c.serial = pdu.SerialNumber
tmpVrpMap := make(map[string]*VRPJsonSimple, len(c.vrpsRtr))
tmpVrpMap := make(VRPMap, len(c.vrpsRtr))
for key, vrp := range c.vrpsRtr {
tmpVrpMap[key] = vrp
}
Expand Down Expand Up @@ -504,7 +538,7 @@ func (c *Client) continuousRTR(cs *rtr.ClientSession) {
}
}

func (c *Client) GetData() (map[string]*VRPJsonSimple, *diffMetadata) {
func (c *Client) GetData() (VRPMap, *diffMetadata) {
c.compLock.RLock()
defer c.compLock.RUnlock()
vrps := c.vrps
Expand Down Expand Up @@ -563,7 +597,7 @@ func countFirstSeenOnOrBefore(vrps []*VRPJsonSimple, thresholdTimestamp int64) f
return float64(count)
}

func Diff(a, b map[string]*VRPJsonSimple) []*VRPJsonSimple {
func Diff(a, b VRPMap) []*VRPJsonSimple {
onlyInA := make([]*VRPJsonSimple, 0)
for key, vrp := range a {
if _, ok := b[key]; !ok {
Expand All @@ -590,7 +624,9 @@ type VRPJsonSimple struct {
Length uint8 `json:"max-length"`
Prefix string `json:"prefix"`
FirstSeen int64 `json:"first-seen"`
LastSeen int64 `json:"last-seen"`
}
type VRPMap map[string]*VRPJsonSimple

type diffExport struct {
MetadataPrimary *diffMetadata `json:"metadata-primary"`
Expand Down
144 changes: 144 additions & 0 deletions cmd/rtrmon/rtrmon_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package main

import (
"testing"
"time"

log "github.com/sirupsen/logrus"

"github.com/bgp/stayrtr/prefixfile"
)

func TestBuildNewVrpMap_expiry(t *testing.T) {
stuff := testData()
now := time.Now()
log := log.WithField("client", "TestBuildNewVrpMap_expiry")

res, inGracePeriod := BuildNewVrpMap(log, make(VRPMap), stuff, now)
if inGracePeriod != 0 {
t.Errorf("Initial build does not have objects in grace period")
}

_, inGracePeriodPreserved := BuildNewVrpMap(log, res, []prefixfile.VRPJson{}, now.Add(time.Minute*10))
if inGracePeriodPreserved != len(res) {
t.Errorf("All objects are in grace period")
}

// Objects are kept in grace period
// 1s before grace period ends
t1 := now.Add(*GracePeriod).Add(-time.Second * 1)
res, inGracePeriod = BuildNewVrpMap(log, res, []prefixfile.VRPJson{}, t1)

assertLastSeenMatchesTimeCount(t, res, t1, 0)
assertLastSeenMatchesTimeCount(t, res, now, len(stuff))
if inGracePeriod != len(stuff) {
t.Errorf("All objects should be in grace period. Expected: %d, actual: %d", len(stuff), inGracePeriod)
}

// 1s after grace period ends, they are removed
res, inGracePeriod = BuildNewVrpMap(log, res, []prefixfile.VRPJson{}, now.Add(*GracePeriod).Add(time.Second*1))
if len(res) != 0 {
t.Errorf("Expected no objects to be left after grace period, actual: %d", len(res))
}
if inGracePeriod != 0 {
t.Errorf("Expected 0 objects in grace period, actual: %d", inGracePeriod)
}
}

func TestBuildNewVrpMap_firsSeen_lastSeen(t *testing.T) {
t0 := time.Now()
log := log.WithField("client", "TestBuildNewVrpMap_firstSeen_lastSeen")
stuff := testData()

var res, _ = BuildNewVrpMap(log, make(VRPMap), stuff, t0)

// All have firstSeen + lastSeen equal to t0
assertFirstSeenMatchesTimeCount(t, res, t0, len(stuff))
assertLastSeenMatchesTimeCount(t, res, t0, len(stuff))

// Supply same data again later
t1 := t0.Add(time.Minute * 10)
res, _ = BuildNewVrpMap(log, res, stuff, t1)

// FirstSeen is constant, LastSeen gets updated
assertFirstSeenMatchesTimeCount(t, res, t0, len(stuff))
assertLastSeenMatchesTimeCount(t, res, t1, len(stuff))

// Supply one new VRP, expect one at new time, others at old time
otherStuff := []prefixfile.VRPJson{
{
Prefix: "2001:DB8::/32",
Length: 48,
ASN: 65536,
TA: "testrir",
},
}
t2 := t1.Add(time.Minute * 10)
res, _ = BuildNewVrpMap(log, res, otherStuff, t2)

// LastSeen gets updated just for the new item
assertFirstSeenMatchesTimeCount(t, res, t0, len(stuff))
assertLastSeenMatchesTimeCount(t, res, t1, len(stuff))

assertFirstSeenMatchesTimeCount(t, res, t2, len(otherStuff))
assertLastSeenMatchesTimeCount(t, res, t2, len(otherStuff))
}

func assertFirstSeenMatchesTimeCount(t *testing.T, vrps VRPMap, pit time.Time, expected int) {
actual := countMatches(vrps, func(vrp *VRPJsonSimple) bool { return vrp.FirstSeen == pit.Unix() })
if actual != expected {
t.Errorf("Expected %d VRPs to have FirstSeen of %v, actual: %d", expected, pit, actual)
}
}

func assertLastSeenMatchesTimeCount(t *testing.T, vrps VRPMap, pit time.Time, expected int) {
actual := countMatches(vrps, func(vrp *VRPJsonSimple) bool { return vrp.LastSeen == pit.Unix() })
if actual != expected {
t.Errorf("Expected %d VRPs to have LastSeen of %v, actual: %d", expected, pit, actual)
}
}

type extractor func(object *VRPJsonSimple) bool

func countMatches(vrps VRPMap, e extractor) int {
matches := 0
for _, entry := range vrps {
if e(entry) {
matches++
}
}

return matches
}

func testData() []prefixfile.VRPJson {
var stuff []prefixfile.VRPJson
stuff = append(stuff,
prefixfile.VRPJson{
Prefix: "192.168.0.0/24",
Length: 24,
ASN: 65537,
TA: "testrir",
},
prefixfile.VRPJson{
Prefix: "192.168.0.0/24",
Length: 24,
ASN: 65536,
TA: "testrir",
},
prefixfile.VRPJson{
Prefix: "2001:db8::/32",
Length: 33,
ASN: "AS64496",
TA: "testrir",
},
prefixfile.VRPJson{
Prefix: "192.168.1.0/24",
Length: 25,
ASN: 64497,
TA: "testrir",
},
)

return stuff
}