Skip to content

Commit

Permalink
Support persistent the drain result to dist (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Aug 6, 2024
1 parent 4aa9c59 commit a4b9220
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 25 deletions.
46 changes: 46 additions & 0 deletions models/Configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
## Configuration

All configurations in URI Drain are done using `uri_drain.ini` file. [Here is a specific demo](../servers/simple/uri_drain.ini).

### Snapshot

Snapshot is used to serialize and store the analysis results that have been saved in the current system.
Currently, it supports saving snapshots to the file system.

| Name | Type(Unit) | Default | Description |
|---------------------------|-------------|---------|-------------------------------------------------------------------------------------------|
| file_dir | string | /tmp/ | The directory to save the snapshot, the persistent would disable when the value is empty. |
| snapshot_interval_minutes | int(minute) | 10 | The interval to save the snapshot. |
| compress_state | bool | True | Whether to compress the snapshot through zlib with base64. |

### Masking

When aggregation methods are detected, Masking determines how to generate the aggregation information.

Currently, all similar content is replaced with `{var}` by default.

| Name | Type(Unit) | Default | Description |
|-------------|------------|---------|-----------------------------------|
| mask_prefix | string | { | The prefix to mask the parameter. |
| mask_suffix | string | } | The suffix to mask the parameter. |

### Drain

Drain is the core algorithm of URI Drain.

| Name | Type(Unit) | Default | Description |
|------------------|------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sim_th | float | 0.4 | The similarity threshold to decide if a new sequence should be merged into an existing cluster. |
| depth | int | 4 | Max depth levels of pattern. Minimum is 2. |
| max_children | int | 100 | Max number of children of an internal node. |
| max_clusters | int | 1024 | Max number of tracked clusters (unlimited by default). When this number is reached, model starts replacing old clusters with a new ones according to the LRU policy. |
| extra_delimiters | string | / | The extra delimiters to split the sequence. |

### Profiling

Profiling is used to enable the profiling of the algorithm.

| Name | Type(Unit) | Default | Description |
|------------|-------------|---------|---------------------------------------------------|
| enabled | bool | False | Whether to enable the profiling. |
| report_sec | int(second) | 30 | The interval to report the profiling information. |
2 changes: 2 additions & 0 deletions models/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ algorithm.
The original paper of Drain can be found [here](https://jiemingzhu.github.io/pub/pjhe_icws2017.pdf)
and [here](https://arxiv.org/pdf/1806.04356.pdf).

The configuration please refer to [Configuration Documentation](./Configuration.md).

#### Upstream Drain3 version

- Currently
Expand Down
35 changes: 34 additions & 1 deletion models/uri_drain/persistence_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: MIT

import base64
import os
from abc import ABC, abstractmethod
from pathlib import Path


class PersistenceHandler(ABC):
Expand All @@ -12,3 +14,34 @@ def save_state(self, state):
@abstractmethod
def load_state(self):
pass


class ServiceFilePersistenceHandler(PersistenceHandler):

def __init__(self, base_dir, service):
self.file_path = os.path.join(base_dir, 'services', base64.b64encode(service.encode('utf-8')).decode('utf-8'))
path = Path(self.file_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.touch(exist_ok=True)

def save_state(self, state):
with open(self.file_path, 'wb') as file:
file.write(state)

def load_state(self):
with open(self.file_path, 'rb') as file:
return file.read()


class ServicePersistentLoader:

def __init__(self, base_dir):
self.file_path = os.path.join(base_dir, 'services')

def load_services(self):
services = []
if os.path.isdir(self.file_path):
for entry in os.listdir(self.file_path):
if os.path.isfile(os.path.join(self.file_path, entry)):
services.append(base64.b64decode(entry.encode('utf-8')).decode('utf-8'))
return services
22 changes: 18 additions & 4 deletions models/uri_drain/template_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
import re
import time
import zlib
from collections import defaultdict
from typing import Optional, List, NamedTuple

import jsonpickle
from cachetools import LRUCache, cachedmethod

from models.uri_drain.uri_drain import Drain, LogCluster
# from drain3.jaccard_drain import JaccardDrain # MODIFIED:: NOT USED AT ALL
from models.uri_drain.masking import LogMasker
from models.uri_drain.persistence_handler import PersistenceHandler
from models.utils.simple_profiler import SimpleProfiler, NullProfiler, Profiler
from models.uri_drain.persistence_handler import PersistenceHandler, ServicePersistentLoader, \
ServiceFilePersistenceHandler
from models.uri_drain.template_miner_config import TemplateMinerConfig
from models.uri_drain.uri_drain import Drain, LogCluster
from models.utils.simple_profiler import SimpleProfiler, NullProfiler, Profiler

logger = logging.getLogger(__name__)

Expand All @@ -24,6 +26,18 @@
ExtractedParameter = NamedTuple("ExtractedParameter", [("value", str), ("mask_name", str)])


def load_existing_miners(config: TemplateMinerConfig = None):
if config.snapshot_file_dir is None:
return
existing_services = ServicePersistentLoader(config.snapshot_file_dir).load_services()
miners = defaultdict(TemplateMiner)
if len(existing_services) > 0:
print(f'Detected {len(existing_services)} services from disk')
for service in existing_services:
miners[service] = TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, service), config)
return miners


class TemplateMiner:

def __init__(self,
Expand Down Expand Up @@ -87,7 +101,7 @@ def load_state(self):
logger.info("Checking for saved state")

state = self.persistence_handler.load_state()
if state is None:
if state is None or state == b'':
logger.info("Saved state not found")
return

Expand Down
4 changes: 4 additions & 0 deletions models/uri_drain/template_miner_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self):
self.profiling_report_sec = 60
self.snapshot_interval_minutes = 5
self.snapshot_compress_state = True
self.snapshot_file_dir = None
self.drain_extra_delimiters = []
self.drain_sim_th = 0.4
self.drain_depth = 4
Expand Down Expand Up @@ -50,6 +51,9 @@ def load(self, config_filename: str):
fallback=self.snapshot_interval_minutes)
self.snapshot_compress_state = parser.getboolean(section_snapshot, 'compress_state',
fallback=self.snapshot_compress_state)
file_path = parser.get(section_snapshot, 'file_path', fallback=None)
if file_path:
self.snapshot_file_dir = file_path

drain_extra_delimiters_str = parser.get(section_drain, 'extra_delimiters',
fallback=str(self.drain_extra_delimiters))
Expand Down
5 changes: 5 additions & 0 deletions models/uri_drain/uri_drain.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ def __init__(self,
def clusters(self):
return self.id_to_cluster.values()

@property
def cluster_patterns(self):
sorted_drain_clusters = sorted(self.clusters, key=lambda it: it.size, reverse=True)
return [cluster.get_template() for cluster in sorted_drain_clusters]

@staticmethod
def has_numbers(s):
return any(char.isdigit() for char in s)
Expand Down
21 changes: 18 additions & 3 deletions servers/simple/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import os
from os.path import dirname

from servers.simple.worker import run_worker
from servers.simple.server import run_server
from models.uri_drain.template_miner import load_existing_miners
from models.uri_drain.template_miner_config import TemplateMinerConfig
from servers.simple.results_manager import ProxyURIDrainResultsManager, URIDrainResults
from servers.simple.server import run_server
from servers.simple.worker import run_worker


def run():
Expand All @@ -25,12 +29,23 @@ def run():
manager = ProxyURIDrainResultsManager()
manager.start()

# Load config
config = TemplateMinerConfig()
config_file = os.path.join(dirname(__file__), "uri_drain.ini")
print(f'Searching for config file at {config_file}')
config.load(config_filename=config_file) # change to config injection from env or other

# SET DEBUG HERE! < TODO CONFIG FILE
shared_results_object = manager.URIDrainResults(debug=False) # noqa
uri_main_queue = multiprocessing.Queue()

# Load existing miner and clusters
miners = load_existing_miners(config)
for service in miners:
shared_results_object.set_dict_field(service=service, value=miners[service].drain.cluster_patterns)

producer_process = multiprocessing.Process(target=run_server, args=(uri_main_queue, shared_results_object))
consumer_process = multiprocessing.Process(target=run_worker, args=(uri_main_queue, shared_results_object))
consumer_process = multiprocessing.Process(target=run_worker, args=(uri_main_queue, shared_results_object, config, miners))

producer_process.start()
consumer_process.start()
Expand Down
3 changes: 2 additions & 1 deletion servers/simple/uri_drain.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

[SNAPSHOT]
file_path = /tmp/
snapshot_interval_minutes = 10
compress_state = True

Expand All @@ -36,4 +37,4 @@ extra_delimiters = ["/"]

[PROFILING]
enabled = False
report_sec = 30
report_sec = 30
35 changes: 19 additions & 16 deletions servers/simple/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,29 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import time
import queue
import os
import time
from collections import defaultdict
from os.path import dirname

from models.uri_drain.template_miner_config import TemplateMinerConfig
from models.uri_drain.persistence_handler import ServiceFilePersistenceHandler
from models.uri_drain.template_miner import TemplateMiner


def run_worker(uri_main_queue, shared_results_object):
config = TemplateMinerConfig()
config_file = os.path.join(dirname(__file__), "uri_drain.ini")
print(f'Searching for config file at {config_file}')
config.load(config_filename=config_file) # change to config injection from env or other
drain_instances = defaultdict(functools.partial(TemplateMiner, None, config)) # URIDrain instances
def create_defaultdict_with_key(factory):
class CustomDefaultDict(defaultdict):
def __missing__(self, key):
value = factory(key)
self[key] = value
return value

return CustomDefaultDict(lambda key: factory(key))


def run_worker(uri_main_queue, shared_results_object, config, existing_miners):
drain_instances = create_defaultdict_with_key(lambda key: # URIDrain instances
TemplateMiner(ServiceFilePersistenceHandler(config.snapshot_file_dir, key) if config.snapshot_file_dir else None, config))
for service in existing_miners:
drain_instances[service] = existing_miners[service]

counter = 0
while True:
Expand All @@ -44,11 +50,8 @@ def run_worker(uri_main_queue, shared_results_object):
for uri in uris:
drain_instances[service].add_log_message(uri)
print(f'Processed {len(uris)} uris in {time.time() - start_time} seconds')
drain_clusters = drain_instances[service].drain.clusters
sorted_drain_clusters = sorted(drain_clusters, key=lambda it: it.size, reverse=True)

drain_clusters_templates = [cluster.get_template() for cluster in sorted_drain_clusters]
shared_results_object.set_dict_field(service=service, value=drain_clusters_templates) # TODO add version
patterns = drain_instances[service].drain.cluster_patterns
shared_results_object.set_dict_field(service=service, value=patterns) # TODO add version
# increment here
counter += 1
print('-================-')
Expand Down

0 comments on commit a4b9220

Please sign in to comment.