Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
beniroquai committed Jul 21, 2024
2 parents 3d791c8 + b186099 commit c1a7f28
Show file tree
Hide file tree
Showing 14 changed files with 501 additions and 113 deletions.
43 changes: 0 additions & 43 deletions conda/meta.yaml

This file was deleted.

74 changes: 74 additions & 0 deletions uc2rest/MockSerial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import random
from threading import Thread
import time

class MockSerial:
def __init__(self, port, baudrate, timeout=1):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.is_open = False
self.data_buffer = []
self.thread = Thread(target=self._simulate_data)
self.thread.daemon = True
self.thread.start()
self.is_open = True
self.manufacturer = "UC2Mock"
self.BAUDRATES = -1

def flush(self):
pass

def isOpen(self):
return self.is_open

def open(self):
self.is_open = True

def close(self):
self.is_open = False

def readline(self, timeout=1):
if not self.is_open:
raise Exception("Device not connected")
if len(self.data_buffer) == 0:
return b''
data = self.data_buffer
self.data_buffer = self.data_buffer
time.sleep(.05)
return bytes(data)

def read(self, num_bytes):
if not self.is_open:
raise Exception("Device not connected")
if len(self.data_buffer) == 0:
return b''
data = self.data_buffer[:num_bytes]
self.data_buffer = self.data_buffer[num_bytes:]
return bytes(data)

def write(self, data):
if not self.is_open:
raise Exception("Device not connected")
pass # Do nothing, as it's a mock

def _simulate_data(self):
while self.is_open:
if random.random() < 0.2: # Simulate occasional data availability
self.data_buffer.extend([random.randint(0, 255) for _ in range(10)])
time.sleep(0.1)

def __enter__(self):
self.open()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()


if __name__ == '__main__':
with MockSerial("COM1", 9600) as ser:
while True:
data = ser.readline()
print(data)
time.sleep(1)
36 changes: 36 additions & 0 deletions uc2rest/TEST/TEST_ESP32_LEDarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#%%
import uc2rest
import numpy as np
import time

port = "unknown"
port = "/dev/cu.SLAB_USBtoUART"
ESP32 = uc2rest.UC2Client(serialport=port, baudrate=500000, DEBUG=True)
time.sleep(2)
#ESP32.serial.sendMessage('{"task":"/home_act", "home": {"steppers": [{"stepperid":1, "timeout": 20000, "speed": 15000, "direction":1, "endposrelease":3000}]}}')
# display random pattern
for i in range(10):
led_pattern = np.random.randint(0,55, (64,3))
ESP32.led.send_LEDMatrix_array(led_pattern=led_pattern, getReturn=True, timeout=4)

#
''' ################
LED
################'''
# test LED
ESP32.led.send_LEDMatrix_full(intensity=(255, 255, 255))
ESP32.led.send_LEDMatrix_full(intensity=(0, 0, 0))

# single LED
for iLED in range(5):
# timeout = 0 means no timeout => mResult will be rubish!
mResult = ESP32.led.send_LEDMatrix_single(indexled=iLED, intensity=(255, 255, 255))#, timeout=0.)
mResult = ESP32.led.send_LEDMatrix_single(indexled=iLED, intensity=(0, 0, 0))#, timeout=0.)

# display random pattern
for i in range(50):
led_pattern = np.random.randint(0,55, (48,3))
ESP32.led.send_LEDMatrix_array(led_pattern=led_pattern, getReturn=False)

ESP32.close()

53 changes: 21 additions & 32 deletions uc2rest/TEST/TEST_ESP32_Serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,30 @@
port = "unknown"
port = "/dev/cu.SLAB_USBtoUART"
#port = "COM3"
ESP32 = uc2rest.UC2Client(serialport=port, baudrate=500000, DEBUG=True)
print("start")
ESP32 = uc2rest.UC2Client(serialport=port, baudrate=115200, DEBUG=True)
#ESP32.serial.sendMessage('{"task":"/home_act", "home": {"steppers": [{"stepperid":1, "timeout": 20000, "speed": 15000, "direction":1, "endposrelease":3000}]}}')

#%% TEMPERATURE
ESP32.temperature.start_temperature_polling()
time.sleep(5)
mTemperature = ESP32.temperature.get_temperature()
print(mTemperature)

#

ESP32.home.home_x(speed =15000, direction = 1, endposrelease = 3000, timeout=20, isBlocking=True)
ESP32.home.home_x(speed =15000, direction = -1, endposrelease = 3000, timeout=20, isBlocking=True)


ESP32.motor.move_x(steps=10000, speed=10000, is_blocking=True)
#%% TEMPERATURE
if 0:
ESP32.temperature.start_temperature_polling()
time.sleep(5)
mTemperature = ESP32.temperature.get_temperature()
print(mTemperature)


#%%
''' ################
HOME
################'''
ESP32.home.home_y(speed =15000, direction = 1, endposrelease = 3000, timeout=2, isBlocking=True)
if 0:
ESP32.home.home_y(speed =15000, direction = 1, endposrelease = 3000, timeout=2, isBlocking=True)
ESP32.home.home_x(speed =15000, direction = 1, endposrelease = 3000, timeout=20, isBlocking=True)
ESP32.home.home_x(speed =15000, direction = -1, endposrelease = 3000, timeout=20, isBlocking=True)

heapSize = ESP32.state.getHeap()
print("Heap size: ", heapSize)

# setting debug output of the serial to true - all message will be printed
ESP32.serial.DEBUG=True
Expand All @@ -40,33 +41,20 @@
mState = ESP32.state.get_state()


''' ################
MODULES
################'''
#load modules from pyhton
mModules = ESP32.modules.get_default_modules()
assert mModules["home"] == 0 or mModules["home"] == 1, "Failed loading the default modules"
print(mModules) #{'led': True, 'motor': True, 'home': True, 'analogin': False, 'pid': False, 'laser': True, 'dac': False, 'analogout': False, 'digitalout': False, 'digitalin': True, 'scanner': False, 'joy': False}

# load modules from device
mModulesDevice = ESP32.modules.get_modules()
#assert mModulesDevice["home"] == 0 or mModulesDevice["home"] == 1, "Failed loading the modules from the device"
print(mModulesDevice) #{'led': True, 'motor': True, 'home': True, 'analogin': False, 'pid': False, 'laser': True, 'dac': False, 'analogout': False, 'digitalout': False, 'digitalin': True, 'scanner': False, 'joy': False}
mModules['home']=1 # activate home module
#%%


ESP32.motor.move_x(steps=10000, speed=10000, is_blocking=True)

# {"task":"/ledarr_act", "led":{"LEDArrMode":1, "led_array":[{"id":0, "r":255, "g":255, "b":255}]}}
mResult = ESP32.led.send_LEDMatrix_full(intensity=(255, 255, 255))
print("Heap size: ", ESP32.state.getHeap())
mResult = ESP32.led.send_LEDMatrix_full(intensity=(0, 0, 0), getReturn=False)

print("Heap size: ", ESP32.state.getHeap())

# check if we are connected
# see if it's the right device
mState = ESP32.state.get_state()
assert mState["identifier_name"] == "UC2_Feather", "Wrong device connected"
#assert mState["state"]["identifier_name"] == "UC2_Feather", "Wrong device connected"

#%%
# test Motor
Expand Down Expand Up @@ -112,11 +100,12 @@
################'''
# test LED
mResult = ESP32.led.send_LEDMatrix_full(intensity=(255, 255, 255))
assert mResult["success"] == 1, "Failed sending LED command"
assert mResult["idsuccess"] == 1, "Failed sending LED command"
time.sleep(0.5)
print("Heap size: ", ESP32.state.getHeap())
mResult = ESP32.led.send_LEDMatrix_full(intensity=(0, 0, 0))
assert mResult["success"] == 1, "Failed sending LED command"

print("Heap size: ", ESP32.state.getHeap())
# single LED
ESP32.setDebugging(False)
for iLED in range(5):
Expand Down
45 changes: 45 additions & 0 deletions uc2rest/TEST/TEST_PURE_SERIAL.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import serial
import threading
import time
import json

# Configuration parameters
port = "/dev/cu.wchusbserial110" # Adjust this to your device's serial port
baudrate = 500000

# Open the serial port
ser = serial.Serial(port, baudrate, timeout=1)

lock = threading.Lock()
# Function to handle incoming messages
def read_from_port(ser):
while True:
try:
with lock:
reading = ser.readline().decode('utf-8').rstrip()
if reading:
print(f"Received: {reading}")
except Exception as e:
print("Error reading from port")
break
# Start the thread for reading
thread = threading.Thread(target=read_from_port, args=(ser,))
thread.start()

iiter = 0
try:
print("Serial Terminal Running. Type 'exit' to quit.")
while True:
iiter+=1
# Get input from the user
input_data = {'task': '/ledarr_act', 'led': {'LEDArrMode': 0, 'led_array': [{'id': 0, 'r': 25, 'g': 42, 'b': 4}, {'id': 1, 'r': 52, 'g': 26, 'b': 11}, {'id': 2, 'r': 51, 'g': 10, 'b': 17}]}, 'qid': 1}
if input_data == 'exit':
break
# Send data if the user typed something
if input_data:
with lock:
ser.write(f"{json.dumps(input_data)}\n".encode('utf-8'))
print(f"Sent: {iiter}")
finally:
ser.close()
print("Serial Terminal Closed.")
Loading

0 comments on commit c1a7f28

Please sign in to comment.