From 4ee14d4ce304d2ae224ec3a13ba70a3c0862d25a Mon Sep 17 00:00:00 2001 From: Ferran Llamas Date: Wed, 3 Jan 2024 10:51:56 +0100 Subject: [PATCH] Introspect endpoint - standalone (#1688) --- nucliadb/nucliadb/standalone/api_router.py | 18 +- nucliadb/nucliadb/standalone/introspect.py | 200 ++++++++++++++++++ .../tests/integration/test_introspect.py | 83 ++++++++ nucliadb/nucliadb/tests/fixtures.py | 27 ++- nucliadb/requirements.txt | 2 + 5 files changed, 327 insertions(+), 3 deletions(-) create mode 100644 nucliadb/nucliadb/standalone/introspect.py create mode 100644 nucliadb/nucliadb/standalone/tests/integration/test_introspect.py diff --git a/nucliadb/nucliadb/standalone/api_router.py b/nucliadb/nucliadb/standalone/api_router.py index df7e2fbc10..151908f943 100644 --- a/nucliadb/nucliadb/standalone/api_router.py +++ b/nucliadb/nucliadb/standalone/api_router.py @@ -17,19 +17,20 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # +import datetime import logging import time import orjson from fastapi import Request -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, StreamingResponse from fastapi.routing import APIRouter from fastapi_versioning import version from jwcrypto import jwe, jwk # type: ignore from nucliadb.common.cluster import manager from nucliadb.common.http_clients.processing import ProcessingHTTPClient -from nucliadb.standalone import versions +from nucliadb.standalone import introspect, versions from nucliadb_models.resource import NucliaDBRoles from nucliadb_utils.authentication import requires from nucliadb_utils.settings import nuclia_settings @@ -137,3 +138,16 @@ async def versions_endpoint(request: Request) -> JSONResponse: for package in versions.WatchedPackages } ) + + +@standalone_api_router.get("/introspect") +def introspect_endpoint(request: Request) -> StreamingResponse: + introspect_id = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + return StreamingResponse( + content=introspect.stream_tar(request.app), + status_code=200, + headers={ + "Content-Disposition": f"attachment; filename=introspect_{introspect_id}.tar.gz" + }, + media_type="application/octet-stream", + ) diff --git a/nucliadb/nucliadb/standalone/introspect.py b/nucliadb/nucliadb/standalone/introspect.py new file mode 100644 index 0000000000..72b431cf32 --- /dev/null +++ b/nucliadb/nucliadb/standalone/introspect.py @@ -0,0 +1,200 @@ +# Copyright (C) 2021 Bosutech XXI S.L. +# +# nucliadb is offered under the AGPL v3.0 and as commercial software. +# For commercial licensing, contact us at info@nuclia.com. +# +# AGPL: +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# + +import asyncio +import os +import platform +import sys +import tarfile +import tempfile +from collections.abc import AsyncGenerator +from typing import Optional + +import pkg_resources +import psutil +from fastapi import FastAPI +from pydantic import BaseModel + +from nucliadb.common.cluster import manager as cluster_manager +from nucliadb.standalone.settings import Settings +from nucliadb_telemetry.settings import LogSettings + +MB = 1024 * 1024 +CHUNK_SIZE = 2 * MB +SYSTEM_INFO_TEMPLATE = """System info +=========== + +Python +------ + - Version: {python_version} + +Operative system +---------------- + - Name: {os_name} + - Release: {os_release} + - Version: {os_version} + - Machine: {os_machine} + - File System Encoding: {os_file_system_encoding} + +CPU information +--------------- + - Number of CPUs: {cpu_count} + +Memory information +------------------ + - Total: {memory_total:.2f} MB + - Available: {memory_available:.2f} MB + - Used: {memory_used:.2f} MB + - Used %: {memory_used_percent:.2f}% +""" + + +class NodeInfo(BaseModel): + id: str + address: str + shard_count: int + primary_id: Optional[str] + + +class ClusterInfo(BaseModel): + nodes: list[NodeInfo] + + +async def stream_tar(app: FastAPI) -> AsyncGenerator[bytes, None]: + with tempfile.TemporaryDirectory() as temp_dir: + tar_file = os.path.join(temp_dir, "introspect.tar.gz") + with tarfile.open(tar_file, mode="w:gz") as tar: + await add_system_info(temp_dir, tar) + await add_dependencies(temp_dir, tar) + await add_cluster_info(temp_dir, tar) + settings: Settings = app.settings.copy() # type: ignore + await add_settings(temp_dir, tar, settings) + if settings.log_output_type == "file": + await add_logs(tar) + + async for chunk in stream_out_tar(tar_file): + yield chunk + + +async def stream_out_tar(tar_file: str) -> AsyncGenerator[bytes, None]: + loop = asyncio.get_event_loop() + with open(tar_file, "rb") as f: + chunk = await loop.run_in_executor(None, f.read, CHUNK_SIZE) + while chunk: + yield chunk + chunk = await loop.run_in_executor(None, f.read, CHUNK_SIZE) + + +async def add_system_info(temp_dir: str, tar: tarfile.TarFile): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _add_system_info_to_tar, temp_dir, tar) + + +def _add_system_info_to_tar(temp_dir: str, tar: tarfile.TarFile): + system_info_file = os.path.join(temp_dir, "system_info.txt") + with open(system_info_file, "w") as f: + memory = psutil.virtual_memory() + f.write( + SYSTEM_INFO_TEMPLATE.format( + python_version=sys.version, + os_name=os.uname().sysname, + os_release=platform.release(), + os_version=platform.version(), + os_machine=platform.machine(), + os_file_system_encoding=os.sys.getfilesystemencoding(), # type: ignore + cpu_count=psutil.cpu_count(), + memory_total=memory.total / MB, + memory_available=memory.available / MB, + memory_used=memory.used / MB, + memory_used_percent=memory.percent, + ) + ) + tar.add(system_info_file, arcname="system_info.txt") + + +async def add_dependencies(temp_dir: str, tar: tarfile.TarFile): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _add_dependencies_to_tar, temp_dir, tar) + + +def _add_dependencies_to_tar(temp_dir: str, tar: tarfile.TarFile): + dependendies_file = os.path.join(temp_dir, "dependencies.txt") + with open(dependendies_file, "w") as f: + installed_packages = [pkg for pkg in pkg_resources.working_set] + lines = [] + for pkg in sorted(installed_packages, key=lambda p: p.key): + lines.append(f"{pkg.key}=={pkg.version}\n") + f.writelines(lines) + tar.add(dependendies_file, arcname="dependencies.txt") + + +async def add_cluster_info(temp_dir: str, tar: tarfile.TarFile): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _add_cluster_info_to_tar, temp_dir, tar) + + +def _add_cluster_info_to_tar(temp_dir: str, tar: tarfile.TarFile): + cluster_info = ClusterInfo( + nodes=[ + NodeInfo( + id=node.id, + address=node.address, + shard_count=node.shard_count, + primary_id=node.primary_id, + ) + for node in cluster_manager.get_index_nodes() + ] + ) + cluster_info_file = os.path.join(temp_dir, "cluster_info.txt") + with open(cluster_info_file, "w") as f: + f.write(cluster_info.json(indent=4)) + tar.add(cluster_info_file, arcname="cluster_info.txt") + + +async def add_settings(temp_dir: str, tar: tarfile.TarFile, settings: Settings): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _add_settings_to_tar, temp_dir, tar, settings) + + +def _add_settings_to_tar(temp_dir: str, tar: tarfile.TarFile, settings: Settings): + # Remove sensitive data from settings + settings.nua_api_key = None + settings.jwk_key = None + settings.gcs_base64_creds = None + settings.s3_client_secret = None + settings_file = os.path.join(temp_dir, "settings.json") + with open(settings_file, "w") as f: + f.write(settings.json(indent=4)) + tar.add(settings_file, arcname="settings.json") + + +async def add_logs(tar): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _add_logs_to_tar, tar) + + +def _add_logs_to_tar(tar: tarfile.TarFile): + log_settings = LogSettings() + access_log = os.path.realpath(log_settings.access_log) + tar.add(access_log, arcname="logs/access.log") + error_log = os.path.realpath(log_settings.error_log) + tar.add(error_log, arcname="logs/error.log") + info_log = os.path.realpath(log_settings.info_log) + tar.add(info_log, arcname="logs/info.log") diff --git a/nucliadb/nucliadb/standalone/tests/integration/test_introspect.py b/nucliadb/nucliadb/standalone/tests/integration/test_introspect.py new file mode 100644 index 0000000000..7592397af1 --- /dev/null +++ b/nucliadb/nucliadb/standalone/tests/integration/test_introspect.py @@ -0,0 +1,83 @@ +# Copyright (C) 2021 Bosutech XXI S.L. +# +# nucliadb is offered under the AGPL v3.0 and as commercial software. +# For commercial licensing, contact us at info@nuclia.com. +# +# AGPL: +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# + +import os +import tarfile +import tempfile + +import pytest + +from nucliadb.standalone.introspect import ClusterInfo +from nucliadb.standalone.settings import Settings + + +@pytest.mark.asyncio +async def test_introspect_endpoint(nucliadb_manager) -> None: + # Generate some traffic to have some logs + await nucliadb_manager.post("/not/found") + await nucliadb_manager.delete("/kb/foobar") + + resp = await nucliadb_manager.get("/introspect", timeout=600) + assert resp.status_code == 200 + + with tempfile.TemporaryDirectory() as root_dir: + # Save the tar to a file + introspect_tar_file = os.path.join(root_dir, "introspect.tar.gz") + with open(introspect_tar_file, "wb") as f: + f.write(resp.content) + + # Extract the tar + extracted_tar = os.path.join(root_dir, "introspect") + with tarfile.open(introspect_tar_file, "r:gz") as tar: + tar.extractall(extracted_tar) + + # Check system info + assert os.path.exists(os.path.join(extracted_tar, "system_info.txt")) + + # Check dependencies + assert os.path.exists(os.path.join(extracted_tar, "dependencies.txt")) + with open(os.path.join(extracted_tar, "dependencies.txt")) as f: + dependencies = f.read() + assert "nucliadb" in dependencies + assert "nucliadb-models" in dependencies + + # Check cluster info + assert os.path.exists(os.path.join(extracted_tar, "cluster_info.txt")) + cluster_info = ClusterInfo.parse_file( + os.path.join(extracted_tar, "cluster_info.txt") + ) + assert len(cluster_info.nodes) > 0 + + # Check settings + assert os.path.exists(os.path.join(extracted_tar, "settings.json")) + introspect_settings = Settings.parse_file( + os.path.join(extracted_tar, "settings.json") + ) + # Check that sensitive data is not included + assert introspect_settings.nua_api_key is None + assert introspect_settings.jwk_key is None + assert introspect_settings.gcs_base64_creds is None + assert introspect_settings.s3_client_secret is None + + # Check logs + assert os.path.exists(os.path.join(extracted_tar, "logs/info.log")) + assert os.path.exists(os.path.join(extracted_tar, "logs/error.log")) + assert os.path.exists(os.path.join(extracted_tar, "logs/access.log")) + assert os.path.getsize(os.path.join(extracted_tar, "logs/access.log")) > 0 diff --git a/nucliadb/nucliadb/tests/fixtures.py b/nucliadb/nucliadb/tests/fixtures.py index 6d0065d8f3..bdaa50ade0 100644 --- a/nucliadb/nucliadb/tests/fixtures.py +++ b/nucliadb/nucliadb/tests/fixtures.py @@ -50,6 +50,13 @@ from nucliadb.standalone.settings import Settings from nucliadb.tests.utils import inject_message from nucliadb.writer import API_PREFIX +from nucliadb_telemetry.logs import setup_logging +from nucliadb_telemetry.settings import ( + LogFormatType, + LogLevel, + LogOutputType, + LogSettings, +) from nucliadb_utils.storages.settings import settings as storage_settings from nucliadb_utils.store import MAIN from nucliadb_utils.tests import free_port @@ -124,19 +131,37 @@ async def nucliadb(dummy_processing, analytics_disabled, driver_settings, tmpdir # we need to force DATA_PATH updates to run every test on the proper # temporary directory data_path = f"{tmpdir}/node" + local_files = f"{tmpdir}/blob" os.environ["DATA_PATH"] = data_path + settings = Settings( file_backend="local", - local_files=f"{tmpdir}/blob", + local_files=local_files, data_path=data_path, http_port=free_port(), ingest_grpc_port=free_port(), train_grpc_port=free_port(), standalone_node_port=free_port(), + log_format_type=LogFormatType.PLAIN, + log_output_type=LogOutputType.FILE, **driver_settings.dict(), ) config_nucliadb(settings) + + # Make sure tests don't write logs outside of the tmpdir + os.environ["ERROR_LOG"] = f"{tmpdir}/logs/error.log" + os.environ["ACCESS_LOG"] = f"{tmpdir}/logs/access.log" + os.environ["INFO_LOG"] = f"{tmpdir}/logs/info.log" + + setup_logging( + settings=LogSettings( + log_output_type=LogOutputType.FILE, + log_format_type=LogFormatType.PLAIN, + debug=False, + log_level=LogLevel.WARNING, + ) + ) server = await run_async_nucliadb(settings) yield settings diff --git a/nucliadb/requirements.txt b/nucliadb/requirements.txt index 68fcdb052a..767915e732 100644 --- a/nucliadb/requirements.txt +++ b/nucliadb/requirements.txt @@ -6,6 +6,8 @@ aiohttp>=3.9.0 lru-dict>=1.1.7 backoff aiofiles>=0.8.0 +psutil==5.9.7 +types-psutil==5.9.5.17 types-aiofiles>=0.8.3 protobuf==4.22.3 types-protobuf>=3.19.20,<4.0