Skip to content

Commit

Permalink
Add typing for builtin (#1450)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jc2k authored Jul 7, 2024
1 parent f9452bf commit 5d3afab
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 88 deletions.
204 changes: 117 additions & 87 deletions music_assistant/server/providers/builtin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
ProviderMapping,
Radio,
Track,
UniqueList,
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import DB_SCHEMA_VERSION, MASS_LOGO, VARIOUS_ARTISTS_FANART
Expand Down Expand Up @@ -162,40 +163,46 @@ def supported_features(self) -> tuple[ProviderFeature, ...]:
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
parsed_item = await self.parse_item(prov_track_id)
assert isinstance(parsed_item, Track)
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
# always prefer the stored info, such as the name
parsed_item.name = stored_item["name"]
if image_url := stored_item.get("image_url"):
parsed_item.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
provider=self.instance_id,
remotely_accessible=image_url.startswith("http"),
)
]
parsed_item.metadata.images = UniqueList(
[
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
provider=self.instance_id,
remotely_accessible=image_url.startswith("http"),
)
]
)
return parsed_item

async def get_radio(self, prov_radio_id: str) -> Radio:
"""Get full radio details by id."""
parsed_item = await self.parse_item(prov_radio_id, force_radio=True)
assert isinstance(parsed_item, Radio)
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
if stored_item := next((x for x in stored_items if x["item_id"] == prov_radio_id), None):
# always prefer the stored info, such as the name
parsed_item.name = stored_item["name"]
if image_url := stored_item.get("image_url"):
parsed_item.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
provider=self.instance_id,
remotely_accessible=image_url.startswith("http"),
)
]
parsed_item.metadata.images = UniqueList(
[
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
provider=self.instance_id,
remotely_accessible=image_url.startswith("http"),
)
]
)
return parsed_item

async def get_artist(self, prov_artist_id: str) -> Track:
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
artist = prov_artist_id
# this is here for compatibility reasons only
Expand Down Expand Up @@ -231,9 +238,9 @@ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
owner="Music Assistant",
is_editable=False,
metadata=MediaItemMetadata(
images=[DEFAULT_THUMB]
images=UniqueList([DEFAULT_THUMB])
if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS
else [DEFAULT_THUMB, DEFAULT_FANART],
else UniqueList([DEFAULT_THUMB, DEFAULT_FANART]),
cache_checksum=str(int(time.time())),
),
)
Expand All @@ -258,14 +265,16 @@ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
)
playlist.metadata.cache_checksum = f"{DB_SCHEMA_VERSION}.{stored_item.get('last_updated')}"
if image_url := stored_item.get("image_url"):
playlist.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
provider=self.instance_id,
remotely_accessible=image_url.startswith("http"),
)
]
playlist.metadata.images = UniqueList(
[
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
provider=self.instance_id,
remotely_accessible=image_url.startswith("http"),
)
]
)
return playlist

async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
Expand Down Expand Up @@ -373,6 +382,7 @@ async def get_playlist_tracks(
track = await media_controller.get_provider_item(
item_id, provider_instance_id_or_domain
)
assert isinstance(track, Track)
track.position = offset + index
result.append(track)
except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
Expand All @@ -392,8 +402,9 @@ async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[
# mark last_updated on playlist object
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
stored_item["last_updated"] = int(time.time())
self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
if stored_item:
stored_item["last_updated"] = int(time.time())
self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)

async def remove_playlist_tracks(
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
Expand All @@ -408,10 +419,11 @@ async def remove_playlist_tracks(
# mark last_updated on playlist object
stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
stored_item["last_updated"] = int(time.time())
self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
if stored_item:
stored_item["last_updated"] = int(time.time())
self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)

async def create_playlist(self, name: str) -> Playlist: # type: ignore[return]
async def create_playlist(self, name: str) -> Playlist:
"""Create a new playlist on provider with given name."""
item_id = shortuuid.random(8)
stored_item = StoredItem(item_id=item_id, name=name)
Expand Down Expand Up @@ -442,6 +454,7 @@ async def parse_item(
),
)
}
media_item: Track | Radio
if is_radio or force_radio:
# treat as radio
media_item = Radio(
Expand All @@ -459,19 +472,23 @@ async def parse_item(
provider=self.domain,
name=media_info.title or url,
duration=int(media_info.duration or 0),
artists=[await self.get_artist(artist) for artist in media_info.artists],
artists=UniqueList(
[await self.get_artist(artist) for artist in media_info.artists]
),
provider_mappings=provider_mappings,
)

if media_info.has_cover_image:
media_item.metadata.images = [
MediaItemImage(
type=ImageType.THUMB,
path=url,
provider=self.instance_id,
remotely_accessible=False,
)
]
media_item.metadata.images = UniqueList(
[
MediaItemImage(
type=ImageType.THUMB,
path=url,
provider=self.instance_id,
remotely_accessible=False,
)
]
)
return media_item

async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
Expand Down Expand Up @@ -507,59 +524,72 @@ async def get_stream_details(self, item_id: str) -> StreamDetails:
can_seek=not is_radio,
)

async def _get_builtin_playlist_tracks(
self, builtin_playlist_id: str
) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given builtin playlist id."""
async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]:
result: list[Track] = []
if builtin_playlist_id == ALL_FAVORITE_TRACKS:
res = await self.mass.music.tracks.library_items(
favorite=True, limit=250000, order_by="random"
res = await self.mass.music.tracks.library_items(
favorite=True, limit=250000, order_by="random"
)
for idx, item in enumerate(res, 1):
item.position = idx
result.append(item)
return result

async def _get_builtin_playlist_random_tracks(self) -> list[Track]:
result: list[Track] = []
res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast")
for idx, item in enumerate(res, 1):
item.position = idx
result.append(item)
return result

async def _get_builtin_playlist_random_album(self) -> list[Track]:
result: list[Track] = []
for random_album in await self.mass.music.albums.library_items(
limit=1, order_by="random_fast"
):
tracks = await self.mass.music.albums.tracks(
random_album.item_id, random_album.provider
)
for idx, track in enumerate(tracks, 1):
track.position = idx
result.append(track)
return result

async def _get_builtin_playlist_random_artist(self) -> list[Track]:
result: list[Track] = []
for random_artist in await self.mass.music.artists.library_items(
limit=1, order_by="random_fast"
):
tracks = await self.mass.music.artists.tracks(
random_artist.item_id, random_artist.provider
)
for idx, item in enumerate(res, 1):
item.position = idx
result.append(item)
return result
if builtin_playlist_id == RANDOM_TRACKS:
res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast")
for idx, item in enumerate(res, 1):
item.position = idx
result.append(item)
return result
if builtin_playlist_id == RANDOM_ALBUM:
for random_album in await self.mass.music.albums.library_items(
limit=1, order_by="random_fast"
):
# use the function specified in the queue controller as that
# already handles unwrapping an album by user preference
tracks = await self.mass.music.albums.tracks(
random_album.item_id, random_album.provider
)
for idx, track in enumerate(tracks, 1):
track.position = idx
result.append(track)
return result
if builtin_playlist_id == RANDOM_ARTIST:
for random_artist in await self.mass.music.artists.library_items(
limit=1, order_by="random_fast"
):
# use the function specified in the queue controller as that
# already handles unwrapping an artist by user preference
tracks = await self.mass.music.artists.tracks(
random_artist.item_id, random_artist.provider
)
for idx, track in enumerate(tracks, 1):
track.position = idx
result.append(track)
return result
if builtin_playlist_id == RECENTLY_PLAYED:
tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
for idx, track in enumerate(tracks, 1):
track.position = idx
result.append(track)
return result
return result

async def _get_builtin_playlist_recently_played(self) -> list[Track]:
result: list[Track] = []
recent_tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
for idx, track in enumerate(recent_tracks, 1):
assert isinstance(track, Track)
track.position = idx
result.append(track)
return result

async def _get_builtin_playlist_tracks(self, builtin_playlist_id: str) -> list[Track]:
"""Get all playlist tracks for given builtin playlist id."""
try:
return await {
ALL_FAVORITE_TRACKS: self._get_builtin_playlist_random_favorite_tracks,
RANDOM_TRACKS: self._get_builtin_playlist_random_tracks,
RANDOM_ALBUM: self._get_builtin_playlist_random_album,
RANDOM_ARTIST: self._get_builtin_playlist_random_artist,
RECENTLY_PLAYED: self._get_builtin_playlist_recently_played,
}[builtin_playlist_id]()
except KeyError:
raise MediaNotFoundError(f"No built in playlist: {builtin_playlist_id}")

async def _read_playlist_file_items(
self, playlist_id: str, offset: int = 0, limit: int = 100000
) -> list[str]:
Expand Down
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
packages=tests,music_assistant.client,music_assistant.common,music_assistant.server.providers.jellyfin,music_assistant.server.providers.radiobrowser
packages=tests,music_assistant.client,music_assistant.common,music_assistant.server.providers.builtin,music_assistant.server.providers.jellyfin,music_assistant.server.providers.radiobrowser

0 comments on commit 5d3afab

Please sign in to comment.