Skip to content

Commit

Permalink
Don't add paths to the cache until they are committed.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendonj committed Mar 27, 2019
1 parent f567e6c commit 4de1469
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
7 changes: 7 additions & 0 deletions libnntsc/parsers/amp.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ def process_data(self, channel, method, properties, body):
self.pending = []
return

# some parsers need to update internal caches after confirming
# that data has been committed successfully
# TODO limit this to parsers that recently processed data
for parsers in self.parsers.values():
for parser in parsers:
parser.post_commit()

# ack all data up to and including the most recent message
channel.basic_ack(method.delivery_tag, True)

Expand Down
16 changes: 13 additions & 3 deletions libnntsc/parsers/amp_traceroute.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self, db):

self.paths = {}
self.aspaths = {}
self.pending_paths = {}
self.pending_aspaths = {}

self.ipdatacolumns = [
{"name":"path_id", "type":"integer", "null":False},
Expand Down Expand Up @@ -355,7 +357,7 @@ def insert_ippath(self, stream, ts, result):
return

result['path_id'] = pathid
self.paths[keystr] = (pathid, int(time.time()))
self.pending_paths[keystr] = (pathid, int(time.time()))

if result['aspath'] != None:
keystr = "%s" % (stream)
Expand All @@ -370,7 +372,7 @@ def insert_ippath(self, stream, ts, result):
return

result['aspath_id'] = pathid
self.aspaths[keystr] = (pathid, int(time.time()))
self.pending_aspaths[keystr] = (pathid, int(time.time()))

# XXX Could almost just call parent insert_data here, except for the
# line where we have to add an entry for "path" before exporting live
Expand Down Expand Up @@ -430,7 +432,7 @@ def _update_as_stream(self, observed, streamid, datapoint):
return

aspath_id = pathid
self.aspaths[keystr] = (pathid, int(time.time()))
self.pending_aspaths[keystr] = (pathid, int(time.time()))

if aspath_id not in observed[streamid]['paths']:
observed[streamid]['paths'][aspath_id] = { \
Expand Down Expand Up @@ -516,6 +518,14 @@ def process_data(self, timestamp, data, source):
self.nextpathflush = (now - (now % PATH_FLUSH_FREQ)) + \
PATH_FLUSH_FREQ

def post_commit(self):
# all pending paths are confirmed to have been committed, move them
# into the main cache
self.paths.update(self.pending_paths)
self.aspaths.update(self.pending_aspaths)
self.pending_paths.clear()
self.pending_aspaths.clear()

def _flush_unused_paths(self, now):
toremove = []
for k, v in self.aspaths.iteritems():
Expand Down
3 changes: 3 additions & 0 deletions libnntsc/parsers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ def register(self):
logger.log("Error was: %s" % (str(e)))
raise

def post_commit(self):
pass

def _get_collection_id(self):
if self.collectionid is None:
try:
Expand Down

0 comments on commit 4de1469

Please sign in to comment.