diff --git a/libnntsc/parsers/amp.py b/libnntsc/parsers/amp.py index 097b77e..afb36aa 100644 --- a/libnntsc/parsers/amp.py +++ b/libnntsc/parsers/amp.py @@ -259,6 +259,13 @@ def process_data(self, channel, method, properties, body): self.pending = [] return + # some parsers need to update internal caches after confirming + # that data has been committed successfully + # TODO limit this to parsers that recently processed data + for parsers in self.parsers.values(): + for parser in parsers: + parser.post_commit() + # ack all data up to and including the most recent message channel.basic_ack(method.delivery_tag, True) diff --git a/libnntsc/parsers/amp_traceroute.py b/libnntsc/parsers/amp_traceroute.py index 8760d4c..b948d7d 100644 --- a/libnntsc/parsers/amp_traceroute.py +++ b/libnntsc/parsers/amp_traceroute.py @@ -50,6 +50,8 @@ def __init__(self, db): self.paths = {} self.aspaths = {} + self.pending_paths = {} + self.pending_aspaths = {} self.ipdatacolumns = [ {"name":"path_id", "type":"integer", "null":False}, @@ -355,7 +357,7 @@ def insert_ippath(self, stream, ts, result): return result['path_id'] = pathid - self.paths[keystr] = (pathid, int(time.time())) + self.pending_paths[keystr] = (pathid, int(time.time())) if result['aspath'] != None: keystr = "%s" % (stream) @@ -370,7 +372,7 @@ def insert_ippath(self, stream, ts, result): return result['aspath_id'] = pathid - self.aspaths[keystr] = (pathid, int(time.time())) + self.pending_aspaths[keystr] = (pathid, int(time.time())) # XXX Could almost just call parent insert_data here, except for the # line where we have to add an entry for "path" before exporting live @@ -430,7 +432,7 @@ def _update_as_stream(self, observed, streamid, datapoint): return aspath_id = pathid - self.aspaths[keystr] = (pathid, int(time.time())) + self.pending_aspaths[keystr] = (pathid, int(time.time())) if aspath_id not in observed[streamid]['paths']: observed[streamid]['paths'][aspath_id] = { \ @@ -516,6 +518,14 @@ def process_data(self, timestamp, data, source): self.nextpathflush = (now - (now % PATH_FLUSH_FREQ)) + \ PATH_FLUSH_FREQ + def post_commit(self): + # all pending paths are confirmed to have been committed, move them + # into the main cache + self.paths.update(self.pending_paths) + self.aspaths.update(self.pending_aspaths) + self.pending_paths.clear() + self.pending_aspaths.clear() + def _flush_unused_paths(self, now): toremove = [] for k, v in self.aspaths.iteritems(): diff --git a/libnntsc/parsers/common.py b/libnntsc/parsers/common.py index 49a2e01..a27bff1 100644 --- a/libnntsc/parsers/common.py +++ b/libnntsc/parsers/common.py @@ -154,6 +154,9 @@ def register(self): logger.log("Error was: %s" % (str(e))) raise + def post_commit(self): + pass + def _get_collection_id(self): if self.collectionid is None: try: