Files
onefinity-firmware/src/py/bbctrl/AuxPreprocessor.py
Claude c7cf9483b3 Add W axis integration via auxcnc ESP32 over /dev/ttyUSB0
Rather than rebuild gplan + the AVR firmware to add a true 7th axis,
we treat W as a synchronous out-of-band axis that moves between G-code
blocks. The pipeline:

  upload -> AuxPreprocessor rewrites W tokens into (MSG,HOOK:aux:N)
  comments -> planner sees only XYZ + messages -> Hooks fires the
  registered internal handler -> AuxAxis sends STEPS/HOME over serial
  to the ESP and blocks the planner until done.

New files:
  src/py/bbctrl/AuxAxis.py       serial worker + RPC layer
  src/py/bbctrl/AuxPreprocessor.py  G-code rewriter
  docs/AUX_W_AXIS.md             design + ops notes

Changed:
  Hooks.py        register_internal(); fix the (MSG,HOOK:...) listener
                  to read the 'messages' state list (was broken before)
  Ctrl.py         instantiate AuxAxis, register aux/aux_rel/aux_home/
                  aux_setzero hooks
  FileHandler.py  rewrite uploads in place when they use W
  Mach.py         rewrite W tokens in MDI input the same way
  Web.py          REST endpoints under /api/aux/*

The ESP firmware in ../auxcnc was extended in lockstep: HOME, HOMECFG
(NVS-persisted), WPOS, HOMED?, LIMIT?, abortable STEPS with
limit-aware abort, trapezoidal ramps, deterministic [topic] reply
tokens, [boot] banner.

Real-time decisions (limit switch, step pulses) live on the ESP. The
host owns mm units, soft limits, and aux_homed bookkeeping. ESP
reboot mid-job clears aux_homed and surfaces a message; per design
manual jogs are still allowed without homing.
2026-04-30 16:51:24 +02:00

238 lines
9.0 KiB
Python

################################################################################
#
# AuxPreprocessor - rewrite W-axis G-code into hook calls
#
# The bbctrl planner only understands xyzabc. We expose a virtual W axis by
# rewriting the G-code file *before* it is fed to gplan, replacing each W
# move with a (MSG,HOOK:aux:...) line that the host's hook handler turns
# into a STEPS or HOME command on the ESP.
#
# Rules:
# - Mixed-axis blocks (W together with XYZABC) are split into two
# sequential blocks. By default the W move runs first; configurable.
# - G90/G91/G20/G21 modal state is tracked so we can convert relative-W
# and inch-W into the absolute mm value the hook handler expects.
# - G28 W0 / G28.2 W0 -> HOOK:aux_home
# - G92 Wx -> HOOK:aux_setzero:<mm>
# - G53 + W not specially handled (W only knows machine coords)
# - Lines inside parentheses or after `;` are passed through.
#
# The preprocessor is intentionally conservative: anything it doesn't
# understand involving W is left alone with a warning, so motion lands in
# gplan which will complain loudly rather than silently misbehaving.
#
################################################################################
import os
import re
import shutil
import tempfile
# Match a word like "W12.5" or "W-3" or "w0". Also matches inside the same
# line as XYZ words. We pull W out specifically.
_W_TOKEN_RE = re.compile(r'(?<![A-Za-z_0-9])[Ww]\s*([-+]?\d*\.?\d+)')
# Detect any axis-bearing word (so we can tell mixed-axis lines apart).
_AXIS_WORD_RE = re.compile(r'(?<![A-Za-z_0-9])[XYZABCxyzabc]\s*[-+]?\d*\.?\d+')
# Strip line comments so we don't get fooled by "(W axis)".
_PAREN_COMMENT_RE = re.compile(r'\([^)]*\)')
# Modal G-code groups we care about.
_MODAL_RE = re.compile(r'(?<![A-Za-z_0-9])[Gg]\s*0*(\d+(?:\.\d+)?)')
class AuxPreprocessorError(Exception):
pass
class AuxPreprocessor(object):
def __init__(self, log=None, w_first=True):
self.log = log
# If True, on a mixed-axis line (e.g. G1 X10 W5), emit the W move
# first, then the XYZ move. Set False to invert.
self.w_first = w_first
def _info(self, msg):
if self.log:
self.log.info(msg)
def _warn(self, msg):
if self.log:
self.log.warning(msg)
# ------------------------------------------------------------------ scan
@staticmethod
def file_uses_w(path):
"""Quick check: does this file contain any W-axis word? Used to skip
preprocessing entirely for files that don't care about W."""
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
for line in f:
code = _PAREN_COMMENT_RE.sub('', line)
code = code.split(';', 1)[0]
if _W_TOKEN_RE.search(code):
return True
except Exception:
pass
return False
# ------------------------------------------------------------------ core
def _strip_w(self, line):
"""Return (line_without_w, w_value_str_or_None). Only first W kept."""
m = _W_TOKEN_RE.search(line)
if m is None:
return line, None
# Remove just the matched W<num> token, preserving surrounding spaces.
rewritten = line[:m.start()] + line[m.end():]
return rewritten, m.group(1)
def _has_other_axis(self, code_no_w):
return _AXIS_WORD_RE.search(code_no_w) is not None
def _detect_modals(self, code, modal):
"""Update modal dict in-place from G-codes on this line."""
for mm in _MODAL_RE.finditer(code):
try:
g = float(mm.group(1))
except ValueError:
continue
if g == 90: modal['abs'] = True
elif g == 91: modal['abs'] = False
elif g == 20: modal['inch'] = True
elif g == 21: modal['inch'] = False
# G28 / G28.2 / G92 are detected case-by-case below.
@staticmethod
def _is_g28_like(code):
# Match G28 or G28.2 (homing).
return bool(re.search(r'(?<![A-Za-z_0-9])[Gg]\s*0*28(?:\.2)?(?![\w.])', code))
@staticmethod
def _is_g92(code):
return bool(re.search(r'(?<![A-Za-z_0-9])[Gg]\s*0*92(?![\w.])', code))
# ------------------------------------------------------------------ run
def process(self, src_path, dst_path):
"""Read src_path, write rewritten G-code to dst_path. Returns True
if any rewrite happened."""
modal = {'abs': True, 'inch': False} # G90 G21 are common defaults
rewrote_any = False
with open(src_path, 'r', encoding='utf-8', errors='replace') as fin, \
open(dst_path, 'w', encoding='utf-8') as fout:
for raw in fin:
line = raw.rstrip('\n')
# Comment-only or blank lines pass through verbatim.
code = _PAREN_COMMENT_RE.sub('', line)
code = code.split(';', 1)[0]
if not code.strip():
fout.write(raw)
continue
# Update modal from G-codes on this line first (so absolute
# vs incremental matches what the planner sees for XYZ).
self._detect_modals(code, modal)
if not _W_TOKEN_RE.search(code):
fout.write(raw)
continue
rewrote_any = True
# G28[.2] W... -> aux_home (W value is ignored except as
# a flag that W is being homed).
if self._is_g28_like(code):
code_no_w, _ = self._strip_w(line)
fout.write('(MSG,HOOK:aux_home:)\n')
# Only keep the residual line if other axes were also
# present (e.g. G28.2 X0 Y0 W0 still homes X+Y). A bare
# "G28" without axis args means "home all" in gcode
# which we explicitly DON'T want to trigger from a
# W-only home command.
rest_code = _PAREN_COMMENT_RE.sub('', code_no_w)
rest_code = rest_code.split(';', 1)[0]
if self._has_other_axis(rest_code):
fout.write(code_no_w + '\n')
continue
# G92 W... -> set W zero (or other value) without motion.
if self._is_g92(code):
line_no_w, w_val = self._strip_w(line)
target_mm = self._w_to_mm(w_val, modal, set_pos=True)
fout.write('(MSG,HOOK:aux_setzero:%g)\n' % target_mm)
rest_code = _PAREN_COMMENT_RE.sub('', line_no_w)
rest_code = rest_code.split(';', 1)[0]
if self._has_other_axis(rest_code):
fout.write(line_no_w + '\n')
continue
# Plain motion: G0/G1 etc with W word.
line_no_w, w_val = self._strip_w(line)
target_mm = self._w_to_mm(w_val, modal, set_pos=False)
# Distinguish absolute vs relative: encode both, the hook
# handler will pick the right operation.
if modal['abs']:
hook_line = '(MSG,HOOK:aux:%g)' % target_mm
else:
hook_line = '(MSG,HOOK:aux_rel:%g)' % target_mm
rest_code = _PAREN_COMMENT_RE.sub('', line_no_w)
rest_code = rest_code.split(';', 1)[0]
has_xyz = self._has_other_axis(rest_code)
if not has_xyz:
# Pure W move; drop the (now-empty) original line.
fout.write(hook_line + '\n')
continue
# Mixed-axis: split. Default order is W first.
if self.w_first:
fout.write(hook_line + '\n')
fout.write(line_no_w + '\n')
else:
fout.write(line_no_w + '\n')
fout.write(hook_line + '\n')
return rewrote_any
# ------------------------------------------------------------ unit conv
def _w_to_mm(self, w_str, modal, set_pos):
try:
v = float(w_str)
except (TypeError, ValueError):
raise AuxPreprocessorError('Invalid W value: %r' % w_str)
if modal['inch']:
v *= 25.4
return v
def preprocess_file(src_path, log=None, w_first=True):
"""Convenience: rewrite src_path in place if it uses W.
Returns True if the file was rewritten."""
if not AuxPreprocessor.file_uses_w(src_path):
return False
pre = AuxPreprocessor(log=log, w_first=w_first)
fd, tmp = tempfile.mkstemp(prefix='auxpre_', suffix='.nc',
dir=os.path.dirname(src_path) or None)
os.close(fd)
try:
rewrote = pre.process(src_path, tmp)
if rewrote:
shutil.move(tmp, src_path)
return True
os.unlink(tmp)
return False
except Exception:
try:
os.unlink(tmp)
except OSError:
pass
raise