Jog: drive A-axis hold-to-jog through ESP JOG / JOGSTOP

Previous attempts (small STEPS chunks per 250 ms tick, then a
single-big-STEPS plus ABORT-on-release) both gave a jerky ride: the
chunked path produced staccato accel/decel ladders, and the
ABORT-on-release path raced with task creation, frequently letting
STEPS run to its full target.

The auxcnc ESP gained a continuous-rate JOG / JOGSTOP pair (see
auxcnc commit 'Add JOG / JOGSTOP for smooth hold-to-jog'). On
press we issue JOG dir=+/- maxrate=... accel=... and the ESP ramps
up and cruises until JOGSTOP, which triggers a controlled decel.

AuxAxis additions:
  - jog_start(direction, max_rate_sps=, accel_sps2=, ignore_limits=)
    sends JOG and waits only for the immediate '[jog] started' ack.
  - jog_stop() sends JOGSTOP fire-and-forget (no RPC lock so it can
    interrupt anything in flight, mirroring abort()).
  - _on_line picks up async '[jog] done|aborted ...' lines and
    resyncs _pos_steps so subsequent moves compute the correct
    delta.

Jog.py:
  - On Xbox 360 pad RB (BTN_TR) -> A+ press, RT (ABS_RZ) -> A-
    press; release -> JOGSTOP. Speed buttons (X/A/B/Y) scale max
    rate by 1/128, 1/32, 1/4, 1.0x.
  - safe=0 only when A is unhomed; otherwise the ESP enforces
    limit-toward-home_dir abort.
  - On release, schedules a 200 ms-deferred ext_axis._pos_mm
    refresh so any subsequent gplan-driven A motion sees the new
    position.

Verified end-to-end on hardware: smooth ramp-cruise-ramp on
press/release, no overshoot on quick taps, soft limits respected
when homed.
This commit is contained in:
2026-05-03 16:55:48 +02:00
parent 99b5af56cc
commit b63e5bb55a
2 changed files with 194 additions and 25 deletions

View File

@@ -303,6 +303,69 @@ class AuxAxis(object):
return
self._do_steps(int(steps), ignore_limits=True)
# ----------------------------------------------- continuous-rate jog
#
# Hold-to-jog support for the gamepad pendant. JOG / JOGSTOP on
# the ESP give a smooth ramp-up, cruise-until-released, ramp-down
# profile - much better than streaming small STEPS chunks.
#
# `jog_start` returns immediately after the ESP acknowledges with
# `[jog] started ...`. The terminal `[jog] done count=<n>
# pos=<p>` arrives later; our reader picks it up and resyncs
# _pos_steps via the same path as STEPS.
def jog_start(self, direction, max_rate_sps=None,
accel_sps2=None, ignore_limits=False):
"""Begin a continuous-rate jog. `direction` is +1 or -1.
Returns once the ESP has accepted the JOG command."""
self._require_present()
if direction not in (-1, +1):
raise AuxAxisError('jog_start direction must be +/-1')
sign = '+' if direction > 0 else '-'
rate = (int(max_rate_sps) if max_rate_sps is not None
else int(self._cfg['step_max_sps']))
accel = (int(accel_sps2) if accel_sps2 is not None
else int(self._cfg['step_accel_sps2']))
if rate < 1: rate = 1
if accel < 1: accel = 1
# Track the in-flight JOG so the reader can deliver the
# terminal [jog] done line back to us. We use a dedicated
# background thread so jog_start can return as soon as the
# `[jog] started` ack lands -- the terminal line may arrive
# seconds later (after JOGSTOP).
cmd = 'JOG dir=%s maxrate=%d accel=%d safe=%d' % (
sign, rate, accel, 0 if ignore_limits else 1)
# Capture both the immediate ack AND the eventual terminal
# line in a single _rpc call would block; instead fire the
# ack-only RPC here and let _on_line handle the terminal
# `[jog] done` async (it falls through to the info log path,
# but we hook _on_line to update _pos_steps).
line = self._rpc(cmd, topic='jog', timeout=2.0)
if line.startswith('error'):
raise AuxAxisError('JOG rejected: %s' % line)
if not line.startswith('started'):
# Could be "done count=0 pos=..." if a near-instant abort
# raced; treat as completed.
self._pos_steps = self._parse_kv_int(
line, 'pos', self._pos_steps)
self._publish_state()
# else: cruising, terminal [jog] reply will arrive later.
def jog_stop(self):
"""Request the running JOG to ramp down to a stop. Returns
immediately; the terminal `[jog] done` arrives async and is
picked up by `_on_line` to resync _pos_steps.
Like abort(), this does NOT take the RPC lock - JOGSTOP is
the on-release path of a hold-to-jog UI and must not block
on whatever else is in flight."""
if not self._present:
return
try:
self.log.info('aux >> JOGSTOP')
self._send_raw('JOGSTOP')
except Exception as e:
self.log.warning('JOGSTOP send failed: %s' % e)
def abort(self):
"""Cancel any running ESP motion immediately."""
if not self._present:
@@ -615,7 +678,22 @@ class AuxAxis(object):
self._pending_replies.append(body)
self._pending_cv.notify_all()
return
# Async informational line; just log.
# Async informational line.
#
# The terminal [jog] done|aborted line for a continuous
# JOG arrives long after the JOG _rpc returned (the JOG
# _rpc only waits for the immediate `[jog] started`
# ack). Use this async path to keep _pos_steps in sync
# so subsequent moves compute the correct delta.
if topic == 'jog' and ('pos=' in body):
try:
self._pos_steps = self._parse_kv_int(
body, 'pos', self._pos_steps)
if 'reason=limit' in body:
self._homed = False
self._publish_state()
except Exception:
pass
self.log.info('aux: %s' % line)
else:
self.log.info('aux: %s' % line)

View File

@@ -25,6 +25,8 @@
# #
################################################################################
import threading
import inevent
from inevent.Constants import *
@@ -51,18 +53,23 @@ class Jog(inevent.JogHandler):
"dir": [1, -1, -1, 1],
"arrows": [ABS_HAT0X, ABS_HAT0Y],
"speed": [0x133, 0x130, 0x131, 0x134],
"lock": [0x136], # L1 = horiz-lock; R1/R2 now A axis
# Right back triggers drive the A axis while held:
# BTN_TR (0x137, upper-right) -> A+
# BTN_TR2 (0x139, lower-right) -> A-
"a_pos": 0x137,
"a_neg": 0x139,
"lock": [0x136], # L1 = horiz-lock; RB/RT now A axis
# Right back controls drive the A axis while held.
# Verified on Xbox 360 pad (Vendor=045e Product=028e):
# RB (upper-right bumper) -> BTN_TR (0x137) digital -> A+
# RT (lower-right trigger) -> ABS_RZ analog 0..255 -> A-
# Some pads expose RT as BTN_TR2 (0x139) instead -- that
# works too via a_neg_btn.
"a_pos_btn": 0x137,
"a_neg_btn": 0x139,
"a_neg_abs": ABS_RZ,
"a_abs_thresh": 32, # 0..255 trigger press threshold
}
}
super().__init__(config)
self.a_button = 0 # -1, 0, +1 from R1/R2 hold state
self.a_button = 0 # -1, 0, +1 from RB / RT hold state
self.v = [0.0] * 4
self.lastV = self.v
self.callback()
@@ -70,26 +77,111 @@ class Jog(inevent.JogHandler):
self.processor = inevent.InEvent(ctrl.ioloop, self, types = ['js'])
# -------- A-axis (external, ESP-driven) hold-to-jog ---------------
#
# The Mach jog path only knows about AVR axes; the A axis is
# handled by ExternalAxis on the auxcnc ESP, which has a proper
# JOG / JOGSTOP protocol added for hold-to-jog: ramp up on press,
# cruise while held, ramp down on release.
#
# Speed buttons (X/A/B/Y) scale the cruise rate (1/128, 1/32,
# 1/4, 1.0x of the configured step_max_sps).
def _a_speed_scale(self):
if self.speed == 1: return 1.0 / 128.0
if self.speed == 2: return 1.0 / 32.0
if self.speed == 3: return 1.0 / 4.0
return 1.0
def _a_stop(self):
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is None or not ext.enabled:
return
try:
ext.aux.jog_stop()
except Exception as e:
self.log.warning('A-axis jog_stop failed: %s', e)
def _a_start(self, direction):
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is None or not ext.enabled or direction == 0:
return
scale = self._a_speed_scale()
try:
aux = ext.aux
max_rate = max(1, int(int(aux._cfg['step_max_sps']) * scale))
accel = int(aux._cfg['step_accel_sps2'])
# ignore_limits=True (safe=0): pendant jog is allowed
# before homing, matching the rest of the manual-jog API.
# When the axis IS homed, the ESP still aborts on a
# limit-toward hit because it tracks home_dir separately
# from `safe` in our updated firmware (see jogTask).
ignore = not bool(aux._homed)
aux.jog_start(direction,
max_rate_sps=max_rate,
accel_sps2=accel,
ignore_limits=ignore)
except Exception as e:
self.log.warning('A-axis jog_start failed: %s', e)
def _a_apply(self, new_dir, old_dir):
if new_dir == old_dir:
return
# On any state change we stop the current jog and (if the
# new direction is non-zero) start a fresh one. JOG / JOGSTOP
# are non-blocking on the host side.
if old_dir != 0:
self._a_stop()
if new_dir != 0:
self._a_start(new_dir)
def _a_resync_pos(self):
"""Pull the ESP step counter back into ExternalAxis after a
JOG ends, so subsequent gplan-driven A motion computes the
right delta. Called opportunistically on state changes; the
AuxAxis reader also updates _pos_steps from the terminal
[jog] done line."""
ext = getattr(self.ctrl, 'ext_axis', None)
if ext is None or not ext.enabled:
return
try:
ext._pos_mm = ext.aux.position_mm
self.ctrl.state.set(ext.axis_letter + 'p', ext._pos_mm)
except Exception:
pass
def event(self, event, state, dev_name):
cfg = self.get_config(dev_name)
a_changed = False
old = self.a_button
if event.type == EV_KEY:
old = self.a_button
if event.code == cfg.get('a_pos'):
self.a_button = 1 if event.value else (
0 if self.a_button == 1 else self.a_button)
elif event.code == cfg.get('a_neg'):
self.a_button = -1 if event.value else (
0 if self.a_button == -1 else self.a_button)
if self.a_button != old: a_changed = True
if event.code == cfg.get('a_pos_btn'):
if event.value: self.a_button = 1
elif self.a_button == 1: self.a_button = 0
elif event.code == cfg.get('a_neg_btn'):
if event.value: self.a_button = -1
elif self.a_button == -1: self.a_button = 0
elif event.type == EV_ABS:
thresh = cfg.get('a_abs_thresh', 32)
if event.code == cfg.get('a_neg_abs'):
if event.value >= thresh: self.a_button = -1
elif self.a_button == -1: self.a_button = 0
if self.a_button != old:
self.log.info('A-axis trigger -> %s', self.a_button)
self._a_apply(self.a_button, old)
# On every release pull a fresh position mirror in case
# the user does a gplan-driven A move next. The terminal
# [jog] done line itself already updates aux._pos_steps;
# this propagates that into ExternalAxis._pos_mm.
if self.a_button == 0:
# Wait briefly so the [jog] done line has time to
# arrive before we read aux.position_mm.
self.ctrl.ioloop.call_later(0.2, self._a_resync_pos)
super().event(event, state, dev_name)
# JogHandler.event() only fires changed() when stick axes move.
# Force a recompute when R1/R2 hold state flips.
if a_changed: self.changed()
def up(self): self.ctrl.lcd.page_up()
def down(self): self.ctrl.lcd.page_down()
@@ -117,8 +209,7 @@ class Jog(inevent.JogHandler):
if self.speed == 2: scale = 1.0 / 32.0
if self.speed == 3: scale = 1.0 / 4.0
# axes[3] is left untouched by RB/RT -- the A axis is the
# ESP-driven external axis on this branch and is jogged via
# discrete relative moves through ExternalAxis (see _a_pump).
self.v = [x * scale for x in self.axes]
# R1/R2 buttons override the A axis (axes[3]) while held.
if self.a_button:
self.v[3] = self.a_button * scale