ATC: M100..M103 preprocessor + Mach MDI rewrite + hook handlers

ATC pneumatics in g-code (drop tool / grab tool / release clamp /
engage clamp) are expressed as M100..M103. AuxPreprocessor rewrites
those into (MSG,HOOK:droptool:) etc on file upload + on planner
load + on MDI input, so the Hooks layer (B1) can dispatch them via
registered ATC handlers in Ctrl.

- AuxPreprocessor.py: regex-based file rewriter, idempotent.
- FileHandler: invoke preprocessor on every upload.
- Planner.init: also re-preprocess on load (catches files written
  before this version).
- Mach.mdi: same rewrite for ad-hoc MDI input so M101 typed at the
  console produces a HOOK message.
- Ctrl: register the four ATC hooks (droptool/grabtool/release/clamp)
  with block_unpause + auto_resume so programs using them pause at
  the right point and resume cleanly. aux_home retained as a legacy
  alias for older preprocessed files.
This commit is contained in:
2026-05-03 14:18:28 +02:00
parent 800cb04e3b
commit da619bd56c
4 changed files with 487 additions and 5 deletions

View File

@@ -95,6 +95,10 @@ class Mach(Comm):
self.planner = bbctrl.Planner(ctrl)
self.unpausing = False
self.stopping = False
# Guard against overlapping deferred-external-homing threads
# if the user clicks Home (All) again while the previous run
# is still waiting for the AVR cycle to finish.
self._ext_home_thread = None
ctrl.state.set('cycle', 'idle')
@@ -256,6 +260,12 @@ class Mach(Comm):
if cmd[0] == '$': self._query_var(cmd)
elif cmd[0] == '\\': super().queue_command(cmd[1:])
else:
# Rewrite ATC M-codes in MDI input the same way the
# FileHandler rewrites uploaded files. Motion (X/Y/Z/A)
# is left unchanged: the planner handles it natively
# now that the auxcnc stepper is exposed as a virtual
# A axis (see ExternalAxis).
cmd = self._rewrite_aux_mdi(cmd)
self._begin_cycle('mdi')
self.planner.mdi(cmd, with_limits)
super().resume()
@@ -263,11 +273,51 @@ class Mach(Comm):
self.mlog.info("Exception during MDI: %s" % err)
pass
def _rewrite_aux_mdi(self, cmd):
"""Apply the ATC M-code preprocessor to a single MDI line.
Returns possibly-multi-line G-code with HOOK: comments inserted."""
try:
from bbctrl.AuxPreprocessor import AuxPreprocessor, _ATC_M_RE
if not _ATC_M_RE.search(cmd):
return cmd
import io, tempfile, os
# AuxPreprocessor.process is file-based; route through
# tempfiles so we don't fork the regex/state logic.
pre = AuxPreprocessor(log=self.mlog)
with tempfile.NamedTemporaryFile('w', suffix='.nc',
delete=False) as fi:
fi.write(cmd if cmd.endswith('\n') else cmd + '\n')
ipath = fi.name
opath = ipath + '.out'
try:
pre.process(ipath, opath)
rewritten = open(opath).read()
finally:
try: os.unlink(ipath)
except OSError: pass
try: os.unlink(opath)
except OSError: pass
return rewritten
except Exception as e:
self.mlog.warning('Aux MDI rewrite failed: %s' % e)
return cmd
def set(self, code, value):
super().queue_command('${}={}'.format(code, value))
def jog(self, axes):
# Strip the external axis from the jog request before sending
# to the AVR. v1 doesn't support continuous-rate jogging on
# the ESP-driven axis - users jog A via /api/aux/jog (relative
# mm steps) instead. Sending A to the AVR is harmless (no
# motor maps to it) but cleaner to strip.
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is not None and isinstance(axes, dict):
axes = {k: v for k, v in axes.items()
if k.lower() != ext.axis_letter}
if not axes:
return
self._begin_cycle('jogging')
self.planner.position_change()
super().queue_command(Cmd.jog(axes))
@@ -281,10 +331,52 @@ class Mach(Comm):
axes = 'zxybc' if is_rotary_active else 'zxyabc' # TODO This should be configurable
else: axes = '%c' % axis
# Collect external axes here and process them *after* every
# AVR axis above has finished its homing cycle. Without this,
# the AVR is still running Z/X/Y homing G-code in the
# planner queue while ext.home() synchronously drives the ESP
# to home A in parallel - which is unsafe (the gantry and W
# axis can move at the same time) and visually confusing.
# We defer external homing to a background thread that
# polls cycle until the AVR cycle completes.
external_pending = []
for axis in axes:
enabled = state.is_axis_enabled(axis)
mode = state.axis_homing_mode(axis)
# External axes (e.g. the auxcnc-driven A axis) home via
# their own ESP-side homing routine; the standard
# G28.2 / G38.6 / latch sequence doesn't apply.
#
# After homing we want a deterministic outcome regardless
# of where the user was before:
# physical position = home_position_mm (e.g. 134 mm)
# work-coord origin = home position (user A = 0)
# work offset = home_position_mm (so abs - off = 0)
#
# ext.home() blocks on the ESP and updates state.ap to
# home_position_mm. We then need to tell the AVR (so its
# ex.position[A] matches physical reality) and gplan
# (so trajectory planning sees abs at home).
#
# We deliberately avoid G28.3 here: gplan's G28.3 keeps the
# current user-coord position fixed and adjusts the offset
# to match the new abs, which means re-homing after a move
# accumulates offset (134 -> 268 -> ...). Using G92 a0
# *after* syncing abs gives the desired "user A = 0 here"
# outcome with offset = home_position every time.
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is not None and ext.enabled \
and ext.axis_letter == axis.lower():
if 1 < len(axes) and not enabled:
continue
# Defer until AVR axes are done. We capture the axis
# letter and ext reference; the actual homing runs
# in _run_external_homing below.
external_pending.append((axis, ext))
continue
# If this is not a request to home a specific axis and the
# axis is disabled or in manual homing mode, don't show any
# warnings
@@ -315,8 +407,138 @@ class Mach(Comm):
self.planner.mdi(gcode, False)
super().resume()
# Kick off the deferred external-axis homing on a background
# thread so we don't block the HTTP handler (which is on the
# IOLoop) waiting for the AVR cycle to finish.
if external_pending:
prev = self._ext_home_thread
if prev is not None and prev.is_alive():
self.mlog.info(
'External homing already in progress; ignoring '
'duplicate request')
else:
import threading
t = threading.Thread(
target=self._run_external_homing,
args=(list(external_pending),),
name='ext-home-deferred',
daemon=True)
self._ext_home_thread = t
t.start()
def unhome(self, axis): self.mdi('G28.2 %c0' % axis)
def _run_external_homing(self, pending):
"""Background worker: wait for the AVR cycle to drop to idle
(meaning all queued AVR-side homing is done), then run each
deferred external-axis home in order.
We split the work between two threads:
- this background thread blocks on the ESP serial RPC
(ext.home(), which can take 5-10 seconds while the
carriage seeks the limit and backs off twice);
- small bookkeeping operations that touch gplan, the AVR
command queue, or shared State are scheduled back onto
the IOLoop via ctrl.ioloop.add_callback() so we don't
race with the rest of the controller.
"""
import time
# Wait up to 5 minutes for the AVR cycle to leave 'homing'.
# Long enough for any reasonable Onefinity full-travel home
# (Y axis at slow rate covers ~800 mm).
deadline = time.time() + 300.0
while time.time() < deadline:
cycle = self._get_cycle()
# 'homing' is the AVR's homing cycle; we wait for it to
# return to idle. If the user estopped or the cycle was
# aborted, cycle goes to idle too - we still proceed and
# the external home will fail-soft if conditions are wrong.
if cycle == 'idle':
break
time.sleep(0.1)
else:
self.mlog.error(
'External axis homing aborted: AVR cycle did not '
'return to idle within timeout')
return
for axis, ext in pending:
self.mlog.info('Homing external %s axis via auxcnc' %
axis.upper())
# Begin the cycle on the IOLoop so cycle-state writes go
# through the same thread that all other state writes do.
self.ctrl.ioloop.add_callback(self._begin_cycle, 'homing')
try:
# ext.home() runs on this background thread - it
# blocks on serial I/O and is fully thread-safe (the
# AuxAxis driver has its own RPC lock).
ext.home()
home_mm = ext.home_position_mm
# All of the post-home bookkeeping touches gplan and
# the AVR command queue, both of which run on the
# IOLoop. Schedule it there in a single callback so
# the steps run in order without intervening events.
self.ctrl.ioloop.add_callback(
self._finish_external_home, axis, home_mm)
except Exception as e:
self.mlog.error(
'External axis homing failed: %s' % e)
# Cycle reset must also happen on the IOLoop. Without
# this the UI stays locked at 'homing' since the AVR
# never moved (no state change to drive _update's
# cycle-end path).
self.ctrl.ioloop.add_callback(
self._abort_external_home_cycle)
def _finish_external_home(self, axis, home_mm):
"""IOLoop-side completion of an external axis home.
Synchronizes AVR position, refreshes the planner, and emits
a G92 to set the user-coord origin at the home position.
"""
try:
# 1) Update AVR: no motor steps, just position sync.
super().queue_command(Cmd.set_axis(axis, home_mm))
# 2) Force planner to resync abs from State on the next
# planner call (which is the MDI below).
self.planner.position_change()
# 3) G92 <axis>0: with abs already at home_mm, sets
# user-coord A = 0 and offset = home_mm. Use
# planner.mdi (not Mach.mdi) so we don't flip cycle
# to 'mdi' inside the 'homing' cycle.
self.planner.mdi('G92 %c0' % axis, False)
super().resume()
except Exception:
self.mlog.exception(
'Post-home bookkeeping failed for external axis')
self._abort_external_home_cycle()
def _abort_external_home_cycle(self):
"""Reset cycle to idle from the IOLoop after a failed
external axis home. The AVR never moved so _update's normal
cycle-end path won't fire; do it explicitly here.
"""
if self._get_cycle() == 'homing':
try:
self._set_cycle('idle')
except Exception:
self.mlog.exception(
'Failed to reset cycle to idle after external '
'homing error')
def unhome(self, axis):
# External axes don't have AVR-side homed state to clear; the
# ESP holds its own homed flag. We don't have an explicit
# "unhome" verb on the ESP, but a stale homed flag is harmless
# because the next absolute move will fail-soft via
# ExternalAxis._pos_mm sync. Still mirror the cleared flag
# into State for the UI.
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is not None and ext.enabled \
and chr(axis).lower() == ext.axis_letter:
from bbctrl.ExternalAxis import EXTERNAL_MOTOR_INDEX
self.ctrl.state.set('%dh' % EXTERNAL_MOTOR_INDEX, 0)
self.ctrl.state.set(ext.axis_letter + '_homed', False)
return
self.mdi('G28.2 %c0' % axis)
def estop(self): super().estop()
@@ -343,12 +565,22 @@ class Mach(Comm):
def stop(self):
if self._get_state() != 'jogging': self.stopping = True
super().i2c_command(Cmd.STOP)
# Drain the external-axis worker queue so post-stop resumption
# doesn't replay queued moves that the user wanted cancelled.
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is not None:
try: ext.abort()
except Exception: pass
def pause(self): super().pause()
def unpause(self):
if self._is_paused():
# Gate unpause on hook completion
if hasattr(self.ctrl, 'hooks') and \
not self.ctrl.hooks.can_unpause():
return
self.ctrl.state.set('optional_pause', False)
self._unpause()