Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use status 404 in more places when appropriate #5480

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion supervisor/api/addons.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
APIAddonNotInstalled,
APIError,
APIForbidden,
APINotFound,
PwnedError,
PwnedSecret,
)
Expand Down Expand Up @@ -161,7 +162,7 @@ def get_addon_for_request(self, request: web.Request) -> Addon:

addon = self.sys_addons.get(addon_slug)
if not addon:
raise APIError(f"Addon {addon_slug} does not exist")
raise APINotFound(f"Addon {addon_slug} does not exist")
if not isinstance(addon, Addon) or not addon.is_installed:
raise APIAddonNotInstalled("Addon is not installed")

Expand Down
4 changes: 2 additions & 2 deletions supervisor/api/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
AddonState,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden
from ..exceptions import APIForbidden, APINotFound
from .utils import api_process, api_validate, require_home_assistant

_LOGGER: logging.Logger = logging.getLogger(__name__)
Expand All @@ -36,7 +36,7 @@ def _extract_message(self, request):
"""Extract discovery message from URL."""
message = self.sys_discovery.get(request.match_info.get("uuid"))
if not message:
raise APIError("Discovery message not found")
raise APINotFound("Discovery message not found")
return message

@api_process
Expand Down
4 changes: 4 additions & 0 deletions supervisor/api/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ATTR_VERSION,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APINotFound
from .utils import api_process, api_validate

_LOGGER: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,6 +59,9 @@ async def create_registry(self, request: web.Request):
async def remove_registry(self, request: web.Request):
"""Delete a docker registry."""
hostname = request.match_info.get(ATTR_HOSTNAME)
if hostname not in self.sys_docker.config.registries:
raise APINotFound(f"Hostname {hostname} does not exist in registries")

del self.sys_docker.config.registries[hostname]
self.sys_docker.config.save_data()

Expand Down
13 changes: 10 additions & 3 deletions supervisor/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import voluptuous as vol

from ..coresys import CoreSysAttributes
from ..exceptions import APIError
from ..exceptions import APIError, APINotFound, JobNotFound
from ..jobs import SupervisorJob
from ..jobs.const import ATTR_IGNORE_CONDITIONS, JobCondition
from .const import ATTR_JOBS
Expand All @@ -23,6 +23,13 @@
class APIJobs(CoreSysAttributes):
"""Handle RESTful API for OS functions."""

def _extract_job(self, request: web.Request) -> SupervisorJob:
"""Extract job from request or raise."""
try:
return self.sys_jobs.get_job(request.match_info.get("uuid"))
except JobNotFound:
raise APINotFound("Job does not exist") from None

def _list_jobs(self, start: SupervisorJob | None = None) -> list[dict[str, Any]]:
"""Return current job tree."""
jobs_by_parent: dict[str | None, list[SupervisorJob]] = {}
Expand Down Expand Up @@ -86,13 +93,13 @@ async def reset(self, request: web.Request) -> None:
@api_process
async def job_info(self, request: web.Request) -> dict[str, Any]:
"""Get details of a job by ID."""
job = self.sys_jobs.get_job(request.match_info.get("uuid"))
job = self._extract_job(request)
return self._list_jobs(job)[0]

@api_process
async def remove_job(self, request: web.Request) -> None:
"""Remove a completed job."""
job = self.sys_jobs.get_job(request.match_info.get("uuid"))
job = self._extract_job(request)

if not job.done:
raise APIError(f"Job {job.uuid} is not done!")
Expand Down
27 changes: 16 additions & 11 deletions supervisor/api/mounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from ..const import ATTR_NAME, ATTR_STATE
from ..coresys import CoreSysAttributes
from ..exceptions import APIError
from ..exceptions import APIError, APINotFound
from ..mounts.const import ATTR_DEFAULT_BACKUP_MOUNT, MountUsage
from ..mounts.mount import Mount
from ..mounts.validate import SCHEMA_MOUNT_CONFIG
Expand All @@ -24,6 +24,13 @@
class APIMounts(CoreSysAttributes):
"""Handle REST API for mounting options."""

def _extract_mount(self, request: web.Request) -> Mount:
"""Extract mount from request or raise."""
name = request.match_info.get("mount")
if name not in self.sys_mounts:
raise APINotFound(f"No mount exists with name {name}")
return self.sys_mounts.get(name)

@api_process
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return MountManager info."""
Expand Down Expand Up @@ -85,15 +92,13 @@ async def create_mount(self, request: web.Request) -> None:
@api_process
async def update_mount(self, request: web.Request) -> None:
"""Update an existing mount in supervisor."""
name = request.match_info.get("mount")
current = self._extract_mount(request)
name_schema = vol.Schema(
{vol.Optional(ATTR_NAME, default=name): name}, extra=vol.ALLOW_EXTRA
{vol.Optional(ATTR_NAME, default=current.name): current.name},
extra=vol.ALLOW_EXTRA,
)
body = await api_validate(vol.All(name_schema, SCHEMA_MOUNT_CONFIG), request)

if name not in self.sys_mounts:
raise APIError(f"No mount exists with name {name}")

mount = Mount.from_dict(self.coresys, body)
await self.sys_mounts.create_mount(mount)

Expand All @@ -110,8 +115,8 @@ async def update_mount(self, request: web.Request) -> None:
@api_process
async def delete_mount(self, request: web.Request) -> None:
"""Delete an existing mount in supervisor."""
name = request.match_info.get("mount")
mount = await self.sys_mounts.remove_mount(name)
current = self._extract_mount(request)
mount = await self.sys_mounts.remove_mount(current.name)

# If it was a backup mount, reload backups
if mount.usage == MountUsage.BACKUP:
Expand All @@ -122,9 +127,9 @@ async def delete_mount(self, request: web.Request) -> None:
@api_process
async def reload_mount(self, request: web.Request) -> None:
"""Reload an existing mount in supervisor."""
name = request.match_info.get("mount")
await self.sys_mounts.reload_mount(name)
mount = self._extract_mount(request)
await self.sys_mounts.reload_mount(mount.name)

# If it's a backup mount, reload backups
if self.sys_mounts.get(name).usage == MountUsage.BACKUP:
if mount.usage == MountUsage.BACKUP:
self.sys_create_task(self.sys_backups.reload())
4 changes: 2 additions & 2 deletions supervisor/api/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
DOCKER_NETWORK_MASK,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, HostNetworkNotFound
from ..exceptions import APIError, APINotFound, HostNetworkNotFound
from ..host.configuration import (
AccessPoint,
Interface,
Expand Down Expand Up @@ -167,7 +167,7 @@ def _get_interface(self, name: str) -> Interface:
except HostNetworkNotFound:
pass

raise APIError(f"Interface {name} does not exist") from None
raise APINotFound(f"Interface {name} does not exist") from None

@api_process
async def info(self, request: web.Request) -> dict[str, Any]:
Expand Down
82 changes: 41 additions & 41 deletions supervisor/api/resolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
ATTR_UNSUPPORTED,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, ResolutionNotFound
from ..resolution.data import Suggestion
from ..exceptions import APINotFound, ResolutionNotFound
from ..resolution.checks.base import CheckBase
from ..resolution.data import Issue, Suggestion
from .utils import api_process, api_validate

SCHEMA_CHECK_OPTIONS = vol.Schema({vol.Optional(ATTR_ENABLED): bool})
Expand All @@ -29,6 +30,29 @@
class APIResoulution(CoreSysAttributes):
"""Handle REST API for resoulution."""

def _extract_issue(self, request: web.Request) -> Issue:
"""Extract issue from request or raise."""
try:
return self.sys_resolution.get_issue(request.match_info.get("issue"))
except ResolutionNotFound:
raise APINotFound("The supplied UUID is not a valid issue") from None

def _extract_suggestion(self, request: web.Request) -> Suggestion:
"""Extract suggestion from request or raise."""
try:
return self.sys_resolution.get_suggestion(
request.match_info.get("suggestion")
)
except ResolutionNotFound:
raise APINotFound("The supplied UUID is not a valid suggestion") from None

def _extract_check(self, request: web.Request) -> CheckBase:
"""Extract check from request or raise."""
try:
return self.sys_resolution.check.get(request.match_info.get("check"))
except ResolutionNotFound:
raise APINotFound("The supplied check slug is not available") from None

def _generate_suggestion_information(self, suggestion: Suggestion):
"""Generate suggestion information for response."""
resp = attr.asdict(suggestion)
Expand Down Expand Up @@ -61,47 +85,31 @@ async def info(self, request: web.Request) -> dict[str, Any]:
@api_process
async def apply_suggestion(self, request: web.Request) -> None:
"""Apply suggestion."""
try:
suggestion = self.sys_resolution.get_suggestion(
request.match_info.get("suggestion")
)
await self.sys_resolution.apply_suggestion(suggestion)
except ResolutionNotFound:
raise APIError("The supplied UUID is not a valid suggestion") from None
suggestion = self._extract_suggestion(request)
await self.sys_resolution.apply_suggestion(suggestion)

@api_process
async def dismiss_suggestion(self, request: web.Request) -> None:
"""Dismiss suggestion."""
try:
suggestion = self.sys_resolution.get_suggestion(
request.match_info.get("suggestion")
)
self.sys_resolution.dismiss_suggestion(suggestion)
except ResolutionNotFound:
raise APIError("The supplied UUID is not a valid suggestion") from None
suggestion = self._extract_suggestion(request)
self.sys_resolution.dismiss_suggestion(suggestion)

@api_process
async def suggestions_for_issue(self, request: web.Request) -> dict[str, Any]:
"""Return suggestions that fix an issue."""
try:
issue = self.sys_resolution.get_issue(request.match_info.get("issue"))
return {
ATTR_SUGGESTIONS: [
self._generate_suggestion_information(suggestion)
for suggestion in self.sys_resolution.suggestions_for_issue(issue)
]
}
except ResolutionNotFound:
raise APIError("The supplied UUID is not a valid issue") from None
issue = self._extract_issue(request)
return {
ATTR_SUGGESTIONS: [
self._generate_suggestion_information(suggestion)
for suggestion in self.sys_resolution.suggestions_for_issue(issue)
]
}

@api_process
async def dismiss_issue(self, request: web.Request) -> None:
"""Dismiss issue."""
try:
issue = self.sys_resolution.get_issue(request.match_info.get("issue"))
self.sys_resolution.dismiss_issue(issue)
except ResolutionNotFound:
raise APIError("The supplied UUID is not a valid issue") from None
issue = self._extract_issue(request)
self.sys_resolution.dismiss_issue(issue)

@api_process
def healthcheck(self, request: web.Request) -> Awaitable[None]:
Expand All @@ -112,11 +120,7 @@ def healthcheck(self, request: web.Request) -> Awaitable[None]:
async def options_check(self, request: web.Request) -> None:
"""Set options for check."""
body = await api_validate(SCHEMA_CHECK_OPTIONS, request)

try:
check = self.sys_resolution.check.get(request.match_info.get("check"))
except ResolutionNotFound:
raise APIError("The supplied check slug is not available") from None
check = self._extract_check(request)

# Apply options
if ATTR_ENABLED in body:
Expand All @@ -127,9 +131,5 @@ async def options_check(self, request: web.Request) -> None:
@api_process
async def run_check(self, request: web.Request) -> None:
"""Execute a backend check."""
try:
check = self.sys_resolution.check.get(request.match_info.get("check"))
except ResolutionNotFound:
raise APIError("The supplied check slug is not available") from None

check = self._extract_check(request)
await check()
4 changes: 2 additions & 2 deletions supervisor/api/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
REQUEST_FROM,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden
from ..exceptions import APIError, APIForbidden, APINotFound
from .utils import api_process, api_validate


Expand All @@ -20,7 +20,7 @@ def _extract_service(self, request):
"""Return service, throw an exception if it doesn't exist."""
service = self.sys_services.get(request.match_info.get("service"))
if not service:
raise APIError("Service does not exist")
raise APINotFound("Service does not exist")

return service

Expand Down
35 changes: 17 additions & 18 deletions supervisor/api/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
REQUEST_FROM,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden
from ..exceptions import APIError, APIForbidden, APINotFound
from ..store.addon import AddonStore
from ..store.repository import Repository
from ..store.validate import validate_repository
Expand All @@ -74,31 +74,30 @@ class APIStore(CoreSysAttributes):
def _extract_addon(self, request: web.Request, installed=False) -> AnyAddon:
"""Return add-on, throw an exception it it doesn't exist."""
addon_slug: str = request.match_info.get("addon")
addon_version: str = request.match_info.get("version", "latest")

if installed:
addon = self.sys_addons.local.get(addon_slug)
if addon is None or not addon.is_installed:
raise APIError(f"Addon {addon_slug} is not installed")
else:
addon = self.sys_addons.store.get(addon_slug)

if not addon:
raise APIError(
f"Addon {addon_slug} with version {addon_version} does not exist in the store"
)

if not (addon := self.sys_addons.get(addon_slug)):
raise APINotFound(f"Addon {addon_slug} does not exist")

if installed and not addon.is_installed:
raise APIError(f"Addon {addon_slug} is not installed")

if not installed and addon.is_installed:
if not addon.addon_store:
raise APINotFound(f"Addon {addon_slug} does not exist in the store")
return addon.addon_store

return addon

def _extract_repository(self, request: web.Request) -> Repository:
"""Return repository, throw an exception it it doesn't exist."""
repository_slug: str = request.match_info.get("repository")

repository = self.sys_store.get(repository_slug)
if not repository:
raise APIError(f"Repository {repository_slug} does not exist in the store")
if repository_slug not in self.sys_store.repositories:
raise APINotFound(
f"Repository {repository_slug} does not exist in the store"
)

return repository
return self.sys_store.get(repository_slug)

def _generate_addon_information(
self, addon: AddonStore, extended: bool = False
Expand Down
Loading
Loading