################################################################################ # # ExternalAxis - bridges a logical motorless axis to step generation on # the auxcnc ESP, so the Buildbotics planner can drive a stepper that # isn't on the AVR. # # Architecture # ------------ # The bbctrl planner (camotics gplan) handles parsing, units, modal # state, soft limits, accel ramping and S-curve timing for axes # X, Y, Z, A, B, C. The AVR has 4 motor channels (0-3) and only # generates step pulses for axes that have a motor mapped to them. # An axis with no mapped motor is fully accepted by the AVR - it # updates its internal `ex.position[axis]` and reports `p` to # the host, but no stepper turns. # # We exploit that: the W stepper is exposed to gplan as A, but no # AVR motor maps to A. The planner does all the gcode-level work # correctly (G90/G91, soft limits, accel, units, modal feed rate); # we intercept the resulting `Cmd.line` blocks in `Planner.__encode`, # strip A out, and forward the A delta to the auxcnc ESP as STEPS. # # To make gplan and State *believe* A is enabled we register a # synthetic motor (index 4) into State.vars, populated from # aux.json, with `4an=3` (axis A), `4me=1` (enabled), and the # usual velocity/accel/jerk/soft-limit vars. State.find_motor and # the snapshot projection are extended to walk index 4. Motor-4 # vars never leave the host (they're not in the AVR's schema) so # the AVR is undisturbed. # # v1 coupling: serialize. If a line has any A delta we wait for # the ESP to finish before letting subsequent commands flow. This # matches the behaviour of the previous hook-based approach (no # XYZ+A blending) but with all the planner's correctness guarantees. # # v2 could match ESP move duration to the gplan trapezoid time and # allow concurrent motion; out of scope for v1. # ################################################################################ import threading try: from queue import Queue except ImportError: from Queue import Queue # py2 just in case # Synthetic motor index used to expose the external axis to State. # The AVR has motors 0..3; we use 4 as a host-only sentinel. EXTERNAL_MOTOR_INDEX = 4 # Axis letters in their canonical order; 'a' is index 3. _AXIS_LETTERS = 'xyzabc' class ExternalAxisError(Exception): pass class ExternalAxis(object): """Bridge between Planner line blocks and AuxAxis serial RPCs. Owns no thread; runs RPC calls inline on whatever thread invokes execute_to_mm / home / abort. The Planner runs `__encode` on its own thread which is allowed to block on planner I/O, so blocking inside the interceptor is fine. Position tracking: gplan emits absolute targets in mm; the ESP counts steps relative to home_zero. We mirror the last commanded mm position so subsequent line blocks compute the correct delta. `_pos_mm` is also published as `p` so DRO updates.""" def __init__(self, ctrl, aux, axis_letter='a'): self.ctrl = ctrl self.aux = aux self.log = ctrl.log.get('ExternalAxis') self.axis_letter = (axis_letter or 'a').lower()[:1] if self.axis_letter not in _AXIS_LETTERS: raise ExternalAxisError( 'Invalid external axis letter: %r' % axis_letter) # Index in 'xyzabc' (0..5) self.axis_index = _AXIS_LETTERS.index(self.axis_letter) self._busy = threading.Event() # Last absolute mm we committed; None until first move / # homing event syncs us up. self._pos_mm = None # Single-slot worker queue: __encode posts (target_mm,) tuples # here; the worker thread runs the ESP RPC. Capacity is # intentionally bounded - if it fills it means motion is # outpacing the ESP and we should backpressure the planner. self._work_q = Queue(maxsize=64) self._stop = threading.Event() self._worker = threading.Thread( target=self._worker_loop, name='ExternalAxis-worker', daemon=True) self._worker.start() # Push synthetic motor vars into State so the planner sees # this axis as enabled with proper limits/velocity/accel. self._publish_synthetic_motor() # Also seed p so the DRO has something to render. self.ctrl.state.set(self.axis_letter + 'p', 0.0) # -------------------------------------------------------------- enabled @property def enabled(self): try: return bool(self.aux is not None and self.aux.enabled and self.aux.present) except Exception: return False # -------------------------------------------------------- configuration @property def steps_per_mm(self): try: return float(self.aux._cfg.get('steps_per_mm', 25.0)) except Exception: return 25.0 @property def dir_sign(self): try: v = int(self.aux._cfg.get('dir_sign', 1)) return -1 if v < 0 else 1 except Exception: return 1 @property def home_position_mm(self): try: return float(self.aux._cfg.get('home_position_mm', 0.0)) except Exception: return 0.0 # ------------------------------------------------------- soft limits def _soft_limits(self): """Return (min_mm, max_mm) in machine coords, or (None, None) if soft limits are disabled (max <= min).""" try: lo = float(self.aux._cfg.get('min_w', 0.0)) hi = float(self.aux._cfg.get('max_w', 0.0)) except Exception: return (None, None) if hi <= lo: return (None, None) return (lo, hi) def _check_soft_limit(self, target_abs_mm): """Raise ExternalAxisError if target_abs_mm is outside the configured soft limits. Skips the check when the axis isn't homed (matching the standard bbctrl convention that soft limits are gated by homing state) - that lets the user jog away from a stuck position before homing without false rejections. Called by both planner-driven motion (enqueue_target_mm) and UI motion (execute_to_mm), so this is the single source of truth regardless of which path triggered the move.""" # Honour the homing gate. try: homed = bool(self.aux._homed) except Exception: homed = False if not homed: return lo, hi = self._soft_limits() if lo is None: return # Use a tiny epsilon so floating-point round-trip targets # right at the boundary aren't rejected. eps = 1e-4 target = float(target_abs_mm) if target < lo - eps or target > hi + eps: raise ExternalAxisError( '%s axis target %.4f mm is outside soft limits ' '[%.3f, %.3f] mm' % ( self.axis_letter.upper(), target, lo, hi)) # ----------------------------------------------------------- conversion def mm_to_steps_delta(self, delta_mm): return int(round(float(delta_mm) * self.steps_per_mm * self.dir_sign)) def steps_to_mm(self, steps): return (float(steps) / self.steps_per_mm) * self.dir_sign # ---------------------------------------------------- synthetic motor def _publish_synthetic_motor(self): """Write motor-4 vars into State so find_motor('a') and get_axis_vector('vm') see A as a real axis. The AVR never sees these (motor index 4 is not in its var schema).""" cfg = self.aux._cfg if self.aux is not None else {} st = self.ctrl.state i = str(EXTERNAL_MOTOR_INDEX) # Axis assignment: 'an' is the 0-based axis index in xyzabc. st.set(i + 'an', self.axis_index) # Motor enabled. st.set(i + 'me', 1 if (self.aux and self.aux.enabled) else 0) # Homed flag - cleared until aux reports homed. try: homed = bool(self.aux._homed) except Exception: homed = False st.set(i + 'h', 1 if homed else 0) # Velocity / accel / jerk: the planner reads these via # state.get_axis_vector('', SCALE) which multiplies the # stored raw value by SCALE. The bbctrl convention (matching # what motors 0-3 store) is: # vm: stored in m/min, planner expects mm/min (scale 1000) # am: stored in km/min^2, planner expects mm/min^2 (scale 1e6) # jm: stored in km/min^3, planner expects mm/min^3 (scale 1e6) # Onefinity defaults for XY are vm=10, am=750, jm=1000. We # follow the same convention; aux.json exposes the values in # those user-facing units so they're directly comparable. st.set(i + 'vm', float(cfg.get('max_velocity_m_per_min', 6.0))) st.set(i + 'am', float(cfg.get('max_accel_km_per_min2', 100.0))) st.set(i + 'jm', float(cfg.get('max_jerk_km_per_min3', 500.0))) # Soft limits in machine units (mm). State.get_soft_limit_vector # returns these directly, no scaling. st.set(i + 'tn', float(cfg.get('min_w', 0.0))) st.set(i + 'tm', float(cfg.get('max_w', 0.0))) # home_position / home_travel are exposed as callbacks for # motors 0..3 (see State.__init__). Register the same lazy # callbacks for motor 4 so gplan's resolver lookup # (__home_position / __home_travel) returns the # right values for the external axis. st.set_callback( i + 'home_position', lambda name: self.home_position_mm) st.set_callback( i + 'home_travel', lambda name: float(self.aux._cfg.get('max_w', 0.0)) - self.home_position_mm) # Misc fields that other code paths might query. Defaults # mirror what the AVR pushes for motors 0-3. st.set(i + 'sa', 1.8) st.set(i + 'mi', 16) st.set(i + 'tr', 4.0) st.set(i + 'sp', 200) st.set(i + 'ic', 0.0) st.set(i + 'dc', 0.0) st.set(i + 'rv', False) st.set(i + 'tc', 1) st.set(i + 'lb', 5) st.set(i + 'ho', 0) st.set(i + 'os', 0) st.set(i + 'oa', False) st.set(i + 'lm', 8) st.set(i + 'lv', 0.1) st.set(i + 'sv', 1.688) st.set(i + 'tv', 1.997) st.set(i + 'lw', 2) # min-switch st.set(i + 'xw', 2) # max-switch st.set(i + 'ls', 0) st.set(i + 'xs', 0) st.set(i + 'df', 0) def refresh_homed(self): """Called when AuxAxis updates its homed flag. Mirrors into State so is_axis_homed('a') returns the right answer. Updates several places at once because different layers read the homed state via different keys: - synthetic motor flag: 4h (used by snapshot -> a_h) - axis-level flag: a_homed (used by State.is_axis_homed and gplan _a_homed resolver)""" try: homed = bool(self.aux._homed) except Exception: homed = False st = self.ctrl.state st.set(str(EXTERNAL_MOTOR_INDEX) + 'h', 1 if homed else 0) st.set(self.axis_letter + '_homed', bool(homed)) # ----------------------------------------------------------- line split def split_target(self, target): """Pop the external axis out of a target dict and return (target_without_ext, ext_mm_or_None). Both case variants accepted defensively.""" if not target: return target, None ax = self.axis_letter new_target = dict(target) ext_mm = new_target.pop(ax, None) if ext_mm is None: ext_mm = new_target.pop(ax.upper(), None) return new_target, ext_mm # -------------------------------------------------------- execution API def is_busy(self): return self._busy.is_set() def execute_to_mm(self, ext_mm): """Synchronously run an external move. Blocks until the ESP reports done. Used by the legacy /api/aux/move and /api/aux/jog endpoints which may want to wait. Most planner-driven motion goes through enqueue_target_mm instead, which is non-blocking. Soft limits are enforced here (not just in gplan) because the UI jog/move endpoints don't go through the planner. Updates state.p immediately on completion. For the planner-driven path that goes through enqueue_target_mm, the AVR's own ap reports drive state.p instead.""" if not self.enabled: raise ExternalAxisError( 'External axis %r not available (aux disabled or ' 'not connected)' % self.axis_letter) self._check_soft_limit(ext_mm) steps, abs_mm = self._compute_move(ext_mm) if steps == 0: self._pos_mm = abs_mm self.ctrl.state.set(self.axis_letter + 'p', self._pos_mm) return self._busy.set() try: self.aux._do_steps(steps, ignore_limits=True) self._pos_mm = abs_mm self.ctrl.state.set(self.axis_letter + 'p', self._pos_mm) finally: self._busy.clear() def enqueue_target_mm(self, ext_mm): """Legacy non-blocking variant: post a fixed-rate STEPS move to the worker queue. No longer used by Planner.__encode (which uses enqueue_line for full S-curve mirroring), but kept for UI jog endpoints that don't have planner timing data. Soft limits are enforced here (defense in depth on top of gplan).""" if not self.enabled: raise ExternalAxisError( 'External axis %r not available' % self.axis_letter) self._check_soft_limit(ext_mm) steps, abs_mm = self._compute_move(ext_mm) # Internal mirror only - drives subsequent delta computation. # state.p is left to the AVR's status reports. self._pos_mm = abs_mm if steps == 0: return self._work_q.put(('move', steps)) def enqueue_line(self, ext_mm, max_accel_mm_min2, max_jerk_mm_min3, entry_vel_mm_min, exit_vel_mm_min, times_ms): """Post a full S-curve LINE block to the ESP worker. Mirrors gplan's planned trajectory exactly (same 7-segment math, same unit system) so the ESP's move duration matches what the AVR would have produced for an A motor. Called by Planner.__encode for every line block that touches the external axis. Parameters: ext_mm: absolute target in mm (gplan target['a']) max_accel_mm_min2:from block['max-accel'] max_jerk_mm_min3: from block['max-jerk'] entry_vel_mm_min: from block['entry-vel'] (typically 0 for the first block, exit_vel of the prior block otherwise) exit_vel_mm_min: from block['exit-vel'] times_ms: 7-tuple of section durations in ms (block['times'] - the same units gplan uses) """ if not self.enabled: raise ExternalAxisError( 'External axis %r not available' % self.axis_letter) self._check_soft_limit(ext_mm) steps, abs_mm = self._compute_move(ext_mm) delta_mm = abs(abs_mm - (self._pos_mm if self._pos_mm is not None else 0.0)) # Update internal mirror; AVR drives state.p. self._pos_mm = abs_mm if steps == 0 or delta_mm <= 0: return # ms -> minutes (the unit gplan/AVR/ESP use internally for # SCurve math). times_min = tuple((t / 60000.0) if t else 0.0 for t in times_ms) self._work_q.put(('line', steps, delta_mm, float(max_accel_mm_min2), float(max_jerk_mm_min3), float(entry_vel_mm_min), float(exit_vel_mm_min), times_min)) def _compute_move(self, ext_mm): """Return (signed_steps, absolute_mm) for a target in mm. Caches first-time position from the ESP.""" if self._pos_mm is None: self._pos_mm = self._read_esp_position_mm() delta_mm = float(ext_mm) - self._pos_mm return self.mm_to_steps_delta(delta_mm), float(ext_mm) def _worker_loop(self): """Background thread that drains the work queue. RPCs to the ESP are slow (multi-second moves) and must not run on the ioloop thread. We serialize ESP commands here so multiple line-block enqueues for the external axis are processed in the order the planner emitted them.""" while not self._stop.is_set(): try: op = self._work_q.get(timeout=0.5) except Exception: continue if op is None: continue kind = op[0] try: self._busy.set() if kind == 'move': steps = op[1] self.aux._do_steps(steps, ignore_limits=True) elif kind == 'line': (_, steps, length_mm, max_accel, max_jerk, entry_vel, exit_vel, times_min) = op self.aux._do_line( steps, length_mm, max_accel, max_jerk, entry_vel, exit_vel, times_min, ignore_limits=True) elif kind == 'home': self.aux.home() # _pos_mm and DRO updated by the caller's enqueue. except Exception as e: self.log.error('External axis worker failed on %s: %s' % (kind, e)) finally: self._busy.clear() self._work_q.task_done() def wait_idle(self, timeout=None): """Block until the worker queue is empty. Used by callers that need post-motion state to be settled (e.g. homing, stop/abort handlers).""" try: # Queue.join blocks until task_done has been called for # every item put. It does not honour a timeout, so we # poll instead when one is requested. if timeout is None: self._work_q.join() return True import time deadline = time.time() + float(timeout) while time.time() < deadline: if self._work_q.unfinished_tasks == 0: return True time.sleep(0.05) return False except Exception: return False def close(self): self._stop.set() try: self._work_q.put(None, block=False) except Exception: pass def home(self): """Run the ESP homing cycle and sync our recorded position to the configured home_position_mm. Blocks; called from Mach.home (which already runs synchronously per axis).""" if not self.enabled: raise ExternalAxisError( 'External axis %r not available' % self.axis_letter) # Drain pending moves so we don't home into stale work. self.wait_idle(timeout=30.0) self._busy.set() try: self.aux.home() self._pos_mm = self.home_position_mm self.ctrl.state.set(self.axis_letter + 'p', self._pos_mm) self.refresh_homed() finally: self._busy.clear() def abort(self): """Cancel the ESP move and drop pending queued work. Caller (estop / stop handler) is responsible for the planner-side cleanup.""" try: if self.aux is not None: self.aux.abort() finally: self._busy.clear() # Drain any pending ops so resume after an abort doesn't # replay stale targets. try: while True: self._work_q.get_nowait() self._work_q.task_done() except Exception: pass # ------------------------------------------------------- ESP introspection def _read_esp_position_mm(self): """Convert AuxAxis._pos_steps mirror to mm. Falls back to 0.""" try: steps = int(self.aux._pos_steps) except Exception: steps = 0 return self.steps_to_mm(steps) # ---------------------------------------------------------- DRO update def sync_dro(self): """Push the current position to State as p so the DRO reflects what we believe gplan/ESP agreed on. Called after moves; also safe to call from external code.""" if self._pos_mm is None: return self.ctrl.state.set(self.axis_letter + 'p', self._pos_mm)