Skip to content

Commit

Permalink
Fixes to LXServo based on testing
Browse files Browse the repository at this point in the history
  • Loading branch information
ZodiusInfuser committed Nov 8, 2023
1 parent 714a1dc commit 3664565
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 69 deletions.
4 changes: 2 additions & 2 deletions lib/pimoroni_yukon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class Yukon:

DEFAULT_VOLTAGE_LIMIT = 17.2
VOLTAGE_LOWER_LIMIT = 4.8
VOLTAGE_ZERO_LEVEL = 0.05
VOLTAGE_ZERO_LEVEL = 0.2
VOLTAGE_SHORT_LEVEL = 0.5
DEFAULT_CURRENT_LIMIT = 20
DEFAULT_TEMPERATURE_LIMIT = 80
Expand Down Expand Up @@ -493,7 +493,7 @@ def enable_main_output(self):
raise UnderVoltageError("[Yukon] No input voltage detected! Make sure power is being provided to the XT-30 (yellow) connector")

if voltage_in < self.VOLTAGE_LOWER_LIMIT:
raise UnderVoltageError(f"[Yukon] Input voltage below minimum operating level of {self.VOLTAGE_LOWER_LIMIT}V! Aborting enable output")
raise UnderVoltageError(f"[Yukon] Input voltage of {voltage_in}V below minimum operating level of {self.VOLTAGE_LOWER_LIMIT}V! Aborting enable output")

start = time.ticks_us()

Expand Down
157 changes: 90 additions & 67 deletions lib/pimoroni_yukon/protocols/lx_servos.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@


def checksum(buffer):
# [From the LX protocol datasheet]
# The calculation method is as follows:
#
# Checksum = ~(ID + Length + Cmd + Prm1 + ... PrmN)
#
# If the numbers in the brackets are calculated and exceeded 255,
# then take the lowest one byte, "~" means negation.
checksum = 0
length = buffer[FRAME_LENGTH_INDEX]
last = length + 2
Expand All @@ -60,27 +67,29 @@ def checksum(buffer):


def send(id, uart, duplexer, command, fmt="", *data):
# Create a buffer of the correct length
buffer = bytearray(FRAME_HEADER_LENGTH + command.length)

# Populate the buffer with the required header values and command data
struct.pack_into("<BBBBB" + fmt + "B", buffer, 0, # fmt, buffer, offset
FRAME_HEADER,
FRAME_HEADER,
id,
command.length,
command.value,
*data,
0)
0) # where the checksum will go

# Calculate the checksum and update the last byte of the buffer
buffer[-1] = checksum(buffer)

# Switch to sending data
duplexer.send_on_data()
duplexer.send_on_data() # Switch to sending data

# Clear out the receive buffer since we are now in send mode
while uart.any():
uart.read()

# Write out the previously generated buffer
uart.write(buffer)
uart.write(buffer) # Write out the buffer


def wait_for_send(uart):
Expand All @@ -89,81 +98,95 @@ def wait_for_send(uart):
pass

# Wait a short time to let the final bits finish transmitting
time.sleep_us(1000000 // BAUD_RATE)


def wait_for_receive(id, uart, duplexer, timeout):
ms = 1000.0 * timeout + 0.5
end_ms = ticks_add(ticks_ms(), int(ms))

while uart.any() == 0:
remaining_ms = ticks_diff(end_ms, ticks_ms())
if remaining_ms <= 0:
duplexer.send_on_data()
raise TimeoutError(f"Serial servo #{id} did not reply within the expected time")
time.sleep_us(1500000 // BAUD_RATE)


def handle_receive(uart):
frameStarted = False
frameCount = 0
dataCount = 0
dataLength = 2
rxBuf = 0
recvBuf = bytearray(32)
# Variables for handling received bytes
frame_started = False
header_count = 0
data_count = 0
data_length = 2
rx_byte = 0
rx_buffer = bytearray(32)

while uart.any() > 0:
rxBuf = uart.read(1)[0]
# print(hex(rxBuf), end=", ")

if not frameStarted:
if rxBuf == FRAME_HEADER:
frameCount += 1
if frameCount == 2:
frameCount = 0
frameStarted = True
dataCount = 1
rx_byte = uart.read(1)[0] # Read the next byte
# print(hex(rx_byte), end=", ")

# Are we outside of the frame?
if not frame_started:
# Only look for header bytes
if rx_byte == FRAME_HEADER:
header_count += 1

# Have to header bytes been received?
if header_count == 2:
header_count = 0
frame_started = True
data_count = 1
else:
frameStarted = False
dataCount = 0
frameCount = 0

if frameStarted:
recvBuf[dataCount] = rxBuf
if dataCount == 3:
dataLength = recvBuf[dataCount]
if dataLength < 3 or dataCount > 7:
dataLength = 2
frameStarted = False
dataCount += 1
if dataCount == dataLength + 3:
if checksum(recvBuf) == recvBuf[dataCount - 1]:
# print("Check SUM OK!!", end="\n\n")
frameStarted = False
return recvBuf[5:5 + dataLength - 3]
time.sleep_us(100)
frame_started = False
data_count = 0
header_count = 0

# Are we inside the frame?
if frame_started:
rx_buffer[data_count] = rx_byte # Add the byte to the buffer

# Extract the frame length from the data, exiting if it contradicts
if data_count == 3:
data_length = rx_buffer[data_count]
if data_length < 3 or data_count > 7:
data_length = 2
frame_started = False

data_count += 1

# Have we reached the expected end of the received data?
if data_count == data_length + 3:
# Does the checksum we calculate match what was received?
if checksum(rx_buffer) == rx_buffer[data_count - 1]:
# print("Checksum OK!!", end="\n\n")
frame_started = False
return rx_buffer[5:5 + data_length - 3]

time.sleep_us(100) # Sleep enough time for another byte to have been received

return None


def receive(id, uart, duplexer, timeout, fmt=""):
wait_for_send(uart)
wait_for_send(uart) # Ensure that all data has been sent
duplexer.receive_on_data() # Switch to receive mode

try:
duplexer.receive_on_data()
wait_for_receive(id, uart, duplexer, timeout)
# Calculate how long to wait for data
ms = int(1000.0 * timeout + 0.5)
end_ms = ticks_add(ticks_ms(), ms)

returned_buffer = handle_receive(uart)
if returned_buffer is not None:
if len(fmt) == 1:
ret = struct.unpack("<" + fmt, returned_buffer)[0]
else:
ret = struct.unpack("<" + fmt, returned_buffer)
else:
ret = None
finally:
duplexer.send_on_data()
# Wait for data to start coming in
while uart.any() == 0:
remaining_ms = ticks_diff(end_ms, ticks_ms())

# Has the timeout been reached?
if remaining_ms <= 0:
duplexer.send_on_data() # Switch back to send mode before throwing the error
raise TimeoutError(f"Serial servo #{id} did not reply within the expected time")

# Handle the received data
data_buffer = handle_receive(uart)

# Was valid data received?
data = None
if data_buffer is not None:
data = struct.unpack("<" + fmt, data_buffer)

# If there is only one piece of data expected, extract it
if len(fmt) == 1:
data = data[0]

return ret
duplexer.send_on_data() # Switch back to send mode
return data


class LXServo:
Expand Down Expand Up @@ -482,7 +505,7 @@ def __send(self, command, fmt="", *data):
send(self.__id, self.__uart, self.__duplexer, command, fmt, *data)

def __receive(self, fmt=""):
receive(self.__id, self.__uart, self.__duplexer, self.__timeout, fmt)
return receive(self.__id, self.__uart, self.__duplexer, self.__timeout, fmt)

def __message_header(self):
return f"[Servo{self.__id}] "
Expand Down

0 comments on commit 3664565

Please sign in to comment.