Skip to content

Commit

Permalink
Introspect endpoint - standalone (#1688)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Jan 3, 2024
1 parent 43ea441 commit 4ee14d4
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 3 deletions.
18 changes: 16 additions & 2 deletions nucliadb/nucliadb/standalone/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import datetime
import logging
import time

import orjson
from fastapi import Request
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.routing import APIRouter
from fastapi_versioning import version
from jwcrypto import jwe, jwk # type: ignore

from nucliadb.common.cluster import manager
from nucliadb.common.http_clients.processing import ProcessingHTTPClient
from nucliadb.standalone import versions
from nucliadb.standalone import introspect, versions
from nucliadb_models.resource import NucliaDBRoles
from nucliadb_utils.authentication import requires
from nucliadb_utils.settings import nuclia_settings
Expand Down Expand Up @@ -137,3 +138,16 @@ async def versions_endpoint(request: Request) -> JSONResponse:
for package in versions.WatchedPackages
}
)


@standalone_api_router.get("/introspect")
def introspect_endpoint(request: Request) -> StreamingResponse:
introspect_id = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
return StreamingResponse(
content=introspect.stream_tar(request.app),
status_code=200,
headers={
"Content-Disposition": f"attachment; filename=introspect_{introspect_id}.tar.gz"
},
media_type="application/octet-stream",
)
200 changes: 200 additions & 0 deletions nucliadb/nucliadb/standalone/introspect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

import asyncio
import os
import platform
import sys
import tarfile
import tempfile
from collections.abc import AsyncGenerator
from typing import Optional

import pkg_resources
import psutil
from fastapi import FastAPI
from pydantic import BaseModel

from nucliadb.common.cluster import manager as cluster_manager
from nucliadb.standalone.settings import Settings
from nucliadb_telemetry.settings import LogSettings

MB = 1024 * 1024
CHUNK_SIZE = 2 * MB
SYSTEM_INFO_TEMPLATE = """System info
===========
Python
------
- Version: {python_version}
Operative system
----------------
- Name: {os_name}
- Release: {os_release}
- Version: {os_version}
- Machine: {os_machine}
- File System Encoding: {os_file_system_encoding}
CPU information
---------------
- Number of CPUs: {cpu_count}
Memory information
------------------
- Total: {memory_total:.2f} MB
- Available: {memory_available:.2f} MB
- Used: {memory_used:.2f} MB
- Used %: {memory_used_percent:.2f}%
"""


class NodeInfo(BaseModel):
id: str
address: str
shard_count: int
primary_id: Optional[str]


class ClusterInfo(BaseModel):
nodes: list[NodeInfo]


async def stream_tar(app: FastAPI) -> AsyncGenerator[bytes, None]:
with tempfile.TemporaryDirectory() as temp_dir:
tar_file = os.path.join(temp_dir, "introspect.tar.gz")
with tarfile.open(tar_file, mode="w:gz") as tar:
await add_system_info(temp_dir, tar)
await add_dependencies(temp_dir, tar)
await add_cluster_info(temp_dir, tar)
settings: Settings = app.settings.copy() # type: ignore
await add_settings(temp_dir, tar, settings)
if settings.log_output_type == "file":
await add_logs(tar)

async for chunk in stream_out_tar(tar_file):
yield chunk


async def stream_out_tar(tar_file: str) -> AsyncGenerator[bytes, None]:
loop = asyncio.get_event_loop()
with open(tar_file, "rb") as f:
chunk = await loop.run_in_executor(None, f.read, CHUNK_SIZE)
while chunk:
yield chunk
chunk = await loop.run_in_executor(None, f.read, CHUNK_SIZE)


async def add_system_info(temp_dir: str, tar: tarfile.TarFile):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _add_system_info_to_tar, temp_dir, tar)


def _add_system_info_to_tar(temp_dir: str, tar: tarfile.TarFile):
system_info_file = os.path.join(temp_dir, "system_info.txt")
with open(system_info_file, "w") as f:
memory = psutil.virtual_memory()
f.write(
SYSTEM_INFO_TEMPLATE.format(
python_version=sys.version,
os_name=os.uname().sysname,
os_release=platform.release(),
os_version=platform.version(),
os_machine=platform.machine(),
os_file_system_encoding=os.sys.getfilesystemencoding(), # type: ignore
cpu_count=psutil.cpu_count(),
memory_total=memory.total / MB,
memory_available=memory.available / MB,
memory_used=memory.used / MB,
memory_used_percent=memory.percent,
)
)
tar.add(system_info_file, arcname="system_info.txt")


async def add_dependencies(temp_dir: str, tar: tarfile.TarFile):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _add_dependencies_to_tar, temp_dir, tar)


def _add_dependencies_to_tar(temp_dir: str, tar: tarfile.TarFile):
dependendies_file = os.path.join(temp_dir, "dependencies.txt")
with open(dependendies_file, "w") as f:
installed_packages = [pkg for pkg in pkg_resources.working_set]
lines = []
for pkg in sorted(installed_packages, key=lambda p: p.key):
lines.append(f"{pkg.key}=={pkg.version}\n")
f.writelines(lines)
tar.add(dependendies_file, arcname="dependencies.txt")


async def add_cluster_info(temp_dir: str, tar: tarfile.TarFile):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _add_cluster_info_to_tar, temp_dir, tar)


def _add_cluster_info_to_tar(temp_dir: str, tar: tarfile.TarFile):
cluster_info = ClusterInfo(
nodes=[
NodeInfo(
id=node.id,
address=node.address,
shard_count=node.shard_count,
primary_id=node.primary_id,
)
for node in cluster_manager.get_index_nodes()
]
)
cluster_info_file = os.path.join(temp_dir, "cluster_info.txt")
with open(cluster_info_file, "w") as f:
f.write(cluster_info.json(indent=4))
tar.add(cluster_info_file, arcname="cluster_info.txt")


async def add_settings(temp_dir: str, tar: tarfile.TarFile, settings: Settings):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _add_settings_to_tar, temp_dir, tar, settings)


def _add_settings_to_tar(temp_dir: str, tar: tarfile.TarFile, settings: Settings):
# Remove sensitive data from settings
settings.nua_api_key = None
settings.jwk_key = None
settings.gcs_base64_creds = None
settings.s3_client_secret = None
settings_file = os.path.join(temp_dir, "settings.json")
with open(settings_file, "w") as f:
f.write(settings.json(indent=4))
tar.add(settings_file, arcname="settings.json")


async def add_logs(tar):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _add_logs_to_tar, tar)


def _add_logs_to_tar(tar: tarfile.TarFile):
log_settings = LogSettings()
access_log = os.path.realpath(log_settings.access_log)
tar.add(access_log, arcname="logs/access.log")
error_log = os.path.realpath(log_settings.error_log)
tar.add(error_log, arcname="logs/error.log")
info_log = os.path.realpath(log_settings.info_log)
tar.add(info_log, arcname="logs/info.log")
83 changes: 83 additions & 0 deletions nucliadb/nucliadb/standalone/tests/integration/test_introspect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

import os
import tarfile
import tempfile

import pytest

from nucliadb.standalone.introspect import ClusterInfo
from nucliadb.standalone.settings import Settings


@pytest.mark.asyncio
async def test_introspect_endpoint(nucliadb_manager) -> None:
# Generate some traffic to have some logs
await nucliadb_manager.post("/not/found")
await nucliadb_manager.delete("/kb/foobar")

resp = await nucliadb_manager.get("/introspect", timeout=600)
assert resp.status_code == 200

with tempfile.TemporaryDirectory() as root_dir:
# Save the tar to a file
introspect_tar_file = os.path.join(root_dir, "introspect.tar.gz")
with open(introspect_tar_file, "wb") as f:
f.write(resp.content)

# Extract the tar
extracted_tar = os.path.join(root_dir, "introspect")
with tarfile.open(introspect_tar_file, "r:gz") as tar:
tar.extractall(extracted_tar)

# Check system info
assert os.path.exists(os.path.join(extracted_tar, "system_info.txt"))

# Check dependencies
assert os.path.exists(os.path.join(extracted_tar, "dependencies.txt"))
with open(os.path.join(extracted_tar, "dependencies.txt")) as f:
dependencies = f.read()
assert "nucliadb" in dependencies
assert "nucliadb-models" in dependencies

# Check cluster info
assert os.path.exists(os.path.join(extracted_tar, "cluster_info.txt"))
cluster_info = ClusterInfo.parse_file(
os.path.join(extracted_tar, "cluster_info.txt")
)
assert len(cluster_info.nodes) > 0

# Check settings
assert os.path.exists(os.path.join(extracted_tar, "settings.json"))
introspect_settings = Settings.parse_file(
os.path.join(extracted_tar, "settings.json")
)
# Check that sensitive data is not included
assert introspect_settings.nua_api_key is None
assert introspect_settings.jwk_key is None
assert introspect_settings.gcs_base64_creds is None
assert introspect_settings.s3_client_secret is None

# Check logs
assert os.path.exists(os.path.join(extracted_tar, "logs/info.log"))
assert os.path.exists(os.path.join(extracted_tar, "logs/error.log"))
assert os.path.exists(os.path.join(extracted_tar, "logs/access.log"))
assert os.path.getsize(os.path.join(extracted_tar, "logs/access.log")) > 0
27 changes: 26 additions & 1 deletion nucliadb/nucliadb/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
from nucliadb.standalone.settings import Settings
from nucliadb.tests.utils import inject_message
from nucliadb.writer import API_PREFIX
from nucliadb_telemetry.logs import setup_logging
from nucliadb_telemetry.settings import (
LogFormatType,
LogLevel,
LogOutputType,
LogSettings,
)
from nucliadb_utils.storages.settings import settings as storage_settings
from nucliadb_utils.store import MAIN
from nucliadb_utils.tests import free_port
Expand Down Expand Up @@ -124,19 +131,37 @@ async def nucliadb(dummy_processing, analytics_disabled, driver_settings, tmpdir
# we need to force DATA_PATH updates to run every test on the proper
# temporary directory
data_path = f"{tmpdir}/node"
local_files = f"{tmpdir}/blob"
os.environ["DATA_PATH"] = data_path

settings = Settings(
file_backend="local",
local_files=f"{tmpdir}/blob",
local_files=local_files,
data_path=data_path,
http_port=free_port(),
ingest_grpc_port=free_port(),
train_grpc_port=free_port(),
standalone_node_port=free_port(),
log_format_type=LogFormatType.PLAIN,
log_output_type=LogOutputType.FILE,
**driver_settings.dict(),
)

config_nucliadb(settings)

# Make sure tests don't write logs outside of the tmpdir
os.environ["ERROR_LOG"] = f"{tmpdir}/logs/error.log"
os.environ["ACCESS_LOG"] = f"{tmpdir}/logs/access.log"
os.environ["INFO_LOG"] = f"{tmpdir}/logs/info.log"

setup_logging(
settings=LogSettings(
log_output_type=LogOutputType.FILE,
log_format_type=LogFormatType.PLAIN,
debug=False,
log_level=LogLevel.WARNING,
)
)
server = await run_async_nucliadb(settings)

yield settings
Expand Down
2 changes: 2 additions & 0 deletions nucliadb/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ aiohttp>=3.9.0
lru-dict>=1.1.7
backoff
aiofiles>=0.8.0
psutil==5.9.7
types-psutil==5.9.5.17
types-aiofiles>=0.8.3
protobuf==4.22.3
types-protobuf>=3.19.20,<4.0
Expand Down

0 comments on commit 4ee14d4

Please sign in to comment.