diff --git a/music_assistant/server/providers/builtin/__init__.py b/music_assistant/server/providers/builtin/__init__.py index 0da09eaf3..f3b55e3f0 100644 --- a/music_assistant/server/providers/builtin/__init__.py +++ b/music_assistant/server/providers/builtin/__init__.py @@ -36,6 +36,7 @@ ProviderMapping, Radio, Track, + UniqueList, ) from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.constants import DB_SCHEMA_VERSION, MASS_LOGO, VARIOUS_ARTISTS_FANART @@ -162,40 +163,46 @@ def supported_features(self) -> tuple[ProviderFeature, ...]: async def get_track(self, prov_track_id: str) -> Track: """Get full track details by id.""" parsed_item = await self.parse_item(prov_track_id) + assert isinstance(parsed_item, Track) stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, []) if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None): # always prefer the stored info, such as the name parsed_item.name = stored_item["name"] if image_url := stored_item.get("image_url"): - parsed_item.metadata.images = [ - MediaItemImage( - type=ImageType.THUMB, - path=image_url, - provider=self.instance_id, - remotely_accessible=image_url.startswith("http"), - ) - ] + parsed_item.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=self.instance_id, + remotely_accessible=image_url.startswith("http"), + ) + ] + ) return parsed_item async def get_radio(self, prov_radio_id: str) -> Radio: """Get full radio details by id.""" parsed_item = await self.parse_item(prov_radio_id, force_radio=True) + assert isinstance(parsed_item, Radio) stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, []) if stored_item := next((x for x in stored_items if x["item_id"] == prov_radio_id), None): # always prefer the stored info, such as the name parsed_item.name = stored_item["name"] if image_url := stored_item.get("image_url"): - parsed_item.metadata.images = [ - MediaItemImage( - type=ImageType.THUMB, - path=image_url, - provider=self.instance_id, - remotely_accessible=image_url.startswith("http"), - ) - ] + parsed_item.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=self.instance_id, + remotely_accessible=image_url.startswith("http"), + ) + ] + ) return parsed_item - async def get_artist(self, prov_artist_id: str) -> Track: + async def get_artist(self, prov_artist_id: str) -> Artist: """Get full artist details by id.""" artist = prov_artist_id # this is here for compatibility reasons only @@ -231,9 +238,9 @@ async def get_playlist(self, prov_playlist_id: str) -> Playlist: owner="Music Assistant", is_editable=False, metadata=MediaItemMetadata( - images=[DEFAULT_THUMB] + images=UniqueList([DEFAULT_THUMB]) if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS - else [DEFAULT_THUMB, DEFAULT_FANART], + else UniqueList([DEFAULT_THUMB, DEFAULT_FANART]), cache_checksum=str(int(time.time())), ), ) @@ -258,14 +265,16 @@ async def get_playlist(self, prov_playlist_id: str) -> Playlist: ) playlist.metadata.cache_checksum = f"{DB_SCHEMA_VERSION}.{stored_item.get('last_updated')}" if image_url := stored_item.get("image_url"): - playlist.metadata.images = [ - MediaItemImage( - type=ImageType.THUMB, - path=image_url, - provider=self.instance_id, - remotely_accessible=image_url.startswith("http"), - ) - ] + playlist.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=self.instance_id, + remotely_accessible=image_url.startswith("http"), + ) + ] + ) return playlist async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType: @@ -373,6 +382,7 @@ async def get_playlist_tracks( track = await media_controller.get_provider_item( item_id, provider_instance_id_or_domain ) + assert isinstance(track, Track) track.position = offset + index result.append(track) except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err: @@ -392,8 +402,9 @@ async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[ # mark last_updated on playlist object stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, []) stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None) - stored_item["last_updated"] = int(time.time()) - self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items) + if stored_item: + stored_item["last_updated"] = int(time.time()) + self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items) async def remove_playlist_tracks( self, prov_playlist_id: str, positions_to_remove: tuple[int, ...] @@ -408,10 +419,11 @@ async def remove_playlist_tracks( # mark last_updated on playlist object stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, []) stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None) - stored_item["last_updated"] = int(time.time()) - self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items) + if stored_item: + stored_item["last_updated"] = int(time.time()) + self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items) - async def create_playlist(self, name: str) -> Playlist: # type: ignore[return] + async def create_playlist(self, name: str) -> Playlist: """Create a new playlist on provider with given name.""" item_id = shortuuid.random(8) stored_item = StoredItem(item_id=item_id, name=name) @@ -442,6 +454,7 @@ async def parse_item( ), ) } + media_item: Track | Radio if is_radio or force_radio: # treat as radio media_item = Radio( @@ -459,19 +472,23 @@ async def parse_item( provider=self.domain, name=media_info.title or url, duration=int(media_info.duration or 0), - artists=[await self.get_artist(artist) for artist in media_info.artists], + artists=UniqueList( + [await self.get_artist(artist) for artist in media_info.artists] + ), provider_mappings=provider_mappings, ) if media_info.has_cover_image: - media_item.metadata.images = [ - MediaItemImage( - type=ImageType.THUMB, - path=url, - provider=self.instance_id, - remotely_accessible=False, - ) - ] + media_item.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=url, + provider=self.instance_id, + remotely_accessible=False, + ) + ] + ) return media_item async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags: @@ -507,59 +524,72 @@ async def get_stream_details(self, item_id: str) -> StreamDetails: can_seek=not is_radio, ) - async def _get_builtin_playlist_tracks( - self, builtin_playlist_id: str - ) -> AsyncGenerator[Track, None]: - """Get all playlist tracks for given builtin playlist id.""" + async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]: result: list[Track] = [] - if builtin_playlist_id == ALL_FAVORITE_TRACKS: - res = await self.mass.music.tracks.library_items( - favorite=True, limit=250000, order_by="random" + res = await self.mass.music.tracks.library_items( + favorite=True, limit=250000, order_by="random" + ) + for idx, item in enumerate(res, 1): + item.position = idx + result.append(item) + return result + + async def _get_builtin_playlist_random_tracks(self) -> list[Track]: + result: list[Track] = [] + res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast") + for idx, item in enumerate(res, 1): + item.position = idx + result.append(item) + return result + + async def _get_builtin_playlist_random_album(self) -> list[Track]: + result: list[Track] = [] + for random_album in await self.mass.music.albums.library_items( + limit=1, order_by="random_fast" + ): + tracks = await self.mass.music.albums.tracks( + random_album.item_id, random_album.provider + ) + for idx, track in enumerate(tracks, 1): + track.position = idx + result.append(track) + return result + + async def _get_builtin_playlist_random_artist(self) -> list[Track]: + result: list[Track] = [] + for random_artist in await self.mass.music.artists.library_items( + limit=1, order_by="random_fast" + ): + tracks = await self.mass.music.artists.tracks( + random_artist.item_id, random_artist.provider ) - for idx, item in enumerate(res, 1): - item.position = idx - result.append(item) - return result - if builtin_playlist_id == RANDOM_TRACKS: - res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast") - for idx, item in enumerate(res, 1): - item.position = idx - result.append(item) - return result - if builtin_playlist_id == RANDOM_ALBUM: - for random_album in await self.mass.music.albums.library_items( - limit=1, order_by="random_fast" - ): - # use the function specified in the queue controller as that - # already handles unwrapping an album by user preference - tracks = await self.mass.music.albums.tracks( - random_album.item_id, random_album.provider - ) - for idx, track in enumerate(tracks, 1): - track.position = idx - result.append(track) - return result - if builtin_playlist_id == RANDOM_ARTIST: - for random_artist in await self.mass.music.artists.library_items( - limit=1, order_by="random_fast" - ): - # use the function specified in the queue controller as that - # already handles unwrapping an artist by user preference - tracks = await self.mass.music.artists.tracks( - random_artist.item_id, random_artist.provider - ) - for idx, track in enumerate(tracks, 1): - track.position = idx - result.append(track) - return result - if builtin_playlist_id == RECENTLY_PLAYED: - tracks = await self.mass.music.recently_played(100, [MediaType.TRACK]) for idx, track in enumerate(tracks, 1): track.position = idx result.append(track) - return result return result + async def _get_builtin_playlist_recently_played(self) -> list[Track]: + result: list[Track] = [] + recent_tracks = await self.mass.music.recently_played(100, [MediaType.TRACK]) + for idx, track in enumerate(recent_tracks, 1): + assert isinstance(track, Track) + track.position = idx + result.append(track) + return result + + async def _get_builtin_playlist_tracks(self, builtin_playlist_id: str) -> list[Track]: + """Get all playlist tracks for given builtin playlist id.""" + try: + return await { + ALL_FAVORITE_TRACKS: self._get_builtin_playlist_random_favorite_tracks, + RANDOM_TRACKS: self._get_builtin_playlist_random_tracks, + RANDOM_ALBUM: self._get_builtin_playlist_random_album, + RANDOM_ARTIST: self._get_builtin_playlist_random_artist, + RECENTLY_PLAYED: self._get_builtin_playlist_recently_played, + }[builtin_playlist_id]() + except KeyError: + raise MediaNotFoundError(f"No built in playlist: {builtin_playlist_id}") + async def _read_playlist_file_items( self, playlist_id: str, offset: int = 0, limit: int = 100000 ) -> list[str]: diff --git a/mypy.ini b/mypy.ini index 63d2e5089..2b11bae03 100644 --- a/mypy.ini +++ b/mypy.ini @@ -21,4 +21,4 @@ disallow_untyped_decorators = true disallow_untyped_defs = true warn_return_any = true warn_unreachable = true -packages=tests,music_assistant.client,music_assistant.common,music_assistant.server.providers.jellyfin,music_assistant.server.providers.radiobrowser +packages=tests,music_assistant.client,music_assistant.common,music_assistant.server.providers.builtin,music_assistant.server.providers.jellyfin,music_assistant.server.providers.radiobrowser