################################################################################ # # AuxAxis - W-axis serial driver for the auxcnc ESP32 controller # # Owns /dev/ttyUSB0 (or whatever serial.port is configured to). Provides # blocking RPCs for use from a hook thread. Maintains: # # - aux_present : True if serial is open and we've seen a boot banner # - aux_homed : True if we've successfully run HOME since last reset # - aux_pos : current logical position in mm (from ESP step counter # * (1 / steps_per_mm * dir_sign)) # # Real-time decisions (limit switch monitoring, step pulse generation) live # on the ESP. The host is responsible for units, soft limits, and tracking # whether we've ever boot-cycled the ESP since last home. # ################################################################################ import os import json import time import threading import traceback try: import serial except ImportError: serial = None # Default config; overridden by ./aux.json or ctrl.config. DEFAULTS = { 'enabled': False, 'port': '/dev/ttyUSB0', 'baud': 115200, 'steps_per_mm': 80.0, # logical steps per mm of axis travel 'dir_sign': 1, # +1 or -1: maps logical+ to motor+ steps # Logical axis letter exposed to gplan. The auxcnc ESP stepper # is presented to the planner as this axis (default 'a' = standard # 4th axis). gcode uses A for moves; the host ExternalAxis layer # forks A motion to the ESP transparently. 'axis_letter': 'a', 'min_w': 0.0, # soft limit min (mm), exposed as 4tn 'max_w': 100.0, # soft limit max (mm), exposed as 4tm # Per-axis kinematic limits used to populate the planner's config. # Units match the bbctrl/onefinity per-motor convention so the # values are directly comparable to motors 0-3: # max_velocity_m_per_min m/min (planner sees * 1000 = mm/min) # max_accel_km_per_min2 km/min2 (planner sees * 1e6 = mm/min2) # max_jerk_km_per_min3 km/min3 (planner sees * 1e6 = mm/min3) 'max_velocity_m_per_min': 6.0, 'max_accel_km_per_min2': 100.0, 'max_jerk_km_per_min3': 500.0, # Informational only - rate caps that actually clamp the move # are on the ESP via step_max_sps below. 'max_feed_mm_min': 600.0, 'home_dir': '-', # which direction is "toward limit" (host's view) 'home_position_mm': 0.0, # mm value to assign at home # ESP-side homing rates (steps/sec). Pushed via HOMECFG on connect. # Speeds tuned for a typical 25 steps/mm aux drive (so 1 step = # 0.04 mm). With the limit-aware ESP firmware these values give # a brisk seek (100 mm/s), enough backoff to clear the switch # hysteresis (16 mm), and a slow re-engage (10 mm/s) that's # accurate without being painfully slow on a longer axis. 'home_fast_sps': 2500, # ≈ 100 mm/s @ 25 steps/mm 'home_slow_sps': 250, # ≈ 10 mm/s 'home_backoff_steps': 400, # ≈ 16 mm 'home_maxtravel_steps': 200000, 'step_max_sps': 4000, # ≈ 160 mm/s normal-move cap 'step_accel_sps2': 12000, 'step_start_sps': 200, 'limit_low': True, } class AuxAxisError(Exception): pass class AuxAxis(object): def __init__(self, ctrl): self.ctrl = ctrl self.log = ctrl.log.get('AuxAxis') self._cfg = dict(DEFAULTS) self._load_config() self._sp = None self._sp_lock = threading.Lock() # serial write/RPC serialization self._rx_lock = threading.Lock() # read-line buffer access self._reader_thread = None self._stop = threading.Event() # Pending replies waiting for a [topic] line. Single-slot since we # serialize RPCs via _sp_lock. self._pending_topics = [] self._pending_replies = [] self._pending_cv = threading.Condition() # Async lines that aren't replies (e.g. logs) are simply logged. self._present = False self._homed = False self._pos_steps = 0 # ESP step counter mirror # Publish initial state self._publish_state() if not self._cfg['enabled']: self.log.info('Aux axis disabled in config') return if serial is None: self.log.error('pyserial not available; aux axis disabled') return self._open() # ------------------------------------------------------------------ config def _config_path(self): return self.ctrl.get_path(filename='aux.json') def _load_config(self): path = self._config_path() if os.path.exists(path): try: with open(path) as f: user = json.load(f) # Be permissive; ignore unknown keys. for k, v in user.items(): if k in self._cfg: self._cfg[k] = v self.log.info('Loaded aux config from %s' % path) except Exception: self.log.error('Failed to read aux.json: %s' % traceback.format_exc()) def save_config(self, cfg): merged = dict(DEFAULTS) for k, v in cfg.items(): if k in DEFAULTS: merged[k] = v path = self._config_path() with open(path, 'w') as f: json.dump(merged, f, indent=2) self._cfg = merged self.log.info('Saved aux config') # Push the relevant pieces to the ESP if connected. if self._present: try: self._push_homecfg() except Exception as e: self.log.warning('Could not push HOMECFG after save: %s' % e) def get_config(self): return dict(self._cfg) # ------------------------------------------------------------------ public @property def enabled(self): return bool(self._cfg.get('enabled', False)) @property def present(self): return self._present @property def homed(self): return self._homed @property def position_mm(self): return self._steps_to_mm(self._pos_steps) def set_state_observer(self, fn): """Register a callback invoked after every _publish_state. Used by ExternalAxis to mirror the homed flag into State.""" self._state_observer = fn def home(self): """Run the homing cycle on the ESP. Blocks until done. Raises on failure. Updates aux_homed and aux_pos. The ESP's home_zero is pre-loaded via HOMECFG so when the cycle completes the step counter already corresponds to home_position_mm. That way the homed-state survives a bbctrl restart correctly (we don't need a post-home WPOS write, which would clear HOMED).""" self._require_present() # Make sure home_zero on the ESP matches our current # home_position_mm in case the user just edited config. self._push_homecfg() line = self._rpc('HOME', topic='home', timeout=120.0) # line is the body after '[home] '. Only terminal lines use # the [home] topic now (done / failed); progress is [home_log]. if line.startswith('done'): self._pos_steps = self._parse_kv_int(line, 'pos', 0) self._homed = True self._publish_state() return # failure reason = line.split('reason=', 1)[1] if 'reason=' in line else line raise AuxAxisError('Homing failed: %s' % reason) def move_abs_mm(self, target_mm): """Move to absolute logical W position (mm). Blocks until done.""" self._require_present() self._check_limits(target_mm) target_steps = self._mm_to_steps(target_mm) delta = target_steps - self._pos_steps if delta == 0: return self._do_steps(delta) def move_rel_mm(self, delta_mm): """Move by delta mm relative to current position. Blocks until done.""" self._require_present() target_mm = self.position_mm + delta_mm self._check_limits(target_mm) target_steps = self._mm_to_steps(target_mm) delta = target_steps - self._pos_steps if delta == 0: return self._do_steps(delta) def set_position_mm(self, mm): """Set current W to without moving (G92-style for W).""" self._require_present() steps = self._mm_to_steps(mm) self._rpc('WPOS %d' % steps, topic='ok', timeout=2.0) self._pos_steps = steps # WPOS clears homed on the ESP; mirror it. self._homed = False self._publish_state() def jog_steps(self, steps): """Raw step move bypassing mm conversion and soft limits. Used by manual jog UI when axis isn't homed yet.""" self._require_present() if steps == 0: return self._do_steps(int(steps), ignore_limits=True) def abort(self): """Cancel any running ESP motion immediately.""" if not self._present: return try: # Don't take the RPC lock; ABORT must be able to interrupt. self._send_raw('ABORT') except Exception as e: self.log.warning('ABORT send failed: %s' % e) # ---------------------------------------------------------- ATC commands # # The auxcnc firmware drives an AMB 1050 FME-W DI tool changer via # three pneumatic valves on relays 1-3. The ESP runs the timed # sequences itself; the host just kicks them off and waits for the # terminal reply. def atc_droptool(self, timeout=30.0): """Eject the current tool. Opens the collet (V1), oscillates the ejector (V2), then re-clamps with a bleed cycle. Blocks until the ESP reports done. Raises on failure.""" self._require_present() line = self._rpc('DROPTOOL', topic='droptool', timeout=timeout) if line.startswith('done'): return reason = line.split('reason=', 1)[1] if 'reason=' in line else line raise AuxAxisError('DROPTOOL failed: %s' % reason) def atc_grabtool(self, timeout=30.0): """Pick up a tool that's already been seated by the operator. Opens V1 (releases the collet), waits for the operator to insert the holder, then re-clamps with a bleed cycle. Blocks.""" self._require_present() line = self._rpc('GRABTOOL', topic='grabtool', timeout=timeout) if line.startswith('done'): return reason = line.split('reason=', 1)[1] if 'reason=' in line else line raise AuxAxisError('GRABTOOL failed: %s' % reason) def atc_release(self, timeout=5.0): """Manually open the collet (release-only, no clamp). Use atc_clamp() afterwards once the new holder is in place.""" self._require_present() line = self._rpc('RELEASE', topic='release', timeout=timeout) if line.startswith('done'): return reason = line.split('reason=', 1)[1] if 'reason=' in line else line raise AuxAxisError('RELEASE failed: %s' % reason) def atc_clamp(self, timeout=10.0): """Manually clamp the collet (run a full bleed cycle). Pairs with atc_release() for two-step manual tool changes.""" self._require_present() line = self._rpc('CLAMP', topic='clamp', timeout=timeout) if line.startswith('done'): return reason = line.split('reason=', 1)[1] if 'reason=' in line else line raise AuxAxisError('CLAMP failed: %s' % reason) def close(self): self._stop.set() try: if self._sp is not None: self._sp.close() except Exception: pass # ------------------------------------------------------------------ guts def _require_present(self): if not self.enabled: raise AuxAxisError('Aux axis disabled') if not self._present: raise AuxAxisError('Aux axis not connected') def _check_limits(self, target_mm): lo = float(self._cfg['min_w']) hi = float(self._cfg['max_w']) if hi <= lo: return # no limits if target_mm < lo - 1e-6 or target_mm > hi + 1e-6: raise AuxAxisError( 'W=%.3f out of soft limits [%.3f, %.3f]' % (target_mm, lo, hi)) def _mm_to_steps(self, mm): spm = float(self._cfg['steps_per_mm']) sign = 1 if int(self._cfg.get('dir_sign', 1)) >= 0 else -1 return int(round(mm * spm * sign)) def _steps_to_mm(self, steps): spm = float(self._cfg['steps_per_mm']) or 1.0 sign = 1 if int(self._cfg.get('dir_sign', 1)) >= 0 else -1 return (steps / spm) * sign def _do_steps(self, signed_count, ignore_limits=False): max_rate = int(self._cfg['step_max_sps']) accel = int(self._cfg['step_accel_sps2']) safe_flag = 0 if ignore_limits else 1 cmd = 'STEPS %d maxrate=%d accel=%d safe=%d' % ( signed_count, max_rate, accel, safe_flag) line = self._rpc(cmd, topic='step', timeout=300.0) # line: "done count=N pos=P limit=L" or "aborted count=N pos=P [reason=...]" if line.startswith('done'): self._pos_steps = self._parse_kv_int(line, 'pos', self._pos_steps) self._publish_state() return # aborted self._pos_steps = self._parse_kv_int(line, 'pos', self._pos_steps) self._publish_state() reason = self._parse_kv_str(line, 'reason') if reason == 'limit': self._homed = False raise AuxAxisError('W move aborted by limit switch') raise AuxAxisError('W move aborted: %s' % line) # ------------------------------------------------------------ serial I/O def _open(self): port = self._cfg['port'] baud = int(self._cfg['baud']) try: self._sp = serial.Serial(port, baud, timeout=0.2) except Exception as e: self.log.error('Could not open %s: %s' % (port, e)) self._sp = None return self.log.info('Opened %s @ %d' % (port, baud)) self._reader_thread = threading.Thread( target=self._reader_loop, name='AuxAxis-rx', daemon=True) self._reader_thread.start() # Give the ESP a moment to settle, then push HOMECFG and query state. # This runs in a background thread to avoid blocking startup. threading.Thread(target=self._on_connect, daemon=True).start() def _on_connect(self): time.sleep(0.5) try: self._push_homecfg() self._refresh_state() except Exception as e: self.log.warning('Aux post-connect setup failed: %s' % e) def _push_homecfg(self): c = self._cfg zero_steps = self._mm_to_steps(c['home_position_mm']) cmd = ('HOMECFG dir=%s fast=%d slow=%d backoff=%d maxtravel=%d ' 'zero=%d accel=%d step_max=%d step_start=%d limit_low=%d') % ( c['home_dir'], int(c['home_fast_sps']), int(c['home_slow_sps']), int(c['home_backoff_steps']), int(c['home_maxtravel_steps']), int(zero_steps), int(c['step_accel_sps2']), int(c['step_max_sps']), int(c['step_start_sps']), 1 if c['limit_low'] else 0, ) self._rpc(cmd, topic='homecfg', timeout=3.0) def _refresh_state(self): try: r = self._rpc('WPOS?', topic='wpos', timeout=2.0) self._pos_steps = int(r.strip()) except Exception: pass # Force the host to start unhomed regardless of what the ESP # remembers from a prior session. The ESP's homed flag survives # bbctrl restarts (since the ESP itself wasn't power-cycled), # but the host's planner offsets and DRO position get reset to # zero on bbctrl boot. Trusting the ESP's homed flag would mean # the user thinks A is homed at the wrong work-coord origin # (offset_a=0 but ESP physically at home_position_mm). Sending # UNHOME forces the user to re-home explicitly, which sets up # the offset and gplan state correctly via the homing path in # Mach.home. try: self._rpc('UNHOME', topic='ok', timeout=2.0) self._homed = False except Exception: # Fall back to whatever HOMED? says - but treat any # missing UNHOME support as "trust ESP's flag" so we # don't break older firmware. try: r = self._rpc('HOMED?', topic='homed', timeout=2.0) self._homed = (r.strip() == '1') except Exception: pass self._publish_state() def _reader_loop(self): buf = b'' while not self._stop.is_set(): sp = self._sp if sp is None: time.sleep(0.5) continue try: chunk = sp.read(256) except Exception as e: self.log.warning('Aux serial read error: %s' % e) time.sleep(0.5) continue if not chunk: continue buf += chunk while True: nl = buf.find(b'\n') if nl < 0: break line = buf[:nl].rstrip(b'\r').decode('utf-8', errors='replace') buf = buf[nl+1:] self._on_line(line) def _on_line(self, line): if not line: return # Boot banner -> reset homed flag. if line.startswith('[boot]'): self.log.warning('Aux ESP booted: %s' % line) self._homed = False self._present = True self._publish_state() self.ctrl.state.add_message( 'W axis controller restarted - re-home before use') return # Topic dispatch: "[topic] body..." if line.startswith('[') and ']' in line: rb = line.index(']') topic = line[1:rb] body = line[rb+1:].lstrip() # Mark present on first known topic. if not self._present: self._present = True self._publish_state() # Match against the head of the pending queue. with self._pending_cv: if (self._pending_topics and topic in self._pending_topics[0]): # Pop and deliver self._pending_topics.pop(0) self._pending_replies.append(body) self._pending_cv.notify_all() return # Async informational line; just log. self.log.info('aux: %s' % line) else: self.log.info('aux: %s' % line) def _send_raw(self, cmd): sp = self._sp if sp is None: raise AuxAxisError('Serial not open') if not cmd.endswith('\n'): cmd = cmd + '\n' sp.write(cmd.encode('utf-8')) sp.flush() def _rpc(self, cmd, topic, timeout=5.0): """Send `cmd`, wait for a reply line whose topic is in `topic`. topic may be a single string or a tuple/list of acceptable topics (e.g. ('home', 'err')).""" if isinstance(topic, str): topics = (topic, 'err') else: topics = tuple(topic) + ('err',) with self._sp_lock: with self._pending_cv: self._pending_topics.append(topics) self._pending_replies = [] # reset self.log.info('aux >> %s' % cmd.strip()) self._send_raw(cmd) deadline = time.time() + timeout with self._pending_cv: while not self._pending_replies: remaining = deadline - time.time() if remaining <= 0: # Drop the pending slot so we don't capture a # late reply meant for the next caller. try: self._pending_topics.remove(topics) except ValueError: pass raise AuxAxisError( 'Timeout waiting for %s reply to "%s"' % (topics, cmd.strip())) self._pending_cv.wait(timeout=remaining) reply = self._pending_replies.pop(0) self.log.info('aux << %s' % reply) if reply.startswith('err') or reply.startswith('error'): raise AuxAxisError('ESP error: %s' % reply) return reply @staticmethod def _parse_kv_int(line, key, default=0): # Parse "key=N" (signed integer) out of a line. for tok in line.split(): if tok.startswith(key + '='): try: return int(tok.split('=', 1)[1]) except ValueError: return default return default @staticmethod def _parse_kv_str(line, key, default=''): for tok in line.split(): if tok.startswith(key + '='): return tok.split('=', 1)[1] return default # ------------------------------------------------------------ state push def _publish_state(self): st = self.ctrl.state try: st.set('aux_present', bool(self._present)) st.set('aux_homed', bool(self._homed)) st.set('aux_pos', round(self.position_mm, 4)) st.set('aux_enabled', bool(self.enabled)) except Exception: # During very early startup, state may not be ready. pass # Notify the external-axis layer so it can mirror state # (e.g. homed flag) into the synthetic motor vars. observer = getattr(self, '_state_observer', None) if observer is not None: try: observer() except Exception: pass