Skip to content

Commit

Permalink
Merge pull request #30 from zachariahpifer/add-type-annotations
Browse files Browse the repository at this point in the history
Add type annotations
  • Loading branch information
FoamyGuy authored May 1, 2023
2 parents e981244 + 78126af commit a6d96e8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 31 deletions.
71 changes: 40 additions & 31 deletions adafruit_rockblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@
https://github.com/adafruit/circuitpython/releases
"""
from __future__ import annotations

try:
from typing import Tuple, Union, Optional
from busio import UART
from serial import Serial
except ImportError:
pass

import time
import struct
Expand All @@ -36,13 +43,13 @@
class RockBlock:
"""Driver for RockBLOCK Iridium satellite modem."""

def __init__(self, uart, baudrate=19200):
def __init__(self, uart: Union[UART, Serial], baudrate: int = 19200) -> None:
self._uart = uart
self._uart.baudrate = baudrate
self._buf_out = None
self.reset()

def _uart_xfer(self, cmd):
def _uart_xfer(self, cmd: str) -> Tuple[bytes, ...]:
"""Send AT command and return response as tuple of lines read."""
self._uart.reset_input_buffer()
self._uart.write(str.encode("AT" + cmd + "\r"))
Expand All @@ -58,22 +65,22 @@ def _uart_xfer(self, cmd):

return tuple(resp)

def reset(self):
def reset(self) -> None:
"""Perform a software reset."""
self._uart_xfer("&F0") # factory defaults
self._uart_xfer("&K0") # flow control off

def _transfer_buffer(self):
def _transfer_buffer(self) -> None:
"""Copy out buffer to in buffer to simulate receiving a message."""
self._uart_xfer("+SBDTC")

@property
def data_out(self):
def data_out(self) -> Optional[bytes]:
"""The binary data in the outbound buffer."""
return self._buf_out

@data_out.setter
def data_out(self, buf):
def data_out(self, buf: bytes) -> None:
if buf is None:
# clear the buffer
resp = self._uart_xfer("+SBDD0")
Expand All @@ -100,7 +107,7 @@ def data_out(self, buf):
self._buf_out = buf

@property
def text_out(self):
def text_out(self) -> Optional[str]:
"""The text in the outbound buffer."""
text = None
# TODO: add better check for non-text in buffer
Expand All @@ -112,15 +119,15 @@ def text_out(self):
return text

@text_out.setter
def text_out(self, text):
def text_out(self, text: str) -> None:
if not isinstance(text, str):
raise ValueError("Only strings allowed.")
if len(text) > 120:
raise ValueError("Text size limited to 120 bytes.")
self.data_out = str.encode(text)

@property
def data_in(self):
def data_in(self) -> Optional[bytes]:
"""The binary data in the inbound buffer."""
data = None
if self.status[2] == 1:
Expand All @@ -130,7 +137,7 @@ def data_in(self):
return data

@data_in.setter
def data_in(self, buf):
def data_in(self, buf: bytes) -> None:
if buf is not None:
raise ValueError("Can only set in buffer to None to clear.")
resp = self._uart_xfer("+SBDD1")
Expand All @@ -139,7 +146,7 @@ def data_in(self, buf):
raise RuntimeError("Error clearing buffer.")

@property
def text_in(self):
def text_in(self) -> Optional[str]:
"""The text in the inbound buffer."""
text = None
if self.status[2] == 1:
Expand All @@ -151,10 +158,10 @@ def text_in(self):
return text

@text_in.setter
def text_in(self, text):
def text_in(self, text: bytes) -> None:
self.data_in = text

def satellite_transfer(self, location=None):
def satellite_transfer(self, location: str = None) -> Tuple[Optional[int], ...]:
"""Initiate a Short Burst Data transfer with satellites."""
status = (None,) * 6
if location:
Expand All @@ -170,7 +177,7 @@ def satellite_transfer(self, location=None):
return tuple(status)

@property
def status(self):
def status(self) -> Tuple[Optional[int], ...]:
"""Return tuple of Short Burst Data status."""
resp = self._uart_xfer("+SBDSX")
if resp[-1].strip().decode() == "OK":
Expand All @@ -179,27 +186,27 @@ def status(self):
return (None,) * 6

@property
def model(self):
def model(self) -> Optional[str]:
"""Return modem model."""
resp = self._uart_xfer("+GMM")
if resp[-1].strip().decode() == "OK":
return resp[1].strip().decode()
return None

@property
def serial_number(self):
def serial_number(self) -> Optional[str]:
"""Modem's serial number, also known as the modem's IMEI.
Returns
string
str | None
"""
resp = self._uart_xfer("+CGSN")
if resp[-1].strip().decode() == "OK":
return resp[1].strip().decode()
return None

@property
def signal_quality(self):
def signal_quality(self) -> Optional[int]:
"""Signal Quality also known as the Received Signal Strength Indicator (RSSI).
Values returned are 0 to 5, where 0 is no signal (0 bars) and 5 is strong signal (5 bars).
Expand All @@ -217,7 +224,7 @@ def signal_quality(self):
return None

@property
def revision(self):
def revision(self) -> Tuple[Optional[str], ...]:
"""Modem's internal component firmware revisions.
For example: Call Processor Version, Modem DSP Version, DBB Version (ASIC),
Expand All @@ -237,7 +244,7 @@ def revision(self):
return (None,) * 7

@property
def ring_alert(self):
def ring_alert(self) -> Optional[bool]:
"""The current ring indication mode.
False means Ring Alerts are disabled, and True means Ring Alerts are enabled.
Expand All @@ -255,7 +262,7 @@ def ring_alert(self):
return None

@ring_alert.setter
def ring_alert(self, value):
def ring_alert(self, value: Union[int, bool]) -> Optional[bool]:
if value in (True, False):
resp = self._uart_xfer("+SBDMTA=" + str(int(value)))
if resp[-1].strip().decode() == "OK":
Expand All @@ -266,7 +273,7 @@ def ring_alert(self, value):
)

@property
def ring_indication(self):
def ring_indication(self) -> Tuple[Optional[str], ...]:
"""The ring indication status.
Returns the reason for the most recent assertion of the Ring Indicate signal.
Expand Down Expand Up @@ -294,7 +301,9 @@ def ring_indication(self):
return (None,) * 2

@property
def geolocation(self):
def geolocation(
self,
) -> Union[Tuple[int, int, int, time.struct_time], Tuple[None, None, None, None]]:
"""Most recent geolocation of the modem as measured by the Iridium constellation
including a timestamp of when geolocation measurement was made.
Expand All @@ -313,20 +322,20 @@ def geolocation(self):
This geolocation coordinate system is known as ECEF (acronym earth-centered, earth-fixed),
also known as ECR (initialism for earth-centered rotational)
<timestamp> is a time_struct
<timestamp> is a time.struct_time
The timestamp is assigned by the modem when the geolocation grid code received from
the network is stored to the modem's internal memory.
The timestamp used by the modem is Iridium system time, which is a running count of
90 millisecond intervals, since Sunday May 11, 2014, at 14:23:55 UTC (the most recent
Iridium epoch).
The timestamp returned by the modem is a 32-bit integer displayed in hexadecimal form.
We convert the modem's timestamp and return it as a time_struct.
We convert the modem's timestamp and return it as a time.struct_time.
The system time value is always expressed in UTC time.
Returns a tuple:
(int, int, int, time_struct)
(int, int, int, time.struct_time)
"""
resp = self._uart_xfer("-MSGEO")
if resp[-1].strip().decode() == "OK":
Expand Down Expand Up @@ -358,7 +367,7 @@ def geolocation(self):
return (None,) * 4

@property
def system_time(self):
def system_time(self) -> Optional[time.struct_time]:
"""Current date and time as given by the Iridium network.
The system time is available and valid only after the ISU has registered with
Expand All @@ -371,12 +380,12 @@ def system_time(self):
90 millisecond intervals, since Sunday May 11, 2014, at 14:23:55 UTC (the most recent
Iridium epoch).
The timestamp returned by the modem is a 32-bit integer displayed in hexadecimal form.
We convert the modem's timestamp and return it as a time_struct.
We convert the modem's timestamp and return it as a time.struct_time.
The system time value is always expressed in UTC time.
Returns:
time_struct
time.struct_time
"""
resp = self._uart_xfer("-MSSTM")
if resp[-1].strip().decode() == "OK":
Expand Down Expand Up @@ -405,7 +414,7 @@ def system_time(self):
return None

@property
def energy_monitor(self):
def energy_monitor(self) -> Optional[int]:
"""The current accumulated energy usage estimate in microamp hours.
Returns an estimate of the charge taken from the +5V supply to the modem,
Expand Down Expand Up @@ -436,7 +445,7 @@ def energy_monitor(self):
return None

@energy_monitor.setter
def energy_monitor(self, value):
def energy_monitor(self, value: int) -> Optional[int]:
if 0 <= value <= 67108863: # 0 to 2^26 - 1
resp = self._uart_xfer("+GEMON=" + str(value))
if resp[-1].strip().decode() == "OK":
Expand Down
2 changes: 2 additions & 0 deletions optional_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# SPDX-FileCopyrightText: 2022 Alec Delaney, for Adafruit Industries
#
# SPDX-License-Identifier: Unlicense

pyserial

0 comments on commit a6d96e8

Please sign in to comment.