Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/asyncio #1828

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 139 additions & 70 deletions bCNC/Sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# Email: [email protected]
# Date: 17-Jun-2015

import asyncio
import functools
import glob
import os
import re
Expand Down Expand Up @@ -67,7 +69,6 @@
NOT_CONNECTED: "OrangeRed",
}


# =============================================================================
# bCNC Sender class
# =============================================================================
Expand All @@ -81,6 +82,8 @@ class Sender:
MSG_ERROR = 4 # error message or exception
MSG_RUNEND = 5 # run ended
MSG_CLEAR = 6 # clear buffer
SERIAL_IO_THREAD_NAME = 'serialIO'
MAIN_THREAD_NAME = 'main'

def __init__(self):
# Global variables
Expand All @@ -98,6 +101,8 @@ def __init__(self):
self.log = Queue() # Log queue returned from GRBL
self.queue = Queue() # Command queue to be send to GRBL
self.pendant = Queue() # Command queue to be executed from Pendant
self.context = threading.local()
self.context.name = self.MAIN_THREAD_NAME
self.serial = None
self.thread = None

Expand All @@ -121,6 +126,12 @@ def __init__(self):

self._onStart = ""
self._onStop = ""

self.loop = asyncio.new_event_loop()
self.resetCondition = None
self.bufferSyncEvent = None
self.resetLock =None
self.idleFunction = None

# ----------------------------------------------------------------------
def controllerLoad(self):
Expand Down Expand Up @@ -515,7 +526,7 @@ def open(self, device, baudrate):
self.mcontrol.initController()
self._gcount = 0
self._alarm = True
self.thread = threading.Thread(target=self.serialIO)
self.thread = threading.Thread(target=self.serialIOWrapper)
self.thread.start()
return True

Expand All @@ -530,8 +541,9 @@ def close(self):
except Exception:
pass
self._runLines = 0

self.thread = None
time.sleep(1)

try:
self.serial.close()
except Exception:
Expand Down Expand Up @@ -565,7 +577,9 @@ def hardReset(self):
self.mcontrol.hardReset()

def softReset(self, clearAlarm=True):
self.mcontrol.softReset(clearAlarm)
if not self.loop.is_running():
return
self.scheduleCoroutine(self.mcontrol.softReset(clearAlarm))

def unlock(self, clearAlarm=True):
self.mcontrol.unlock(clearAlarm)
Expand Down Expand Up @@ -619,7 +633,9 @@ def pause(self, event=None):
self.mcontrol.pause(event)

def purgeController(self):
self.mcontrol.purgeController()
if not self.loop.is_running():
return
self.scheduleCoroutine(self.mcontrol.purgeController())

def g28Command(self):
self.sendGCode("G28.1") # FIXME: ???
Expand Down Expand Up @@ -689,16 +705,16 @@ def stopRun(self, event=None):
# So we can purge the controller for the next job
# See https://github.com/vlachoudis/bCNC/issues/1035
# ----------------------------------------------------------------------
def jobDone(self):
async def jobDone(self):
print(f"Job done. Purging the controller. (Running: {self.running})")
self.purgeController()
asyncio.create_task(self.mcontrol.purgeController())

# ----------------------------------------------------------------------
# This is called everytime that motion controller changes the state
# YOU SHOULD PASS ONLY REAL HW STATE TO THIS, NOT BCNC STATE
# Right now the primary idea of this is to detect when job stopped running
# ----------------------------------------------------------------------
def controllerStateChange(self, state):
async def controllerStateChange(self, state):
print(
f"Controller state changed to: {state} (Running: {self.running})")
if state in ("Idle"):
Expand All @@ -709,82 +725,116 @@ def controllerStateChange(self, state):
and self.running is False
and state in ("Idle")):
self.cleanAfter = False
self.jobDone()
await self.jobDone()

# ----------------------------------------------------------------------
# thread performing I/O on serial line
# ----------------------------------------------------------------------
def serialIO(self):
def scheduleCoroutine(self, coro):
if self.context.name in self.SERIAL_IO_THREAD_NAME:
asyncio.create_task(coro)
else:
task = asyncio.run_coroutine_threadsafe(coro, self.loop)
while not task.done():
if self.idleFunction:
self.idleFunction()
else:
time.sleep(0.1)

def serialIOWrapper(self):
self.context.name = self.SERIAL_IO_THREAD_NAME
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.serialIO())
for task in asyncio.all_tasks(self.loop):
task.cancel()

async def triggerBufferSync(self):
if self.bufferSyncEvent:
self.bufferSyncEvent.set()

async def serialIO(self):
# wait for commands to complete (status change to Idle)
self.sio_wait = False
self.sio_status = False # waiting for status <...> report
cline = [] # length of pipeline commands
sline = [] # pipeline commands
tosend = None # next string to send
tr = tg = time.time() # last time a ? or $G was send to grbl

self.context.cline = [] # length of pipeline commands
self.context.sline = [] # pipeline commands
self.context.tosend = None # next string to send
self.context.tr = self.context.tg = time.time() # last time a ? or $G was send to grbl
self.resetCondition = asyncio.Condition()
self.resetLock = asyncio.Lock()
while self.thread:
await self.writeSerialI0()
if not self.thread:
return
await self.readSerialI0()
await asyncio.sleep(0.01)

self.resetCondition = None
self.resetLock = None

async def writeSerialI0(self):
try:
t = time.time()
# refresh machine position?
if t - tr > SERIAL_POLL:
if t - self.context.tr > SERIAL_POLL:
self.mcontrol.viewStatusReport()
tr = t
self.context.tr = t

# If Override change, attach feed
if CNC.vars["_OvChanged"]:
self.mcontrol.overrideSet()

# Fetch new command to send if...
if (
tosend is None
self.context.tosend is None
and not self.sio_wait
and not self._pause
and self.queue.qsize() > 0
):
try:
tosend = self.queue.get_nowait()
if isinstance(tosend, tuple):
self.context.tosend = self.queue.get_nowait()
if isinstance(self.context.tosend, tuple):
# wait to empty the grbl buffer and status is Idle
if tosend[0] == WAIT:
if self.context.tosend[0] == WAIT:
# Don't count WAIT until we are idle!
self.sio_wait = True
elif tosend[0] == MSG:
elif self.context.tosend[0] == MSG:
# Count executed commands as well
self._gcount += 1
if tosend[1] is not None:
if self.context.tosend[1] is not None:
# show our message on machine status
self._msg = tosend[1]
elif tosend[0] == UPDATE:
self._msg = self.context.tosend[1]
elif self.context.tosend[0] == UPDATE:
# Count executed commands as well
self._gcount += 1
self._update = tosend[1]
self._update = self.context.tosend[1]
else:
# Count executed commands as well
self._gcount += 1
tosend = None
self.context.tosend = None

elif not isinstance(tosend, str):
elif not isinstance(self.context.tosend, str):
try:
tosend = self.gcode.evaluate(tosend, self)
if isinstance(tosend, str):
tosend += "\n"
self.context.tosend = self.gcode.evaluate(self.context.tosend, self)
if isinstance(self.context.tosend, str):
self.context.tosend += "\n"
else:
# Count executed commands as well
self._gcount += 1
except Exception:
for s in str(sys.exc_info()[1]).splitlines():
self.log.put((Sender.MSG_ERROR, s))
self._gcount += 1
tosend = None
self.context.tosend = None
except Empty:
break
return

if tosend is not None:
# All modification in tosend should be
if self.context.tosend is not None:
# All modification in self.context.tosend should be
# done before adding it to cline

# Keep track of last feed
pat = FEEDPAT.match(tosend)
pat = FEEDPAT.match(self.context.tosend)
if pat is not None:
self._lastFeed = pat.group(2)

Expand All @@ -799,16 +849,16 @@ def serialIO(self):
if (
pat is None
and self._newFeed != 0
and not tosend.startswith("$")
and not self.context.tosend.startswith("$")
):
tosend = f"f{self._newFeed:g}{tosend}"
self.context.tosend = f"f{self._newFeed:g}{self.context.tosend}"

# Apply override Feed
if CNC.vars["_OvFeed"] != 100 and self._newFeed != 0:
pat = FEEDPAT.match(tosend)
pat = FEEDPAT.match(self.context.tosend)
if pat is not None:
try:
tosend = "{}f{:g}{}\n".format(
self.context.tosend = "{}f{:g}{}\n".format(
pat.group(1),
self._newFeed,
pat.group(3),
Expand All @@ -817,49 +867,68 @@ def serialIO(self):
pass

# Bookkeeping of the buffers
sline.append(tosend)
cline.append(len(tosend))

# Anything to receive?
if self.serial.inWaiting() or tosend is None:
try:
line = str(self.serial.readline().decode("ascii", "ignore")).strip()
except Exception:
self.log.put((Sender.MSG_RECEIVE, str(sys.exc_info()[1])))
self.emptyQueue()
self.close()
return

if not line:
pass
elif self.mcontrol.parseLine(line, cline, sline):
pass
else:
self.log.put((Sender.MSG_RECEIVE, line))
self.context.sline.append(self.context.tosend)
self.context.cline.append(len(self.context.tosend))

# Received external message to stop
if self._stop:
self.emptyQueue()
tosend = None
self.context.tosend = None
self.bufferSyncEvent = asyncio.Event()
self.log.put((Sender.MSG_CLEAR, ""))
await self.bufferSyncEvent.wait()
self.bufferSyncEvent = None
# WARNING if runLines==maxint then it means we are
# still preparing/sending lines from from bCNC.run(),
# so don't stop
if self._runLines != sys.maxsize:
self._stop = False
async with self.resetCondition:
self.resetCondition.notify_all()

if tosend is not None and sum(cline) < RX_BUFFER_SIZE:
self._sumcline = sum(cline)
if self.context.tosend is not None and sum(self.context.cline) < RX_BUFFER_SIZE:
self._sumcline = sum(self.context.cline)
if self.mcontrol.gcode_case > 0:
tosend = tosend.upper()
self.context.tosend = self.context.tosend.upper()
if self.mcontrol.gcode_case < 0:
tosend = tosend.lower()

self.serial_write(tosend)
self.context.tosend = self.context.tosend.lower()
self.serial_write(self.context.tosend)

self.log.put((Sender.MSG_BUFFER, tosend))
self.log.put((Sender.MSG_BUFFER, self.context.tosend))

tosend = None
if not self.running and t - tg > G_POLL:
self.context.tosend = None
if not self.running and t - self.context.tg > G_POLL:
self.mcontrol.viewState()
tg = t
self.context.tg = t

except serial.SerialException:
self.log.put((Sender.MSG_RECEIVE, str(sys.exc_info()[1])))
self.emptyQueue()
if self.thread:
self.event_generate("<<Connect>>")
return
except Exception:
return


async def readSerialI0(self):
# Anything to receive?
try:
if self.serial.inWaiting() or self.context.tosend is None:
line = str(self.serial.readline().decode("ascii", "ignore")).strip()

if not line:
pass
elif await self.mcontrol.parseLine(line, self.context.cline, self.context.sline):
pass
else:
self.log.put((Sender.MSG_RECEIVE, line))
except serial.SerialException:
self.log.put((Sender.MSG_RECEIVE, str(sys.exc_info()[1])))
self.emptyQueue()
if self.thread:
self.event_generate("<<Connect>>")
return
except Exception:
return
Loading