From 0c4be45b0499a8e8163bbd5a487b8aba6e6ceb4d Mon Sep 17 00:00:00 2001 From: Karan Shah Date: Fri, 20 Dec 2024 21:07:20 +0530 Subject: [PATCH 1/4] Update README.md (#1225) * Fix taskrunner links Signed-off-by: MasterSkepticista * Tidy up readme Signed-off-by: MasterSkepticista * Tiny changes Signed-off-by: MasterSkepticista * More changes Signed-off-by: MasterSkepticista --------- Signed-off-by: MasterSkepticista --- README.md | 101 ++++++++++------------- docs/about/features_index/taskrunner.rst | 15 ++-- 2 files changed, 48 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index c6e9e53855..5ed611dd72 100644 --- a/README.md +++ b/README.md @@ -1,98 +1,83 @@
- +
- -[![PyPI - Python Version](https://img.shields.io/badge/python-3.9%20%7C%203.10%20%7C%203.11-blue)](https://pypi.org/project/openfl/) +[![PyPI version](https://img.shields.io/pypi/v/openfl)](https://pypi.org/project/openfl/) +[![Downloads](https://pepy.tech/badge/openfl)](https://pepy.tech/project/openfl) +[![DockerHub](https://img.shields.io/docker/pulls/intel/openfl.svg)](https://hub.docker.com/r/intel/openfl) [![Ubuntu CI status](https://github.com/securefederatedai/openfl/actions/workflows/ubuntu.yml/badge.svg)](https://github.com/securefederatedai/openfl/actions/workflows/ubuntu.yml) [![Windows CI status](https://github.com/securefederatedai/openfl/actions/workflows/windows.yml/badge.svg)](https://github.com/securefederatedai/openfl/actions/workflows/windows.yml) [![Documentation Status](https://readthedocs.org/projects/openfl/badge/?version=latest)](https://openfl.readthedocs.io/en/latest/?badge=latest) -[![Downloads](https://pepy.tech/badge/openfl)](https://pepy.tech/project/openfl) -[![DockerHub](https://img.shields.io/docker/pulls/intel/openfl.svg)](https://hub.docker.com/r/intel/openfl) -[![PyPI version](https://img.shields.io/pypi/v/openfl)](https://pypi.org/project/openfl/) [](https://join.slack.com/t/openfl/shared_invite/zt-ovzbohvn-T5fApk05~YS_iZhjJ5yaTw) -[![License](https://img.shields.io/badge/License-Apache%202.0-brightgreen.svg)](https://opensource.org/licenses/Apache-2.0) -[![Citation](https://img.shields.io/badge/cite-citation-brightgreen)](https://arxiv.org/abs/2105.06413) [![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/6599/badge)](https://bestpractices.coreinfrastructure.org/projects/6599) Coverity Scan Build Status -[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/intel/openfl/blob/develop/openfl-tutorials/Federated_Pytorch_MNIST_Tutorial.ipynb) - -Open Federated Learning (OpenFL) is a Python 3 framework for Federated Learning. OpenFL is designed to be a _flexible_, _extensible_ and _easily learnable_ tool for data scientists. OpenFL is hosted by The Linux Foundation, aims to be community-driven, and welcomes contributions back to the project. -Looking for the Open Flash Library project also referred to as OpenFL? Find it [here](https://github.com/openfl/openfl)! - -## Installation +[**Overview**](#overview) +| [**Features**](#features) +| [**Installation**](#installation) +| [**Changelog**](https://openfl.readthedocs.io/en/latest/releases.html) +| [**Documentation**](https://openfl.readthedocs.io/en/latest/) -You can simply install OpenFL from PyPI: +OpenFL is a Python framework for Federated Learning. It enables organizations to train and validate machine learning models on sensitive data. It increases privacy by allowing collaborative model training or validation across local private datasets without ever sharing that data with a central server. OpenFL is hosted by The Linux Foundation. -``` -$ pip install openfl -``` -For more installation options check out the [online documentation](https://openfl.readthedocs.io/en/latest/get_started/installation.html). +## Overview -## Getting Started +Federated Learning is a distributed machine learning approach that enables collaborative training and evaluation of models without sharing sensitive data such as, personal information, patient records, financial data, or classified information. The minimum data movement needed across a Federated Training experiment, is solely the model parameters and their updates. This is in contrast to a Centralized Learning regime, where all data needs to be moved to a central server or a datacenter for massively parallel training. -OpenFL supports two APIs to set up a Federated Learning experiment: +![Federated Learning](https://openfl.readthedocs.io/en/latest/_images/ct_vs_fl.png) -- [Task Runner API](https://openfl.readthedocs.io/en/latest/about/features_index/taskrunner.html): -Define an experiment and distribute it manually. All participants can verify model code and [FL plan](https://openfl.readthedocs.io/en/latest/about/features_index/taskrunner.html#federated-learning-plan-fl-plan-settings) prior to execution. The federation is terminated when the experiment is finished. This API is meant for enterprise-grade FL experiments, including support for mTLS-based communication channels and TEE-ready nodes (based on Intel® SGX). Follow the [Quick Start Guide](https://openfl.readthedocs.io/en/latest/get_started/quickstart.html#quick-start) for launching your first FL experiment locally. Then, refer to the [TaskRunner API Tutorial](https://github.com/securefederatedai/openfl/tree/develop/openfl-workspace/torch_cnn_mnist/README.md) for customizing the example workspace to your specific needs.
+OpenFL builds on a collaboration between Intel and the Bakas lab at the University of Pennsylvania (UPenn) to develop the [Federated Tumor Segmentation (FeTS)](https://www.fets.ai/) platform (grant award number: U01-CA242871). -- [Workflow API](https://openfl.readthedocs.io/en/latest/about/features_index/workflowinterface.html) ([*experimental*](https://openfl.readthedocs.io/en/latest/developer_guide/experimental_features.html)): -Create complex experiments that extend beyond traditional horizontal federated learning. This API enables an experiment to be simulated locally, then seamlessly scaled to a federated setting. See the [experimental tutorials](https://github.com/securefederatedai/openfl/blob/develop/openfl-tutorials/experimental/workflow/) to learn how to coordinate [aggregator validation after collaborator model training](https://github.com/securefederatedai/openfl/tree/develop/openfl-tutorials/experimental/workflow/102_Aggregator_Validation.ipynb), [perform global differentially private federated learning](https://github.com/psfoley/openfl/tree/experimental-workflow-interface/openfl-tutorials/experimental/workflow/Global_DP), measure the amount of private information embedded in a model after collaborator training with [privacy meter](https://github.com/securefederatedai/openfl/blob/develop/openfl-tutorials/experimental/workflow/Privacy_Meter/readme.md), or [add a watermark to a federated model](https://github.com/securefederatedai/openfl/blob/develop/openfl-tutorials/experimental/workflow/301_MNIST_Watermarking.ipynb). +The grant for FeTS was awarded from the [Informatics Technology for Cancer Research (ITCR)](https://itcr.cancer.gov/) program of the National Cancer Institute (NCI) of the National Institutes of Health (NIH), to Dr. Spyridon Bakas (Principal Investigator) when he was affiliated with the [Center for Biomedical Image Computing and Analytics (CBICA)](https://www.cbica.upenn.edu/) at UPenn and now heading the [Division of Computational Pathology at Indiana University (IU)](https://medicine.iu.edu/pathology/research/computational-pathology). -## Requirements +FeTS is a real-world medical federated learning platform with international collaborators. The original OpenFederatedLearning project and OpenFL are designed to serve as the backend for the FeTS platform, and OpenFL developers and researchers continue to work very closely with IU on the FeTS project. An example is the [FeTS-AI/Front-End](https://github.com/FETS-AI/Front-End), which integrates the group’s medical AI expertise with OpenFL framework to create a federated learning solution for medical imaging. -OpenFL supports popular NumPy-based ML frameworks like TensorFlow, PyTorch and Jax which should be installed separately.
-Users can extend the list of supported Machine Learning frameworks if needed. +Although initially developed for use in medical imaging, OpenFL designed to be agnostic to the use-case, the industry, and the machine learning framework. -## Project Overview -### What is Federated Learning +For more information, here is a list of relevant [publications](https://openfl.readthedocs.io/en/latest/about/blogs_publications.html). -[Federated learning](https://en.wikipedia.org/wiki/Federated_learning) is a distributed machine learning approach that enables collaboration on machine learning projects without having to share sensitive data, such as, patient records, financial data, or classified information. The minimum data movement needed across the federation is solely the model parameters and their updates. +## Installation -![Federated Learning](https://raw.githubusercontent.com/intel/openfl/develop/docs/images/diagram_fl_new.png) +Install via PyPI (latest stable release): +``` +pip install -U openfl +``` +For more installation options, checkout the [installation guide](https://openfl.readthedocs.io/en/latest/installation.html). -### Background -OpenFL builds on a collaboration between Intel and the Bakas lab at the University of Pennsylvania (UPenn) to develop the [Federated Tumor Segmentation (FeTS, www.fets.ai)](https://www.fets.ai/) platform (grant award number: U01-CA242871). +## Features -The grant for FeTS was awarded from the [Informatics Technology for Cancer Research (ITCR)](https://itcr.cancer.gov/) program of the National Cancer Institute (NCI) of the National Institutes of Health (NIH), to Dr Spyridon Bakas (Principal Investigator) when he was affiliated with the [Center for Biomedical Image Computing and Analytics (CBICA)](https://www.cbica.upenn.edu/) at UPenn and now heading up the [Division of Computational Pathology at Indiana University (IU)](https://medicine.iu.edu/pathology/research/computational-pathology). +### Ways to set up an FL experiment +OpenFL supports two ways to set up a Federated Learning experiment: -FeTS is a real-world medical federated learning platform with international collaborators. The original OpenFederatedLearning project and OpenFL are designed to serve as the backend for the FeTS platform, and OpenFL developers and researchers continue to work very closely with IU on the FeTS project. An example is the [FeTS-AI/Front-End](https://github.com/FETS-AI/Front-End), which integrates the group’s medical AI expertise with OpenFL framework to create a federated learning solution for medical imaging. +- [TaskRunner API](https://openfl.readthedocs.io/en/latest/about/features_index/taskrunner.html): This API uses short-lived components like the `Aggregator` and `Collaborator`, which terminate at the end of an FL experiment. TaskRunner supports mTLS-based secure communication channels, and TEE-based confidential computing environments. -Although initially developed for use in medical imaging, OpenFL designed to be agnostic to the use-case, the industry, and the machine learning framework. +- [Workflow API](https://openfl.readthedocs.io/en/latest/about/features_index/workflowinterface.html): This API allows for experiments beyond the traditional horizontal federated learning paradigm using a pythonic interface. It allows an experiment to be simulated locally, and then to be seamlessly scaled to a federated setting by switching from a local runtime to a distributed, federated runtime. + > **Note:** This is experimental capability. -You can find more details in the following articles: -- [Pati S, et al., 2022](https://www.nature.com/articles/s41467-022-33407-5) -- [Reina A, et al., 2021](https://arxiv.org/abs/2105.06413) -- [Sheller MJ, et al., 2020](https://www.nature.com/articles/s41598-020-69250-1) -- [Sheller MJ, et al., 2019](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6589345) -- [Yang Y, et al., 2019](https://arxiv.org/abs/1902.04885) -- [McMahan HB, et al., 2016](https://arxiv.org/abs/1602.05629) +### Framework Compatibility +OpenFL is backend-agnostic. It comes with support for popular NumPy-based ML frameworks like TensorFlow, PyTorch and Jax which should be installed separately. Users may extend the list of supported backends if needed. -### Supported Aggregation Algorithms -| Algorithm Name | Paper | PyTorch implementation | TensorFlow implementation | Other frameworks compatibility | +### Aggregation Algorithms +OpenFL supports popular aggregation algorithms out-of-the-box, with more algorithms coming soon. +| | Reference | PyTorch backend | TensorFlow backend | NumPy backend | | -------------- | ----- | :--------------------: | :-----------------------: | :----------------------------: | -| FedAvg | [McMahan et al., 2017](https://arxiv.org/pdf/1602.05629.pdf) | ✅ | ✅ | ✅ | -| FedProx | [Li et al., 2020](https://arxiv.org/pdf/1812.06127.pdf) | ✅ | ✅ | ❌ | -| FedOpt | [Reddi et al., 2020](https://arxiv.org/abs/2003.00295) | ✅ | ✅ | ✅ | -| FedCurv | [Shoham et al., 2019](https://arxiv.org/pdf/1910.07796.pdf) | ✅ | ❌ | ❌ | - -## Support -The OpenFL community is growing, and we invite you to be a part of it. Join the [Slack channel](https://join.slack.com/t/openfl/shared_invite/zt-ovzbohvn-T5fApk05~YS_iZhjJ5yaTw) to connect with fellow enthusiasts, share insights, and contribute to the future of federated learning. +| FedAvg | [McMahan et al., 2017](https://arxiv.org/pdf/1602.05629.pdf) | yes | yes | yes | +| FedOpt | [Reddi et al., 2020](https://arxiv.org/abs/2003.00295) | yes | yes | yes | +| FedProx | [Li et al., 2020](https://arxiv.org/pdf/1812.06127.pdf) | yes | yes | - | +| FedCurv | [Shoham et al., 2019](https://arxiv.org/pdf/1910.07796.pdf) | yes | - | - | -Consider subscribing to the OpenFL mail list openfl-announce@lists.lfaidata.foundation +## Contributing +We welcome contributions! Please refer to the [contributing guidelines](https://openfl.readthedocs.io/en/latest/contributing.html). -See you there! +The OpenFL community is expanding, and we encourage you to join us. Connect with other enthusiasts, share knowledge, and contribute to the advancement of federated learning by joining our [Slack channel](https://join.slack.com/t/openfl/shared_invite/zt-ovzbohvn-T5fApk05~YS_iZhjJ5yaTw). -We also always welcome questions, issue reports, and suggestions via: +Stay updated by subscribing to the OpenFL mailing list: [openfl-announce@lists.lfaidata.foundation](mailto:openfl-announce@lists.lfaidata.foundation). -* [GitHub Issues](https://github.com/securefederatedai/openfl/issues) -* [GitHub Discussions](https://github.com/securefederatedai/openfl/discussions) ## License This project is licensed under [Apache License Version 2.0](LICENSE). By contributing to the project, you agree to the license and copyright terms therein and release your contribution under these terms. diff --git a/docs/about/features_index/taskrunner.rst b/docs/about/features_index/taskrunner.rst index 37e050e5a3..66abcf6868 100644 --- a/docs/about/features_index/taskrunner.rst +++ b/docs/about/features_index/taskrunner.rst @@ -7,22 +7,17 @@ TaskRunner API ================ -Let's take a deeper dive into the Task Runner API. If you haven't already, we suggest checking out the :ref:`quick_start` for a primer on doing a simple experiment on a single node. - -The steps to transition from a local experiment to a distributed federation can be understood best with the following diagram. +This is a deep dive into the TaskRunner API. To gain familiarity with this API, we recommend going through the `quickstart <../../tutorials/taskrunner.html>`_ guide. Note that the quickstart guide is focused on simulating an experiment locally. The design choices of this API are best understood when transitioning from a local experiment to a distributed federation, which is how real-world federated learning experiments are conducted. .. figure:: ../../images/openfl_flow.png -.. centered:: Overview of a Task Runner experiment distributed across multiple nodes -: - -The Task Runner API uses short-lived components in a federation, which is terminated when the experiment is finished. The components are as follows: +The Task Runner API uses short-lived components in a federation, which are terminated once the experiment finishes. These components are: -- The *Collaborator* uses a local dataset to train a global model and the *Aggregator* receives model updates from *Collaborators* and aggregates them to create the new global model. -- The *Aggregator* is framework-agnostic, while the *Collaborator* can use any deep learning frameworks, such as `TensorFlow `_\* \ or `PyTorch `_\*\. +- The :code:`Collaborator` uses a local dataset to train a global model and the :code:`Aggregator` receives model updates from :code:`Collaborator` s and aggregates them to create the new global model. +- The :code:`Aggregator` is framework-agnostic, while the :code:`Collaborator` can use any deep learning frameworks, such as `TensorFlow `_\* \ or `PyTorch `_\*\. -For this workflow, you modify the federation workspace to your requirements by editing the Federated Learning plan (FL plan) along with the Python\*\ code that defines the model and the data loader. The FL plan is a `YAML `_ file that defines the collaborators, aggregator, connections, models, data, and any other parameters that describe the training. +For this workflow, one needs modify the federation workspace to their requirements by editing the Federated Learning plan (FL plan) along with the Python\*\ code that defines the model and the data loader. The FL plan is a `YAML `_ file that defines the collaborators, aggregator, connections, models, data, and any other parameters that describe the training. .. _plan_settings: From 4e021d2ca27de7e9c24e6f7b26c49539260c5303 Mon Sep 17 00:00:00 2001 From: Shailesh Tanwar <135304487+tanwarsh@users.noreply.github.com> Date: Mon, 23 Dec 2024 13:09:25 +0530 Subject: [PATCH 2/4] Bump Python version to 3.10 - 3.12 (#1213) * update python version to 3.12 Signed-off-by: yes * dummy commit Signed-off-by: yes * dummy commit Signed-off-by: yes * update python version to 3.12 Signed-off-by: yes * dummy commit Signed-off-by: yes * dummy commit Signed-off-by: yes * removed files Signed-off-by: yes * reverted doc change Signed-off-by: yes * added missing requirements for Workflow Interface Tests Signed-off-by: yes * added tensorboard Signed-off-by: yes --------- Signed-off-by: yes --- .github/workflows/docker-bench-security.yml | 2 +- .github/workflows/double_ws_export.yml | 2 +- .github/workflows/experimental_workflow_tests.yml | 2 +- .github/workflows/hadolint.yml | 2 +- .github/workflows/lint.yml | 2 +- .github/workflows/pki.yml | 4 ++-- .github/workflows/pytest_coverage.yml | 2 +- .github/workflows/straggler-handling.yml | 2 +- .github/workflows/task_runner_basic_e2e.yml | 2 +- .github/workflows/taskrunner.yml | 2 +- .github/workflows/taskrunner_eden_pipeline.yml | 2 +- .github/workflows/tr_docker_gramine_direct.yml | 2 +- .github/workflows/tr_docker_native.yml | 2 +- .github/workflows/ubuntu.yml | 4 ++-- .github/workflows/windows.yml | 4 ++-- Jenkinsfile | 4 ++-- docs/about/features_index/taskrunner.rst | 2 +- docs/developer_guide/running_the_federation.notebook.rst | 2 +- docs/developer_guide/running_the_federation_with_gandlf.rst | 2 +- .../experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb | 2 +- setup.py | 4 ++-- .../requirements_experimental_localruntime_tests.txt | 4 ++++ 22 files changed, 30 insertions(+), 26 deletions(-) diff --git a/.github/workflows/docker-bench-security.yml b/.github/workflows/docker-bench-security.yml index 1d1b56b486..a64282a309 100644 --- a/.github/workflows/docker-bench-security.yml +++ b/.github/workflows/docker-bench-security.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/double_ws_export.yml b/.github/workflows/double_ws_export.yml index d4d6b0459c..a33aea7d64 100644 --- a/.github/workflows/double_ws_export.yml +++ b/.github/workflows/double_ws_export.yml @@ -26,7 +26,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/experimental_workflow_tests.yml b/.github/workflows/experimental_workflow_tests.yml index d99cec2e5f..84166ab362 100644 --- a/.github/workflows/experimental_workflow_tests.yml +++ b/.github/workflows/experimental_workflow_tests.yml @@ -25,7 +25,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/hadolint.yml b/.github/workflows/hadolint.yml index 6e90292158..a529491bdb 100644 --- a/.github/workflows/hadolint.yml +++ b/.github/workflows/hadolint.yml @@ -22,7 +22,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 13d53e885d..1556bad3b3 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -22,7 +22,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install linters run: | python -m pip install --upgrade pip diff --git a/.github/workflows/pki.yml b/.github/workflows/pki.yml index d8ecbecb03..704fbbfcea 100644 --- a/.github/workflows/pki.yml +++ b/.github/workflows/pki.yml @@ -26,7 +26,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -43,7 +43,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/pytest_coverage.yml b/.github/workflows/pytest_coverage.yml index 9371f74e13..18fa9f25fe 100644 --- a/.github/workflows/pytest_coverage.yml +++ b/.github/workflows/pytest_coverage.yml @@ -27,7 +27,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/straggler-handling.yml b/.github/workflows/straggler-handling.yml index 47b7f05709..4f2bdb7dd1 100644 --- a/.github/workflows/straggler-handling.yml +++ b/.github/workflows/straggler-handling.yml @@ -29,7 +29,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/task_runner_basic_e2e.yml b/.github/workflows/task_runner_basic_e2e.yml index c75deb5eb3..b50eedd526 100644 --- a/.github/workflows/task_runner_basic_e2e.yml +++ b/.github/workflows/task_runner_basic_e2e.yml @@ -37,7 +37,7 @@ jobs: # There are open issues for some of the models, so excluding them for now: # model_name: [ "torch_cnn_mnist", "keras_cnn_mnist", "torch_cnn_histology" ] model_name: ["torch_cnn_mnist", "keras_cnn_mnist"] - python_version: ["3.9", "3.10", "3.11"] + python_version: ["3.10", "3.11", "3.12"] fail-fast: false # do not immediately fail if one of the combinations fail env: diff --git a/.github/workflows/taskrunner.yml b/.github/workflows/taskrunner.yml index 1e3a3ab6c2..59abb67251 100644 --- a/.github/workflows/taskrunner.yml +++ b/.github/workflows/taskrunner.yml @@ -21,7 +21,7 @@ jobs: strategy: matrix: os: ['ubuntu-latest', 'windows-latest'] - python-version: ["3.9", "3.10", "3.11"] + python-version: ["3.10", "3.11", "3.12"] runs-on: ${{ matrix.os }} timeout-minutes: 15 diff --git a/.github/workflows/taskrunner_eden_pipeline.yml b/.github/workflows/taskrunner_eden_pipeline.yml index 44de314a5f..b6d2426c46 100644 --- a/.github/workflows/taskrunner_eden_pipeline.yml +++ b/.github/workflows/taskrunner_eden_pipeline.yml @@ -25,7 +25,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/tr_docker_gramine_direct.yml b/.github/workflows/tr_docker_gramine_direct.yml index 855a059c98..d02526edb7 100644 --- a/.github/workflows/tr_docker_gramine_direct.yml +++ b/.github/workflows/tr_docker_gramine_direct.yml @@ -20,7 +20,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/tr_docker_native.yml b/.github/workflows/tr_docker_native.yml index 36dfcf9107..b3382553ae 100644 --- a/.github/workflows/tr_docker_native.yml +++ b/.github/workflows/tr_docker_native.yml @@ -20,7 +20,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index 4c3c99af43..c968e85f11 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -21,7 +21,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -43,7 +43,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index 1e5b245cc1..341b93b7f1 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -20,7 +20,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -41,7 +41,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/Jenkinsfile b/Jenkinsfile index c1904d0453..73f919c844 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -81,7 +81,7 @@ pipeline { SNYK_ALLOW_LONG_PROJECT_NAME = true SNYK_USE_MULTI_PROC = true SNYK_DEBUG = true - SNYK_PYTHON_VERSION = '3.9' + SNYK_PYTHON_VERSION = '3.10' BANDIT_SOURCE_PATH = 'openfl/ openfl-workspace/ openfl-tutorials/' BANDIT_SEVERITY_LEVEL = 'high' @@ -114,7 +114,7 @@ pipeline { stage('Build Package') { agent { docker { - image 'python:3.9' + image 'python:3.10' } } steps { diff --git a/docs/about/features_index/taskrunner.rst b/docs/about/features_index/taskrunner.rst index 66abcf6868..2097c72f32 100644 --- a/docs/about/features_index/taskrunner.rst +++ b/docs/about/features_index/taskrunner.rst @@ -142,7 +142,7 @@ Bare Metal Approach STEP 1: Create a Workspace ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -1. Start a Python 3.9 (>=3.9, <3.12) virtual environment and confirm OpenFL is available. +1. Start a Python 3.10 (>=3.10, <3.13) virtual environment and confirm OpenFL is available. .. code-block:: shell diff --git a/docs/developer_guide/running_the_federation.notebook.rst b/docs/developer_guide/running_the_federation.notebook.rst index e15d3b91de..44e18e1380 100644 --- a/docs/developer_guide/running_the_federation.notebook.rst +++ b/docs/developer_guide/running_the_federation.notebook.rst @@ -17,7 +17,7 @@ You will start a Jupyter\* \ lab server and receive a URL you can use to access Start the Tutorials =================== -1. Start a Python\* \ 3.9 (>=3.9, <3.12) virtual environment and confirm OpenFL is available. +1. Start a Python\* \ 3.10 (>=3.10, <3.13) virtual environment and confirm OpenFL is available. .. code-block:: python diff --git a/docs/developer_guide/running_the_federation_with_gandlf.rst b/docs/developer_guide/running_the_federation_with_gandlf.rst index e2d2cd37f9..3e8dc7707f 100644 --- a/docs/developer_guide/running_the_federation_with_gandlf.rst +++ b/docs/developer_guide/running_the_federation_with_gandlf.rst @@ -113,7 +113,7 @@ However, continue with the following procedure for details in creating a federat STEP 1: Install GaNDLF prerequisites and Create a Workspace ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -1. Start a Python 3.9 (>=3.9, <3.12) virtual environment and confirm OpenFL is available. +1. Start a Python 3.10 (>=3.10, <3.13) virtual environment and confirm OpenFL is available. .. code-block:: python diff --git a/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb b/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb index 69dbd4159a..9ada0f38b7 100644 --- a/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb +++ b/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb @@ -51,7 +51,7 @@ "id": "4dbb89b6", "metadata": {}, "source": [ - "First we start by installing the necessary dependencies for the workflow interface. Note that this tuorial uses Keras 3, make sure you use python 3.9 or higher." + "First we start by installing the necessary dependencies for the workflow interface. Note that this tuorial uses Keras 3, make sure you use python 3.10 or higher." ] }, { diff --git a/setup.py b/setup.py index 8e2d1b1c68..9bad746c74 100644 --- a/setup.py +++ b/setup.py @@ -95,7 +95,7 @@ def run(self): 'protobuf>=4.22,<6.0.0', 'grpcio>=1.56.2,<1.66.0', ], - python_requires='>=3.9, <3.12', + python_requires='>=3.10, <3.13', project_urls={ 'Bug Tracker': 'https://github.com/securefederatedai/openfl/issues', 'Documentation': 'https://openfl.readthedocs.io/en/stable/', @@ -109,9 +109,9 @@ def run(self): 'Topic :: System :: Distributed Computing', 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ], entry_points={'console_scripts': ['fx=openfl.interface.cli:entry']}, cmdclass={ diff --git a/tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt b/tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt index e487d1aeb6..daeb192878 100644 --- a/tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt +++ b/tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt @@ -2,4 +2,8 @@ dill==0.3.6 metaflow==2.7.15 ray==2.9.2 torch +tabulate==0.9.0 torchvision +nbformat==5.10.4 +nbdev==2.3.12 +tensorboard \ No newline at end of file From c280f10200193276fff8405c6f120f50b2a32854 Mon Sep 17 00:00:00 2001 From: Karan Shah Date: Tue, 24 Dec 2024 01:09:57 +0530 Subject: [PATCH 3/4] Introduce `callbacks` API (#1195) * Get rid of kwargs Signed-off-by: Shah, Karan * Use module-level logger Signed-off-by: Shah, Karan * Reduce keras verbosity Signed-off-by: Shah, Karan * Remove all log_metric and log_memory_usage traces; add callback hooks Signed-off-by: Shah, Karan * Add `openfl.callbacks` module Signed-off-by: Shah, Karan * Include round_num for task callbacks Signed-off-by: Shah, Karan * Add tensordb to callbacks Signed-off-by: Shah, Karan * No round_num on task callbacks Signed-off-by: Shah, Karan * Remove task boundary callbacks Signed-off-by: Shah, Karan * Remove tb/model_ckpt. Add memory_profiler Signed-off-by: Shah, Karan * Restore psutil and tbX Signed-off-by: Shah, Karan * Format code Signed-off-by: Shah, Karan * Define default callbacks Signed-off-by: Shah, Karan * Add write_logs for bwd compat Signed-off-by: Shah, Karan * Add log_metric_callback for bwd compat Signed-off-by: Shah, Karan * Migrate to module-level logger for collaborator Signed-off-by: Shah, Karan * Review comments Signed-off-by: Shah, Karan * Add metric_writer Signed-off-by: Shah, Karan * Add collaborator side metric logging Signed-off-by: Shah, Karan * Make log dirs on exp begin Signed-off-by: Shah, Karan * Do not print use_tls Signed-off-by: Shah, Karan * Assume reportable metric to be a scalar Signed-off-by: Shah, Karan * Add aggregator side callbacks Signed-off-by: Shah, Karan * do_task test returns mock dict Signed-off-by: Shah, Karan * Consistency changes Signed-off-by: Shah, Karan * Add documentation hooks Signed-off-by: Shah, Karan * Update docstring Signed-off-by: Shah, Karan * Update docs hook Signed-off-by: Shah, Karan * Remove all traces of log_metric_callback and write_metric Signed-off-by: Shah, Karan * Do on_round_begin if not time_to_quit Signed-off-by: Shah, Karan --------- Signed-off-by: Shah, Karan --- docs/openfl.callbacks.rst | 16 ++ docs/openfl.rst | 3 +- .../101_torch_cnn_mnist/plan/plan.yaml | 2 - .../101_torch_cnn_mnist/src/utils.py | 20 -- .../102_aggregator_validation/plan/plan.yaml | 2 - .../102_aggregator_validation/src/utils.py | 20 -- .../104_keras_mnist/plan/plan.yaml | 2 - .../104_keras_mnist/src/utils.py | 20 -- .../plan/plan.yaml | 2 - .../src/utils.py | 22 --- .../plan/plan.yaml | 2 - .../src/utils.py | 20 -- .../vertical_fl/plan/plan.yaml | 2 - .../vertical_fl/src/utils.py | 20 -- .../vertical_fl_two_party/plan/plan.yaml | 2 - .../vertical_fl_two_party/src/utils.py | 20 -- .../plan/plan.yaml | 2 - .../src/mnist_utils.py | 16 -- .../torch_cnn_mnist_fed_eval/plan/plan.yaml | 2 - .../src/mnist_utils.py | 16 -- .../plan/plan.yaml | 2 - .../src/mnist_utils.py | 16 -- .../torch_llm_horovod/plan/plan.yaml | 2 - .../torch_llm_horovod/src/emotion_utils.py | 16 -- .../workspace/plan/defaults/aggregator.yaml | 1 - openfl/callbacks/__init__.py | 7 + openfl/callbacks/callback.py | 56 ++++++ openfl/callbacks/callback_list.py | 95 ++++++++++ openfl/callbacks/lambda_callback.py | 38 ++++ openfl/callbacks/memory_profiler.py | 62 ++++++ openfl/callbacks/metric_writer.py | 69 +++++++ openfl/component/aggregator/aggregator.py | 179 ++++++++---------- openfl/component/collaborator/collaborator.py | 137 ++++++++------ openfl/federated/plan/plan.py | 15 +- openfl/transport/grpc/aggregator_server.py | 1 - openfl/utilities/logs.py | 82 -------- .../collaborator/test_collaborator.py | 3 +- 37 files changed, 510 insertions(+), 482 deletions(-) create mode 100644 docs/openfl.callbacks.rst delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/src/utils.py create mode 100644 openfl/callbacks/__init__.py create mode 100644 openfl/callbacks/callback.py create mode 100644 openfl/callbacks/callback_list.py create mode 100644 openfl/callbacks/lambda_callback.py create mode 100644 openfl/callbacks/memory_profiler.py create mode 100644 openfl/callbacks/metric_writer.py diff --git a/docs/openfl.callbacks.rst b/docs/openfl.callbacks.rst new file mode 100644 index 0000000000..e6ac82f65a --- /dev/null +++ b/docs/openfl.callbacks.rst @@ -0,0 +1,16 @@ +``openfl.callbacks`` module +=========================== + +.. currentmodule:: openfl.callbacks + +.. automodule:: openfl.callbacks + +.. autosummary:: + :toctree: _autosummary + :recursive: + + Callback + CallbackList + LambdaCallback + MetricWriter + MemoryProfiler diff --git a/docs/openfl.rst b/docs/openfl.rst index 9d053c2173..a4dd53dc5c 100644 --- a/docs/openfl.rst +++ b/docs/openfl.rst @@ -1,6 +1,6 @@ .. currentmodule:: openfl -Public API: ``openfl`` package +API Reference: ``openfl`` =========================== Subpackages @@ -10,6 +10,7 @@ Subpackages :maxdepth: 1 openfl.component + openfl.callbacks openfl.cryptography openfl.experimental openfl.databases diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/plan/plan.yaml index b4a50d8c62..ab0440875d 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/plan/plan.yaml index c42ce135c9..413c871ab2 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/plan/plan.yaml index dca3c92bbf..e119a0b047 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/src/utils.py deleted file mode 100644 index 96fe885713..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from tensorflow.summary import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/plan/plan.yaml index 0f37dc3a77..0f0ddcb802 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/src/utils.py deleted file mode 100644 index a3db4c1ecf..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/src/utils.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -"""You may copy this file as the starting point of your own model.""" - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/plan/plan.yaml index 7f3a47bb39..c2239a0a19 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/plan/plan.yaml index 061afcc41d..da42ea4dc9 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.aggregator.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/plan/plan.yaml index f30c5481ea..7819432251 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.aggregator.Aggregator settings : rounds_to_train : 10 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/torch_cnn_mnist_eden_compression/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist_eden_compression/plan/plan.yaml index 96eb8a35f9..283a3dc97c 100644 --- a/openfl-workspace/torch_cnn_mnist_eden_compression/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist_eden_compression/plan/plan.yaml @@ -9,8 +9,6 @@ aggregator : best_state_path : save/torch_cnn_mnist_best.pbuf last_state_path : save/torch_cnn_mnist_last.pbuf rounds_to_train : 10 - log_metric_callback : - template : src.mnist_utils.write_metric collaborator : diff --git a/openfl-workspace/torch_cnn_mnist_eden_compression/src/mnist_utils.py b/openfl-workspace/torch_cnn_mnist_eden_compression/src/mnist_utils.py index 16ee801b4d..a03e1e6da2 100644 --- a/openfl-workspace/torch_cnn_mnist_eden_compression/src/mnist_utils.py +++ b/openfl-workspace/torch_cnn_mnist_eden_compression/src/mnist_utils.py @@ -6,27 +6,11 @@ from logging import getLogger import numpy as np -from torch.utils.tensorboard import SummaryWriter from torchvision import datasets from torchvision import transforms logger = getLogger(__name__) -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) - def one_hot(labels, classes): """ diff --git a/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml index 02db7dff3e..580ce79760 100644 --- a/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml @@ -8,8 +8,6 @@ aggregator : init_state_path : save/torch_cnn_mnist_init.pbuf best_state_path : save/torch_cnn_mnist_best.pbuf last_state_path : save/torch_cnn_mnist_last.pbuf - log_metric_callback : - template : src.mnist_utils.write_metric collaborator : defaults : plan/defaults/collaborator.yaml diff --git a/openfl-workspace/torch_cnn_mnist_fed_eval/src/mnist_utils.py b/openfl-workspace/torch_cnn_mnist_fed_eval/src/mnist_utils.py index 1eccd2a95d..95fa35fa6f 100644 --- a/openfl-workspace/torch_cnn_mnist_fed_eval/src/mnist_utils.py +++ b/openfl-workspace/torch_cnn_mnist_fed_eval/src/mnist_utils.py @@ -6,27 +6,11 @@ from logging import getLogger import numpy as np -from torch.utils.tensorboard import SummaryWriter from torchvision import datasets from torchvision import transforms logger = getLogger(__name__) -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) - def one_hot(labels, classes): """ diff --git a/openfl-workspace/torch_cnn_mnist_straggler_check/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist_straggler_check/plan/plan.yaml index a42b064e56..b2b12f047a 100644 --- a/openfl-workspace/torch_cnn_mnist_straggler_check/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist_straggler_check/plan/plan.yaml @@ -9,8 +9,6 @@ aggregator : best_state_path : save/torch_cnn_mnist_best.pbuf last_state_path : save/torch_cnn_mnist_last.pbuf rounds_to_train : 6 - log_metric_callback : - template : src.mnist_utils.write_metric collaborator : diff --git a/openfl-workspace/torch_cnn_mnist_straggler_check/src/mnist_utils.py b/openfl-workspace/torch_cnn_mnist_straggler_check/src/mnist_utils.py index 16ee801b4d..a03e1e6da2 100644 --- a/openfl-workspace/torch_cnn_mnist_straggler_check/src/mnist_utils.py +++ b/openfl-workspace/torch_cnn_mnist_straggler_check/src/mnist_utils.py @@ -6,27 +6,11 @@ from logging import getLogger import numpy as np -from torch.utils.tensorboard import SummaryWriter from torchvision import datasets from torchvision import transforms logger = getLogger(__name__) -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) - def one_hot(labels, classes): """ diff --git a/openfl-workspace/torch_llm_horovod/plan/plan.yaml b/openfl-workspace/torch_llm_horovod/plan/plan.yaml index 037b27f437..86ae2579b8 100644 --- a/openfl-workspace/torch_llm_horovod/plan/plan.yaml +++ b/openfl-workspace/torch_llm_horovod/plan/plan.yaml @@ -9,8 +9,6 @@ aggregator : best_state_path : save/torch_llm_best.pbuf last_state_path : save/torch_llm_last.pbuf rounds_to_train : 5 - log_metric_callback : - template : src.emotion_utils.write_metric collaborator : diff --git a/openfl-workspace/torch_llm_horovod/src/emotion_utils.py b/openfl-workspace/torch_llm_horovod/src/emotion_utils.py index 8eeed70e8b..48fb330c72 100644 --- a/openfl-workspace/torch_llm_horovod/src/emotion_utils.py +++ b/openfl-workspace/torch_llm_horovod/src/emotion_utils.py @@ -6,26 +6,10 @@ from logging import getLogger from datasets import Dataset, load_dataset -from torch.utils.tensorboard import SummaryWriter from transformers import AutoTokenizer, DataCollatorWithPadding logger = getLogger(__name__) -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter("./logs/llm", flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f"{node_name}/{task_name}/{metric_name}", metric, round_number) - def get_emotion_dataset(tokenizer): dataset = load_dataset("dair-ai/emotion", cache_dir="dataset", revision="9ce6303") diff --git a/openfl-workspace/workspace/plan/defaults/aggregator.yaml b/openfl-workspace/workspace/plan/defaults/aggregator.yaml index 0bb76e099d..43d923b996 100644 --- a/openfl-workspace/workspace/plan/defaults/aggregator.yaml +++ b/openfl-workspace/workspace/plan/defaults/aggregator.yaml @@ -1,4 +1,3 @@ template : openfl.component.Aggregator settings : db_store_rounds : 2 - write_logs : true diff --git a/openfl/callbacks/__init__.py b/openfl/callbacks/__init__.py new file mode 100644 index 0000000000..8cd3f911ba --- /dev/null +++ b/openfl/callbacks/__init__.py @@ -0,0 +1,7 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +from openfl.callbacks.callback import Callback +from openfl.callbacks.callback_list import CallbackList +from openfl.callbacks.lambda_callback import LambdaCallback +from openfl.callbacks.memory_profiler import MemoryProfiler +from openfl.callbacks.metric_writer import MetricWriter diff --git a/openfl/callbacks/callback.py b/openfl/callbacks/callback.py new file mode 100644 index 0000000000..133d64a7cf --- /dev/null +++ b/openfl/callbacks/callback.py @@ -0,0 +1,56 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +class Callback: + """Base class for callbacks. + + Callbacks can be used to perform actions at different stages of the + Federated Learning process. To create a custom callback, subclass + `openfl.callbacks.Callback` and implement the necessary methods. + + Callbacks can be triggered on the aggregator and collaborator side + for the following events: + * At the beginning of an experiment + * At the beginning of a round + * At the end of a round + * At the end of an experiment + + Attributes: + params: Additional parameters saved for use within the callback. + tensor_db: The `TensorDB` instance of the respective participant. + """ + + def __init__(self): + self.params = None + self.tensor_db = None + + def set_params(self, params): + self.params = params + + def set_tensor_db(self, tensor_db): + self.tensor_db = tensor_db + + def on_round_begin(self, round_num: int, logs=None): + """Callback function to be executed at the beginning of a round. + + Subclasses need to implement actions to be taken here. + """ + + def on_round_end(self, round_num: int, logs=None): + """Callback function to be executed at the end of a round. + + Subclasses need to implement actions to be taken here. + """ + + def on_experiment_begin(self, logs=None): + """Callback function to be executed at the beginning of an experiment. + + Subclasses need to implement actions to be taken here. + """ + + def on_experiment_end(self, logs=None): + """Callback function to be executed at the end of an experiment. + + Subclasses need to implement actions to be taken here. + """ diff --git a/openfl/callbacks/callback_list.py b/openfl/callbacks/callback_list.py new file mode 100644 index 0000000000..29661fff6a --- /dev/null +++ b/openfl/callbacks/callback_list.py @@ -0,0 +1,95 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +from openfl.callbacks.callback import Callback +from openfl.callbacks.memory_profiler import MemoryProfiler +from openfl.callbacks.metric_writer import MetricWriter + + +class CallbackList(Callback): + """An ensemble of callbacks. + + This class allows multiple callbacks to be used together, by sequentially + calling each callback's respective methods. + + Attributes: + callbacks: A list of `openfl.callbacks.Callback` instances. + add_memory_profiler: If True, adds a `MemoryProfiler` callback to the list. + add_metric_writer: If True, adds a `MetricWriter` callback to the list. + tensor_db: Optional `TensorDB` instance of the respective participant. + If provided, callbacks can access TensorDB for various actions. + params: Additional parameters saved for use within the callbacks. + """ + + def __init__( + self, + callbacks: list, + add_memory_profiler=False, + add_metric_writer=False, + tensor_db=None, + **params, + ): + super().__init__() + self.callbacks = _flatten(callbacks) if callbacks else [] + + self._add_default_callbacks(add_memory_profiler, add_metric_writer) + + self.set_tensor_db(tensor_db) + self.set_params(params) + + def set_params(self, params): + self.params = params + if params: + for callback in self.callbacks: + callback.set_params(params) + + def set_tensor_db(self, tensor_db): + self.tensor_db = tensor_db + if tensor_db: + for callback in self.callbacks: + callback.set_tensor_db(tensor_db) + + def _add_default_callbacks(self, add_memory_profiler, add_metric_writer): + """Add default callbacks to callbacks list if not already present.""" + self._memory_profiler = None + self._metric_writer = None + + for cb in self.callbacks: + if isinstance(cb, MemoryProfiler): + self._memory_profiler = cb + if isinstance(cb, MetricWriter): + self._metric_writer = cb + + if add_memory_profiler and self._memory_profiler is None: + self._memory_profiler = MemoryProfiler() + self.callbacks.append(self._memory_profiler) + + if add_metric_writer and self._metric_writer is None: + self._metric_writer = MetricWriter() + self.callbacks.append(self._metric_writer) + + def on_round_begin(self, round_num: int, logs=None): + for callback in self.callbacks: + callback.on_round_begin(round_num, logs) + + def on_round_end(self, round_num: int, logs=None): + for callback in self.callbacks: + callback.on_round_end(round_num, logs) + + def on_experiment_begin(self, logs=None): + for callback in self.callbacks: + callback.on_experiment_begin(logs) + + def on_experiment_end(self, logs=None): + for callback in self.callbacks: + callback.on_experiment_end(logs) + + +def _flatten(l): + """Flatten a possibly-nested tree of lists.""" + if not isinstance(l, (list, tuple)): + return [l] + for elem in l: + if isinstance(elem, list): + yield from _flatten(elem) + else: + yield elem diff --git a/openfl/callbacks/lambda_callback.py b/openfl/callbacks/lambda_callback.py new file mode 100644 index 0000000000..d1b0542f83 --- /dev/null +++ b/openfl/callbacks/lambda_callback.py @@ -0,0 +1,38 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +from openfl.callbacks.callback import Callback + + +class LambdaCallback(Callback): + """Custom on-the-fly callbacks. + + This callback can be constructed with functions that will be called + at the appropriate time during the life-cycle of a Federated Learning experiment. + Certain callbacks may expect positional arguments, for example: + + * on_round_begin: expects `round_num` as a positional argument. + * on_round_end: expects `round_num` as a positional argument. + + Args: + on_round_begin: called at the beginning of every round. + on_round_end: called at the end of every round. + on_experiment_begin: called at the beginning of an experiment. + on_experiment_end: called at the end of an experiment. + """ + + def __init__( + self, + on_round_begin=None, + on_round_end=None, + on_experiment_begin=None, + on_experiment_end=None, + ): + super().__init__() + if on_round_begin is not None: + self.on_round_begin = on_round_begin + if on_round_end is not None: + self.on_round_end = on_round_end + if on_experiment_begin is not None: + self.on_experiment_begin = on_experiment_begin + if on_experiment_end is not None: + self.on_experiment_end = on_experiment_end diff --git a/openfl/callbacks/memory_profiler.py b/openfl/callbacks/memory_profiler.py new file mode 100644 index 0000000000..71b6d18488 --- /dev/null +++ b/openfl/callbacks/memory_profiler.py @@ -0,0 +1,62 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import json +import logging +import os + +import psutil + +from openfl.callbacks.callback import Callback + +logger = logging.getLogger(__name__) + + +class MemoryProfiler(Callback): + """Profile memory usage of the current process at the end of each round. + + Attributes: + log_dir: If set, writes logs as lines of JSON. + """ + + def __init__(self, log_dir: str = "./logs/"): + super().__init__() + self.log_dir = None + if log_dir: + os.makedirs(log_dir, exist_ok=True) + self.log_dir = log_dir + + def on_round_end(self, round_num: int, logs=None): + origin = self.params["origin"] + + info = _get_memory_usage() + info["round_number"] = round_num + info["origin"] = origin + + logger.info(f"Round {round_num}: Memory usage: {info}") + if self.log_dir: + with open(os.path.join(self.log_dir, f"{origin}_memory_usage.json"), "a") as f: + f.write(json.dumps(info) + "\n") + + +def _get_memory_usage() -> dict: + process = psutil.Process(os.getpid()) + virtual_memory = psutil.virtual_memory() + swap_memory = psutil.swap_memory() + info = { + "process_memory": round(process.memory_info().rss / (1024**2), 2), + "virtual_memory/total": round(virtual_memory.total / (1024**2), 2), + "virtual_memory/available": round(virtual_memory.available / (1024**2), 2), + "virtual_memory/percent": virtual_memory.percent, + "virtual_memory/used": round(virtual_memory.used / (1024**2), 2), + "virtual_memory/free": round(virtual_memory.free / (1024**2), 2), + "virtual_memory/active": round(virtual_memory.active / (1024**2), 2), + "virtual_memory/inactive": round(virtual_memory.inactive / (1024**2), 2), + "virtual_memory/buffers": round(virtual_memory.buffers / (1024**2), 2), + "virtual_memory/cached": round(virtual_memory.cached / (1024**2), 2), + "virtual_memory/shared": round(virtual_memory.shared / (1024**2), 2), + "swap_memory/total": round(swap_memory.total / (1024**2), 2), + "swap_memory/used": round(swap_memory.used / (1024**2), 2), + "swap_memory/free": round(swap_memory.free / (1024**2), 2), + "swap_memory/percent": swap_memory.percent, + } + return info diff --git a/openfl/callbacks/metric_writer.py b/openfl/callbacks/metric_writer.py new file mode 100644 index 0000000000..fc9a2daa35 --- /dev/null +++ b/openfl/callbacks/metric_writer.py @@ -0,0 +1,69 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import json +import logging +import os + +from tensorboardX import SummaryWriter + +from openfl.callbacks.callback import Callback + +logger = logging.getLogger(__name__) + + +class MetricWriter(Callback): + """Log scalar metrics at the end of each round. + + Attributes: + log_dir: Path to write logs as lines of JSON. Defaults to `./logs`. + use_tensorboard: If True, writes scalar summaries to TensorBoard under `log_dir`. + """ + + def __init__(self, log_dir: str = "./logs/", use_tensorboard: bool = True): + super().__init__() + self.log_dir = log_dir + self.use_tensorboard = use_tensorboard + + self._log_file_handle = None + self._summary_writer = None + + def on_experiment_begin(self, logs=None): + """Open file handles for logging.""" + os.makedirs(self.log_dir, exist_ok=True) + + if not self._log_file_handle: + self._log_file_handle = open( + os.path.join(self.log_dir, self.params["origin"] + "_metrics.txt"), "a" + ) + + if self.use_tensorboard: + self._summary_writer = SummaryWriter( + os.path.join(self.log_dir, self.params["origin"] + "_tensorboard") + ) + + def on_round_end(self, round_num: int, logs=None): + """Log metrics. + + Args: + round_num: The current round number. + logs: A key-value pair of scalar metrics. + """ + logs = logs or {} + logger.info(f"Round {round_num}: Metrics: {logs}") + + self._log_file_handle.write(json.dumps(logs) + "\n") + self._log_file_handle.flush() + + if self._summary_writer: + for key, value in logs.items(): + self._summary_writer.add_scalar(key, value, round_num) + self._summary_writer.flush() + + def on_experiment_end(self, logs=None): + """Cleanup.""" + if self._log_file_handle: + self._log_file_handle.close() + self._log_file_handle = None + + if self._summary_writer: + self._summary_writer.close() diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 1e34aa7e92..eaac9fa6a0 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -4,18 +4,21 @@ """Aggregator module.""" +import logging import queue import time -from logging import getLogger from threading import Lock +from typing import List, Optional +import openfl.callbacks as callbacks_module from openfl.component.straggler_handling_functions import CutoffTimeBasedStragglerHandling from openfl.databases import TensorDB from openfl.interface.aggregation_functions import WeightedAverage from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import base_pb2, utils from openfl.utilities import TaskResultKey, TensorKey, change_tags -from openfl.utilities.logs import get_memory_usage, write_memory_usage_to_file, write_metric + +logger = logging.getLogger(__name__) class Aggregator: @@ -38,8 +41,7 @@ class Aggregator: tensor_db (TensorDB): Object for tensor database. db_store_rounds* (int): Rounds to store in TensorDB. logger: Object for logging. - write_logs (bool): Flag to enable log writing. - log_metric_callback: Callback for logging metrics. + write_logs (bool): Flag to enable metric writer callback. best_model_score (optional): Score of the best model. Defaults to None. metric_queue (queue.Queue): Queue for metrics. @@ -76,10 +78,10 @@ def __init__( single_col_cert_common_name=None, compression_pipeline=None, db_store_rounds=1, - write_logs=False, - log_memory_usage=False, - log_metric_callback=None, initial_tensor_dict=None, + log_memory_usage=False, + write_logs=False, + callbacks: Optional[List] = None, ): """Initializes the Aggregator. @@ -104,17 +106,13 @@ def __init__( NoCompressionPipeline. db_store_rounds (int, optional): Rounds to store in TensorDB. Defaults to 1. - write_logs (bool, optional): Whether to write logs. Defaults to - False. - log_metric_callback (optional): Callback for log metric. Defaults - to None. - **kwargs: Additional keyword arguments. + initial_tensor_dict (dict, optional): Initial tensor dictionary. + callbacks: List of callbacks to be used during the experiment. """ - self.logger = getLogger(__name__) self.round_number = 0 if single_col_cert_common_name: - self.logger.warning( + logger.warning( "You are running in single collaborator certificate mode. " "This mode is intended for development settings only and does not " "provide proper Public Key Infrastructure (PKI) security. " @@ -128,9 +126,7 @@ def __init__( ) self._end_of_round_check_done = [False] * rounds_to_train self.stragglers = [] - # Flag can be enabled to get memory usage details for ubuntu system - self.log_memory_usage = log_memory_usage - self.memory_details = [] + self.rounds_to_train = rounds_to_train # if the collaborator requests a delta, this value is set to true @@ -145,16 +141,6 @@ def __init__( # if it is set to 1 for the aggregator. self.db_store_rounds = db_store_rounds - # Gathered together logging-related objects - self.write_logs = write_logs - self.log_metric_callback = log_metric_callback - - if self.write_logs: - self.log_metric = write_metric - if self.log_metric_callback: - self.log_metric = log_metric_callback - self.logger.info("Using custom log metric: %s", self.log_metric) - self.best_model_score = None self.metric_queue = queue.Queue() @@ -165,6 +151,7 @@ def __init__( self.best_state_path = best_state_path self.last_state_path = last_state_path + # TODO: Remove. Used in deprecated interactive and native APIs self.best_tensor_dict: dict = {} self.last_tensor_dict: dict = {} @@ -195,6 +182,19 @@ def __init__( self.use_delta_updates = use_delta_updates + # Callbacks + self.callbacks = callbacks_module.CallbackList( + callbacks, + add_memory_profiler=log_memory_usage, + add_metric_writer=write_logs, + origin="aggregator", + ) + + # TODO: Aggregator has no concrete notion of round_begin. + # https://github.com/securefederatedai/openfl/pull/1195#discussion_r1879479537 + self.callbacks.on_experiment_begin() + self.callbacks.on_round_begin(self.round_number) + def _load_initial_tensors(self): """Load all of the tensors required to begin federated learning. @@ -209,9 +209,7 @@ def _load_initial_tensors(self): ) if round_number > self.round_number: - self.logger.info( - f"Starting training from round {round_number} of previously saved model" - ) + logger.info(f"Starting training from round {round_number} of previously saved model") self.round_number = round_number tensor_key_dict = { TensorKey(k, self.uuid, self.round_number, False, ("model",)): v @@ -219,7 +217,7 @@ def _load_initial_tensors(self): } # all initial model tensors are loaded here self.tensor_db.cache_tensor(tensor_key_dict) - self.logger.debug("This is the initial tensor_db: %s", self.tensor_db) + logger.debug("This is the initial tensor_db: %s", self.tensor_db) def _load_initial_tensors_from_dict(self, tensor_dict): """Load all of the tensors required to begin federated learning. @@ -236,7 +234,7 @@ def _load_initial_tensors_from_dict(self, tensor_dict): } # all initial model tensors are loaded here self.tensor_db.cache_tensor(tensor_key_dict) - self.logger.debug("This is the initial tensor_db: %s", self.tensor_db) + logger.debug("This is the initial tensor_db: %s", self.tensor_db) def _save_model(self, round_number, file_path): """Save the best or latest model. @@ -261,7 +259,7 @@ def _save_model(self, round_number, file_path): tk_name, _, _, _, _ = tk tensor_dict[tk_name] = self.tensor_db.get_tensor_from_cache(tk) if tensor_dict[tk_name] is None: - self.logger.info( + logger.info( "Cannot save model for round %s. Continuing...", round_number, ) @@ -343,13 +341,13 @@ def get_tasks(self, collaborator_name): sleep_time (int): Sleep time. time_to_quit (bool): Whether it's time to quit. """ - self.logger.debug( + logger.debug( f"Aggregator GetTasks function reached from collaborator {collaborator_name}..." ) # first, if it is time to quit, inform the collaborator if self._time_to_quit(): - self.logger.info( + logger.info( "Sending signal to collaborator %s to shutdown...", collaborator_name, ) @@ -402,7 +400,7 @@ def get_tasks(self, collaborator_name): return tasks, self.round_number, sleep_time, time_to_quit - self.logger.info( + logger.info( f"Sending tasks to collaborator {collaborator_name} for round {self.round_number}" ) sleep_time = 0 @@ -421,7 +419,7 @@ def _straggler_cutoff_time_elapsed(self) -> None: Returns: None """ - self.logger.warning( + logger.warning( f"Round number: {self.round_number} cutoff timer elapsed after " f"{self.straggler_handling_policy.straggler_cutoff_time}s. " f"Applying {self.straggler_handling_policy.__class__.__name__} policy." @@ -460,7 +458,7 @@ def get_aggregated_tensor( Raises: ValueError: if Aggregator does not have an aggregated tensor for {tensor_key}. """ - self.logger.debug( + logger.debug( f"Retrieving aggregated tensor {tensor_name},{round_number},{tags} " f"for collaborator {collaborator_name}" ) @@ -490,7 +488,7 @@ def get_aggregated_tensor( start_retrieving_time = time.time() while nparray is None: - self.logger.debug("Waiting for tensor_key %s", agg_tensor_key) + logger.debug("Waiting for tensor_key %s", agg_tensor_key) time.sleep(5) nparray = self.tensor_db.get_tensor_from_cache(agg_tensor_key) if (time.time() - start_retrieving_time) > 60: @@ -609,20 +607,20 @@ def send_local_task_results( None """ if self._time_to_quit() or collaborator_name in self.stragglers: - self.logger.warning( + logger.warning( f"STRAGGLER: Collaborator {collaborator_name} is reporting results " f"after task {task_name} has finished." ) return if self.round_number != round_number: - self.logger.warning( + logger.warning( f"Collaborator {collaborator_name} is reporting results" f" for the wrong round: {round_number}. Ignoring..." ) return - self.logger.info( + logger.info( f"Collaborator {collaborator_name} is sending task results " f"for {task_name}, round {round_number}" ) @@ -631,7 +629,7 @@ def send_local_task_results( # we mustn't have results already if self._collaborator_task_completed(collaborator_name, task_name, round_number): - self.logger.warning( + logger.warning( f"Aggregator already has task results from collaborator {collaborator_name}" f" for task {task_key}" ) @@ -663,15 +661,6 @@ def send_local_task_results( "metric_value": float(value), } self.metric_queue.put(metrics) - self.logger.metric("%s", str(metrics)) - if self.write_logs: - self.log_metric( - collaborator_name, - task_name, - tensor_key.tensor_name, - float(value), - round_number, - ) task_results.append(tensor_key) @@ -699,7 +688,7 @@ def _end_of_round_with_stragglers_check(self): if collab_name not in self.collaborators_done ] if len(self.stragglers) != 0: - self.logger.warning(f"Identified stragglers: {self.stragglers}") + logger.warning(f"Identified stragglers: {self.stragglers}") self._end_of_round_check() def _process_named_tensor(self, named_tensor, collaborator_name): @@ -788,7 +777,7 @@ def _process_named_tensor(self, named_tensor, collaborator_name): assert final_nparray is not None, f"Could not create tensorkey {final_tensor_key}" self.tensor_db.cache_tensor({final_tensor_key: final_nparray}) - self.logger.debug("Created TensorKey: %s", final_tensor_key) + logger.debug("Created TensorKey: %s", final_tensor_key) return final_tensor_key, final_nparray @@ -846,7 +835,7 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result # Apply delta (unless delta couldn't be created) if base_model_nparray is not None and self.use_delta_updates: - self.logger.debug("Applying delta for layer %s", decompressed_delta_tk[0]) + logger.debug("Applying delta for layer %s", decompressed_delta_tk[0]) new_model_tk, new_model_nparray = self.tensor_codec.apply_delta( decompressed_delta_tk, decompressed_delta_nparray, @@ -879,11 +868,14 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result # Finally, cache the updated model tensor self.tensor_db.cache_tensor({final_model_tk: new_model_nparray}) - def _compute_validation_related_task_metrics(self, task_name): + def _compute_validation_related_task_metrics(self, task_name) -> dict: """Compute all validation related metrics. Args: task_name (str): Task name. + + Returns: + A dictionary of reportable metrics. """ # By default, print out all of the metrics that the validation # task sent @@ -918,6 +910,7 @@ def _compute_validation_related_task_metrics(self, task_name): task_agg_function = self.assigner.get_aggregation_type_for_task(task_name) task_key = TaskResultKey(task_name, collaborators_for_task[0], self.round_number) + metrics = {} for tensor_key in self.collaborator_tasks_results[task_key]: tensor_name, origin, round_number, report, tags = tensor_key assert ( @@ -934,32 +927,26 @@ def _compute_validation_related_task_metrics(self, task_name): ) if report: - # Caution: This schema must be followed. It is also used in - # gRPC message streams for director/envoy. - metrics = { - "metric_origin": "aggregator", - "task_name": task_name, - "metric_name": tensor_key.tensor_name, - "metric_value": float(agg_results), - "round": round_number, - } - - self.metric_queue.put(metrics) - self.logger.metric("%s", metrics) - if self.write_logs: - self.log_metric( - "aggregator", - task_name, - tensor_key.tensor_name, - float(agg_results), - round_number, - ) + # Metric must be a scalar. + value = float(agg_results) + + # TODO: Deprecate `metric_queue` going forward. + self.metric_queue.put( + { + "metric_origin": "aggregator", + "task_name": task_name, + "metric_name": tensor_key.tensor_name, + "metric_value": value, + "round": round_number, + } + ) + metrics.update({f"aggregator/{task_name}/{tensor_key.tensor_name}": value}) # FIXME: Configurable logic for min/max criteria in saving best. if "validate_agg" in tags: # Compare the accuracy of the model, potentially save it if self.best_model_score is None or self.best_model_score < agg_results: - self.logger.metric( + logger.info( f"Round {round_number}: saved the best " f"model with score {agg_results:f}" ) @@ -968,6 +955,8 @@ def _compute_validation_related_task_metrics(self, task_name): if "trained" in tags: self._prepare_trained(tensor_name, origin, round_number, report, agg_results) + return metrics + def _end_of_round_check(self): """Check if the round complete. @@ -985,22 +974,18 @@ def _end_of_round_check(self): return # Compute all validation related metrics - all_tasks = self.assigner.get_all_tasks_for_round(self.round_number) - for task_name in all_tasks: - self._compute_validation_related_task_metrics(task_name) + logs = {} + for task_name in self.assigner.get_all_tasks_for_round(self.round_number): + logs.update(self._compute_validation_related_task_metrics(task_name)) - if self.log_memory_usage: - # This is the place to check the memory usage of the aggregator - memory_detail = get_memory_usage() - memory_detail["round_number"] = self.round_number - memory_detail["metric_origin"] = "aggregator" - self.memory_details.append(memory_detail) + # End of round callbacks. + self.callbacks.on_round_end(self.round_number, logs) # Once all of the task results have been processed self._end_of_round_check_done[self.round_number] = True # Save the latest model - self.logger.info("Saving round %s model...", self.round_number) + logger.info("Saving round %s model...", self.round_number) self._save_model(self.round_number, self.last_state_path) self.round_number += 1 @@ -1011,12 +996,11 @@ def _end_of_round_check(self): # TODO This needs to be fixed! if self._time_to_quit(): - if self.log_memory_usage: - self.logger.info(f"Publish memory usage: {self.memory_details}") - write_memory_usage_to_file(self.memory_details, "aggregator_memory_usage.json") - self.logger.info("Experiment Completed. Cleaning up...") + logger.info("Experiment Completed. Cleaning up...") else: - self.logger.info("Starting round %s...", self.round_number) + logger.info("Starting round %s...", self.round_number) + # https://github.com/securefederatedai/openfl/pull/1195#discussion_r1879479537 + self.callbacks.on_round_begin(self.round_number) # Cleaning tensor db self.tensor_db.clean_up(self.db_store_rounds) @@ -1036,7 +1020,7 @@ def _is_collaborator_done(self, collaborator_name: str, round_number: int) -> No None """ if self.round_number != round_number: - self.logger.warning( + logger.warning( f"Collaborator {collaborator_name} is reporting results" f" for the wrong round: {round_number}. Ignoring..." ) @@ -1056,7 +1040,7 @@ def _is_collaborator_done(self, collaborator_name: str, round_number: int) -> No # update collaborators_done if all_tasks_completed: self.collaborators_done.append(collaborator_name) - self.logger.info( + logger.info( f"Round {self.round_number}: Collaborators that have completed all tasks: " f"{self.collaborators_done}" ) @@ -1070,16 +1054,19 @@ def stop(self, failed_collaborator: str = None) -> None: Returns: None """ - self.logger.info("Force stopping the aggregator execution.") + logger.info("Force stopping the aggregator execution.") # We imitate quit_job_sent_to the failed collaborator # So the experiment set to a finished state if failed_collaborator: self.quit_job_sent_to.append(failed_collaborator) + # End of experiment callbacks. + self.callbacks.on_experiment_end() + # This code does not actually send `quit` tasks to collaborators, # it just mimics it by filling arrays. for collaborator_name in filter(lambda c: c != failed_collaborator, self.authorized_cols): - self.logger.info( + logger.info( "Sending signal to collaborator %s to shutdown...", collaborator_name, ) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 08f19b9d94..d4fd380998 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -4,16 +4,18 @@ """Collaborator module.""" +import logging from enum import Enum -from logging import getLogger from time import sleep -from typing import Tuple +from typing import List, Optional, Tuple +import openfl.callbacks as callbacks_module from openfl.databases import TensorDB from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import utils from openfl.utilities import TensorKey -from openfl.utilities.logs import get_memory_usage, write_memory_usage_to_file + +logger = logging.getLogger(__name__) class DevicePolicy(Enum): @@ -82,6 +84,8 @@ def __init__( compression_pipeline=None, db_store_rounds=1, log_memory_usage=False, + write_logs=False, + callbacks: Optional[List] = None, ): """Initialize the Collaborator object. @@ -103,6 +107,7 @@ def __init__( Defaults to None. db_store_rounds (int, optional): The number of rounds to store in the database. Defaults to 1. + callbacks (list, optional): List of callbacks. Defaults to None. """ self.single_col_cert_common_name = None @@ -123,31 +128,34 @@ def __init__( self.delta_updates = delta_updates self.client = client - # Flag can be enabled to get memory usage details for ubuntu system - self.log_memory_usage = log_memory_usage - self.task_config = task_config - self.logger = getLogger(__name__) + self.task_config = task_config # RESET/CONTINUE_LOCAL/CONTINUE_GLOBAL if hasattr(OptTreatment, opt_treatment): self.opt_treatment = OptTreatment[opt_treatment] else: - self.logger.error("Unknown opt_treatment: %s.", opt_treatment.name) + logger.error("Unknown opt_treatment: %s.", opt_treatment.name) raise NotImplementedError(f"Unknown opt_treatment: {opt_treatment}.") if hasattr(DevicePolicy, device_assignment_policy): self.device_assignment_policy = DevicePolicy[device_assignment_policy] else: - self.logger.error( - "Unknown device_assignment_policy: " f"{device_assignment_policy.name}." - ) + logger.error("Unknown device_assignment_policy: " f"{device_assignment_policy.name}.") raise NotImplementedError( f"Unknown device_assignment_policy: {device_assignment_policy}." ) self.task_runner.set_optimizer_treatment(self.opt_treatment.name) + # Callbacks + self.callbacks = callbacks_module.CallbackList( + callbacks, + add_memory_profiler=log_memory_usage, + add_metric_writer=write_logs, + origin=self.collaborator_name, + ) + def set_available_devices(self, cuda: Tuple[str] = ()): """Set available CUDA devices. @@ -159,33 +167,36 @@ def set_available_devices(self, cuda: Tuple[str] = ()): def run(self): """Run the collaborator.""" - memory_details = [] + # Experiment begin + self.callbacks.on_experiment_begin() + while True: - tasks, round_number, sleep_time, time_to_quit = self.get_tasks() + tasks, round_num, sleep_time, time_to_quit = self.get_tasks() + if time_to_quit: break - elif sleep_time > 0: - sleep(sleep_time) # some sleep function - else: - self.logger.info("Received the following tasks: %s", tasks) - for task in tasks: - self.do_task(task, round_number) - # Cleaning tensor db - self.tensor_db.clean_up(self.db_store_rounds) - if self.log_memory_usage: - # This is the place to check the memory usage of the collaborator - memory_detail = get_memory_usage() - memory_detail["round_number"] = round_number - memory_detail["metric_origin"] = self.collaborator_name - memory_details.append(memory_detail) - if self.log_memory_usage: - self.logger.info(f"Publish memory usage: {memory_details}") - write_memory_usage_to_file( - memory_details, f"{self.collaborator_name}_memory_usage.json" - ) + if not tasks: + sleep(sleep_time) + continue + + # Round begin + logger.info("Received Tasks: %s", tasks) + self.callbacks.on_round_begin(round_num) + + # Run tasks + logs = {} + for task in tasks: + metrics = self.do_task(task, round_num) + logs.update(metrics) - self.logger.info("End of Federation reached. Exiting...") + # Round end + self.tensor_db.clean_up(self.db_store_rounds) + self.callbacks.on_round_end(round_num, logs) + + # Experiment end + self.callbacks.on_experiment_end() + logger.info("Received shutdown signal. Exiting...") def run_simulation(self): """Specific function for the simulation. @@ -196,15 +207,15 @@ def run_simulation(self): while True: tasks, round_number, sleep_time, time_to_quit = self.get_tasks() if time_to_quit: - self.logger.info("End of Federation reached. Exiting...") + logger.info("End of Federation reached. Exiting...") break elif sleep_time > 0: sleep(sleep_time) # some sleep function else: - self.logger.info("Received the following tasks: %s", tasks) + logger.info("Received the following tasks: %s", tasks) for task in tasks: self.do_task(task, round_number) - self.logger.info( + logger.info( f"All tasks completed on {self.collaborator_name} " f"for round {round_number}..." ) @@ -220,19 +231,22 @@ def get_tasks(self): time_to_quit (bool): bool value for quit. """ # logging wait time to analyze training process - self.logger.info("Waiting for tasks...") + logger.info("Waiting for tasks...") tasks, round_number, sleep_time, time_to_quit = self.client.get_tasks( self.collaborator_name ) return tasks, round_number, sleep_time, time_to_quit - def do_task(self, task, round_number): + def do_task(self, task, round_number) -> dict: """Perform the specified task. Args: task (list_of_str): List of tasks. round_number (int): Actual round number. + + Returns: + A dictionary of reportable metrics of the current collaborator for the task. """ # map this task to an actual function name and kwargs if hasattr(self.task_runner, "TASK_REGISTRY"): @@ -288,7 +302,7 @@ def do_task(self, task, round_number): # New interactive python API # New `Core` TaskRunner contains registry of tasks func = self.task_runner.TASK_REGISTRY[func_name] - self.logger.debug("Using Interactive Python API") + logger.debug("Using Interactive Python API") # So far 'kwargs' contained parameters read from the plan # those are parameters that the eperiment owner registered for @@ -306,7 +320,7 @@ def do_task(self, task, round_number): # TaskRunner subclassing API # Tasks are defined as methods of TaskRunner func = getattr(self.task_runner, func_name) - self.logger.debug("Using TaskRunner subclassing API") + logger.debug("Using TaskRunner subclassing API") global_output_tensor_dict, local_output_tensor_dict = func( col_name=self.collaborator_name, @@ -321,7 +335,8 @@ def do_task(self, task, round_number): # send the results for this tasks; delta and compression will occur in # this function - self.send_task_results(global_output_tensor_dict, round_number, task_name) + metrics = self.send_task_results(global_output_tensor_dict, round_number, task_name) + return metrics def get_numpy_dict_for_tensorkeys(self, tensor_keys): """Get tensor dictionary for specified tensorkey set. @@ -345,13 +360,13 @@ def get_data_for_tensorkey(self, tensor_key): """ # try to get from the store tensor_name, origin, round_number, report, tags = tensor_key - self.logger.debug("Attempting to retrieve tensor %s from local store", tensor_key) + logger.debug("Attempting to retrieve tensor %s from local store", tensor_key) nparray = self.tensor_db.get_tensor_from_cache(tensor_key) # if None and origin is our client, request it from the client if nparray is None: if origin == self.collaborator_name: - self.logger.info( + logger.info( f"Attempting to find locally stored {tensor_name} tensor from prior round..." ) prior_round = round_number - 1 @@ -360,16 +375,14 @@ def get_data_for_tensorkey(self, tensor_key): TensorKey(tensor_name, origin, prior_round, report, tags) ) if nparray is not None: - self.logger.debug( + logger.debug( f"Found tensor {tensor_name} in local TensorDB " f"for round {prior_round}" ) return nparray prior_round -= 1 - self.logger.info( - f"Cannot find any prior version of tensor {tensor_name} locally..." - ) - self.logger.debug( + logger.info(f"Cannot find any prior version of tensor {tensor_name} locally...") + logger.debug( "Unable to get tensor from local store..." "attempting to retrieve from client" ) # Determine whether there are additional compression related @@ -397,7 +410,7 @@ def get_data_for_tensorkey(self, tensor_key): ) self.tensor_db.cache_tensor({new_model_tk: nparray}) else: - self.logger.info( + logger.info( "Count not find previous model layer." "Fetching latest layer from aggregator" ) @@ -411,7 +424,7 @@ def get_data_for_tensorkey(self, tensor_key): tensor_key, require_lossless=True ) else: - self.logger.debug("Found tensor %s in local TensorDB", tensor_key) + logger.debug("Found tensor %s in local TensorDB", tensor_key) return nparray @@ -437,7 +450,7 @@ def get_aggregated_tensor_from_aggregator(self, tensor_key, require_lossless=Fal """ tensor_name, origin, round_number, report, tags = tensor_key - self.logger.debug("Requesting aggregated tensor %s", tensor_key) + logger.debug("Requesting aggregated tensor %s", tensor_key) tensor = self.client.get_aggregated_tensor( self.collaborator_name, tensor_name, @@ -456,13 +469,16 @@ def get_aggregated_tensor_from_aggregator(self, tensor_key, require_lossless=Fal return nparray - def send_task_results(self, tensor_dict, round_number, task_name): + def send_task_results(self, tensor_dict, round_number, task_name) -> dict: """Send task results to the aggregator. Args: tensor_dict (dict): Tensor dictionary. round_number (int): Actual round number. task_name (string): Task name. + + Returns: + A dictionary of reportable metrics of the current collaborator for the task. """ named_tensors = [self.nparray_to_named_tensor(k, v) for k, v in tensor_dict.items()] @@ -477,17 +493,16 @@ def send_task_results(self, tensor_dict, round_number, task_name): if "valid" in task_name: data_size = self.task_runner.get_valid_data_size() - self.logger.debug("%s data size = %s", task_name, data_size) + logger.debug("%s data size = %s", task_name, data_size) + metrics = {} for tensor in tensor_dict: tensor_name, origin, fl_round, report, tags = tensor if report: - self.logger.metric( - f"Round {round_number}, collaborator {self.collaborator_name} " - f"is sending metric for task {task_name}:" - f" {tensor_name}\t{tensor_dict[tensor]:f}" - ) + # Reportable metric must be a scalar + value = float(tensor_dict[tensor]) + metrics.update({f"{self.collaborator_name}/{task_name}/{tensor_name}": value}) self.client.send_local_task_results( self.collaborator_name, @@ -497,6 +512,8 @@ def send_task_results(self, tensor_dict, round_number, task_name): named_tensors, ) + return metrics + def nparray_to_named_tensor(self, tensor_key, nparray): """Construct the NamedTensor Protobuf. @@ -579,7 +596,7 @@ def named_tensor_to_nparray(self, named_tensor): named_tensor.report, tuple(named_tensor.tags), ) - tensor_name, origin, round_number, report, tags = tensor_key + *_, tags = tensor_key if "compressed" in tags: decompressed_tensor_key, decompressed_nparray = self.tensor_codec.decompress( tensor_key, @@ -594,7 +611,7 @@ def named_tensor_to_nparray(self, named_tensor): else: # There could be a case where the compression pipeline is bypassed # entirely - self.logger.warning("Bypassing tensor codec...") + logger.warning("Bypassing tensor codec...") decompressed_tensor_key = tensor_key decompressed_nparray = raw_bytes diff --git a/openfl/federated/plan/plan.py b/openfl/federated/plan/plan.py index 34c50a4d1e..69ff36c19c 100644 --- a/openfl/federated/plan/plan.py +++ b/openfl/federated/plan/plan.py @@ -391,18 +391,9 @@ def get_aggregator(self, tensor_dict=None): defaults[SETTINGS]["assigner"] = self.get_assigner() defaults[SETTINGS]["compression_pipeline"] = self.get_tensor_pipe() defaults[SETTINGS]["straggler_handling_policy"] = self.get_straggler_handling_policy() - log_metric_callback = defaults[SETTINGS].get("log_metric_callback") - - if log_metric_callback: - if isinstance(log_metric_callback, dict): - log_metric_callback = Plan.import_(**log_metric_callback) - elif not callable(log_metric_callback): - raise TypeError( - f"log_metric_callback should be callable object " - f"or be import from code part, get {log_metric_callback}" - ) - defaults[SETTINGS]["log_metric_callback"] = log_metric_callback + # TODO: Load callbacks from plan. + if self.aggregator_ is None: self.aggregator_ = Plan.build(**defaults, initial_tensor_dict=tensor_dict) @@ -577,6 +568,8 @@ def get_collaborator( defaults[SETTINGS]["aggregator_uuid"] = self.aggregator_uuid defaults[SETTINGS]["federation_uuid"] = self.federation_uuid + # TODO: Load callbacks from the plan. + if task_runner is not None: defaults[SETTINGS]["task_runner"] = task_runner else: diff --git a/openfl/transport/grpc/aggregator_server.py b/openfl/transport/grpc/aggregator_server.py index bfae10351b..12f658a0aa 100644 --- a/openfl/transport/grpc/aggregator_server.py +++ b/openfl/transport/grpc/aggregator_server.py @@ -70,7 +70,6 @@ def __init__( TLS connection. **kwargs: Additional keyword arguments. """ - print(f"{use_tls=}") self.aggregator = aggregator self.uri = f"[::]:{agg_port}" self.use_tls = use_tls diff --git a/openfl/utilities/logs.py b/openfl/utilities/logs.py index ce64b5f7fb..3798f2bc90 100644 --- a/openfl/utilities/logs.py +++ b/openfl/utilities/logs.py @@ -4,43 +4,10 @@ """Logs utilities.""" -import json import logging -import os -import psutil from rich.console import Console from rich.logging import RichHandler -from tensorboardX import SummaryWriter - -writer = None - - -def get_writer(): - """Create global writer object. - - This function creates a global `SummaryWriter` object for logging to - TensorBoard. - """ - global writer - if not writer: - writer = SummaryWriter("./logs/tensorboard", flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback. - - This function logs a metric to TensorBoard. - - Args: - node_name (str): The name of the node. - task_name (str): The name of the task. - metric_name (str): The name of the metric. - metric (float): The value of the metric. - round_number (int): The current round number. - """ - get_writer() - writer.add_scalar(f"{node_name}/{task_name}/{metric_name}", metric, round_number) def setup_loggers(log_level=logging.INFO): @@ -60,52 +27,3 @@ def setup_loggers(log_level=logging.INFO): formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] - %(message)s") handler.setFormatter(formatter) root.addHandler(handler) - - -def get_memory_usage() -> dict: - """Return memory usage details of the current process. - - Returns: - dict: A dictionary containing memory usage details. - """ - process = psutil.Process(os.getpid()) - virtual_memory = psutil.virtual_memory() - swap_memory = psutil.swap_memory() - memory_usage = { - "process_memory": round(process.memory_info().rss / (1024**2), 2), - "virtual_memory": { - "total": round(virtual_memory.total / (1024**2), 2), - "available": round(virtual_memory.available / (1024**2), 2), - "percent": virtual_memory.percent, - "used": round(virtual_memory.used / (1024**2), 2), - "free": round(virtual_memory.free / (1024**2), 2), - "active": round(virtual_memory.active / (1024**2), 2), - "inactive": round(virtual_memory.inactive / (1024**2), 2), - "buffers": round(virtual_memory.buffers / (1024**2), 2), - "cached": round(virtual_memory.cached / (1024**2), 2), - "shared": round(virtual_memory.shared / (1024**2), 2), - }, - "swap_memory": { - "total": round(swap_memory.total / (1024**2), 2), - "used": round(swap_memory.used / (1024**2), 2), - "free": round(swap_memory.free / (1024**2), 2), - "percent": swap_memory.percent, - }, - } - return memory_usage - - -def write_memory_usage_to_file(memory_usage_dict, file_name): - """ - Write memory usage details to a file. - - Args: - memory_usage_dict (dict): The memory usage details to write. - file_name (str): The name of the file to write to. - - Returns: - None - """ - file_path = os.path.join("logs", file_name) - with open(file_path, "w") as f: - json.dump(memory_usage_dict, f, indent=4) diff --git a/tests/openfl/component/collaborator/test_collaborator.py b/tests/openfl/component/collaborator/test_collaborator.py index a136fe2cba..88a60015c6 100644 --- a/tests/openfl/component/collaborator/test_collaborator.py +++ b/tests/openfl/component/collaborator/test_collaborator.py @@ -133,7 +133,6 @@ def test_send_task_results(collaborator_mock, tensor_key): data_size = -1 collaborator_mock.nparray_to_named_tensor = mock.Mock(return_value=None) collaborator_mock.client.send_local_task_results = mock.Mock() - collaborator_mock.logger.metric = mock.Mock() collaborator_mock.send_task_results(tensor_dict, round_number, task_name) collaborator_mock.client.send_local_task_results.assert_called_with( @@ -286,7 +285,7 @@ def test_run(collaborator_mock): collaborator_mock.get_tasks = mock.Mock() collaborator_mock.get_tasks.side_effect = [(['task'], round_number, 0, False), (['task'], round_number, 0, True)] - collaborator_mock.do_task = mock.Mock() + collaborator_mock.do_task = mock.Mock(return_value={'metric': 0.0}) collaborator_mock.run() collaborator_mock.do_task.assert_called_with('task', round_number) From 18cda3e062a8aacd0e904caf32a106a8858548ba Mon Sep 17 00:00:00 2001 From: Payal Chaurasiya Date: Fri, 27 Dec 2024 16:50:33 +0530 Subject: [PATCH 4/4] Replace xml.etree.ElementTree.parse with its defusedxml (#1230) * Replace xml.etree.ElementTree.parse with its defusedxml Signed-off-by: Chaurasiya, Payal * convert to json Signed-off-by: Chaurasiya, Payal * Fix memory logs and create pdf Signed-off-by: Chaurasiya, Payal --------- Signed-off-by: Chaurasiya, Payal --- test-requirements.txt | 3 + .../test_suites/memory_logs_tests.py | 27 ++++- tests/end_to_end/utils/generate_report.py | 101 ++++++++++++++++++ tests/end_to_end/utils/summary_helper.py | 4 +- 4 files changed, 131 insertions(+), 4 deletions(-) create mode 100644 tests/end_to_end/utils/generate_report.py diff --git a/test-requirements.txt b/test-requirements.txt index bb2fc0421b..446d67e9af 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,3 +4,6 @@ paramiko pytest==8.3.4 pytest-asyncio==0.25.0 pytest-mock==3.14.0 +defusedxml==0.7.1 +matplotlib==3.10.0 +fpdf==1.7.2 \ No newline at end of file diff --git a/tests/end_to_end/test_suites/memory_logs_tests.py b/tests/end_to_end/test_suites/memory_logs_tests.py index b152cd0852..d0957a8336 100644 --- a/tests/end_to_end/test_suites/memory_logs_tests.py +++ b/tests/end_to_end/test_suites/memory_logs_tests.py @@ -9,6 +9,7 @@ from tests.end_to_end.utils.common_fixtures import fx_federation_tr, fx_federation_tr_dws import tests.end_to_end.utils.constants as constants from tests.end_to_end.utils import federation_helper as fed_helper, ssh_helper as ssh +from tests.end_to_end.utils.generate_report import generate_memory_report log = logging.getLogger(__name__) @@ -78,7 +79,9 @@ def _log_memory_usage(request, fed_obj): ), "Aggregator memory usage file is not available" # Log the aggregator memory usage details - memory_usage_dict = json.load(open(aggregator_memory_usage_file)) + memory_usage_dict = _convert_to_json(aggregator_memory_usage_file) + aggregator_path = os.path.join(fed_obj.workspace_path, "aggregator") + generate_memory_report(memory_usage_dict, aggregator_path) # check memory usage entries for each round assert ( @@ -98,10 +101,30 @@ def _log_memory_usage(request, fed_obj): collaborator_memory_usage_file ), f"Memory usage file for collaborator {collaborator.collaborator_name} is not available" - memory_usage_dict = json.load(open(collaborator_memory_usage_file)) + memory_usage_dict = _convert_to_json(collaborator_memory_usage_file) + collaborator_path = os.path.join(fed_obj.workspace_path, collaborator.name) + generate_memory_report(memory_usage_dict, collaborator_path) assert ( len(memory_usage_dict) == request.config.num_rounds ), f"Memory usage details are not available for all rounds for collaborator {collaborator.collaborator_name}" log.info("Memory usage details are available for all participants") + + +def _convert_to_json(file): + """ + Reads a file containing JSON objects, one per line, and converts them into a list of parsed JSON objects. + + Args: + file (str): The path to the file containing JSON objects. + + Returns: + list: A list of parsed JSON objects. + """ + with open(file, 'r') as infile: + json_objects = infile.readlines() + + # Parse each JSON object + parsed_json_objects = [json.loads(obj) for obj in json_objects] + return parsed_json_objects diff --git a/tests/end_to_end/utils/generate_report.py b/tests/end_to_end/utils/generate_report.py new file mode 100644 index 0000000000..879a103608 --- /dev/null +++ b/tests/end_to_end/utils/generate_report.py @@ -0,0 +1,101 @@ +import pandas as pd +import matplotlib.pyplot as plt +import numpy as np +from scipy.stats import linregress +from fpdf import FPDF + +class PDF(FPDF): + def header(self): + self.set_font('Arial', 'B', 14) + + def chapter_title(self, title): + self.add_page() + self.set_font('Arial', 'B', 14) # Set font to bold for title + self.cell(0, 10, title, 0, 1, 'L') + + def chapter_body(self, body): + self.set_font('Arial', '', 12) + self.multi_cell(0, 10, body) + +def generate_memory_report(memory_usage_dict, workspace_path): + """ + Generates a memory usage report from a CSV file. + + Parameters: + file_path (str): The path to the CSV file containing memory usage data. + + Returns: + None + """ + # Load data + data = pd.DataFrame(memory_usage_dict) + + # Plotting the chart + plt.figure(figsize=(10, 5)) + plt.plot(data['round_number'], data['virtual_memory/used'], marker='o') + plt.title('Memory Usage per Round') + plt.xlabel('round_number') + plt.ylabel('Virtual Memory Used (MB)') + plt.grid(True) + output_path = f"{workspace_path}/mem_usage_plot.png" + plt.savefig(output_path) + plt.close() + + # Calculate statistics + min_mem = round(data['virtual_memory/used'].min(), 2) + max_mem = round(data['virtual_memory/used'].max(), 2) + mean_mem = round(data['virtual_memory/used'].mean(), 2) + variance_mem = round(data['virtual_memory/used'].var(), 2) + std_dev_mem = round(data['virtual_memory/used'].std(), 2) + slope, _, _, _, _ = linregress(data.index, data['virtual_memory/used']) + slope = round(slope, 2) + stats_path = f"{workspace_path}/mem_stats.txt" + with open(stats_path, 'w') as file: + file.write(f"Minimum Memory Used: {min_mem} MB\n") + file.write(f"Maximum Memory Used: {max_mem} MB\n") + file.write(f"Mean Memory Used: {mean_mem} MB\n") + file.write(f"Variance: {variance_mem}\n") + file.write(f"Standard Deviation: {std_dev_mem}\n") + file.write(f"Slope: {slope}\n") + + # Generate PDF report + pdf = PDF() + add_introduction(pdf) + add_chart_analysis(pdf, output_path, data) + add_statistical_overview(pdf, stats_path) + add_conclusion(pdf, slope) + pdf_output_path = f"{workspace_path}/MemAnalysis.pdf" + pdf.output(pdf_output_path) + + print("Memory report generation completed. Report saved to:", pdf_output_path) + +def add_introduction(pdf): + pdf.chapter_title('Introduction') + intro_text = ("The purpose of this memory analysis is to identify memory usage trends and potential bottlenecks. " + "This analysis focuses on the relationship between round information and memory usage.") + pdf.chapter_body(intro_text) + +def add_chart_analysis(pdf, output_path, data): + pdf.chapter_title('Chart Analysis') + pdf.image(output_path, w=180) + diffs = data['virtual_memory/used'].diff().round(2) + significant_changes = diffs[diffs.abs() > 500] + for index, value in significant_changes.items(): + pdf.chapter_body(f"Significant memory change: {value} MB at Round {data['round_number'][index]}") + +def add_statistical_overview(pdf, stats_path): + pdf.chapter_title('Statistical Overview') + with open(stats_path, 'r') as file: + stats = file.read() + pdf.chapter_body(stats) + +def add_conclusion(pdf, slope): + pdf.chapter_title('Conclusion') + if slope > 0: + conclusion_text = "The upward slope in the graph indicates a trend of increasing memory usage over rounds." + else: + conclusion_text = "There is no continuous memory growth." + pdf.chapter_body(conclusion_text) + +# Uncomment the following line to run the function directly when this script is executed +# generate_memory_report('/home/sys_tpe_st_svc_acct/memory_leak/mem_info_aggr.csv') diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index a832a281c7..25b29ad9fd 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -1,7 +1,7 @@ # Copyright 2020-2023 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -import xml.etree.ElementTree as ET +from defusedxml.ElementTree import parse as defused_parse from lxml import etree import os from pathlib import Path @@ -17,7 +17,7 @@ print(f"Results XML file not found at {result_xml}. Exiting...") exit(1) -tree = ET.parse(result_xml, parser=parser) +tree = defused_parse(result_xml, parser=parser) # Get the root element testsuites = tree.getroot()