Skip to content

Commit

Permalink
Removed multiprocessing.Queue in favor of zeroMQ #893 (#2657)
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Jun 28, 2019
1 parent f96f7e0 commit 1fd8f2a
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 433 deletions.
1 change: 0 additions & 1 deletion plaso/cli/extraction_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def __init__(self, input_reader=None, output_writer=None):
self._storage_format = definitions.STORAGE_FORMAT_SQLITE
self._temporary_directory = None
self._text_prepend = None
self._use_zeromq = True
self._yara_rules_string = None

def _CreateProcessingConfiguration(self, knowledge_base):
Expand Down
1 change: 0 additions & 1 deletion plaso/cli/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from plaso.cli.helpers import xlsx_output
from plaso.cli.helpers import yara_rules
from plaso.cli.helpers import workers
from plaso.cli.helpers import zeromq

# These modules do not register CLI helpers, but contain super classes used by
# CLI helpers in other modules.
Expand Down
56 changes: 0 additions & 56 deletions plaso/cli/helpers/zeromq.py

This file was deleted.

3 changes: 1 addition & 2 deletions plaso/cli/log2timeline_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,7 @@ def ExtractEventsFromSources(self):
if single_process_mode:
extraction_engine = single_process_engine.SingleProcessEngine()
else:
extraction_engine = multi_process_engine.TaskMultiProcessEngine(
use_zeromq=self._use_zeromq)
extraction_engine = multi_process_engine.TaskMultiProcessEngine()

# If the source is a directory or a storage media image
# run pre-processing.
Expand Down
7 changes: 2 additions & 5 deletions plaso/cli/psort_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def __init__(self, input_reader=None, output_writer=None):
self._temporary_directory = None
self._time_slice = None
self._use_time_slicer = False
self._use_zeromq = True
self._worker_memory_limit = None

self.list_analysis_plugins = False
Expand Down Expand Up @@ -531,8 +530,7 @@ def ProcessStorage(self):
session, self._storage_file_path))

# TODO: add single processing support.
analysis_engine = psort.PsortMultiProcessEngine(
use_zeromq=self._use_zeromq)
analysis_engine = psort.PsortMultiProcessEngine()

analysis_engine.AnalyzeEvents(
self._knowledge_base, storage_writer, self._data_location,
Expand All @@ -552,8 +550,7 @@ def ProcessStorage(self):
self._storage_file_path))

# TODO: add single processing support.
analysis_engine = psort.PsortMultiProcessEngine(
use_zeromq=self._use_zeromq)
analysis_engine = psort.PsortMultiProcessEngine()

analysis_engine.ExportEvents(
self._knowledge_base, storage_reader, self._output_module,
Expand Down
6 changes: 2 additions & 4 deletions plaso/cli/psteal_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ def AnalyzeEvents(self):
self._storage_file_path))

# TODO: add single processing support.
analysis_engine = psort.PsortMultiProcessEngine(
use_zeromq=self._use_zeromq)
analysis_engine = psort.PsortMultiProcessEngine()

analysis_engine.ExportEvents(
self._knowledge_base, storage_reader, self._output_module,
Expand Down Expand Up @@ -297,8 +296,7 @@ def ExtractEventsFromSources(self):
if single_process_mode:
extraction_engine = single_process_engine.SingleProcessEngine()
else:
extraction_engine = multi_process_engine.TaskMultiProcessEngine(
use_zeromq=self._use_zeromq)
extraction_engine = multi_process_engine.TaskMultiProcessEngine()

# If the source is a directory or a storage media image
# run pre-processing.
Expand Down
130 changes: 0 additions & 130 deletions plaso/multi_processing/multi_process_queue.py

This file was deleted.

49 changes: 12 additions & 37 deletions plaso/multi_processing/psort.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from plaso.multi_processing import analysis_process
from plaso.multi_processing import engine as multi_process_engine
from plaso.multi_processing import logger
from plaso.multi_processing import multi_process_queue
from plaso.storage import event_tag_index
from plaso.storage import time_range as storage_time_range

Expand Down Expand Up @@ -193,13 +192,8 @@ class PsortMultiProcessEngine(multi_process_engine.MultiProcessEngine):

_QUEUE_TIMEOUT = 10 * 60

def __init__(self, use_zeromq=True):
"""Initializes an engine object.
Args:
use_zeromq (Optional[bool]): True if ZeroMQ should be used for queuing
instead of Python's multiprocessing queue.
"""
def __init__(self):
"""Initializes a psort multi-processing engine."""
super(PsortMultiProcessEngine, self).__init__()
self._analysis_plugins = {}
self._completed_analysis_processes = set()
Expand Down Expand Up @@ -231,7 +225,6 @@ def __init__(self, use_zeromq=True):
self._serializers_profiler = None
self._status = definitions.STATUS_INDICATOR_IDLE
self._status_update_callback = None
self._use_zeromq = use_zeromq
self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT

def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
Expand Down Expand Up @@ -678,19 +671,10 @@ def _StopAnalysisProcesses(self, abort=False):
logger.debug('Stopping analysis processes.')
self._StopMonitoringProcesses()

# Note that multiprocessing.Queue is very sensitive regarding
# blocking on either a get or a put. So we try to prevent using
# any blocking behavior.

if abort:
# Signal all the processes to abort.
self._AbortTerminate()

if not self._use_zeromq:
logger.debug('Emptying queues.')
for event_queue in self._event_queues.values():
event_queue.Empty()

# Wake the processes to make sure that they are not blocking
# waiting for the queue new items.
for event_queue in self._event_queues.values():
Expand Down Expand Up @@ -817,28 +801,19 @@ def _StartWorkerProcess(self, process_name, storage_writer):
logger.error('Missing analysis plugin: {0:s}'.format(process_name))
return None

if self._use_zeromq:
queue_name = '{0:s} output event queue'.format(process_name)
output_event_queue = zeromq_queue.ZeroMQPushBindQueue(
name=queue_name, timeout_seconds=self._QUEUE_TIMEOUT)
# Open the queue so it can bind to a random port, and we can get the
# port number to use in the input queue.
output_event_queue.Open()

else:
output_event_queue = multi_process_queue.MultiProcessingQueue(
timeout=self._QUEUE_TIMEOUT)
queue_name = '{0:s} output event queue'.format(process_name)
output_event_queue = zeromq_queue.ZeroMQPushBindQueue(
name=queue_name, timeout_seconds=self._QUEUE_TIMEOUT)
# Open the queue so it can bind to a random port, and we can get the
# port number to use in the input queue.
output_event_queue.Open()

self._event_queues[process_name] = output_event_queue

if self._use_zeromq:
queue_name = '{0:s} input event queue'.format(process_name)
input_event_queue = zeromq_queue.ZeroMQPullConnectQueue(
name=queue_name, delay_open=True, port=output_event_queue.port,
timeout_seconds=self._QUEUE_TIMEOUT)

else:
input_event_queue = output_event_queue
queue_name = '{0:s} input event queue'.format(process_name)
input_event_queue = zeromq_queue.ZeroMQPullConnectQueue(
name=queue_name, delay_open=True, port=output_event_queue.port,
timeout_seconds=self._QUEUE_TIMEOUT)

process = analysis_process.AnalysisProcess(
input_event_queue, storage_writer, self._knowledge_base,
Expand Down
Loading

0 comments on commit 1fd8f2a

Please sign in to comment.