Skip to content

Commit

Permalink
Fine-tuned todos a bit + migrated flops_utils into the main repo
Browse files Browse the repository at this point in the history
Still need to update the flops_utils dependency links
  • Loading branch information
Malyuk-A committed Oct 4, 2024
1 parent 5c507ef commit 4072970
Show file tree
Hide file tree
Showing 23 changed files with 540 additions and 77 deletions.
62 changes: 0 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,65 +3,3 @@
The documentation is a work in progress.

Please take a look at the [releases](https://github.com/oakestra/addon-FLOps/releases).

<!-- # FLOps Management
<img src="https://github.com/oakestra/plugin-FLOps-management/assets/65814168/c8ba7bce-634e-46a3-a62c-5c152b7f4829" alt="Description of image" width="50%" height="auto">
## FLOps Manager
The manager is responsible for:
- Serving the FLOps API (Entrypoint for FLOps processes/projects)
- Requesting/Managing FLOps components/services e.g. by delegating calls to Oakestra.
- Interacting with the FLOps DB for storing persistent data.
- Is connected to the FLOps MQTT Broker and can handle incoming messages.
- E.g. if the FL Image Builder succeeds - start spawning the FL Aggregator and FL Learner.
- If the Builder fails - log the specific error and handle the FLOps Project termination.
## FLOps MQTT Broker
A [Mosquitto MQTT Broker](https://mosquitto.org/) that enabled communication between the FLOps components deployed on Workers and the Manager.
## FLOps Image Registry
A [CNCF Distribution Registry](https://distribution.github.io/distribution/) that allows the FLOps Image Builder to store/push its images to and for the FL Learners to pull them.
## FLOps DB
- Stores persistent data relevant to FLOps via MongoDB.
- All connected FLOps components that are part of the same FLOps project need to be able to identify and recall each other if need be.
- E.g. Once the FLOps Image Builder finishes building it calls the FLOps Manager to continue with the next FLOps Project steps. Including the undeployment of the current Builder service and spawning the Aggregator and Learners. To do so the Manager needs the FLOps ProjectID to retrieve the necessary details of the Builder and to initiate the upcoming components. These details (mostly IDs) are stored in the FLOps DB.
## MLflow MLOps
TODO/WIP
#############
TODO update readme -->

<!-- # FLOps Project Components
<img src="https://github.com/oakestra/plugin-FLOps-project-components/assets/65814168/40cccf6c-25fc-437d-bae4-7f799b7f326c" alt="Description of image" width="50%" height="auto">
A FLOps Project is a logical unit that groups together all related FLOps components that are necessary to fulfill a concrete user FLOps request.
A Project enabled its components to find and communicate with each other.
E.g. A user wants to run FLOps on his provided ML repo. All following processes that get triggered due to these initial requests are part of one FLOps project.
Another call will lead to a different FLOps project.
Thus supporting multiple FLOps projects and components running at the same time - i.e. serving multiple user FL requests at the same time.</br>
## FL (Learner) Image Builder
- Is instantiated if the requested FL Learner Image is not found in the FLOps Image Registry.
- Builds the FL Learner Image to be able to create FL Learners (also called FL Clients) and pushes the resulting image to the FLOps image registry.
- This FL Learner image is based on the user-provided ML repo/code which gets wrapped into an FL-compatible image including all necessary dependencies for proper training.
## FLOps UI
- Enables the user/developer to easily inspect the current progress of his FLOps processes/project.
- Depending on the use case a non-developer user will not be able to see other components but the UI to abstract away technical details and complexities.
- Every FLOps component has the capability to send a message to the UI (via internal Oakestra service networking), including the FLOps manager (which uses MQTT, because the manager is not deployed as a service).
## FLOps Aggregator
- The FL Server that aggregates the Learner updates.
## FL Learner
- The FL Client that trains the model and sends its updates ot the aggregator. -->
5 changes: 5 additions & 0 deletions flops_manager_package/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@ ENV FLOPS_MQTT_BROKER_URL=flops_mqtt
ENV SYSTEM_MANAGER_PORT=10000

RUN poetry add git+https://github.com/Malyuk-A/tmp_flops_utils.git@main
#RUN poetry add git+https://github.com/oakestra/addon-FLOps.git@main#subdirectory=utils_library/flops_utils
# !!!!!!!!!!!!!! TODO test if this is infact working as expected, if so, continue with migrating futher places wher flops_utils is used
# Do not forget the CLI
# Once this is confirmed to be working, put the legacy project in private mode,
# once this is also working maybe even delete the legacy project

CMD ["poetry", "run", "python", "main.py"]
2 changes: 0 additions & 2 deletions flops_manager_package/flops_manager/classes/apps/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class _TrainingConfiguration(BaseModel):
)
training_cycles: int = Field(
default=1,
# TODO check naming with ML/FL naming conventions
# Maybe "period, round, cycle" are already coined and mean something different.
description="""
(Only applicable for the 'hierarchical' mode.)
The number of training & evaluation rounds performed between
Expand Down
4 changes: 0 additions & 4 deletions flops_manager_package/flops_manager/utils/sla/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,10 @@ def to_json_dict(self) -> dict:

class LatencyConstraint(SlaConstraint):
type = "latency"
# TODO
pass


class GeoConstraint(SlaConstraint):
type = "geo"
# TODO
pass


class AddonConstraint(SlaConstraint):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ def handle_aggregator(

try:
model_manager = model_manager or get_model_manager()
if not aggregator_context.track_locally and aggregator_context.should_use_mlflow:
if (
not aggregator_context.track_locally
and aggregator_context.should_use_mlflow
):
mlflow.set_tracking_uri(aggregator_context.mlflow_tracking_server_url)
if aggregator_context.should_use_mlflow:
# NOTE: A MLflow experiment consists of multiple runs.
Expand All @@ -73,7 +76,9 @@ def handle_aggregator(
aggregator_context=aggregator_context,
model_manager=model_manager,
mlflow_experiment_id=(
mlflow_experiment.experiment_id if aggregator_context.should_use_mlflow else None
mlflow_experiment.experiment_id
if aggregator_context.should_use_mlflow
else None
),
# NOTE: The Flower Strategy lacks the notion of the number of expected training rounds.
requested_total_number_of_training_rounds=aggregator_context.training_iterations,
Expand All @@ -82,8 +87,11 @@ def handle_aggregator(
min_evaluate_clients=aggregator_context.min_evaluate_clients,
# NOTE: Instead of waiting for a random Learner to provide the init params.
# We let the Aggregator set the init params.
# TODO: add paper reference - that shows that
# setting init params can give a massive boost/benefit.
#
# Paper - 2022 - Nguyen et al.
# "Where to Begin?
# On the Impact of Pre-Training and Initialization in Federated Learning"
# This paper proves that setting init params can give a massive boost/benefit.
# (We are not doing anything fancy here yet - but this can be easily enhanced.)
#
# This also enables us to chain together multiple training cycles,
Expand All @@ -98,6 +106,8 @@ def handle_aggregator(
# More info: https://discuss.flower.ai/t/how-do-i-start-from-a-pre-trained-model/73/2
# TODO: Add a check in the builder to verify that the user provided code
# (the get_params() method) can be properly transformed into Flower Parameters.
# I.e. verify that the user provided code fits our FLOps requirements
# (structural, methods, etc.)
initial_parameters=fl.common.ndarrays_to_parameters(
model_manager.get_model_parameters() # type: ignore
),
Expand Down
4 changes: 3 additions & 1 deletion image_builder_package/fl_image_builder/image_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def build_image(
should_notify_observer: bool = False,
build_cmd_addition: str = "",
) -> None:
image_name_with_tag = image_name_with_tag or build_directory or context.get_image_name()
image_name_with_tag = (
image_name_with_tag or build_directory or context.get_image_name()
)
cwd = pathlib.Path.cwd()
# Important: Be very careful how and where you run buildah.
# If you run buildah incorrectly it can easily kill your host system.
Expand Down
4 changes: 3 additions & 1 deletion image_builder_package/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
requests ~=2.28.1
gitpython ~= 3.1.42
# NOTE: TODO add comment if things work out about how skinny works and the full fledged thing needs a ton of breaking depenendcies, etc.
# NOTE: There exists a more light-weight option for mlflow.
# However, it comes with many missing and breaking depenendcies, etc.
# Might be worth investigating as FUTURE WORK.
#mlflow-skinny ~= 2.12.1
mlflow ~= 2.12.1
git+https://github.com/Malyuk-A/tmp_flops_utils.git@main
Expand Down
2 changes: 0 additions & 2 deletions project_observer_package/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# addon-FLOps-Project-Observer
TODO

The FLOps UI image is used for observing the FLOps processes/project.
It is intended to inform the user/dev about the successful or failed FLOps processes like FL Learner image builder progress, etc.
Expand All @@ -8,7 +7,6 @@ This image comes with two communication "channels":
- A python socket based one that is intended for Oakestra-service-internal communication. It uses the Oakestra Network and can be reached e.g. by using this service's RR IP.
- A MQTT one that subscribes to the FLOps Manager MQTT broker. This one is necessary to be able to receive messages from the FLOps Manager which is not deployed as an Oakestra service.

TODO: The image can be found in the Oakestra image registry: `TODO`

## Input Params Explanation
Input arguments for the python module ```python main.py ...```
Expand Down
2 changes: 2 additions & 0 deletions utils_library/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__
dist
1 change: 0 additions & 1 deletion utils_library/README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# TODO
move the contents from the tmp_floips
18 changes: 18 additions & 0 deletions utils_library/flops_utils/env_vars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
import sys

from flops_utils.logging import logger

DOCKER_HOST_IP_LINUX = "172.17.0.1"

_ERROR_MESSAGE = (
"Terminating. Make sure to set the environment variables first. Missing: "
)


def get_env_var(name: str, default: str = "") -> str:
env_var = os.environ.get(name) or default
if env_var is None:
logger.fatal(f"{_ERROR_MESSAGE}'{name}'")
sys.exit(1)
return env_var
51 changes: 51 additions & 0 deletions utils_library/flops_utils/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Reference:
# https://alexandra-zaharia.github.io/posts/make-your-own-custom-color-formatter-with-python-logging/

import logging


class CustomFormatter(logging.Formatter):
"""Logging colored formatter, adapted from https://stackoverflow.com/a/56944256/3638629"""

grey = "\x1b[38;21m"
yellow = "\x1b[38;5;226m"
red = "\x1b[38;5;196m"
bold_red = "\x1b[31;1m"
blue = "\x1b[38;5;33m"
light_blue = "\x1b[38;5;45m"
reset = "\x1b[0m"

def __init__(self, fmt, with_color: bool = False):
super().__init__()
self.fmt = fmt
base_fmt = self.fmt + self.reset

if with_color:
self.FORMATS = {
logging.DEBUG: self.grey + base_fmt,
logging.INFO: self.light_blue + base_fmt,
logging.WARNING: self.yellow + base_fmt,
logging.ERROR: self.red + base_fmt,
logging.CRITICAL: self.bold_red + base_fmt,
}
else:
self.FORMATS = {level: base_fmt for level in logging._levelToName.values()}

def format(self, record):
log_fmt = self.FORMATS.get(record.levelno) # type: ignore
formatter = logging.Formatter(log_fmt)
return formatter.format(record)


def configure_logger(name, level, with_color=False):
logger = logging.getLogger(name)
logger.setLevel(level)
stream_handler = logging.StreamHandler()
formatter = CustomFormatter("%(message)s", with_color=with_color)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
return logger


logger = configure_logger("logger", logging.DEBUG)
colorful_logger = configure_logger("colorful_logger", logging.DEBUG, with_color=True)
28 changes: 28 additions & 0 deletions utils_library/flops_utils/ml_model_flavor_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Used to abstract away concrete uses of MLflow model flavors.
# Based on the provided flavor from the FLOps SLA the specific MLflow model flavor will be used.

import os
import sys

from flops_utils.logging import logger
from flops_utils.types import MLModelFlavor


def get_ml_model_flavor():
match MLModelFlavor(os.environ.get("ML_MODEL_FLAVOR")):
case MLModelFlavor.KERAS:
import mlflow.keras # type: ignore

return mlflow.keras
case MLModelFlavor.PYTORCH:
import mlflow.pytorch # type: ignore

return mlflow.pytorch

case MLModelFlavor.SKLEARN:
import mlflow.sklearn # type: ignore

return mlflow.sklearn
case _:
logger.exception("Provided MLModelFlavor is not supported yet.")
sys.exit(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .flops_learner_files_proxy import load_dataset
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Note: This proxy is used to provide ML repo developers/users with stub FLOps Learner components.
# E.g. The ML repo developer does not have access to any data of the worker nodes yet.
# This data will be fetched by the Learner's data_loading from the Data Manager Sidecar.
# This data_loading is part of the Learner image and should be abstracted away from the ML repo.
# To be able to include the data_loading methods in the ML repo code these mocks are provided.
# These mocks will be replaced with the real implementation during the FLOps image build process.

import sys

import datasets # type: ignore

from flops_utils.logging import logger


def load_dataset() -> datasets.Dataset:
"""Loads the data from the co-located ml-data-server from the learner node.
Returns a single dataset that encompasses all matching found data from the server.
This dataset is in "Arrow" format.
"""

try:
from data_loading import load_data_from_ml_data_server # type: ignore

return load_data_from_ml_data_server()
except ImportError:
logger.exception("The data_loading file was not found.")
sys.exit(1)
18 changes: 18 additions & 0 deletions utils_library/flops_utils/ml_repo_files_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Note: This proxy is used to handle ml repo files
# which get injected during the image build.
# I.e. these files are not yet present.
# Additionally it handles exceptions and helps linters, etc to work normally.

import sys

from flops_utils.logging import logger


def get_model_manager():
try:
from model_manager import ModelManager # type: ignore

return ModelManager()
except ImportError:
logger.exception("A ML repo file was not found.")
sys.exit(1)
Loading

0 comments on commit 4072970

Please sign in to comment.