diff --git a/lib/pimoroni_yukon/__init__.py b/lib/pimoroni_yukon/__init__.py index 9b1134f..0497cf5 100644 --- a/lib/pimoroni_yukon/__init__.py +++ b/lib/pimoroni_yukon/__init__.py @@ -116,7 +116,7 @@ class Yukon: DEFAULT_VOLTAGE_LIMIT = 17.2 VOLTAGE_LOWER_LIMIT = 4.8 - VOLTAGE_ZERO_LEVEL = 0.05 + VOLTAGE_ZERO_LEVEL = 0.2 VOLTAGE_SHORT_LEVEL = 0.5 DEFAULT_CURRENT_LIMIT = 20 DEFAULT_TEMPERATURE_LIMIT = 80 @@ -493,7 +493,7 @@ def enable_main_output(self): raise UnderVoltageError("[Yukon] No input voltage detected! Make sure power is being provided to the XT-30 (yellow) connector") if voltage_in < self.VOLTAGE_LOWER_LIMIT: - raise UnderVoltageError(f"[Yukon] Input voltage below minimum operating level of {self.VOLTAGE_LOWER_LIMIT}V! Aborting enable output") + raise UnderVoltageError(f"[Yukon] Input voltage of {voltage_in}V below minimum operating level of {self.VOLTAGE_LOWER_LIMIT}V! Aborting enable output") start = time.ticks_us() diff --git a/lib/pimoroni_yukon/protocols/lx_servos.py b/lib/pimoroni_yukon/protocols/lx_servos.py index f29a116..7cc3f63 100644 --- a/lib/pimoroni_yukon/protocols/lx_servos.py +++ b/lib/pimoroni_yukon/protocols/lx_servos.py @@ -50,6 +50,13 @@ def checksum(buffer): + # [From the LX protocol datasheet] + # The calculation method is as follows: + # + # Checksum = ~(ID + Length + Cmd + Prm1 + ... PrmN) + # + # If the numbers in the brackets are calculated and exceeded 255, + # then take the lowest one byte, "~" means negation. checksum = 0 length = buffer[FRAME_LENGTH_INDEX] last = length + 2 @@ -60,7 +67,10 @@ def checksum(buffer): def send(id, uart, duplexer, command, fmt="", *data): + # Create a buffer of the correct length buffer = bytearray(FRAME_HEADER_LENGTH + command.length) + + # Populate the buffer with the required header values and command data struct.pack_into(" 0: - rxBuf = uart.read(1)[0] - # print(hex(rxBuf), end=", ") - - if not frameStarted: - if rxBuf == FRAME_HEADER: - frameCount += 1 - if frameCount == 2: - frameCount = 0 - frameStarted = True - dataCount = 1 + rx_byte = uart.read(1)[0] # Read the next byte + # print(hex(rx_byte), end=", ") + + # Are we outside of the frame? + if not frame_started: + # Only look for header bytes + if rx_byte == FRAME_HEADER: + header_count += 1 + + # Have to header bytes been received? + if header_count == 2: + header_count = 0 + frame_started = True + data_count = 1 else: - frameStarted = False - dataCount = 0 - frameCount = 0 - - if frameStarted: - recvBuf[dataCount] = rxBuf - if dataCount == 3: - dataLength = recvBuf[dataCount] - if dataLength < 3 or dataCount > 7: - dataLength = 2 - frameStarted = False - dataCount += 1 - if dataCount == dataLength + 3: - if checksum(recvBuf) == recvBuf[dataCount - 1]: - # print("Check SUM OK!!", end="\n\n") - frameStarted = False - return recvBuf[5:5 + dataLength - 3] - time.sleep_us(100) + frame_started = False + data_count = 0 + header_count = 0 + + # Are we inside the frame? + if frame_started: + rx_buffer[data_count] = rx_byte # Add the byte to the buffer + + # Extract the frame length from the data, exiting if it contradicts + if data_count == 3: + data_length = rx_buffer[data_count] + if data_length < 3 or data_count > 7: + data_length = 2 + frame_started = False + + data_count += 1 + + # Have we reached the expected end of the received data? + if data_count == data_length + 3: + # Does the checksum we calculate match what was received? + if checksum(rx_buffer) == rx_buffer[data_count - 1]: + # print("Checksum OK!!", end="\n\n") + frame_started = False + return rx_buffer[5:5 + data_length - 3] + + time.sleep_us(100) # Sleep enough time for another byte to have been received return None def receive(id, uart, duplexer, timeout, fmt=""): - wait_for_send(uart) + wait_for_send(uart) # Ensure that all data has been sent + duplexer.receive_on_data() # Switch to receive mode - try: - duplexer.receive_on_data() - wait_for_receive(id, uart, duplexer, timeout) + # Calculate how long to wait for data + ms = int(1000.0 * timeout + 0.5) + end_ms = ticks_add(ticks_ms(), ms) - returned_buffer = handle_receive(uart) - if returned_buffer is not None: - if len(fmt) == 1: - ret = struct.unpack("<" + fmt, returned_buffer)[0] - else: - ret = struct.unpack("<" + fmt, returned_buffer) - else: - ret = None - finally: - duplexer.send_on_data() + # Wait for data to start coming in + while uart.any() == 0: + remaining_ms = ticks_diff(end_ms, ticks_ms()) + + # Has the timeout been reached? + if remaining_ms <= 0: + duplexer.send_on_data() # Switch back to send mode before throwing the error + raise TimeoutError(f"Serial servo #{id} did not reply within the expected time") + + # Handle the received data + data_buffer = handle_receive(uart) + + # Was valid data received? + data = None + if data_buffer is not None: + data = struct.unpack("<" + fmt, data_buffer) + + # If there is only one piece of data expected, extract it + if len(fmt) == 1: + data = data[0] - return ret + duplexer.send_on_data() # Switch back to send mode + return data class LXServo: @@ -482,7 +505,7 @@ def __send(self, command, fmt="", *data): send(self.__id, self.__uart, self.__duplexer, command, fmt, *data) def __receive(self, fmt=""): - receive(self.__id, self.__uart, self.__duplexer, self.__timeout, fmt) + return receive(self.__id, self.__uart, self.__duplexer, self.__timeout, fmt) def __message_header(self): return f"[Servo{self.__id}] "