Skip to content

Commit

Permalink
chg: [importers obj_type] importers queues: add feeder source + objec…
Browse files Browse the repository at this point in the history
…t global ID
  • Loading branch information
Terrtia committed Oct 5, 2023
1 parent daf9f6f commit eae57fb
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 40 deletions.
4 changes: 2 additions & 2 deletions bin/crawlers/Crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ def save_capture_response(self, parent_id, entries):
print(item_id)
gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html'])
# send item to Global
relay_message = f'crawler {item_id} {gzip64encoded}'
relay_message = f'crawler item::{item_id} {gzip64encoded}'
self.add_message_to_queue(relay_message, 'Importers')

# Tag
msg = f'infoleak:submission="crawler";{item_id}'
msg = f'infoleak:submission="crawler";{item_id}' # TODO FIXME
self.add_message_to_queue(msg, 'Tags')

crawlers.create_item_metadata(item_id, last_url, parent_id)
Expand Down
10 changes: 5 additions & 5 deletions bin/importer/FeederImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ def importer(self, json_data):
feeder_name = feeder.get_name()
print(f'importing: {feeder_name} feeder')

item_id = feeder.get_item_id() # TODO replace me with object global id
obj = feeder.get_obj() # TODO replace by a list of objects to import ????
# process meta
if feeder.get_json_meta():
feeder.process_meta()

if feeder_name == 'telegram':
return item_id # TODO support UI dashboard
else:
if obj.type == 'item': # object save on disk as file (Items)
gzip64_content = feeder.get_gzip64_content()
return f'{feeder_name} {item_id} {gzip64_content}'
return f'{feeder_name} {obj.get_global_id()} {gzip64_content}'
else: # Messages save on DB
return f'{feeder_name} {obj.get_global_id()}'


class FeederModuleImporter(AbstractModule):
Expand Down
5 changes: 3 additions & 2 deletions bin/importer/FileImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from importer.abstract_importer import AbstractImporter
# from modules.abstract_module import AbstractModule
from lib import ail_logger
from lib.ail_queues import AILQueue
# from lib.ail_queues import AILQueue
from lib import ail_files # TODO RENAME ME

logging.config.dictConfig(ail_logger.get_config(name='modules'))
Expand All @@ -41,9 +41,10 @@ def importer(self, path):
gzipped = False
if mimetype == 'application/gzip':
gzipped = True
elif not ail_files.is_text(mimetype):
elif not ail_files.is_text(mimetype): # # # #
return None

# TODO handle multiple objects
message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import')
if message:
self.add_message_to_queue(message=message)
Expand Down
1 change: 1 addition & 0 deletions bin/importer/PystemonImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def importer(self):
else:
gzipped = False

# TODO handle multiple objects
return self.create_message(item_id, content, gzipped=gzipped, source='pystemon')

except IOError as e:
Expand Down
18 changes: 15 additions & 3 deletions bin/importer/ZMQImporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,15 @@ def __init__(self):
super().__init__()

config_loader = ConfigLoader()
self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name")

addresses = config_loader.get_config_str('ZMQ_Global', 'address')
addresses = addresses.split(',')
channel = config_loader.get_config_str('ZMQ_Global', 'channel')
self.zmq_importer = ZMQImporters()
for address in addresses:
self.zmq_importer.add(address.strip(), channel)

# TODO MESSAGE SOURCE - UI
def get_message(self):
for message in self.zmq_importer.importer():
# remove channel from message
Expand All @@ -72,8 +73,19 @@ def get_message(self):
def compute(self, messages):
for message in messages:
message = message.decode()
print(message.split(' ', 1)[0])
self.add_message_to_queue(message=message)

obj_id, gzip64encoded = message.split(' ', 1) # TODO ADD LOGS
splitted = obj_id.split('>>', 1)
if splitted == 2:
feeder_name, obj_id = splitted
else:
feeder_name = self.default_feeder_name

# f'{source} item::{obj_id} {content}'
relay_message = f'{feeder_name} item::{obj_id} {gzip64encoded}'

print(f'feeder_name item::{obj_id}')
self.add_message_to_queue(message=relay_message)


if __name__ == '__main__':
Expand Down
4 changes: 3 additions & 1 deletion bin/importer/abstract_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,7 @@ def create_message(self, obj_id, content, b64=False, gzipped=False, source=None)
source = self.name
self.logger.info(f'{source} {obj_id}')
# self.logger.debug(f'{source} {obj_id} {content}')
return f'{source} {obj_id} {content}'

# TODO handle multiple objects
return f'{source} item::{obj_id} {content}'

22 changes: 16 additions & 6 deletions bin/importer/feeders/Default.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@
"""
import os
import datetime
import sys
import uuid

sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib.objects import ail_objects

class DefaultFeeder:
"""Default Feeder"""

def __init__(self, json_data):
self.json_data = json_data
self.item_id = None
self.obj = None
self.name = None

def get_name(self):
Expand Down Expand Up @@ -52,14 +59,17 @@ def get_gzip64_content(self):
return self.json_data.get('data')

## OVERWRITE ME ##
def get_item_id(self):
def get_obj(self):
"""
Return item id. define item id
Return obj global id. define obj global id
Default == item object
"""
date = datetime.date.today().strftime("%Y/%m/%d")
item_id = os.path.join(self.get_name(), date, str(uuid.uuid4()))
self.item_id = f'{item_id}.gz'
return self.item_id
obj_id = os.path.join(self.get_name(), date, str(uuid.uuid4()))
obj_id = f'{obj_id}.gz'
obj_id = f'item::{obj_id}'
self.obj = ail_objects.get_obj_from_global_id(obj_id)
return self.obj

## OVERWRITE ME ##
def process_meta(self):
Expand Down
14 changes: 8 additions & 6 deletions bin/importer/feeders/Telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
##################################
from importer.feeders.Default import DefaultFeeder
from lib.ConfigLoader import ConfigLoader
from lib.objects import ail_objects
from lib.objects.Chats import Chat
from lib.objects import Messages
from lib.objects import UsersAccount
Expand All @@ -25,6 +26,7 @@
import base64
import io
import gzip

def gunzip_bytes_obj(bytes_obj):
gunzipped_bytes_obj = None
try:
Expand All @@ -45,8 +47,7 @@ def __init__(self, json_data):
super().__init__(json_data)
self.name = 'telegram'

# define item id
def get_item_id(self): # TODO rename self.item_id
def get_obj(self): # TODO handle others objects -> images, pdf, ...
# Get message date
timestamp = self.json_data['meta']['date']['timestamp'] # TODO CREATE DEFAULT TIMESTAMP
# if self.json_data['meta'].get('date'):
Expand All @@ -56,8 +57,10 @@ def get_item_id(self): # TODO rename self.item_id
# date = datetime.date.today().strftime("%Y/%m/%d")
chat_id = str(self.json_data['meta']['chat']['id'])
message_id = str(self.json_data['meta']['id'])
self.item_id = Messages.create_obj_id('telegram', chat_id, message_id, timestamp) # TODO rename self.item_id
return self.item_id
obj_id = Messages.create_obj_id('telegram', chat_id, message_id, timestamp)
obj_id = f'message:telegram:{obj_id}'
self.obj = ail_objects.get_obj_from_global_id(obj_id)
return self.obj

def process_meta(self):
"""
Expand All @@ -81,7 +84,7 @@ def process_meta(self):
translation = None
decoded = base64.standard_b64decode(self.json_data['data'])
content = gunzip_bytes_obj(decoded)
message = Messages.create(self.item_id, content, translation=translation)
message = Messages.create(self.obj.id, content, translation=translation)

if meta.get('chat'):
chat = Chat(meta['chat']['id'], 'telegram')
Expand Down Expand Up @@ -131,5 +134,4 @@ def process_meta(self):
# TODO reply threads ????
# message edit ????


return None
25 changes: 11 additions & 14 deletions bin/modules/Mixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,19 @@ def computeNone(self):
def compute(self, message):
self.refresh_stats()
splitted = message.split()
# Old Feeder name "feeder>>item_id gzip64encoded"
if len(splitted) == 2:
item_id, gzip64encoded = splitted
try:
feeder_name, item_id = item_id.split('>>')
feeder_name.replace(" ", "")
if 'import_dir' in feeder_name:
feeder_name = feeder_name.split('/')[1]
except ValueError:
feeder_name = self.default_feeder_name
# Feeder name in message: "feeder item_id gzip64encoded"
elif len(splitted) == 3:
feeder_name, item_id, gzip64encoded = splitted
# message -> # feeder_name - object - content
# or # message -> # feeder_name - object

# feeder_name - object
if len(splitted) == 2: # feeder_name - object (content already saved)
feeder_name, obj_id = splitted

# Feeder name in message: "feeder obj_id gzip64encoded"
elif len(splitted) == 3: # gzip64encoded content
feeder_name, obj_id, gzip64encoded = splitted
else:
print('Invalid message: not processed')
self.logger.debug(f'Invalid Item: {splitted[0]} not processed')
self.logger.debug(f'Invalid Item: {splitted[0]} not processed') # TODO
return None

# remove absolute path
Expand Down
2 changes: 1 addition & 1 deletion bin/modules/SubmitPaste.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def create_paste(self, uuid, paste_content, ltags, ltagsgalaxies, name, source=N
self.redis_logger.debug(f"relative path {rel_item_path}")

# send paste to Global module
relay_message = f"submitted {rel_item_path} {gzip64encoded}"
relay_message = f"submitted item::{rel_item_path} {gzip64encoded}"
self.add_message_to_queue(message=relay_message)

# add tags
Expand Down

0 comments on commit eae57fb

Please sign in to comment.