diff --git a/custom_components/xmltv_epg/helper.py b/custom_components/xmltv_epg/helper.py new file mode 100644 index 0000000..e327543 --- /dev/null +++ b/custom_components/xmltv_epg/helper.py @@ -0,0 +1,46 @@ +"""tvxml_epg helper functions.""" + + +def normalize_for_entity_id(s: str) -> str: + """Normalize a string for usage in an entity_id. + + Example: + - s = 'DE: WDR (Münster)' + => "de_wdr_muenster" + + :param s: The string to normalize. + :return: The normalized string. + + """ + + # lower case + s = s.lower() + + # special replacement rules + replacements = { + # replace umlauts with their respective digraphs + "ä": "ae", + "ö": "oe", + "ü": "ue", + "ß": "ss", + # '+' is replaced with ' plus '. + "+": " plus ", + } + for umlaut, replacement in replacements.items(): + s = s.replace(umlaut, replacement) + + # replace "delimiting characters" and spaces with underscores + for c in " -.:": + s = s.replace(c, "_") + + # remove all non-alphanumeric characters except underscores + s = "".join(c if c.isalnum() or c == "_" else "" for c in s) + + # trim underscores from start and end + s = s.strip("_") + + # remove all occurrences of multiple underscores + while "__" in s: + s = s.replace("__", "_") + + return s diff --git a/custom_components/xmltv_epg/sensor.py b/custom_components/xmltv_epg/sensor.py index a7ec2f7..1f81ebe 100644 --- a/custom_components/xmltv_epg/sensor.py +++ b/custom_components/xmltv_epg/sensor.py @@ -13,6 +13,7 @@ from .const import DOMAIN, LOGGER from .coordinator import XMLTVDataUpdateCoordinator from .entity import XMLTVEntity +from .helper import normalize_for_entity_id from .model import TVChannel, TVGuide @@ -40,6 +41,30 @@ async def async_setup_entry(hass, entry, async_add_devices): class XMLTVChannelSensor(XMLTVEntity, SensorEntity): """XMLTV Channel Program Sensor class.""" + @staticmethod + def get_normalized_identification( + channel: TVChannel, is_next: bool + ) -> tuple[str, str]: + """Return normalized identification information for a sensor for the given channel and upcoming status. + + The identification information consists of the sensor entity_id and the translation_key. + For the entity_id, the channel id is normalized and cleaned up to form a valid entity_id. + + Example: + - channel_id = 'DE: My Channel 1' + - is_next = True + => ('program_upcoming', 'sensor.de_my_channel_1_program_upcoming') + + :param channel: The TV channel. + :param is_next: The upcoming status. + :return: (translation_key, entity_id) tuple. + + """ + translation_key = f"program_{'upcoming' if is_next else 'current'}" + entity_id = f"sensor.{normalize_for_entity_id(channel.id)}_{translation_key}" + + return translation_key, entity_id + def __init__( self, coordinator: XMLTVDataUpdateCoordinator, @@ -49,18 +74,17 @@ def __init__( """Initialize the sensor class.""" super().__init__(coordinator) - key = f"program_{'upcoming' if is_next else 'current'}" - channel_id_clean = ( - channel.id.replace(" ", "_").replace("-", "_").replace(":", "").lower() + translation_key, entity_id = self.get_normalized_identification( + channel, is_next ) - self.entity_id = f"sensor.{channel_id_clean}_{key}" + self.entity_id = entity_id self._attr_unique_id = str(uuid.uuid5(uuid.NAMESPACE_X500, self.entity_id)) self._attr_has_entity_name = True self.entity_description = SensorEntityDescription( - key=key, - translation_key=key, + key=translation_key, + translation_key=translation_key, icon="mdi:format-quote-close", ) @@ -130,18 +154,40 @@ def native_value(self) -> str: class XMLTVStatusSensor(XMLTVEntity, SensorEntity): """XMLTV Coordinator Status Sensor class.""" + @staticmethod + def get_normalized_identification(guide: TVGuide) -> tuple[str, str]: + """Return normalized identification information for a sensor for the given guide. + + The identification information consists of the sensor entity_id and the translation_key. + For the entity_id, the guide's generator_name is normalized and cleaned up to form a valid entity_id. + + Example: + - generator_name = 'TVXML.ORG' + => ('last_update', 'sensor.tvxml_org_last_update') + + :param guide: The TV guide. + :return: (translation_key, entity_id) tuple. + + """ + translation_key = "last_update" + entity_id = ( + f"sensor.{normalize_for_entity_id(guide.generator_name)}_{translation_key}" + ) + + return translation_key, entity_id + def __init__(self, coordinator: XMLTVDataUpdateCoordinator, guide: TVGuide) -> None: """Initialize the sensor class.""" super().__init__(coordinator) - key = "last_update" - self.entity_id = f"sensor.{guide.generator_name}_{key}" + translation_key, entity_id = self.get_normalized_identification(guide) + self.entity_id = entity_id self._attr_unique_id = str(uuid.uuid5(uuid.NAMESPACE_X500, self.entity_id)) self._attr_has_entity_name = True self.entity_description = SensorEntityDescription( - key=key, - translation_key=key, + key=translation_key, + translation_key=translation_key, device_class=SensorDeviceClass.TIMESTAMP, ) diff --git a/test/test_helper.py b/test/test_helper.py new file mode 100644 index 0000000..4acc5d7 --- /dev/null +++ b/test/test_helper.py @@ -0,0 +1,30 @@ +"""Test cases for the helper module.""" + +from custom_components.xmltv_epg.helper import normalize_for_entity_id + + +def test_normalize_for_entity_id(): + """Test the normalize_for_entity_id function.""" + + assert normalize_for_entity_id("WDR") == "wdr" + assert normalize_for_entity_id("DE: My Channel 1") == "de_my_channel_1" + assert normalize_for_entity_id("DE: WDR (Münster)") == "de_wdr_muenster" + + # test umlauts + assert normalize_for_entity_id("ÄÖÜß") == "aeoeuess" + + # test special characters + assert ( + normalize_for_entity_id('Special Characters: !"§$%&/()=?') + == "special_characters" + ) + + # test leading and trailing spaces + assert ( + normalize_for_entity_id(" Leading and Trailing Spaces ") + == "leading_and_trailing_spaces" + ) + + # test special replacement rules + # required because both "Sport1" and "Sport1+" are channels that exists in the wild... + assert normalize_for_entity_id("A+B-C") == "a_plus_b_c" diff --git a/test/test_sensor.py b/test/test_sensor.py index c7664e1..8fc8df1 100644 --- a/test/test_sensor.py +++ b/test/test_sensor.py @@ -13,6 +13,8 @@ OPT_ENABLE_UPCOMING_SENSOR, OPT_PROGRAM_LOOKAHEAD, ) +from custom_components.xmltv_epg.model import TVChannel, TVGuide +from custom_components.xmltv_epg.sensor import XMLTVChannelSensor, XMLTVStatusSensor from .const import MOCK_NOW, MOCK_TV_GUIDE_NAME, MOCK_TV_GUIDE_URL @@ -166,3 +168,39 @@ async def test_last_update_sensor_attributes( assert state.attributes["generator_name"] == MOCK_TV_GUIDE_NAME assert state.attributes["generator_url"] == MOCK_TV_GUIDE_URL + + +def test_sensor_entity_ids(): + """Test sensor entity ids match the expected values.""" + + # status sensor + translation_key, entity_id = XMLTVStatusSensor.get_normalized_identification( + TVGuide("TVXML.ORG") + ) + + assert translation_key == "last_update" + assert entity_id == "sensor.tvxml_org_last_update" + + # program sensor, current + translation_key, entity_id = XMLTVChannelSensor.get_normalized_identification( + TVChannel("CH 1", "Channel 1"), False + ) + + assert translation_key == "program_current" + assert entity_id == "sensor.ch_1_program_current" + + # program sensor, upcoming + translation_key, entity_id = XMLTVChannelSensor.get_normalized_identification( + TVChannel("CH 1", "Channel 1"), True + ) + + assert translation_key == "program_upcoming" + assert entity_id == "sensor.ch_1_program_upcoming" + + # program sensor, with special characters and umlauts + translation_key, entity_id = XMLTVChannelSensor.get_normalized_identification( + TVChannel("DE: WDR (Münster)", "WDR (Münster)"), False + ) + + assert translation_key == "program_current" + assert entity_id == "sensor.de_wdr_muenster_program_current"