diff --git a/deployability/deps/build/lib/workflow/README.md b/deployability/deps/build/lib/workflow/README.md deleted file mode 100755 index 74793771b3..0000000000 --- a/deployability/deps/build/lib/workflow/README.md +++ /dev/null @@ -1,98 +0,0 @@ -# Workflow Processor - -The Workflow Processor is a tool for executing tasks defined in a YAML-based workflow file. It supports parallel execution of tasks with dependency management. - -## Table of Contents - -- [Workflow Processor](#workflow-processor) - - [Table of Contents](#table-of-contents) - - [Getting Started](#getting-started) - - [Prerequisites](#prerequisites) - - [Installation](#installation) - - [Usage](#usage) - - [Command Line Arguments](#command-line-arguments) - - [Workflow File](#workflow-file) - - [Logging](#logging) - - [Examples](#examples) - - [Basic Execution](#basic-execution) - - [Parallel Execution](#parallel-execution) - - [Dry Run](#dry-run) - - [License](#license) - -## Getting Started - -### Prerequisites - -Before using the Workflow Processor, make sure you have the following prerequisites installed: - -- Python 3.9 - -### Installation - -1. Clone the repository: - - ```bash - git clone https://github.com/wazuh/wazuh-qa.git - ``` - -2. Navigate to the project directory: - - ```bash - cd wazuh-qa/poc-tests/scripts/qa-workflow-engine - ``` - -3. Install the required dependencies: - - ```bash - pip install -r requirements.txt - ``` - -Now, you're ready to use the QA Workflow Engine. - -## Usage - -### Command Line Arguments - -Run the workflow processor using the following command: - -```bash -python main.py workflow_file.yml --threads 4 --dry-run --log-format json --log-level INFO -``` - -- `workflow_file.yml`: Path to the YAML-based workflow file. -- `--threads`: Number of threads to use for parallel execution (default is 1). -- `--dry-run`: Display the plan without executing tasks. -- `--log-format`: Log format (`plain` or `json`, default is `plain`). -- `--log-level`: Log level (`DEBUG`, `INFO`, `WARNING`, `ERROR`, or `CRITICAL`, default is `INFO`). - -### Workflow File - -The workflow file is written in YAML format. It defines tasks, dependencies, and other configurations. See the provided examples in the `examples/` directory for reference. - -### Logging - -The workflow processor logs messages to the console. You can configure the log format (`plain` or `json`) and log level using command line arguments. - -## Examples - -### Basic Execution - -```bash -python main.py examples/basic_workflow.yml -``` - -### Parallel Execution - -```bash -python main.py examples/parallel_workflow.yml --threads 4 -``` - -### Dry Run - -```bash -python main.py examples/dry_run_workflow.yml --dry-run -``` - -## License - -WAZUH Copyright (C) 2015 Wazuh Inc. (License GPLv2) diff --git a/deployability/deps/build/lib/workflow/__init__.py b/deployability/deps/build/lib/workflow/__init__.py deleted file mode 100755 index 5627726b79..0000000000 --- a/deployability/deps/build/lib/workflow/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .workflow_processor import WorkflowProcessor - diff --git a/deployability/deps/build/lib/workflow/examples/dtt1-agents-poc.yaml b/deployability/deps/build/lib/workflow/examples/dtt1-agents-poc.yaml deleted file mode 100755 index 644b2e0e6e..0000000000 --- a/deployability/deps/build/lib/workflow/examples/dtt1-agents-poc.yaml +++ /dev/null @@ -1,167 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 -version: 0.1 -description: This workflow is used to test agents deployment por DDT1 PoC -variables: - agents-os: - - linux-ubuntu-22.04-amd64 - manager-os: linux-ubuntu-22.04-amd64 - infra-provider: vagrant - working-dir: /tmp/dtt1-poc - -tasks: - # Generic agent test task - - task: "run-agent-tests-{agent}" - description: "Run tests uninstall for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/test.py - - inventory: "{working-dir}/agent-{agent}/inventory.yaml" - - dependencies: - - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" - - agent: "{working-dir}/agent-{agent}/inventory.yaml" - - tests: "install,register,stop" - - component: "agent" - - wazuh-version: "4.7.1" - - wazuh-revision: "40709" - depends-on: - - "provision-install-{agent}" - - "provision-manager" - foreach: - - variable: agents-os - as: agent - - # Generic agent test task - - task: "run-agent-tests-uninstall-{agent}" - description: "Run tests uninstall for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/test.py - - inventory: "{working-dir}/agent-{agent}/inventory.yaml" - - dependency: "{working-dir}/manager-{manager-os}/inventory.yaml" - - tests: "uninstall" - - component: "agent" - - wazuh-version: "4.7.1" - - wazuh-revision: "40709" - depends-on: - - "run-agent-tests-{agent}" - - "provision-uninstall-{agent}" - foreach: - - variable: agents-os - as: agent - - # Unique manager provision task - - task: "provision-manager" - description: "Provision the manager." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/provision.py - - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" - - install: - - component: wazuh-manager - type: package - depends-on: - - "allocate-manager" - - # Unique manager allocate task - - task: "allocate-manager" - description: "Allocate resources for the manager." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/allocation.py - - action: create - - provider: "{infra-provider}" - - size: large - - composite-name: "{manager-os}" - - inventory-output: "{working-dir}/manager-{manager-os}/inventory.yaml" - - track-output: "{working-dir}/manager-{manager-os}/track.yaml" - #cleanup: - # this: process - # with: - # path: python3 - # args: - # - /home/akim/Desktop/wazuh-qa/deployability/launchers/allocation.py - # - action: delete - # - track-output: "{working-dir}/manager-{manager-os}/track.yaml" - - # Generic agent provision task - - task: "provision-install-{agent}" - description: "Provision resources for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/provision.py - - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" - - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" - - install: - - component: wazuh-agent - type: package - - component: curl - depends-on: - - "allocate-{agent}" - - "provision-manager" - foreach: - - variable: agents-os - as: agent - - # Generic agent provision task - - task: "provision-uninstall-{agent}" - description: "Provision resources for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/provision.py - - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" - - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" - - uninstall: - - component: wazuh-agent - type: package - depends-on: - - "provision-install-{agent}" - foreach: - - variable: agents-os - as: agent - - # Generic agent allocate task - - task: "allocate-{agent}" - description: "Allocate resources for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/allocation.py - - action: create - - provider: "{infra-provider}" - - size: small - - composite-name: "{agent}" - - inventory-output: "{working-dir}/agent-{agent}/inventory.yaml" - - track-output: "{working-dir}/agent-{agent}/track.yaml" - #cleanup: - # this: process - # with: - # path: python3 - # args: - # - allocation.py - # - action: delete - # - track-output: "{working-dir}/agent-{agent}/track.yaml" - foreach: - - variable: agents-os - as: agent \ No newline at end of file diff --git a/deployability/deps/build/lib/workflow/examples/dtt1-agents.yaml b/deployability/deps/build/lib/workflow/examples/dtt1-agents.yaml deleted file mode 100755 index e8a827282d..0000000000 --- a/deployability/deps/build/lib/workflow/examples/dtt1-agents.yaml +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 -version: 0.1 -description: This workflow is used to test agents deployment. -variables: - agents-os: - - linux-redhat-7-amd64 - - linux-redhat-8-amd64 - - linux-redhat-9-amd64 - - linux-centos-7-amd64 - - linux-centos-8-amd64 - - linux-debian-10-amd64 - - linux-debian-11-amd64 - - linux-debian-12-amd64 - - linux-ubuntu-18.04-amd64 - - linux-ubuntu-20.04-amd64 - - linux-ubuntu-22.04-amd64 - - linux-fedora-37-amd64 - - linux-fedora-38-amd64 - - linux-suse-15-amd64 - - linux-opensuse-15-amd64 - - linux-oracle-9-amd64 - - linux-amazon-2-amd64 - - linux-amazon-2023-amd64 - - windows-10-amd64 - - windows-11-amd64 - - windows-server2012-amd64 - - windows-server2016-amd64 - - windows-server2019-amd64 - - windows-server2022-amd64 - - macos-13.3-amd64 - - macos-14.2-amd64 - manager-os: linux-amazon-2023-amd64 - -tasks: - # Generic agent test task - - task: "test-agent-{agent}" - description: "Run tests for the {agent} agent." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running tests for {agent}" - depends-on: - - "provision-agent-{agent}" - foreach: - - variable: agents-os - as: agent - - # Unique manager provision task - - task: "provision-manager-{manager-os}" - description: "Provision the manager." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running provision for manager" - depends-on: - - "allocate-manager-{manager-os}" - - # Unique manager allocate task - - task: "allocate-manager-{manager-os}" - description: "Allocate resources for the manager." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running allocate for manager" - cleanup: - this: process - with: - path: /bin/echo - args: - - -n - - "Running cleanup for manager" - - # Generic agent provision task - - task: "provision-agent-{agent}" - description: "Provision resources for the {agent} agent." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running provision for {agent}" - depends-on: - - "allocate-agent-{agent}" - - "provision-manager-{manager-os}" - foreach: - - variable: agents-os - as: agent - - # Generic agent allocate task - - task: "allocate-agent-{agent}" - description: "Allocate resources for the {agent} agent." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running allocate for {agent}" - cleanup: - this: process - with: - path: /bin/echo - args: - - -n - - "Running cleanup for allocate for {agent}" - foreach: - - variable: agents-os - as: agent \ No newline at end of file diff --git a/deployability/deps/build/lib/workflow/examples/dtt1-managers.yaml b/deployability/deps/build/lib/workflow/examples/dtt1-managers.yaml deleted file mode 100755 index 503cd115a3..0000000000 --- a/deployability/deps/build/lib/workflow/examples/dtt1-managers.yaml +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 -version: 0.1 -description: This workflow is used to test managers deployment. Two agents per manager are deployed. -variables: - agents-os: - - linux-debian-12-amd64 - - linux-ubuntu-22.04-amd64 - managers-os: - - linux-redhat-7-amd64 - - linux-redhat-8-amd64 - - linux-redhat-9-amd64 - - linux-centos-7-amd64 - - linux-centos-8-amd64 - - linux-debian-10-amd64 - - linux-debian-11-amd64 - - linux-debian-12-amd64 - - linux-ubuntu-18.04-amd64 - - linux-ubuntu-20.04-amd64 - - linux-ubuntu-22.04-amd64 - - linux-fedora-37-amd64 - - linux-fedora-38-amd64 - - linux-suse-15-amd64 - - linux-opensuse-15-amd64 - - linux-oracle-9-amd64 - - linux-amazon-2-amd64 - - linux-amazon-2023-amd64 -tasks: - # Generic manager test task - - task: "test-{manager}-{agent}" - do: - this: process - with: - path: /bin/echo - args: - - Executing tests for {manager} manager with {agent} agent. - depends-on: - - "provision-{manager}-manager" - - "provision-{agent}-agent-for-{manager}-manager" - foreach: - - variable: managers-os - as: manager - - variable: agents-os - as: agent - - # --------- Provision -------------- - # Generic manager provision task - - task: "provision-{manager}-manager" - do: - this: process - with: - path: /bin/echo - args: - - Executing provision for {manager} as a manager. - depends-on: - - "allocate-{manager}-manager" - foreach: - - variable: managers-os - as: manager - - # Generic agent provision task - - task: "provision-{agent}-agent-for-{manager}-manager" - do: - this: process - with: - path: /bin/echo - args: - - Executing provision for {agent} as an agent. - depends-on: - - "allocate-{agent}-agent-for-{manager}-manager" - foreach: - - variable: managers-os - as: manager - - variable: agents-os - as: agent - - # --------- Allocate -------------- - # Generic manager allocate task - - task: "allocate-{manager}-manager" - do: - this: process - with: - path: /bin/echo - args: - - Executing allocation for {manager} as a manager. - foreach: - - variable: managers-os - as: manager - - # Generic agent allocate task - - task: "allocate-{agent}-agent-for-{manager}-manager" - do: - this: process - with: - path: /bin/echo - args: - - Executing allocation for {agent} as an agent. - foreach: - - variable: managers-os - as: manager - - variable: agents-os - as: agent diff --git a/deployability/deps/build/lib/workflow/models.py b/deployability/deps/build/lib/workflow/models.py deleted file mode 100644 index b6bf2b7fea..0000000000 --- a/deployability/deps/build/lib/workflow/models.py +++ /dev/null @@ -1,11 +0,0 @@ -from pathlib import Path -from typing import Literal -from pydantic import BaseModel - - -class InputPayload(BaseModel): - workflow_file: str | Path - threads: int = 1 - dry_run: bool = False - log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO' - schema_file: str | Path | None = None diff --git a/deployability/deps/build/lib/workflow/schema_validator.py b/deployability/deps/build/lib/workflow/schema_validator.py deleted file mode 100755 index effe2d2925..0000000000 --- a/deployability/deps/build/lib/workflow/schema_validator.py +++ /dev/null @@ -1,77 +0,0 @@ -import jsonschema -import json - -from jsonschema.exceptions import ValidationError -from pathlib import Path -from ruamel.yaml import YAML -from workflow.utils import logger - -class SchemaValidator: - """ - A SchemaValidator class that validates a YAML file against a JSON schema. - - Attributes: - schema_data (dict): The schema data. - yaml_data (dict): The YAML data. - """ - - def __init__(self, schema: Path | str, to_validate: Path | str): - """ - Initializes the SchemaValidator object. - - Args: - schema (Path, str): The path to the schema file. - to_validate (Path, str): The path to the YAML file to validate. - """ - schema_data: str = None - yaml_data: str = None - - self.logger = logger(Path(__file__).stem).get_logger() - with open(schema, 'r') as schema_file: - self.logger.debug(f"Loading schema file: {schema}") - schema_data = json.load(schema_file) - - with open(to_validate, 'r') as file: - self.logger.debug(f"Loading yaml file: {to_validate}") - yaml = YAML(typ='safe', pure=True) - yaml_data = yaml.load(file) - - self.schema_data = schema_data - self.yaml_data = yaml_data - - def preprocess_data(self) -> None: - """ - Preprocess the YAML data to be validated. - - Raises: - ValidationError: If the YAML data is not valid. - """ - for task in self.yaml_data.get('tasks', []): - do_with = task.get('do', {}).get('with', {}) - this_value = task.get('do', {}).get('this', '') - - if this_value == 'process': - if 'path' not in do_with or 'args' not in do_with: - raise ValidationError(f"Missing required properties in 'with' for task: {task}") - - do_with = task.get('cleanup', {}).get('with', {}) - this_value = task.get('cleanup', {}).get('this', '') - - if this_value == 'process': - if 'path' not in do_with or 'args' not in do_with: - raise ValidationError(f"Missing required properties in 'with' for task: {task}") - - def validateSchema(self) -> None: - """ - Validate the Workflow schema - - Raises: - ValidationError: If the YAML data is not valid. - Exception: If an unexpected error occurs. - """ - try: - jsonschema.validate(self.yaml_data, self.schema_data) - except ValidationError as e: - self.logger.error(f"Schema validation error: {e}") - except Exception as e: - self.logger.error(f"Unexpected error at schema validation: {e}") diff --git a/deployability/deps/build/lib/workflow/schemas/schema_v1.json b/deployability/deps/build/lib/workflow/schemas/schema_v1.json deleted file mode 100644 index 81f8e7c587..0000000000 --- a/deployability/deps/build/lib/workflow/schemas/schema_v1.json +++ /dev/null @@ -1,118 +0,0 @@ -{ - "type": "object", - "properties": { - "name": {"type": "string"}, - "description": {"type": "string"}, - "version": {"type": "number"}, - "tasks": { - "type": "array", - "items": { - "type": "object", - "properties": { - "task": {"type": "string"}, - "do": { - "type": "object", - "properties": { - "this": {"type": "string"}, - "with": { - "type": "object", - "properties": { - "this": {"type": "string"}, - "args": { - "type": "array", - "items": { - "oneOf": [ - {"type": "string"}, - {"type": "array"}, - {"type": "object"} - ] - } - }, - "path": {"type": "string"} - } - } - }, - "required": ["this"] - }, - "cleanup": { - "type": "object", - "properties": { - "this": {"type": "string"}, - "with": { - "type": "object", - "properties": { - "this": {"type": "string"}, - "args": { - "type": "array", - "items": { - "oneOf": [ - {"type": "string"}, - {"type": "array"}, - {"type": "object"} - ] - } - }, - "path": {"type": "string"} - } - } - }, - "required": ["this"] - }, - "depends-on": { - "type": "array", - "items": {"type": "string"} - }, - "foreach": { - "type": "array", - "items": { - "type": "object", - "properties": { - "variable": {"type": "string"}, - "as": {"type": "string"}, - "foreach": { - "type": "array", - "items": { - "type": "object", - "properties": { - "variable": {"type": "string"}, - "as": {"type": "string"} - } - } - } - }, - "required": ["variable", "as"] - } - } - }, - "required": ["task", "do"] - - }, - "minItems": 1 - }, - "variables": { - "type": "object", - "properties": { - "agent-os": { - "type": "array", - "items": { - "oneOf": [ - {"type": "string"}, - {"type": "array"} - ] - } - }, - "managers-os": { - "type": "array", - "items": { - "oneOf": [ - {"type": "string"}, - {"type": "array"} - ] - } - } - } - } - }, - "required": ["tasks", "variables", "version"], - "additionalProperties": false - } \ No newline at end of file diff --git a/deployability/deps/build/lib/workflow/task.py b/deployability/deps/build/lib/workflow/task.py deleted file mode 100755 index 28181b5972..0000000000 --- a/deployability/deps/build/lib/workflow/task.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 - -import subprocess -import random -import time - -from abc import ABC, abstractmethod -from workflow.utils import logger - - -class Task(ABC): - """Abstract base class for tasks.""" - - @abstractmethod - def execute(self) -> None: - """Execute the task.""" - pass - - -class ProcessTask(Task): - """Task for executing a process.""" - - def __init__(self, task_name: str, task_parameters: dict): - """ - Initialize ProcessTask. - - Args: - task_name (str): Name of the task. - task_parameters (dict): Parameters for the task. - logger (logging.Logger): Logger instance. - """ - self.task_name = task_name - self.task_parameters = task_parameters - self.logger = logger - - def execute(self) -> None: - """Execute the process task.""" - - task_args = [] - for arg in self.task_parameters['args']: - if isinstance(arg, str): - task_args.append(arg) - elif isinstance(arg, dict): - key, value = list(arg.items())[0] - if isinstance(value, list): - for argvalue in value: - print(f"argvalue {argvalue}") - task_args.extend([f"--{key}={argvalue}" for argvalue in value]) - else: - task_args.append(f"--{key}={value}") - print(f"task_args {task_args}") - result = None - try: - result = subprocess.run( - [self.task_parameters['path']] + task_args, - check=True, - capture_output=True, - text=True, - ) - - logger.info(str(result.stdout)) - logger.info("%s: %s", "Finish task: ", self.task_name, extra={'tag': self.task_name}) - - - if result.returncode != 0: - raise subprocess.CalledProcessError(returncode=result.returncode, cmd=result.args, output=result.stdout) - except subprocess.CalledProcessError as e: - raise Exception(f"Error executing process task {e.stderr}") - -class DummyTask(Task): - def __init__(self, task_name, task_parameters): - self.task_name = task_name - self.task_parameters = task_parameters - - def execute(self): - message = self.task_parameters.get('message', 'No message provided') - logger.info("%s: %s", message, self.task_name, extra={'tag': self.task_name}) - - -class DummyRandomTask(Task): - def __init__(self, task_name, task_parameters): - self.task_name = task_name - self.task_parameters = task_parameters - - def execute(self): - time_interval = self.task_parameters.get('time-seconds', [1, 5]) - sleep_time = random.uniform(time_interval[0], time_interval[1]) - - message = self.task_parameters.get('message', 'No message provided') - logger.info("%s: %s (Sleeping for %.2f seconds)", message, self.task_name, sleep_time, extra={'tag': self.task_name}) - - time.sleep(sleep_time) - - -TASKS_HANDLERS = { - 'process': ProcessTask, - 'dummy': DummyTask, - 'dummy-random': DummyRandomTask, -} diff --git a/deployability/deps/build/lib/workflow/utils.py b/deployability/deps/build/lib/workflow/utils.py deleted file mode 100644 index 3d64f331a8..0000000000 --- a/deployability/deps/build/lib/workflow/utils.py +++ /dev/null @@ -1,6 +0,0 @@ -import logging - -# The logging module will use the generic logger anyway -# that is because the logging module is a singleton. -# So, we can just use it this way here, and it will work. -logger = (lambda: logging.getLogger("workflow_engine"))() diff --git a/deployability/deps/build/lib/workflow/workflow_processor.py b/deployability/deps/build/lib/workflow/workflow_processor.py deleted file mode 100755 index 50e2bf428b..0000000000 --- a/deployability/deps/build/lib/workflow/workflow_processor.py +++ /dev/null @@ -1,357 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 - -import concurrent.futures -import graphlib -import json -import time -import yaml - -from pathlib import Path -from itertools import product - -from workflow.schema_validator import SchemaValidator -from workflow.task import * -from workflow.utils import logger - - - -class WorkflowFile: - """Class for loading and processing a workflow file.""" - schema_path = Path(__file__).parent / 'schemas' / 'schema_v1.json' - - def __init__(self, workflow_file_path: Path | str, schema_path: Path | str = None) -> None: - self.schema_path = schema_path or self.schema_path - self.__validate_schema(workflow_file_path) - self.workflow_raw_data = self.__load_workflow(workflow_file_path) - self.task_collection = self.__process_workflow() - self.__static_workflow_validation() - - def __validate_schema(self, workflow_file: Path | str) -> None: - """ - Validate the workflow file against the schema. - - Args: - workflow_file (Path | str): Path to the workflow file. - """ - validator = SchemaValidator(self.schema_path, workflow_file) - validator.preprocess_data() - validator.validateSchema() - - def __load_workflow(self, file_path: str) -> dict: - """ - Load the workflow data from a file. - - Args: - file_path (str): Path to the workflow file. - - Returns: - dict: Workflow data. - """ - with open(file_path, 'r', encoding='utf-8') as file: - return yaml.safe_load(file) - - def __process_workflow(self): - """Process the workflow and return a list of tasks.""" - task_collection = [] - variables = self.workflow_raw_data.get('variables', {}) - for task in self.workflow_raw_data.get('tasks', []): - task_collection.extend(self.__expand_task(task, variables)) - return task_collection - - def __replace_placeholders(self, element: str, values: dict, parent_key: str = None): - """ - Recursively replace placeholders in a dictionary or list. - - Args: - element (Any): The element to process. - values (dict): The values to replace placeholders. - parent_key (str): The parent key for nested replacements. - - Returns: - Any: The processed element. - """ - if isinstance(element, dict): - return {key: self.__replace_placeholders(value, values, key) for key, value in element.items()} - if isinstance(element, list): - return [self.__replace_placeholders(sub_element, values, parent_key) for sub_element in element] - if isinstance(element, str): - return element.format_map(values) - return element - - def __expand_task(self, task: dict, variables: dict): - """ - Expand a task with variable values. - - Args: - task (dict): The task to expand. - variables (dict): Variable values. - - Returns: - List[dict]: List of expanded tasks. - """ - expanded_tasks = [] - - if 'foreach' in task: - loop_variables = task.get('foreach', [{}]) - - variable_names = [loop_variable_data.get('variable') for loop_variable_data in loop_variables] - as_identifiers = [loop_variable_data.get('as') for loop_variable_data in loop_variables] - - variable_values = [variables.get(name, []) for name in variable_names] - - for combination in product(*variable_values): - variables_with_items = {**variables, **dict(zip(as_identifiers, combination))} - expanded_tasks.append(self.__replace_placeholders(task, variables_with_items)) - else: - expanded_tasks.append(self.__replace_placeholders(task, variables)) - - return expanded_tasks - - def __static_workflow_validation(self): - """Validate the workflow against static criteria.""" - def check_duplicated_tasks(self): - """Validate task name duplication.""" - task_name_counts = {task['task']: 0 for task in self.task_collection} - - for task in self.task_collection: - task_name_counts[task['task']] += 1 - - duplicates = [name for name, count in task_name_counts.items() if count > 1] - - if duplicates: - raise ValueError(f"Duplicated task names: {', '.join(duplicates)}") - - def check_not_existing_tasks(self): - """Validate task existance.""" - task_names = {task['task'] for task in self.task_collection} - - for dependencies in [task.get('depends-on', []) for task in self.task_collection]: - non_existing_dependencies = [dependency for dependency in dependencies if dependency not in task_names] - if non_existing_dependencies: - raise ValueError(f"Tasks do not exist: {', '.join(non_existing_dependencies)}") - - validations = [check_duplicated_tasks, check_not_existing_tasks] - for validation in validations: - validation(self) - - -class DAG(): - """Class for creating a dependency graph.""" - def __init__(self, task_collection: list, reverse: bool = False): - self.task_collection = task_collection - self.reverse = reverse - self.dag, self.dependency_tree = self.__build_dag() - self.to_be_canceled = set() - self.finished_tasks_status = { - 'failed': set(), - 'canceled': set(), - 'successful': set(), - } - self.execution_plan = self.__create_execution_plan(self.dependency_tree) - self.dag.prepare() - - def is_active(self) -> bool: - """Check if the DAG is active.""" - return self.dag.is_active() - - def get_available_tasks(self) -> list: - """Get the available tasks.""" - return self.dag.get_ready() - - def get_execution_plan(self) -> dict: - """Get the execution plan.""" - return self.execution_plan - - def set_status(self, task_name: str, status: str): - """Set the status of a task.""" - self.finished_tasks_status[status].add(task_name) - self.dag.done(task_name) - - def should_be_canceled(self, task_name: str) -> bool: - """Check if a task should be canceled.""" - return task_name in self.to_be_canceled - - def __build_dag(self): - """Build a dependency graph for the tasks.""" - dependency_dict = {} - dag = graphlib.TopologicalSorter() - - for task in self.task_collection: - task_name = task['task'] - dependencies = task.get('depends-on', []) - - if self.reverse: - for dependency in dependencies: - dag.add(dependency, task_name) - else: - dag.add(task_name, *dependencies) - - dependency_dict[task_name] = dependencies - - return dag, dependency_dict - - def cancel_dependant_tasks(self, task_name, cancel_policy) -> None: - """Cancel all tasks that depend on a failed task.""" - def get_all_task_set(tasks): - task_set = set() - - for task, sub_tasks in tasks.items(): - task_set.add(task) - task_set.update(get_all_task_set(sub_tasks)) - - return task_set - - if cancel_policy == 'continue': - return - - not_cancelled_tasks = self.finished_tasks_status['failed'].union(self.finished_tasks_status['successful']) - for root_task, sub_tasks in self.execution_plan.items(): - task_set = get_all_task_set({root_task: sub_tasks}) - if cancel_policy == 'abort-all': - self.to_be_canceled.update(task_set) - elif cancel_policy == 'abort-related-flows': - if task_name in task_set: - self.to_be_canceled.update(task_set - not_cancelled_tasks) - else: - raise ValueError(f"Unknown cancel policy '{cancel_policy}'.") - - def __create_execution_plan(self, dependency_dict: dict) -> dict: - - execution_plan = {} - - def get_root_tasks(dependency_dict: dict) -> set: - """Get root tasks from the dependency dictionary.""" - all_tasks = set(dependency_dict.keys()) - dependent_tasks = set(dep for dependents in dependency_dict.values() for dep in dependents) - return all_tasks - dependent_tasks - - def get_subtask_plan(task_name: str, dependency_dict: dict, level: int = 0) -> dict: - """Create the execution plan recursively as a dictionary.""" - if task_name not in dependency_dict: - return {task_name: {}} - - dependencies = dependency_dict[task_name] - plan = {task_name: {}} - - for dependency in dependencies: - sub_plan = get_subtask_plan(dependency, dependency_dict, level + 1) - plan[task_name].update(sub_plan) - - return plan - - root_tasks = get_root_tasks(dependency_dict) - for root_task in root_tasks: - execution_plan.update(get_subtask_plan(root_task, dependency_dict)) - - return execution_plan - - -class WorkflowProcessor: - """Class for processing a workflow.""" - - def __init__(self, workflow_file: str, dry_run: bool, threads: int, log_level: str = 'INFO', schema_file: Path | str = None): - """ - Initialize WorkflowProcessor. - - Args: - workflow_file (str): Path to the workflow file (YAML format). - dry_run (bool): Display the plan without executing tasks. - threads (int): Number of threads to use for parallel execution. - log_level (str): Log level. - schema_file (Path | str): Path to the schema file (YAML format). - """ - logger.setLevel(log_level) - # Initialize the instance variables. - self.task_collection = WorkflowFile(workflow_file, schema_file).task_collection - self.dry_run = dry_run - self.threads = threads - - def execute_task(self, dag: DAG, task: dict, action) -> None: - """Execute a task.""" - task_name = task['task'] - if dag.should_be_canceled(task_name): - logger.warning("[%s] Skipping task due to dependency failure.", task_name) - dag.set_status(task_name, 'canceled') - else: - try: - task_object = self.create_task_object(task, action) - - logger.info("[%s] Starting task.", task_name) - start_time = time.time() - task_object.execute() - logger.info("[%s] Finished task in %.2f seconds.", task_name, time.time() - start_time) - dag.set_status(task_name, 'successful') - except Exception as e: - # Pass the tag to the tag_formatter function if it exists - logger.error("[%s] Task failed with error: %s.", task_name, e) - dag.set_status(task_name, 'failed') - dag.cancel_dependant_tasks(task_name, task.get('on-error', 'abort-related-flows')) - # Handle the exception or re-raise if necessary - raise - - def create_task_object(self, task: dict, action) -> Task: - """Create and return a Task object based on task type.""" - task_type = task[action]['this'] - - task_handler = TASKS_HANDLERS.get(task_type) - - if task_handler is not None: - return task_handler(task['task'], task[action]['with']) - - raise ValueError(f"Unknown task type '{task_type}'.") - - def execute_tasks_parallel(self) -> None: - """Execute tasks in parallel.""" - if not self.dry_run: - logger.info("Executing tasks in parallel.") - dag = DAG(self.task_collection) - # Execute tasks based on the DAG - with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor: - futures = {} - while True: - if not dag.is_active(): - break - for task_name in dag.get_available_tasks(): - task = next(t for t in self.task_collection if t['task'] == task_name) - future = executor.submit(self.execute_task, dag, task, 'do') - futures[task_name] = future - concurrent.futures.wait(futures.values()) - - # Now execute cleanup tasks based on the reverse DAG - reverse_dag = DAG(self.task_collection, reverse=True) - - logger.info("Executing cleanup tasks.") - with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor: - reverse_futures = {} - - while True: - if not reverse_dag.is_active(): - break - for task_name in reverse_dag.get_available_tasks(): - task = next(t for t in self.task_collection if t['task'] == task_name) - if 'cleanup' in task: - future = executor.submit(self.execute_task, reverse_dag, task, 'cleanup') - reverse_futures[task_name] = future - else: - reverse_dag.set_status(task_name, 'successful') - concurrent.futures.wait(reverse_futures.values()) - - else: - dag = DAG(self.task_collection) - logger.info("Execution plan: %s", json.dumps(dag.get_execution_plan(), indent=2)) - - def run(self) -> None: - """Main entry point.""" - self.execute_tasks_parallel() - - def abort_execution(self, executor: concurrent.futures.ThreadPoolExecutor, futures: dict) -> None: - """Abort the execution of tasks.""" - for future in concurrent.futures.as_completed(futures.values()): - try: - _ = future.result() - except Exception as e: - logger.error("Error in aborted task: %s", e) - - executor.shutdown(wait=False) diff --git a/deployability/deps/build/lib/workflow_engine/README.md b/deployability/deps/build/lib/workflow_engine/README.md deleted file mode 100755 index 74793771b3..0000000000 --- a/deployability/deps/build/lib/workflow_engine/README.md +++ /dev/null @@ -1,98 +0,0 @@ -# Workflow Processor - -The Workflow Processor is a tool for executing tasks defined in a YAML-based workflow file. It supports parallel execution of tasks with dependency management. - -## Table of Contents - -- [Workflow Processor](#workflow-processor) - - [Table of Contents](#table-of-contents) - - [Getting Started](#getting-started) - - [Prerequisites](#prerequisites) - - [Installation](#installation) - - [Usage](#usage) - - [Command Line Arguments](#command-line-arguments) - - [Workflow File](#workflow-file) - - [Logging](#logging) - - [Examples](#examples) - - [Basic Execution](#basic-execution) - - [Parallel Execution](#parallel-execution) - - [Dry Run](#dry-run) - - [License](#license) - -## Getting Started - -### Prerequisites - -Before using the Workflow Processor, make sure you have the following prerequisites installed: - -- Python 3.9 - -### Installation - -1. Clone the repository: - - ```bash - git clone https://github.com/wazuh/wazuh-qa.git - ``` - -2. Navigate to the project directory: - - ```bash - cd wazuh-qa/poc-tests/scripts/qa-workflow-engine - ``` - -3. Install the required dependencies: - - ```bash - pip install -r requirements.txt - ``` - -Now, you're ready to use the QA Workflow Engine. - -## Usage - -### Command Line Arguments - -Run the workflow processor using the following command: - -```bash -python main.py workflow_file.yml --threads 4 --dry-run --log-format json --log-level INFO -``` - -- `workflow_file.yml`: Path to the YAML-based workflow file. -- `--threads`: Number of threads to use for parallel execution (default is 1). -- `--dry-run`: Display the plan without executing tasks. -- `--log-format`: Log format (`plain` or `json`, default is `plain`). -- `--log-level`: Log level (`DEBUG`, `INFO`, `WARNING`, `ERROR`, or `CRITICAL`, default is `INFO`). - -### Workflow File - -The workflow file is written in YAML format. It defines tasks, dependencies, and other configurations. See the provided examples in the `examples/` directory for reference. - -### Logging - -The workflow processor logs messages to the console. You can configure the log format (`plain` or `json`) and log level using command line arguments. - -## Examples - -### Basic Execution - -```bash -python main.py examples/basic_workflow.yml -``` - -### Parallel Execution - -```bash -python main.py examples/parallel_workflow.yml --threads 4 -``` - -### Dry Run - -```bash -python main.py examples/dry_run_workflow.yml --dry-run -``` - -## License - -WAZUH Copyright (C) 2015 Wazuh Inc. (License GPLv2) diff --git a/deployability/deps/build/lib/workflow_engine/__init__.py b/deployability/deps/build/lib/workflow_engine/__init__.py deleted file mode 100755 index 5627726b79..0000000000 --- a/deployability/deps/build/lib/workflow_engine/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .workflow_processor import WorkflowProcessor - diff --git a/deployability/deps/build/lib/workflow_engine/examples/dtt1-agents-poc.yaml b/deployability/deps/build/lib/workflow_engine/examples/dtt1-agents-poc.yaml deleted file mode 100755 index 644b2e0e6e..0000000000 --- a/deployability/deps/build/lib/workflow_engine/examples/dtt1-agents-poc.yaml +++ /dev/null @@ -1,167 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 -version: 0.1 -description: This workflow is used to test agents deployment por DDT1 PoC -variables: - agents-os: - - linux-ubuntu-22.04-amd64 - manager-os: linux-ubuntu-22.04-amd64 - infra-provider: vagrant - working-dir: /tmp/dtt1-poc - -tasks: - # Generic agent test task - - task: "run-agent-tests-{agent}" - description: "Run tests uninstall for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/test.py - - inventory: "{working-dir}/agent-{agent}/inventory.yaml" - - dependencies: - - manager: "{working-dir}/manager-{manager-os}/inventory.yaml" - - agent: "{working-dir}/agent-{agent}/inventory.yaml" - - tests: "install,register,stop" - - component: "agent" - - wazuh-version: "4.7.1" - - wazuh-revision: "40709" - depends-on: - - "provision-install-{agent}" - - "provision-manager" - foreach: - - variable: agents-os - as: agent - - # Generic agent test task - - task: "run-agent-tests-uninstall-{agent}" - description: "Run tests uninstall for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/test.py - - inventory: "{working-dir}/agent-{agent}/inventory.yaml" - - dependency: "{working-dir}/manager-{manager-os}/inventory.yaml" - - tests: "uninstall" - - component: "agent" - - wazuh-version: "4.7.1" - - wazuh-revision: "40709" - depends-on: - - "run-agent-tests-{agent}" - - "provision-uninstall-{agent}" - foreach: - - variable: agents-os - as: agent - - # Unique manager provision task - - task: "provision-manager" - description: "Provision the manager." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/provision.py - - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" - - install: - - component: wazuh-manager - type: package - depends-on: - - "allocate-manager" - - # Unique manager allocate task - - task: "allocate-manager" - description: "Allocate resources for the manager." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/allocation.py - - action: create - - provider: "{infra-provider}" - - size: large - - composite-name: "{manager-os}" - - inventory-output: "{working-dir}/manager-{manager-os}/inventory.yaml" - - track-output: "{working-dir}/manager-{manager-os}/track.yaml" - #cleanup: - # this: process - # with: - # path: python3 - # args: - # - /home/akim/Desktop/wazuh-qa/deployability/launchers/allocation.py - # - action: delete - # - track-output: "{working-dir}/manager-{manager-os}/track.yaml" - - # Generic agent provision task - - task: "provision-install-{agent}" - description: "Provision resources for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/provision.py - - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" - - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" - - install: - - component: wazuh-agent - type: package - - component: curl - depends-on: - - "allocate-{agent}" - - "provision-manager" - foreach: - - variable: agents-os - as: agent - - # Generic agent provision task - - task: "provision-uninstall-{agent}" - description: "Provision resources for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/provision.py - - inventory-agent: "{working-dir}/agent-{agent}/inventory.yaml" - - inventory-manager: "{working-dir}/manager-{manager-os}/inventory.yaml" - - uninstall: - - component: wazuh-agent - type: package - depends-on: - - "provision-install-{agent}" - foreach: - - variable: agents-os - as: agent - - # Generic agent allocate task - - task: "allocate-{agent}" - description: "Allocate resources for the {agent} agent." - do: - this: process - with: - path: python3 - args: - - /home/akim/Desktop/wazuh-qa/deployability/launchers/allocation.py - - action: create - - provider: "{infra-provider}" - - size: small - - composite-name: "{agent}" - - inventory-output: "{working-dir}/agent-{agent}/inventory.yaml" - - track-output: "{working-dir}/agent-{agent}/track.yaml" - #cleanup: - # this: process - # with: - # path: python3 - # args: - # - allocation.py - # - action: delete - # - track-output: "{working-dir}/agent-{agent}/track.yaml" - foreach: - - variable: agents-os - as: agent \ No newline at end of file diff --git a/deployability/deps/build/lib/workflow_engine/examples/dtt1-agents.yaml b/deployability/deps/build/lib/workflow_engine/examples/dtt1-agents.yaml deleted file mode 100755 index e8a827282d..0000000000 --- a/deployability/deps/build/lib/workflow_engine/examples/dtt1-agents.yaml +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 -version: 0.1 -description: This workflow is used to test agents deployment. -variables: - agents-os: - - linux-redhat-7-amd64 - - linux-redhat-8-amd64 - - linux-redhat-9-amd64 - - linux-centos-7-amd64 - - linux-centos-8-amd64 - - linux-debian-10-amd64 - - linux-debian-11-amd64 - - linux-debian-12-amd64 - - linux-ubuntu-18.04-amd64 - - linux-ubuntu-20.04-amd64 - - linux-ubuntu-22.04-amd64 - - linux-fedora-37-amd64 - - linux-fedora-38-amd64 - - linux-suse-15-amd64 - - linux-opensuse-15-amd64 - - linux-oracle-9-amd64 - - linux-amazon-2-amd64 - - linux-amazon-2023-amd64 - - windows-10-amd64 - - windows-11-amd64 - - windows-server2012-amd64 - - windows-server2016-amd64 - - windows-server2019-amd64 - - windows-server2022-amd64 - - macos-13.3-amd64 - - macos-14.2-amd64 - manager-os: linux-amazon-2023-amd64 - -tasks: - # Generic agent test task - - task: "test-agent-{agent}" - description: "Run tests for the {agent} agent." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running tests for {agent}" - depends-on: - - "provision-agent-{agent}" - foreach: - - variable: agents-os - as: agent - - # Unique manager provision task - - task: "provision-manager-{manager-os}" - description: "Provision the manager." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running provision for manager" - depends-on: - - "allocate-manager-{manager-os}" - - # Unique manager allocate task - - task: "allocate-manager-{manager-os}" - description: "Allocate resources for the manager." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running allocate for manager" - cleanup: - this: process - with: - path: /bin/echo - args: - - -n - - "Running cleanup for manager" - - # Generic agent provision task - - task: "provision-agent-{agent}" - description: "Provision resources for the {agent} agent." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running provision for {agent}" - depends-on: - - "allocate-agent-{agent}" - - "provision-manager-{manager-os}" - foreach: - - variable: agents-os - as: agent - - # Generic agent allocate task - - task: "allocate-agent-{agent}" - description: "Allocate resources for the {agent} agent." - do: - this: process - with: - path: /bin/echo - args: - - -n - - "Running allocate for {agent}" - cleanup: - this: process - with: - path: /bin/echo - args: - - -n - - "Running cleanup for allocate for {agent}" - foreach: - - variable: agents-os - as: agent \ No newline at end of file diff --git a/deployability/deps/build/lib/workflow_engine/examples/dtt1-managers.yaml b/deployability/deps/build/lib/workflow_engine/examples/dtt1-managers.yaml deleted file mode 100755 index 503cd115a3..0000000000 --- a/deployability/deps/build/lib/workflow_engine/examples/dtt1-managers.yaml +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 -version: 0.1 -description: This workflow is used to test managers deployment. Two agents per manager are deployed. -variables: - agents-os: - - linux-debian-12-amd64 - - linux-ubuntu-22.04-amd64 - managers-os: - - linux-redhat-7-amd64 - - linux-redhat-8-amd64 - - linux-redhat-9-amd64 - - linux-centos-7-amd64 - - linux-centos-8-amd64 - - linux-debian-10-amd64 - - linux-debian-11-amd64 - - linux-debian-12-amd64 - - linux-ubuntu-18.04-amd64 - - linux-ubuntu-20.04-amd64 - - linux-ubuntu-22.04-amd64 - - linux-fedora-37-amd64 - - linux-fedora-38-amd64 - - linux-suse-15-amd64 - - linux-opensuse-15-amd64 - - linux-oracle-9-amd64 - - linux-amazon-2-amd64 - - linux-amazon-2023-amd64 -tasks: - # Generic manager test task - - task: "test-{manager}-{agent}" - do: - this: process - with: - path: /bin/echo - args: - - Executing tests for {manager} manager with {agent} agent. - depends-on: - - "provision-{manager}-manager" - - "provision-{agent}-agent-for-{manager}-manager" - foreach: - - variable: managers-os - as: manager - - variable: agents-os - as: agent - - # --------- Provision -------------- - # Generic manager provision task - - task: "provision-{manager}-manager" - do: - this: process - with: - path: /bin/echo - args: - - Executing provision for {manager} as a manager. - depends-on: - - "allocate-{manager}-manager" - foreach: - - variable: managers-os - as: manager - - # Generic agent provision task - - task: "provision-{agent}-agent-for-{manager}-manager" - do: - this: process - with: - path: /bin/echo - args: - - Executing provision for {agent} as an agent. - depends-on: - - "allocate-{agent}-agent-for-{manager}-manager" - foreach: - - variable: managers-os - as: manager - - variable: agents-os - as: agent - - # --------- Allocate -------------- - # Generic manager allocate task - - task: "allocate-{manager}-manager" - do: - this: process - with: - path: /bin/echo - args: - - Executing allocation for {manager} as a manager. - foreach: - - variable: managers-os - as: manager - - # Generic agent allocate task - - task: "allocate-{agent}-agent-for-{manager}-manager" - do: - this: process - with: - path: /bin/echo - args: - - Executing allocation for {agent} as an agent. - foreach: - - variable: managers-os - as: manager - - variable: agents-os - as: agent diff --git a/deployability/deps/build/lib/workflow_engine/models.py b/deployability/deps/build/lib/workflow_engine/models.py deleted file mode 100644 index b6bf2b7fea..0000000000 --- a/deployability/deps/build/lib/workflow_engine/models.py +++ /dev/null @@ -1,11 +0,0 @@ -from pathlib import Path -from typing import Literal -from pydantic import BaseModel - - -class InputPayload(BaseModel): - workflow_file: str | Path - threads: int = 1 - dry_run: bool = False - log_level: Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] = 'INFO' - schema_file: str | Path | None = None diff --git a/deployability/deps/build/lib/workflow_engine/schema_validator.py b/deployability/deps/build/lib/workflow_engine/schema_validator.py deleted file mode 100755 index effe2d2925..0000000000 --- a/deployability/deps/build/lib/workflow_engine/schema_validator.py +++ /dev/null @@ -1,77 +0,0 @@ -import jsonschema -import json - -from jsonschema.exceptions import ValidationError -from pathlib import Path -from ruamel.yaml import YAML -from workflow.utils import logger - -class SchemaValidator: - """ - A SchemaValidator class that validates a YAML file against a JSON schema. - - Attributes: - schema_data (dict): The schema data. - yaml_data (dict): The YAML data. - """ - - def __init__(self, schema: Path | str, to_validate: Path | str): - """ - Initializes the SchemaValidator object. - - Args: - schema (Path, str): The path to the schema file. - to_validate (Path, str): The path to the YAML file to validate. - """ - schema_data: str = None - yaml_data: str = None - - self.logger = logger(Path(__file__).stem).get_logger() - with open(schema, 'r') as schema_file: - self.logger.debug(f"Loading schema file: {schema}") - schema_data = json.load(schema_file) - - with open(to_validate, 'r') as file: - self.logger.debug(f"Loading yaml file: {to_validate}") - yaml = YAML(typ='safe', pure=True) - yaml_data = yaml.load(file) - - self.schema_data = schema_data - self.yaml_data = yaml_data - - def preprocess_data(self) -> None: - """ - Preprocess the YAML data to be validated. - - Raises: - ValidationError: If the YAML data is not valid. - """ - for task in self.yaml_data.get('tasks', []): - do_with = task.get('do', {}).get('with', {}) - this_value = task.get('do', {}).get('this', '') - - if this_value == 'process': - if 'path' not in do_with or 'args' not in do_with: - raise ValidationError(f"Missing required properties in 'with' for task: {task}") - - do_with = task.get('cleanup', {}).get('with', {}) - this_value = task.get('cleanup', {}).get('this', '') - - if this_value == 'process': - if 'path' not in do_with or 'args' not in do_with: - raise ValidationError(f"Missing required properties in 'with' for task: {task}") - - def validateSchema(self) -> None: - """ - Validate the Workflow schema - - Raises: - ValidationError: If the YAML data is not valid. - Exception: If an unexpected error occurs. - """ - try: - jsonschema.validate(self.yaml_data, self.schema_data) - except ValidationError as e: - self.logger.error(f"Schema validation error: {e}") - except Exception as e: - self.logger.error(f"Unexpected error at schema validation: {e}") diff --git a/deployability/deps/build/lib/workflow_engine/schemas/schema_v1.json b/deployability/deps/build/lib/workflow_engine/schemas/schema_v1.json deleted file mode 100644 index 81f8e7c587..0000000000 --- a/deployability/deps/build/lib/workflow_engine/schemas/schema_v1.json +++ /dev/null @@ -1,118 +0,0 @@ -{ - "type": "object", - "properties": { - "name": {"type": "string"}, - "description": {"type": "string"}, - "version": {"type": "number"}, - "tasks": { - "type": "array", - "items": { - "type": "object", - "properties": { - "task": {"type": "string"}, - "do": { - "type": "object", - "properties": { - "this": {"type": "string"}, - "with": { - "type": "object", - "properties": { - "this": {"type": "string"}, - "args": { - "type": "array", - "items": { - "oneOf": [ - {"type": "string"}, - {"type": "array"}, - {"type": "object"} - ] - } - }, - "path": {"type": "string"} - } - } - }, - "required": ["this"] - }, - "cleanup": { - "type": "object", - "properties": { - "this": {"type": "string"}, - "with": { - "type": "object", - "properties": { - "this": {"type": "string"}, - "args": { - "type": "array", - "items": { - "oneOf": [ - {"type": "string"}, - {"type": "array"}, - {"type": "object"} - ] - } - }, - "path": {"type": "string"} - } - } - }, - "required": ["this"] - }, - "depends-on": { - "type": "array", - "items": {"type": "string"} - }, - "foreach": { - "type": "array", - "items": { - "type": "object", - "properties": { - "variable": {"type": "string"}, - "as": {"type": "string"}, - "foreach": { - "type": "array", - "items": { - "type": "object", - "properties": { - "variable": {"type": "string"}, - "as": {"type": "string"} - } - } - } - }, - "required": ["variable", "as"] - } - } - }, - "required": ["task", "do"] - - }, - "minItems": 1 - }, - "variables": { - "type": "object", - "properties": { - "agent-os": { - "type": "array", - "items": { - "oneOf": [ - {"type": "string"}, - {"type": "array"} - ] - } - }, - "managers-os": { - "type": "array", - "items": { - "oneOf": [ - {"type": "string"}, - {"type": "array"} - ] - } - } - } - } - }, - "required": ["tasks", "variables", "version"], - "additionalProperties": false - } \ No newline at end of file diff --git a/deployability/deps/build/lib/workflow_engine/task.py b/deployability/deps/build/lib/workflow_engine/task.py deleted file mode 100755 index 28181b5972..0000000000 --- a/deployability/deps/build/lib/workflow_engine/task.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 - -import subprocess -import random -import time - -from abc import ABC, abstractmethod -from workflow.utils import logger - - -class Task(ABC): - """Abstract base class for tasks.""" - - @abstractmethod - def execute(self) -> None: - """Execute the task.""" - pass - - -class ProcessTask(Task): - """Task for executing a process.""" - - def __init__(self, task_name: str, task_parameters: dict): - """ - Initialize ProcessTask. - - Args: - task_name (str): Name of the task. - task_parameters (dict): Parameters for the task. - logger (logging.Logger): Logger instance. - """ - self.task_name = task_name - self.task_parameters = task_parameters - self.logger = logger - - def execute(self) -> None: - """Execute the process task.""" - - task_args = [] - for arg in self.task_parameters['args']: - if isinstance(arg, str): - task_args.append(arg) - elif isinstance(arg, dict): - key, value = list(arg.items())[0] - if isinstance(value, list): - for argvalue in value: - print(f"argvalue {argvalue}") - task_args.extend([f"--{key}={argvalue}" for argvalue in value]) - else: - task_args.append(f"--{key}={value}") - print(f"task_args {task_args}") - result = None - try: - result = subprocess.run( - [self.task_parameters['path']] + task_args, - check=True, - capture_output=True, - text=True, - ) - - logger.info(str(result.stdout)) - logger.info("%s: %s", "Finish task: ", self.task_name, extra={'tag': self.task_name}) - - - if result.returncode != 0: - raise subprocess.CalledProcessError(returncode=result.returncode, cmd=result.args, output=result.stdout) - except subprocess.CalledProcessError as e: - raise Exception(f"Error executing process task {e.stderr}") - -class DummyTask(Task): - def __init__(self, task_name, task_parameters): - self.task_name = task_name - self.task_parameters = task_parameters - - def execute(self): - message = self.task_parameters.get('message', 'No message provided') - logger.info("%s: %s", message, self.task_name, extra={'tag': self.task_name}) - - -class DummyRandomTask(Task): - def __init__(self, task_name, task_parameters): - self.task_name = task_name - self.task_parameters = task_parameters - - def execute(self): - time_interval = self.task_parameters.get('time-seconds', [1, 5]) - sleep_time = random.uniform(time_interval[0], time_interval[1]) - - message = self.task_parameters.get('message', 'No message provided') - logger.info("%s: %s (Sleeping for %.2f seconds)", message, self.task_name, sleep_time, extra={'tag': self.task_name}) - - time.sleep(sleep_time) - - -TASKS_HANDLERS = { - 'process': ProcessTask, - 'dummy': DummyTask, - 'dummy-random': DummyRandomTask, -} diff --git a/deployability/deps/build/lib/workflow_engine/utils.py b/deployability/deps/build/lib/workflow_engine/utils.py deleted file mode 100644 index 3d64f331a8..0000000000 --- a/deployability/deps/build/lib/workflow_engine/utils.py +++ /dev/null @@ -1,6 +0,0 @@ -import logging - -# The logging module will use the generic logger anyway -# that is because the logging module is a singleton. -# So, we can just use it this way here, and it will work. -logger = (lambda: logging.getLogger("workflow_engine"))() diff --git a/deployability/deps/build/lib/workflow_engine/workflow_processor.py b/deployability/deps/build/lib/workflow_engine/workflow_processor.py deleted file mode 100755 index 50e2bf428b..0000000000 --- a/deployability/deps/build/lib/workflow_engine/workflow_processor.py +++ /dev/null @@ -1,357 +0,0 @@ -# Copyright (C) 2015, Wazuh Inc. -# Created by Wazuh, Inc. . -# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2 - -import concurrent.futures -import graphlib -import json -import time -import yaml - -from pathlib import Path -from itertools import product - -from workflow.schema_validator import SchemaValidator -from workflow.task import * -from workflow.utils import logger - - - -class WorkflowFile: - """Class for loading and processing a workflow file.""" - schema_path = Path(__file__).parent / 'schemas' / 'schema_v1.json' - - def __init__(self, workflow_file_path: Path | str, schema_path: Path | str = None) -> None: - self.schema_path = schema_path or self.schema_path - self.__validate_schema(workflow_file_path) - self.workflow_raw_data = self.__load_workflow(workflow_file_path) - self.task_collection = self.__process_workflow() - self.__static_workflow_validation() - - def __validate_schema(self, workflow_file: Path | str) -> None: - """ - Validate the workflow file against the schema. - - Args: - workflow_file (Path | str): Path to the workflow file. - """ - validator = SchemaValidator(self.schema_path, workflow_file) - validator.preprocess_data() - validator.validateSchema() - - def __load_workflow(self, file_path: str) -> dict: - """ - Load the workflow data from a file. - - Args: - file_path (str): Path to the workflow file. - - Returns: - dict: Workflow data. - """ - with open(file_path, 'r', encoding='utf-8') as file: - return yaml.safe_load(file) - - def __process_workflow(self): - """Process the workflow and return a list of tasks.""" - task_collection = [] - variables = self.workflow_raw_data.get('variables', {}) - for task in self.workflow_raw_data.get('tasks', []): - task_collection.extend(self.__expand_task(task, variables)) - return task_collection - - def __replace_placeholders(self, element: str, values: dict, parent_key: str = None): - """ - Recursively replace placeholders in a dictionary or list. - - Args: - element (Any): The element to process. - values (dict): The values to replace placeholders. - parent_key (str): The parent key for nested replacements. - - Returns: - Any: The processed element. - """ - if isinstance(element, dict): - return {key: self.__replace_placeholders(value, values, key) for key, value in element.items()} - if isinstance(element, list): - return [self.__replace_placeholders(sub_element, values, parent_key) for sub_element in element] - if isinstance(element, str): - return element.format_map(values) - return element - - def __expand_task(self, task: dict, variables: dict): - """ - Expand a task with variable values. - - Args: - task (dict): The task to expand. - variables (dict): Variable values. - - Returns: - List[dict]: List of expanded tasks. - """ - expanded_tasks = [] - - if 'foreach' in task: - loop_variables = task.get('foreach', [{}]) - - variable_names = [loop_variable_data.get('variable') for loop_variable_data in loop_variables] - as_identifiers = [loop_variable_data.get('as') for loop_variable_data in loop_variables] - - variable_values = [variables.get(name, []) for name in variable_names] - - for combination in product(*variable_values): - variables_with_items = {**variables, **dict(zip(as_identifiers, combination))} - expanded_tasks.append(self.__replace_placeholders(task, variables_with_items)) - else: - expanded_tasks.append(self.__replace_placeholders(task, variables)) - - return expanded_tasks - - def __static_workflow_validation(self): - """Validate the workflow against static criteria.""" - def check_duplicated_tasks(self): - """Validate task name duplication.""" - task_name_counts = {task['task']: 0 for task in self.task_collection} - - for task in self.task_collection: - task_name_counts[task['task']] += 1 - - duplicates = [name for name, count in task_name_counts.items() if count > 1] - - if duplicates: - raise ValueError(f"Duplicated task names: {', '.join(duplicates)}") - - def check_not_existing_tasks(self): - """Validate task existance.""" - task_names = {task['task'] for task in self.task_collection} - - for dependencies in [task.get('depends-on', []) for task in self.task_collection]: - non_existing_dependencies = [dependency for dependency in dependencies if dependency not in task_names] - if non_existing_dependencies: - raise ValueError(f"Tasks do not exist: {', '.join(non_existing_dependencies)}") - - validations = [check_duplicated_tasks, check_not_existing_tasks] - for validation in validations: - validation(self) - - -class DAG(): - """Class for creating a dependency graph.""" - def __init__(self, task_collection: list, reverse: bool = False): - self.task_collection = task_collection - self.reverse = reverse - self.dag, self.dependency_tree = self.__build_dag() - self.to_be_canceled = set() - self.finished_tasks_status = { - 'failed': set(), - 'canceled': set(), - 'successful': set(), - } - self.execution_plan = self.__create_execution_plan(self.dependency_tree) - self.dag.prepare() - - def is_active(self) -> bool: - """Check if the DAG is active.""" - return self.dag.is_active() - - def get_available_tasks(self) -> list: - """Get the available tasks.""" - return self.dag.get_ready() - - def get_execution_plan(self) -> dict: - """Get the execution plan.""" - return self.execution_plan - - def set_status(self, task_name: str, status: str): - """Set the status of a task.""" - self.finished_tasks_status[status].add(task_name) - self.dag.done(task_name) - - def should_be_canceled(self, task_name: str) -> bool: - """Check if a task should be canceled.""" - return task_name in self.to_be_canceled - - def __build_dag(self): - """Build a dependency graph for the tasks.""" - dependency_dict = {} - dag = graphlib.TopologicalSorter() - - for task in self.task_collection: - task_name = task['task'] - dependencies = task.get('depends-on', []) - - if self.reverse: - for dependency in dependencies: - dag.add(dependency, task_name) - else: - dag.add(task_name, *dependencies) - - dependency_dict[task_name] = dependencies - - return dag, dependency_dict - - def cancel_dependant_tasks(self, task_name, cancel_policy) -> None: - """Cancel all tasks that depend on a failed task.""" - def get_all_task_set(tasks): - task_set = set() - - for task, sub_tasks in tasks.items(): - task_set.add(task) - task_set.update(get_all_task_set(sub_tasks)) - - return task_set - - if cancel_policy == 'continue': - return - - not_cancelled_tasks = self.finished_tasks_status['failed'].union(self.finished_tasks_status['successful']) - for root_task, sub_tasks in self.execution_plan.items(): - task_set = get_all_task_set({root_task: sub_tasks}) - if cancel_policy == 'abort-all': - self.to_be_canceled.update(task_set) - elif cancel_policy == 'abort-related-flows': - if task_name in task_set: - self.to_be_canceled.update(task_set - not_cancelled_tasks) - else: - raise ValueError(f"Unknown cancel policy '{cancel_policy}'.") - - def __create_execution_plan(self, dependency_dict: dict) -> dict: - - execution_plan = {} - - def get_root_tasks(dependency_dict: dict) -> set: - """Get root tasks from the dependency dictionary.""" - all_tasks = set(dependency_dict.keys()) - dependent_tasks = set(dep for dependents in dependency_dict.values() for dep in dependents) - return all_tasks - dependent_tasks - - def get_subtask_plan(task_name: str, dependency_dict: dict, level: int = 0) -> dict: - """Create the execution plan recursively as a dictionary.""" - if task_name not in dependency_dict: - return {task_name: {}} - - dependencies = dependency_dict[task_name] - plan = {task_name: {}} - - for dependency in dependencies: - sub_plan = get_subtask_plan(dependency, dependency_dict, level + 1) - plan[task_name].update(sub_plan) - - return plan - - root_tasks = get_root_tasks(dependency_dict) - for root_task in root_tasks: - execution_plan.update(get_subtask_plan(root_task, dependency_dict)) - - return execution_plan - - -class WorkflowProcessor: - """Class for processing a workflow.""" - - def __init__(self, workflow_file: str, dry_run: bool, threads: int, log_level: str = 'INFO', schema_file: Path | str = None): - """ - Initialize WorkflowProcessor. - - Args: - workflow_file (str): Path to the workflow file (YAML format). - dry_run (bool): Display the plan without executing tasks. - threads (int): Number of threads to use for parallel execution. - log_level (str): Log level. - schema_file (Path | str): Path to the schema file (YAML format). - """ - logger.setLevel(log_level) - # Initialize the instance variables. - self.task_collection = WorkflowFile(workflow_file, schema_file).task_collection - self.dry_run = dry_run - self.threads = threads - - def execute_task(self, dag: DAG, task: dict, action) -> None: - """Execute a task.""" - task_name = task['task'] - if dag.should_be_canceled(task_name): - logger.warning("[%s] Skipping task due to dependency failure.", task_name) - dag.set_status(task_name, 'canceled') - else: - try: - task_object = self.create_task_object(task, action) - - logger.info("[%s] Starting task.", task_name) - start_time = time.time() - task_object.execute() - logger.info("[%s] Finished task in %.2f seconds.", task_name, time.time() - start_time) - dag.set_status(task_name, 'successful') - except Exception as e: - # Pass the tag to the tag_formatter function if it exists - logger.error("[%s] Task failed with error: %s.", task_name, e) - dag.set_status(task_name, 'failed') - dag.cancel_dependant_tasks(task_name, task.get('on-error', 'abort-related-flows')) - # Handle the exception or re-raise if necessary - raise - - def create_task_object(self, task: dict, action) -> Task: - """Create and return a Task object based on task type.""" - task_type = task[action]['this'] - - task_handler = TASKS_HANDLERS.get(task_type) - - if task_handler is not None: - return task_handler(task['task'], task[action]['with']) - - raise ValueError(f"Unknown task type '{task_type}'.") - - def execute_tasks_parallel(self) -> None: - """Execute tasks in parallel.""" - if not self.dry_run: - logger.info("Executing tasks in parallel.") - dag = DAG(self.task_collection) - # Execute tasks based on the DAG - with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor: - futures = {} - while True: - if not dag.is_active(): - break - for task_name in dag.get_available_tasks(): - task = next(t for t in self.task_collection if t['task'] == task_name) - future = executor.submit(self.execute_task, dag, task, 'do') - futures[task_name] = future - concurrent.futures.wait(futures.values()) - - # Now execute cleanup tasks based on the reverse DAG - reverse_dag = DAG(self.task_collection, reverse=True) - - logger.info("Executing cleanup tasks.") - with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor: - reverse_futures = {} - - while True: - if not reverse_dag.is_active(): - break - for task_name in reverse_dag.get_available_tasks(): - task = next(t for t in self.task_collection if t['task'] == task_name) - if 'cleanup' in task: - future = executor.submit(self.execute_task, reverse_dag, task, 'cleanup') - reverse_futures[task_name] = future - else: - reverse_dag.set_status(task_name, 'successful') - concurrent.futures.wait(reverse_futures.values()) - - else: - dag = DAG(self.task_collection) - logger.info("Execution plan: %s", json.dumps(dag.get_execution_plan(), indent=2)) - - def run(self) -> None: - """Main entry point.""" - self.execute_tasks_parallel() - - def abort_execution(self, executor: concurrent.futures.ThreadPoolExecutor, futures: dict) -> None: - """Abort the execution of tasks.""" - for future in concurrent.futures.as_completed(futures.values()): - try: - _ = future.result() - except Exception as e: - logger.error("Error in aborted task: %s", e) - - executor.shutdown(wait=False) diff --git a/deployability/deps/workflow.egg-info/PKG-INFO b/deployability/deps/workflow.egg-info/PKG-INFO deleted file mode 100644 index 676acd52e7..0000000000 --- a/deployability/deps/workflow.egg-info/PKG-INFO +++ /dev/null @@ -1,8 +0,0 @@ -Metadata-Version: 2.1 -Name: workflow -Version: 1.0 -Summary: Wazuh testing utilities to help programmers automate deployment tests -Home-page: https://github.com/wazuh -Author: Wazuh -Author-email: hello@wazuh.com -License: GPLv2 diff --git a/deployability/deps/workflow.egg-info/SOURCES.txt b/deployability/deps/workflow.egg-info/SOURCES.txt deleted file mode 100644 index 3455838868..0000000000 --- a/deployability/deps/workflow.egg-info/SOURCES.txt +++ /dev/null @@ -1,24 +0,0 @@ -setup.py -../modules/workflow_engine/README.md -../modules/workflow_engine/__init__.py -../modules/workflow_engine/models.py -../modules/workflow_engine/schema_validator.py -../modules/workflow_engine/task.py -../modules/workflow_engine/utils.py -../modules/workflow_engine/workflow_processor.py -../modules/workflow_engine/__pycache__/__init__.cpython-310.pyc -../modules/workflow_engine/__pycache__/models.cpython-310.pyc -../modules/workflow_engine/__pycache__/schema_validator.cpython-310.pyc -../modules/workflow_engine/__pycache__/task.cpython-310.pyc -../modules/workflow_engine/__pycache__/utils.cpython-310.pyc -../modules/workflow_engine/__pycache__/workflow_processor.cpython-310.pyc -../modules/workflow_engine/examples/dtt1-agents-poc.yaml -../modules/workflow_engine/examples/dtt1-agents.yaml -../modules/workflow_engine/examples/dtt1-managers.yaml -../modules/workflow_engine/schemas/schema_v1.json -workflow.egg-info/PKG-INFO -workflow.egg-info/SOURCES.txt -workflow.egg-info/dependency_links.txt -workflow.egg-info/entry_points.txt -workflow.egg-info/not-zip-safe -workflow.egg-info/top_level.txt \ No newline at end of file diff --git a/deployability/deps/workflow.egg-info/dependency_links.txt b/deployability/deps/workflow.egg-info/dependency_links.txt deleted file mode 100644 index 8b13789179..0000000000 --- a/deployability/deps/workflow.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/deployability/deps/workflow.egg-info/entry_points.txt b/deployability/deps/workflow.egg-info/entry_points.txt deleted file mode 100644 index 156b170f3d..0000000000 --- a/deployability/deps/workflow.egg-info/entry_points.txt +++ /dev/null @@ -1,6 +0,0 @@ -[console_scripts] -models = workflow.models:main -schema_validator = workflow.schema_validator:main -task = workflow.task:main -utils = workflow.utils:main -workflow_processor = workflow.workflow_processor:main diff --git a/deployability/deps/workflow.egg-info/not-zip-safe b/deployability/deps/workflow.egg-info/not-zip-safe deleted file mode 100644 index 8b13789179..0000000000 --- a/deployability/deps/workflow.egg-info/not-zip-safe +++ /dev/null @@ -1 +0,0 @@ - diff --git a/deployability/deps/workflow.egg-info/top_level.txt b/deployability/deps/workflow.egg-info/top_level.txt deleted file mode 100644 index 16d139c485..0000000000 --- a/deployability/deps/workflow.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -workflow_engine