Files
onefinity-firmware/src/py/bbctrl/Planner.py
Henrik Muehe 8e6e72a8b9 Z-A coupling: auto-coordinate A on jogs and MDI
Match the file-preprocessor behaviour for live operator input. When a
Z-down jog or MDI line would push (A-Z) above the safe band, append
the matching A delta to the same line so the planner runs Z and A
together. Same direction-aware refusal: only error when the operator
explicitly asks A to move *up* (delta > 0) past the bound, or when
the required A would violate A's soft minimum.

Implementation:
  * ExternalAxis.coordinate_mdi rewrites a multi-line MDI burst,
    tracking G90/G91 modal across lines (jogs always emit
    M70/G91/G0/M72; standard MDI defaults to G90). Z and A targets
    are computed in machine coords using offset_z and offset_a so
    the work-coord A token we emit is consistent with the operator's
    frame.
  * The 'A0' the jog UI emits for axes that aren't moving is treated
    as 'no A intent' (G91 delta of zero) and freely overridden.
  * Hooked into Mach.mdi after the existing ATC rewrite. On
    ExternalAxisError the burst is dropped with a user message; the
    planner check downstream still fires as defense in depth.
  * Planner.__encode also catches ExternalAxisError now (vs
    bricking on uncaught) - logs to the operator messages list and
    halts the cycle cleanly so subsequent jogs work.
  * check_coupling itself is now improvement-aware: only refuses
    moves that worsen an existing violation. Pure XY jogs and
    Z-up/A-down recovery moves pass even when (A-Z) is currently
    above the bound.

Tested locally with synthetic MDI: small Z jog within band, Z jog
across the boundary (auto-injects A delta), G90 MDI G0 Z-50
(appends A106), explicit A-lift while Z deep (refuses), pure XY
jog (unchanged), G91 A-down (unchanged), G90 G0 A0 with
offset_a=134 (refuses as lift to home).
2026-05-03 14:47:44 +02:00

575 lines
22 KiB
Python

################################################################################
# #
# This file is part of the Buildbotics firmware. #
# #
# Copyright (c) 2015 - 2018, Buildbotics LLC #
# All rights reserved. #
# #
# This file ("the software") is free software: you can redistribute it #
# and/or modify it under the terms of the GNU General Public License, #
# version 2 as published by the Free Software Foundation. You should #
# have received a copy of the GNU General Public License, version 2 #
# along with the software. If not, see <http://www.gnu.org/licenses/>. #
# #
# The software is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
# Lesser General Public License for more details. #
# #
# You should have received a copy of the GNU Lesser General Public #
# License along with the software. If not, see #
# <http://www.gnu.org/licenses/>. #
# #
# For information regarding this software email: #
# "Joseph Coffland" <joseph@buildbotics.com> #
# #
################################################################################
import json
import math
import re
import time
from collections import deque
# camotics.gplan is heavy (loads a C++ extension that pulls in libstdc++,
# boost::python, etc.). Defer it: bbctrl can listen on HTTP and serve
# the UI without ever touching the planner. Lazy-load the first time
# Planner.init() runs, which is when the user actually queues motion.
gplan = None
def _load_gplan():
global gplan
if gplan is None:
try:
import bbctrl.Trace as _T
with _T.span('imports.camotics_gplan'):
import camotics.gplan as _gplan # pylint: disable=no-name-in-module,import-error
except Exception:
import camotics.gplan as _gplan # pylint: disable=no-name-in-module,import-error
gplan = _gplan
return gplan
import bbctrl.Cmd as Cmd
from bbctrl.CommandQueue import CommandQueue
reLogLine = re.compile(
r'^(?P<level>[A-Z])[0-9 ]:'
r'((?P<file>[^:]+):)?'
r'((?P<line>\d+):)?'
r'((?P<column>\d+):)?'
r'(?P<msg>.*)$')
def log_floats(o):
if isinstance(o, float): return round(o, 2)
if isinstance(o, dict): return {k: log_floats(v) for k, v in o.items()}
if isinstance(o, (list, tuple)): return [log_floats(x) for x in o]
return o
def log_json(o): return json.dumps(log_floats(o))
class Planner():
def __init__(self, ctrl):
self.ctrl = ctrl
self.log = ctrl.log.get('Planner')
self.cmdq = CommandQueue(ctrl)
self.planner = None
self._position_dirty = False
self.where = ''
ctrl.state.add_listener(self._update)
self.reset(stop = False)
self._report_time()
def is_busy(self): return self.is_running() or self.cmdq.is_active()
def is_running(self): return self.planner.is_running()
def position_change(self): self._position_dirty = True
def _sync_position(self, force = False):
if not force and not self._position_dirty: return
self._position_dirty = False
self.planner.set_position(self.ctrl.state.get_position())
def get_config(self, mdi, with_limits):
state = self.ctrl.state
config = self.ctrl.config
is_pwm = config.get('tool-type') == 'PWM Spindle'
deviation = config.get('max-deviation')
cfg = {
# NOTE Must get current units not configured default units
'default-units': 'METRIC' if state.get('metric') else 'IMPERIAL',
'max-vel': state.get_axis_vector('vm', 1000),
'max-accel': state.get_axis_vector('am', 1000000),
'max-jerk': state.get_axis_vector('jm', 1000000),
'rapid-auto-off': config.get('rapid-auto-off') and is_pwm,
'max-blend-error': deviation,
'max-merge-error': deviation,
'max-arc-error': deviation / 10,
'junction-accel': config.get('junction-accel'),
}
# We place an upper limit of 1000 km/min^3 on jerk for MDI movements
if mdi:
for axis in 'xyzabc':
if axis in cfg['max-jerk']:
cfg['max-jerk'][axis] = min(1000 * 1000000, cfg['max-jerk'][axis])
if with_limits:
minLimit = state.get_soft_limit_vector('tn', -math.inf)
maxLimit = state.get_soft_limit_vector('tm', math.inf)
# If max <= min then no limit
for axis in 'xyzabc':
if maxLimit[axis] <= minLimit[axis]:
minLimit[axis], maxLimit[axis] = -math.inf, math.inf
cfg['min-soft-limit'] = minLimit
cfg['max-soft-limit'] = maxLimit
if not mdi:
program_start = config.get('program-start')
if program_start: cfg['program-start'] = program_start
overrides = {}
tool_change = config.get('tool-change')
if tool_change: overrides['M6'] = tool_change
program_end = config.get('program-end')
if program_end: overrides['M2'] = program_end
if overrides: cfg['overrides'] = overrides
self.log.info('Config:' + log_json(cfg))
return cfg
def _update(self, update):
if 'id' in update:
id = update['id']
self.planner.set_active(id) # Release planner commands
self.cmdq.release(id) # Synchronize planner variables
def _get_var_cb(self, name, units):
value = 0
if len(name) and name[0] == '_':
value = self.ctrl.state.get(name[1:], 0)
try:
float(value)
if units == 'IMPERIAL': value /= 25.4 # Assume metric
except ValueError: value = 0
self.log.info('Get: %s=%s (units=%s)' % (name, value, units))
return value
def _log_cb(self, line):
line = line.strip()
m = reLogLine.match(line)
if not m: return
level = m.group('level')
msg = m.group('msg')
filename = m.group('file')
line = m.group('line')
column = m.group('column')
where = ':'.join(filter(None.__ne__, [filename, line, column]))
if line is not None: line = int(line)
if column is not None: column = int(column)
if level == 'I': self.log.info (msg, where = where)
elif level == 'D': self.log.debug (msg, where = where)
elif level == 'W': self.log.warning (msg, where = where)
elif level == 'E': self.log.error (msg, where = where)
else: self.log.error('Could not parse planner log line: ' + line)
def _add_message(self, text):
line = self.ctrl.state.get('line', 0)
if 0 <= line: where = '%s:%d' % (self.where, line)
else: where = self.where
# HOOK:<event>:<data> messages are an internal IPC channel
# between the gcode preprocessor and Hooks; bypass the user
# message list so they don't surface as popups, and dispatch
# the hook directly. Routing through state.messages would
# only deliver it after the 0.25s state-change debounce, by
# which point we'd have to keep it visible to ensure Hooks
# could see it.
hooks = getattr(self.ctrl, 'hooks', None)
if hooks is not None and hooks.dispatch_hook_message(text):
self.log.info('HOOK msg: %s' % text, where = where)
return
self.ctrl.state.add_message(text)
self.log.message(text, where = where)
def _enqueue_set_cmd(self, id, name, value):
self.log.info('set(#%d, %s, %s)', id, name, value)
self.cmdq.enqueue(id, self.ctrl.state.set, name, value)
def _report_time(self):
state = self.ctrl.state.get('xx', '')
if state in ('STOPPING', 'RUNNING') and self.move_start:
delta = time.time() - self.move_start
if self.move_time < delta: delta = self.move_time
plan_time = self.current_plan_time + delta
self.ctrl.state.set('plan_time', round(plan_time))
elif state != 'HOLDING': self.ctrl.state.set('plan_time', 0)
self.ctrl.ioloop.call_later(1, self._report_time)
def _plan_time_restart(self):
self.plan_time = self.ctrl.state.get('plan_time', 0)
def _update_time(self, plan_time, move_time):
self.current_plan_time = plan_time
self.move_time = move_time
self.move_start = time.time()
def _enqueue_line_time(self, block):
if block.get('first', False) or block.get('seeking', False): return
# Sum move times
move_time = sum(block['times']) / 1000 # To seconds
self.cmdq.enqueue(block['id'], self._update_time, self.plan_time,
move_time)
self.plan_time += move_time
def _enqueue_dwell_time(self, block):
self.cmdq.enqueue(block['id'], self._update_time, self.plan_time,
block['seconds'])
self.plan_time += block['seconds']
def __encode(self, block):
type, id = block['type'], block['id']
if type != 'set': self.log.info('Cmd:' + log_json(block))
if type == 'line':
# Z-A coupling check: every line block that touches Z (or
# A) is validated against the projected (A,Z) machine
# pair. The ExternalAxis check is improvement-aware: it
# only refuses moves that worsen an existing violation
# or push a healthy state into one. So pure-XY jogs and
# recovery moves (Z up, A down) are not rejected even
# when (A-Z) is currently above the bound.
ext_check = getattr(self.ctrl, 'ext_axis', None)
if ext_check is not None:
from bbctrl.ExternalAxis import ExternalAxisError
target = block.get('target') or {}
z_target = target.get('z')
if z_target is None: z_target = target.get('Z')
a_letter = ext_check.axis_letter
a_target = target.get(a_letter)
if a_target is None:
a_target = target.get(a_letter.upper())
if z_target is not None or a_target is not None:
try:
ext_check.check_coupling(
target_a_machine=a_target,
target_z_machine=z_target)
except ExternalAxisError as e:
# Convert the raw error into a clean abort:
# surface the message to the operator, stop
# the cycle, and skip this block. Returning
# None drops the block from the AVR queue;
# mach.stop() halts further planner output
# so the rest of an offending program can't
# leak through. The planner stays usable
# for new MDI / jog commands.
self.log.warning('Z-A coupling refused: %s' % e)
try:
self.ctrl.state.add_message(
'Z-A coupling refused move: ' + str(e))
except Exception: pass
try:
self.ctrl.mach.stop()
except Exception: pass
return None
ext = self._external_axis_for_line(block)
if ext is not None:
# Side effect: enqueue the ESP move on the external-
# axis worker. The AVR still receives the full target
# (including A) so ex.position[A] tracks gplan; no
# motor steps for A because no motor maps to it.
self._dispatch_external_line(block, ext)
self._enqueue_line_time(block)
return Cmd.line(block['target'], block['exit-vel'],
block['max-accel'], block['max-jerk'],
block['times'], block.get('speeds', []))
if type == 'set':
name, value = block['name'], block['value']
if name == 'message':
self.cmdq.enqueue(id, self._add_message, value)
if name in ['line', 'tool']: self._enqueue_set_cmd(id, name, value)
if name == 'speed':
self._enqueue_set_cmd(id, name, value)
return Cmd.speed(value)
if len(name) and name[0] == '_':
# Don't queue axis positions, can be triggered by new position
if len(name) != 2 or name[1] not in 'xyzabc':
self._enqueue_set_cmd(id, name[1:], value)
if name == '_feed': # Must come after _enqueue_set_cmd() above
return Cmd.set_sync('if', 1 / value if value else 0)
if name[0:1] == '_' and name[1:2] in 'xyzabc':
if name[2:] == '_home': return Cmd.set_axis(name[1], value)
if name[2:] == '_homed':
motor = self.ctrl.state.find_motor(name[1])
# Synthetic external motor (index 4) doesn't exist
# on the AVR; mirror the homed flag in State only.
from bbctrl.ExternalAxis import EXTERNAL_MOTOR_INDEX
if motor is not None and motor < EXTERNAL_MOTOR_INDEX:
return Cmd.set_sync('%dh' % motor, value)
if motor == EXTERNAL_MOTOR_INDEX:
# Update synthetic motor flag and the<axis>_homed
# projection consumed by the DRO.
self.cmdq.enqueue(
id, self.ctrl.state.set,
'%dh' % EXTERNAL_MOTOR_INDEX, value)
return
if type == 'input':
# TODO handle timeout
self.planner.synchronize(0) # TODO Fix this
return Cmd.input(block['port'], block['mode'], block['timeout'])
if type == 'output':
return Cmd.output(block['port'], int(float(block['value'])))
if type == 'dwell':
self._enqueue_dwell_time(block)
return Cmd.dwell(block['seconds'])
if type == 'pause': return Cmd.pause(block['pause-type'])
if type == 'seek':
sw = self.ctrl.state.get_switch_id(block['switch'])
return Cmd.seek(sw, block['active'], block['error'])
if type == 'end': return '' # Sends id
raise Exception('Unknown planner command "%s"' % type)
def _encode(self, block):
cmd = self.__encode(block)
if cmd is not None:
self.cmdq.enqueue(block['id'], None)
return Cmd.set_sync('id', block['id']) + '\n' + cmd
def reset_times(self):
self.move_start = 0
self.move_time = 0
self.plan_time = 0
self.current_plan_time = 0
def close(self):
# Release planner callbacks
if self.planner is not None:
self.planner.set_resolver(None)
self.planner.set_logger(None)
# ----------------------------------------------- external-axis routing
#
# When an axis is exposed to gplan via a synthetic motor (no AVR
# channel), we need to fork its motion off to the ESP at line
# encode time and let the rest of the line proceed to the AVR.
# The split is done here rather than in gplan because gplan
# treats all six axes uniformly and just emits target dicts; we
# don't want to teach it about the ESP.
def _external_axis_for_line(self, block):
"""Return the ExternalAxis instance for whichever axis in
block['target'] is external, or None."""
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is None or not ext.enabled:
return None
target = block.get('target') or {}
if ext.axis_letter in target or ext.axis_letter.upper() in target:
return ext
return None
def _dispatch_external_line(self, block, ext):
"""Side-effect: enqueue the ESP move on the external-axis
worker thread (non-blocking). Returns the block (possibly
unchanged) for the AVR.
We do NOT strip the external axis target from the AVR line.
The AVR's exec_move_to_target updates ex.position[axis] for
every axis in the target dict regardless of motor mapping,
and reports it back via the `p` indexed var. Leaving A in
the target keeps state.ap in sync with gplan's idea of A
(otherwise the AVR's stale ex.position[A] would clobber
ExternalAxis's state.ap=N update on the next status report).
The AVR doesn't step any motor for the external axis (no
motor maps to it) - so leaving A in the target is
physically a no-op for the steppers, while keeping the
host-side state coherent.
We pass the full S-curve parameters to the ESP so its move
duration matches the AVR's exactly. The ESP runs the same
7-segment jerk-limited trajectory the AVR would have run
if A had been a real motor."""
target = block.get('target') or {}
# Read the external target (case-insensitive) without modifying
# the dict so the AVR still sees A.
ext_mm = target.get(ext.axis_letter)
if ext_mm is None:
ext_mm = target.get(ext.axis_letter.upper())
try:
ext.enqueue_line(
ext_mm,
block.get('max-accel', 0.0),
block.get('max-jerk', 0.0),
block.get('entry-vel', 0.0),
block.get('exit-vel', 0.0),
block.get('times', [0]*7),
)
except Exception as e:
self.log.error('External axis enqueue failed: %s' % e)
raise
return block
def reset(self, *args, **kwargs):
stop = kwargs.get('stop', True)
if stop:
self.ctrl.mach.stop()
self.planner = _load_gplan().Planner()
self.planner.set_resolver(self._get_var_cb)
# TODO logger is global and will not work correctly in demo mode
self.planner.set_logger(self._log_cb, 1, 'LinePlanner:3')
self._position_dirty = True
self.cmdq.clear()
self.reset_times()
# Drain the external-axis worker queue and force the next
# move to re-sync position from the ESP (since State.reset
# below will zero <axis>p which makes ext._pos_mm stale).
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is not None:
try: ext.abort()
except Exception: pass
try: ext._pos_mm = None
except Exception: pass
resetState = kwargs.get('resetState', True)
if resetState:
self.ctrl.state.reset()
def mdi(self, cmd, with_limits = True):
self.where = '<mdi>'
self.log.info('MDI:' + cmd)
self._sync_position()
self.planner.load_string(cmd, self.get_config(True, with_limits))
self.reset_times()
def load(self, path):
self.where = path
path = self.ctrl.get_path('upload', path)
self.log.info('GCode:' + path)
# Rewrite ATC M-codes (M100..M103) before gplan sees them.
# preprocess_file is a no-op when no rewriting is needed and
# idempotent when run twice on the same file, so this is
# safe on every load. W tokens are no longer rewritten - the
# auxcnc stepper is now exposed as a virtual A axis and gcode
# should use A directly.
try:
from bbctrl.AuxPreprocessor import preprocess_file
ext = getattr(self.ctrl, 'ext_axis', None)
coupling = (ext.coupling_for_preprocessor()
if ext is not None else None)
if preprocess_file(path, log=self.log, coupling=coupling):
self.log.info('Rewrote (ATC / Z-A coupling) in %s' % path)
except Exception:
self.log.exception('Aux preprocess at load failed; '
'attempting to load file unchanged')
self._sync_position()
self.planner.load(path, self.get_config(False, True))
self.reset_times()
def stop(self):
try:
self.planner.stop()
self.cmdq.clear()
except:
self.log.exception('Internal error: Planner stop')
self.reset()
def restart(self):
try:
id = self.ctrl.state.get('id')
position = self.ctrl.state.get_position()
self.log.info('Planner restart: %d %s' % (id, log_json(position)))
self.cmdq.clear()
self.cmdq.release(id)
self._plan_time_restart()
self.planner.restart(id, position)
except:
self.log.exception('Internal error: Planner restart')
self.stop()
def next(self):
try:
while self.planner.has_more():
cmd = self.planner.next()
cmd = self._encode(cmd)
if cmd is not None: return cmd
except RuntimeError as e:
# Pass on the planner message
self.log.error(str(e))
self.stop()
except:
self.log.exception('Internal error: Planner next')
self.stop()