diff --git a/.github/workflows/code-quality.yml b/.github/workflows/code-quality.yml index f9aa648..07ca0fc 100644 --- a/.github/workflows/code-quality.yml +++ b/.github/workflows/code-quality.yml @@ -6,8 +6,8 @@ jobs: runs-on: ubuntu-latest name: Checks steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: python-version: 3.x - run: pip install --upgrade pip diff --git a/.pylintrc b/.pylintrc index 1d4507b..f069d46 100644 --- a/.pylintrc +++ b/.pylintrc @@ -415,14 +415,15 @@ disable=raw-checker-failed, suppressed-message, useless-suppression, deprecated-pragma, - use-symbolic-message-instead, + use-symbolic-message-instead, import-error, trailing-whitespace, too-many-instance-attributes, missing-module-docstring, missing-class-docstring, missing-function-docstring, - logging-fstring-interpolation + logging-fstring-interpolation, + duplicate-code, # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/custom_components/berlin_transport/__init__.py b/custom_components/berlin_transport/__init__.py index a9af694..56335ae 100644 --- a/custom_components/berlin_transport/__init__.py +++ b/custom_components/berlin_transport/__init__.py @@ -11,10 +11,10 @@ PLATFORMS = [Platform.SENSOR] + async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up a config entry.""" - await hass.config_entries.async_forward_entry_setups( - entry, PLATFORMS) + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) entry.async_on_unload(entry.add_update_listener(config_entry_update_listener)) return True @@ -26,8 +26,10 @@ async def config_entry_update_listener(hass: HomeAssistant, entry: ConfigEntry) async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" - return await hass.config_entries.async_unload_platforms( - entry, PLATFORMS) + return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + -def setup(hass: HomeAssistant, config: ConfigType) -> bool: # pylint: disable=unused-argument +def setup( + hass: HomeAssistant, config: ConfigType # pylint: disable=unused-argument +) -> bool: return True diff --git a/custom_components/berlin_transport/config_flow.py b/custom_components/berlin_transport/config_flow.py index 2c6da98..f135645 100644 --- a/custom_components/berlin_transport/config_flow.py +++ b/custom_components/berlin_transport/config_flow.py @@ -12,21 +12,29 @@ import homeassistant.helpers.config_validation as cv from homeassistant.helpers import selector -from .const import API_ENDPOINT, API_MAX_RESULTS, CONF_DEPARTURES_STOP_ID,CONF_DEPARTURES_NAME, CONF_DEPARTURES_DIRECTION, CONF_DEPARTURES_DURATION, CONF_DEPARTURES_WALKING_TIME, CONF_SHOW_API_LINE_COLORS, DOMAIN +from .const import ( + API_ENDPOINT, + API_MAX_RESULTS, + CONF_DEPARTURES_STOP_ID, + CONF_DEPARTURES_NAME, + CONF_DEPARTURES_DIRECTION, + CONF_DEPARTURES_DURATION, + CONF_DEPARTURES_WALKING_TIME, + CONF_SHOW_API_LINE_COLORS, + DOMAIN, # noqa +) from .sensor import TRANSPORT_TYPES_SCHEMA _LOGGER = logging.getLogger(__name__) -CONF_SEARCH="search" -CONF_FOUND_STOPS="found_stops" -CONF_SELECTED_STOP="selected_stop" +CONF_SEARCH = "search" +CONF_FOUND_STOPS = "found_stops" +CONF_SELECTED_STOP = "selected_stop" DATA_SCHEMA = vol.Schema( { - #vol.Required(CONF_DEPARTURES_STOP_ID): cv.positive_int, - #vol.Required(CONF_DEPARTURES_NAME): cv.string, vol.Optional(CONF_DEPARTURES_DIRECTION): cv.positive_int, vol.Optional(CONF_DEPARTURES_DURATION): cv.positive_int, vol.Optional(CONF_DEPARTURES_WALKING_TIME, default=1): cv.positive_int, @@ -44,16 +52,17 @@ # IPv6 is broken, see: https://github.com/public-transport/transport.rest/issues/20 requests.packages.urllib3.util.connection.HAS_IPV6 = False -def get_stop_id(name) -> Optional[list[str]]: + +def get_stop_id(name) -> Optional[list[dict[str, Any]]]: try: response = requests.get( url=f"{API_ENDPOINT}/locations", params={ - "query": name, - "results": API_MAX_RESULTS, - }, - timeout=30, - ) + "query": name, + "results": API_MAX_RESULTS, + }, + timeout=30, + ) response.raise_for_status() except requests.exceptions.HTTPError as ex: _LOGGER.warning(f"API error: {ex}") @@ -72,22 +81,33 @@ def get_stop_id(name) -> Optional[list[str]]: return [] # convert api data into objects - return [{CONF_DEPARTURES_NAME: stop["name"], CONF_DEPARTURES_STOP_ID: stop["id"]} for stop in stops if stop["type"] == "stop"] + return [ + {CONF_DEPARTURES_NAME: stop["name"], CONF_DEPARTURES_STOP_ID: stop["id"]} + for stop in stops + if stop["type"] == "stop" + ] + def list_stops(stops) -> Optional[vol.Schema]: """Provides a drop down list of stops""" schema = vol.Schema( - { - vol.Required(CONF_SELECTED_STOP, default=False):selector.SelectSelector( - selector.SelectSelectorConfig(options=["{} [{}]".format(stop[CONF_DEPARTURES_NAME],stop[CONF_DEPARTURES_STOP_ID]) for stop in stops], - mode=selector.SelectSelectorMode.DROPDOWN) - ) - }) + { + vol.Required(CONF_SELECTED_STOP, default=False): selector.SelectSelector( + selector.SelectSelectorConfig( + options=[ + f"{stop[CONF_DEPARTURES_NAME]} [{stop[CONF_DEPARTURES_STOP_ID]}]" + for stop in stops + ], + mode=selector.SelectSelectorMode.DROPDOWN, + ) + ) + } + ) - return schema + return schema -class TransportConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): +class TransportConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): VERSION = 1 CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL @@ -96,45 +116,66 @@ def __init__(self) -> None: """Init the ConfigFlow.""" self.data: dict[str, Any] = {} - async def async_step_user(self, user_input: dict[str, Any] | None = None) -> FlowResult: + async def async_step_user( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: """Handle the initial step.""" if user_input is None: return self.async_show_form( - step_id="user", - data_schema=NAME_SCHEMA, - errors={}, + step_id="user", + data_schema=NAME_SCHEMA, + errors={}, + ) + self.data[CONF_FOUND_STOPS] = await self.hass.async_add_executor_job( + get_stop_id, user_input[CONF_SEARCH] ) - self.data[CONF_FOUND_STOPS] = await self.hass.async_add_executor_job(get_stop_id, user_input[CONF_SEARCH]) - _LOGGER.debug(f"OK: found stops for {user_input[CONF_SEARCH]}: {self.data[CONF_FOUND_STOPS]}") + _LOGGER.debug( + f"OK: found stops for {user_input[CONF_SEARCH]}: {self.data[CONF_FOUND_STOPS]}" + ) return await self.async_step_stop() - async def async_step_stop(self, user_input: dict[str, Any] | None = None) -> FlowResult: + async def async_step_stop( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: """Handle the initial step.""" if user_input is None: return self.async_show_form( - step_id="stop", - data_schema=list_stops(self.data[CONF_FOUND_STOPS]), - errors={}, + step_id="stop", + data_schema=list_stops(self.data[CONF_FOUND_STOPS]), + errors={}, + ) + + selected_stop = next( + (stop[CONF_DEPARTURES_NAME], stop[CONF_DEPARTURES_STOP_ID]) + for stop in self.data[CONF_FOUND_STOPS] + if user_input[CONF_SELECTED_STOP] + == f"{stop[CONF_DEPARTURES_NAME]} [{stop[CONF_DEPARTURES_STOP_ID]}]" ) - - selected_stop = next((stop[CONF_DEPARTURES_NAME], stop[CONF_DEPARTURES_STOP_ID]) for stop in self.data[CONF_FOUND_STOPS] if user_input[CONF_SELECTED_STOP] == "{} [{}]".format(stop[CONF_DEPARTURES_NAME],stop[CONF_DEPARTURES_STOP_ID])) - self.data[CONF_DEPARTURES_NAME], self.data[CONF_DEPARTURES_STOP_ID] = selected_stop[0], selected_stop[1] - _LOGGER.debug(f"OK: selected stop {self.data[CONF_DEPARTURES_NAME]} [{self.data[CONF_DEPARTURES_STOP_ID]}]") + ( + self.data[CONF_DEPARTURES_NAME], + self.data[CONF_DEPARTURES_STOP_ID], + ) = selected_stop + _LOGGER.debug(f"OK: selected stop {selected_stop[0]} [{selected_stop[1]}]") return await self.async_step_details() - async def async_step_details(self, user_input: dict[str, Any] | None = None) -> FlowResult: + async def async_step_details( + self, user_input: dict[str, Any] | None = None + ) -> FlowResult: """Handle the details.""" if user_input is None: return self.async_show_form( - step_id="details", - data_schema=DATA_SCHEMA, - errors={}, - ) + step_id="details", + data_schema=DATA_SCHEMA, + errors={}, + ) - data=user_input - data[CONF_DEPARTURES_STOP_ID]=self.data[CONF_DEPARTURES_STOP_ID] - data[CONF_DEPARTURES_NAME]=self.data[CONF_DEPARTURES_NAME] - return self.async_create_entry(title="{} [{}]".format(data[CONF_DEPARTURES_NAME],data[CONF_DEPARTURES_STOP_ID]), data=data) + data = user_input + data[CONF_DEPARTURES_STOP_ID] = self.data[CONF_DEPARTURES_STOP_ID] + data[CONF_DEPARTURES_NAME] = self.data[CONF_DEPARTURES_NAME] + return self.async_create_entry( + title=f"{data[CONF_DEPARTURES_NAME]} [{data[CONF_DEPARTURES_STOP_ID]}]", + data=data, + ) diff --git a/custom_components/berlin_transport/departure.py b/custom_components/berlin_transport/departure.py index 5d8fd3a..bf95294 100644 --- a/custom_components/berlin_transport/departure.py +++ b/custom_components/berlin_transport/departure.py @@ -25,7 +25,9 @@ class Departure: def from_dict(cls, source): line_type = source.get("line", {}).get("product") line_visuals = TRANSPORT_TYPE_VISUALS.get(line_type) or {} - timestamp=datetime.fromisoformat(source.get("when") or source.get("plannedWhen")) + timestamp = datetime.fromisoformat( + source.get("when") or source.get("plannedWhen") + ) return cls( trip_id=source["tripId"], line_name=source.get("line", {}).get("name"), @@ -43,7 +45,7 @@ def from_dict(cls, source): cancelled=source.get("cancelled", False), ) - def to_dict(self, show_api_line_colors:bool): + def to_dict(self, show_api_line_colors: bool): color = self.fallback_color if show_api_line_colors and self.bg_color is not None: color = self.bg_color diff --git a/custom_components/berlin_transport/sensor.py b/custom_components/berlin_transport/sensor.py index 828a0eb..ca99086 100644 --- a/custom_components/berlin_transport/sensor.py +++ b/custom_components/berlin_transport/sensor.py @@ -67,6 +67,7 @@ # IPv6 is broken, see: https://github.com/public-transport/transport.rest/issues/20 requests.packages.urllib3.util.connection.HAS_IPV6 = False + async def async_setup_platform( hass: HomeAssistant, config: ConfigType, @@ -78,10 +79,11 @@ async def async_setup_platform( for departure in config[CONF_DEPARTURES]: async_add_entities([TransportSensor(hass, departure)]) + async def async_setup_entry( hass: HomeAssistant, config_entry: ConfigEntry, - async_add_entities: AddEntitiesCallback + async_add_entities: AddEntitiesCallback, ) -> None: async_add_entities([TransportSensor(hass, config_entry.data)]) @@ -126,7 +128,8 @@ def state(self) -> str: def extra_state_attributes(self): return { "departures": [ - departure.to_dict(self.show_api_line_colors) for departure in self.departures or [] + departure.to_dict(self.show_api_line_colors) + for departure in self.departures or [] ] } @@ -172,7 +175,9 @@ def fetch_departures(self) -> Optional[list[Departure]]: return [] # convert api data into objects - unsorted = [Departure.from_dict(departure) for departure in departures.get("departures")] + unsorted = [ + Departure.from_dict(departure) for departure in departures.get("departures") + ] return sorted(unsorted, key=lambda d: d.timestamp) def next_departure(self):