Skip to content

Commit

Permalink
chg: [objs processed] xxhash messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Terrtia committed Sep 7, 2023
1 parent b459498 commit bb3dad2
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
36 changes: 18 additions & 18 deletions bin/lib/ail_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import datetime
import time

from hashlib import sha256
import xxhash

sys.path.append(os.environ['AIL_BIN'])
##################################
Expand Down Expand Up @@ -80,12 +80,12 @@ def get_message(self):
# raise Exception(f'Error: queue {self.name}, no AIL object provided')
else:
obj_global_id, mess = row_mess
sha256_mess = sha256(message.encode()).hexdigest()
add_processed_obj(obj_global_id, sha256_mess, module=self.name)
return obj_global_id, sha256_mess, mess
m_hash = xxhash.xxh3_64_hexdigest(message)
add_processed_obj(obj_global_id, m_hash, module=self.name)
return obj_global_id, m_hash, mess

def end_message(self, obj_global_id, sha256_mess):
end_processed_obj(obj_global_id, sha256_mess, module=self.name)
def end_message(self, obj_global_id, m_hash):
end_processed_obj(obj_global_id, m_hash, module=self.name)

def send_message(self, obj_global_id, message='', queue_name=None):
if not self.subscribers_modules:
Expand All @@ -100,14 +100,14 @@ def send_message(self, obj_global_id, message='', queue_name=None):

message = f'{obj_global_id};{message}'
if obj_global_id != '::':
sha256_mess = sha256(message.encode()).hexdigest()
m_hash = xxhash.xxh3_64_hexdigest(message)
else:
sha256_mess = None
m_hash = None

# Add message to all modules
for module_name in self.subscribers_modules[queue_name]:
if sha256_mess:
add_processed_obj(obj_global_id, sha256_mess, queue=module_name)
if m_hash:
add_processed_obj(obj_global_id, m_hash, queue=module_name)

r_queues.rpush(f'queue:{module_name}:in', message)
# stats
Expand Down Expand Up @@ -192,23 +192,23 @@ def get_processed_obj_queues(obj_global_id):
def get_processed_obj(obj_global_id):
return {'modules': get_processed_obj_modules(obj_global_id), 'queues': get_processed_obj_queues(obj_global_id)}

def add_processed_obj(obj_global_id, sha256_mess, module=None, queue=None):
def add_processed_obj(obj_global_id, m_hash, module=None, queue=None):
obj_type = obj_global_id.split(':', 1)[0]
new_obj = r_obj_process.sadd(f'objs:process', obj_global_id)
# first process:
if new_obj:
r_obj_process.zadd(f'objs:process:{obj_type}', {obj_global_id: int(time.time())})
if queue:
r_obj_process.zadd(f'obj:queues:{obj_global_id}', {f'{queue}:{sha256_mess}': int(time.time())})
r_obj_process.zadd(f'obj:queues:{obj_global_id}', {f'{queue}:{m_hash}': int(time.time())})
if module:
r_obj_process.zadd(f'obj:modules:{obj_global_id}', {f'{module}:{sha256_mess}': int(time.time())})
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{module}:{sha256_mess}')
r_obj_process.zadd(f'obj:modules:{obj_global_id}', {f'{module}:{m_hash}': int(time.time())})
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{module}:{m_hash}')

def end_processed_obj(obj_global_id, sha256_mess, module=None, queue=None):
def end_processed_obj(obj_global_id, m_hash, module=None, queue=None):
if queue:
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{sha256_mess}')
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{m_hash}')
if module:
r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{sha256_mess}')
r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{m_hash}')

# TODO HANDLE QUEUE DELETE
# process completed
Expand Down Expand Up @@ -322,7 +322,7 @@ def save_queue_digraph():
if __name__ == '__main__':
# clear_modules_queues_stats()
# save_queue_digraph()
oobj_global_id = 'item::submitted/2023/06/22/submitted_f656119e-f2ea-49d7-9beb-fb97077f8fe5.gz'
oobj_global_id = 'item::submitted/2023/09/06/submitted_75fb9ff2-8c91-409d-8bd6-31769d73db8f.gz'
while True:
print(get_processed_obj(oobj_global_id))
time.sleep(0.5)
5 changes: 5 additions & 0 deletions configs/core.cfg.sample
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ host = localhost
port = 6381
db = 0

[Redis_Process]
host = localhost
port = 6381
db = 2

[Redis_Mixer_Cache]
host = localhost
port = 6381
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ websockets>9.0
crcmod
mmh3>2.5
ssdeep>3.3
xxhash>3.1.0

# ZMQ
zmq
Expand Down

0 comments on commit bb3dad2

Please sign in to comment.