Skip to content

Commit

Permalink
Swap octoprint to use an external library (#46611)
Browse files Browse the repository at this point in the history
  • Loading branch information
rfleming71 authored Mar 1, 2021
1 parent a8beae3 commit 09ea84d
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 332 deletions.
275 changes: 89 additions & 186 deletions homeassistant/components/octoprint/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""Support for monitoring OctoPrint 3D printers."""
from datetime import timedelta
import logging
import time

from aiohttp.hdrs import CONTENT_TYPE
import requests
from pyoctoprintapi import OctoprintClient, PrinterOffline
import voluptuous as vol

from homeassistant.components.discovery import SERVICE_OCTOPRINT
Expand All @@ -17,15 +16,15 @@
CONF_PORT,
CONF_SENSORS,
CONF_SSL,
CONTENT_TYPE_JSON,
PERCENTAGE,
TEMP_CELSIUS,
TIME_SECONDS,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import discovery
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import load_platform
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.util import slugify as util_slugify
import homeassistant.util.dt as dt_util

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,11 +52,10 @@ def ensure_valid_path(value):
return value


BINARY_SENSOR_TYPES = {
# API Endpoint, Group, Key, unit
"Printing": ["printer", "state", "printing", None],
"Printing Error": ["printer", "state", "error", None],
}
BINARY_SENSOR_TYPES = [
"Printing",
"Printing Error",
]

BINARY_SENSOR_SCHEMA = vol.Schema(
{
Expand All @@ -68,26 +66,13 @@ def ensure_valid_path(value):
}
)

SENSOR_TYPES = {
# API Endpoint, Group, Key, unit, icon
"Temperatures": ["printer", "temperature", "*", TEMP_CELSIUS],
"Current State": ["printer", "state", "text", None, "mdi:printer-3d"],
"Job Percentage": [
"job",
"progress",
"completion",
PERCENTAGE,
"mdi:file-percent",
],
"Time Remaining": [
"job",
"progress",
"printTimeLeft",
TIME_SECONDS,
"mdi:clock-end",
],
"Time Elapsed": ["job", "progress", "printTime", TIME_SECONDS, "mdi:clock-start"],
}
SENSOR_TYPES = [
"Temperatures",
"Current State",
"Job Percentage",
"Time Remaining",
"Time Elapsed",
]

SENSOR_SCHEMA = vol.Schema(
{
Expand Down Expand Up @@ -127,16 +112,16 @@ def ensure_valid_path(value):
)


def setup(hass, config):
async def async_setup(hass, config):
"""Set up the OctoPrint component."""
printers = hass.data[DOMAIN] = {}
success = False

def device_discovered(service, info):
async def device_discovered(service, info):
"""Get called when an Octoprint server has been discovered."""
_LOGGER.debug("Found an Octoprint server: %s", info)

discovery.listen(hass, SERVICE_OCTOPRINT, device_discovered)
discovery.async_listen(hass, SERVICE_OCTOPRINT, device_discovered)

if DOMAIN not in config:
# Skip the setup if there is no configuration present
Expand All @@ -147,169 +132,87 @@ def device_discovered(service, info):
protocol = "https" if printer[CONF_SSL] else "http"
base_url = (
f"{protocol}://{printer[CONF_HOST]}:{printer[CONF_PORT]}"
f"{printer[CONF_PATH]}api/"
f"{printer[CONF_PATH]}"
)
api_key = printer[CONF_API_KEY]
number_of_tools = printer[CONF_NUMBER_OF_TOOLS]
bed = printer[CONF_BED]
try:
octoprint_api = OctoPrintAPI(base_url, api_key, bed, number_of_tools)
printers[base_url] = octoprint_api
octoprint_api.get("printer")
octoprint_api.get("job")
except requests.exceptions.RequestException as conn_err:
_LOGGER.error("Error setting up OctoPrint API: %r", conn_err)
continue

session = async_get_clientsession(hass)
octoprint = OctoprintClient(
printer[CONF_HOST],
session,
printer[CONF_PORT],
printer[CONF_SSL],
printer[CONF_PATH],
)
octoprint.set_api_key(printer[CONF_API_KEY])
coordinator = OctoprintDataUpdateCoordinator(hass, octoprint, base_url, 30)
await coordinator.async_refresh()

printers[base_url] = coordinator

sensors = printer[CONF_SENSORS][CONF_MONITORED_CONDITIONS]
load_platform(
hass,
"sensor",
DOMAIN,
{"name": name, "base_url": base_url, "sensors": sensors},
config,
hass.async_create_task(
async_load_platform(
hass,
"sensor",
DOMAIN,
{
"name": name,
"base_url": base_url,
"sensors": sensors,
CONF_NUMBER_OF_TOOLS: printer[CONF_NUMBER_OF_TOOLS],
CONF_BED: printer[CONF_BED],
},
config,
)
)
b_sensors = printer[CONF_BINARY_SENSORS][CONF_MONITORED_CONDITIONS]
load_platform(
hass,
"binary_sensor",
DOMAIN,
{"name": name, "base_url": base_url, "sensors": b_sensors},
config,
hass.async_create_task(
async_load_platform(
hass,
"binary_sensor",
DOMAIN,
{"name": name, "base_url": base_url, "sensors": b_sensors},
config,
)
)
success = True

return success


class OctoPrintAPI:
"""Simple JSON wrapper for OctoPrint's API."""

def __init__(self, api_url, key, bed, number_of_tools):
"""Initialize OctoPrint API and set headers needed later."""
self.api_url = api_url
self.headers = {CONTENT_TYPE: CONTENT_TYPE_JSON, "X-Api-Key": key}
self.printer_last_reading = [{}, None]
self.job_last_reading = [{}, None]
self.job_available = False
self.printer_available = False
self.printer_error_logged = False
self.available = False
self.available_error_logged = False
self.job_error_logged = False
self.bed = bed
self.number_of_tools = number_of_tools
class OctoprintDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching Octoprint data."""

def get_tools(self):
"""Get the list of tools that temperature is monitored on."""
tools = []
if self.number_of_tools > 0:
for tool_number in range(0, self.number_of_tools):
tools.append(f"tool{tool_number!s}")
if self.bed:
tools.append("bed")
if not self.bed and self.number_of_tools == 0:
temps = self.printer_last_reading[0].get("temperature")
if temps is not None:
tools = temps.keys()
return tools
def __init__(
self,
hass: HomeAssistant,
octoprint: OctoprintClient,
device_id: str,
interval: int,
):
"""Initialize."""
super().__init__(
hass,
_LOGGER,
name=f"octoprint-{device_id}",
update_interval=timedelta(seconds=interval),
)
self._octoprint = octoprint
self.data = {"printer": None, "job": None, "last_read_time": None}

def get(self, endpoint):
"""Send a get request, and return the response as a dict."""
# Only query the API at most every 30 seconds
now = time.time()
if endpoint == "job":
last_time = self.job_last_reading[1]
if last_time is not None:
if now - last_time < 30.0:
return self.job_last_reading[0]
elif endpoint == "printer":
last_time = self.printer_last_reading[1]
if last_time is not None:
if now - last_time < 30.0:
return self.printer_last_reading[0]
async def _async_update_data(self):
"""Update data via API."""
printer = None
job = await self._octoprint.get_job_info()

url = self.api_url + endpoint
# If octoprint is on, but the printer is disconnected
# printer will return a 409, so continue using the last
# reading if there is one
try:
response = requests.get(url, headers=self.headers, timeout=9)
response.raise_for_status()
if endpoint == "job":
self.job_last_reading[0] = response.json()
self.job_last_reading[1] = time.time()
self.job_available = True
elif endpoint == "printer":
self.printer_last_reading[0] = response.json()
self.printer_last_reading[1] = time.time()
self.printer_available = True

self.available = self.printer_available and self.job_available
if self.available:
self.job_error_logged = False
self.printer_error_logged = False
self.available_error_logged = False

return response.json()

except requests.ConnectionError as exc_con:
log_string = "Failed to connect to Octoprint server. Error: %s" % exc_con

if not self.available_error_logged:
_LOGGER.error(log_string)
self.job_available = False
self.printer_available = False
self.available_error_logged = True

return None

except requests.HTTPError as ex_http:
status_code = ex_http.response.status_code

log_string = "Failed to update OctoPrint status. Error: %s" % ex_http
# Only log the first failure
if endpoint == "job":
log_string = f"Endpoint: job {log_string}"
if not self.job_error_logged:
_LOGGER.error(log_string)
self.job_error_logged = True
self.job_available = False
elif endpoint == "printer":
if (
status_code == 409
): # octoprint returns HTTP 409 when printer is not connected (and many other states)
self.printer_available = False
else:
log_string = f"Endpoint: printer {log_string}"
if not self.printer_error_logged:
_LOGGER.error(log_string)
self.printer_error_logged = True
self.printer_available = False

self.available = False

return None

def update(self, sensor_type, end_point, group, tool=None):
"""Return the value for sensor_type from the provided endpoint."""
response = self.get(end_point)
if response is not None:
return get_value_from_json(response, sensor_type, group, tool)

return response


def get_value_from_json(json_dict, sensor_type, group, tool):
"""Return the value for sensor_type from the JSON."""
if group not in json_dict:
return None

if sensor_type in json_dict[group]:
if sensor_type == "target" and json_dict[sensor_type] is None:
return 0

return json_dict[group][sensor_type]

if tool is not None:
if sensor_type in json_dict[group][tool]:
return json_dict[group][tool][sensor_type]
printer = await self._octoprint.get_printer_info()
except PrinterOffline:
_LOGGER.error("Unable to retrieve printer information: Printer offline")
if self.data and "printer" in self.data:
printer = self.data["printer"]

return None
return {"job": job, "printer": printer, "last_read_time": dt_util.utcnow()}
Loading

0 comments on commit 09ea84d

Please sign in to comment.