AuxAxis: ESP32-driven external stepper (auxcnc)

bbctrl.AuxAxis manages a stepper driven by an auxcnc-style ESP32
over /dev/ttyUSB0 (or whichever serial port). Persistent config in
aux.json; UI talks to it via /api/aux/* endpoints.

- AuxAxis: serial framing, position tracking, soft-limit enforcement,
  homing state machine, ATC pneumatic control (M100..M103 wrappers).
- Ctrl: instantiate self.aux alongside the other subsystems and
  close it during shutdown.
- Web: handlers for /api/aux/{config,status,home,abort,jog,move,set-zero}.
This commit is contained in:
2026-05-03 14:16:54 +02:00
parent 80a00978b7
commit d797f1d4fc
4 changed files with 752 additions and 0 deletions

675
src/py/bbctrl/AuxAxis.py Normal file
View File

@@ -0,0 +1,675 @@
################################################################################
#
# AuxAxis - W-axis serial driver for the auxcnc ESP32 controller
#
# Owns /dev/ttyUSB0 (or whatever serial.port is configured to). Provides
# blocking RPCs for use from a hook thread. Maintains:
#
# - aux_present : True if serial is open and we've seen a boot banner
# - aux_homed : True if we've successfully run HOME since last reset
# - aux_pos : current logical position in mm (from ESP step counter
# * (1 / steps_per_mm * dir_sign))
#
# Real-time decisions (limit switch monitoring, step pulse generation) live
# on the ESP. The host is responsible for units, soft limits, and tracking
# whether we've ever boot-cycled the ESP since last home.
#
################################################################################
import os
import json
import time
import threading
import traceback
try:
import serial
except ImportError:
serial = None
# Default config; overridden by ./aux.json or ctrl.config.
DEFAULTS = {
'enabled': False,
'port': '/dev/ttyUSB0',
'baud': 115200,
'steps_per_mm': 80.0, # logical steps per mm of axis travel
'dir_sign': 1, # +1 or -1: maps logical+ to motor+ steps
# Logical axis letter exposed to gplan. The auxcnc ESP stepper
# is presented to the planner as this axis (default 'a' = standard
# 4th axis). gcode uses A for moves; the host ExternalAxis layer
# forks A motion to the ESP transparently.
'axis_letter': 'a',
'min_mm': 0.0, # soft limit min (mm), exposed as 4tn
'max_mm': 100.0, # soft limit max (mm), exposed as 4tm
# Per-axis kinematic limits used to populate the planner's config.
# Units match the bbctrl/onefinity per-motor convention so the
# values are directly comparable to motors 0-3:
# max_velocity_m_per_min m/min (planner sees * 1000 = mm/min)
# max_accel_km_per_min2 km/min2 (planner sees * 1e6 = mm/min2)
# max_jerk_km_per_min3 km/min3 (planner sees * 1e6 = mm/min3)
'max_velocity_m_per_min': 6.0,
'max_accel_km_per_min2': 100.0,
'max_jerk_km_per_min3': 500.0,
# Informational only - rate caps that actually clamp the move
# are on the ESP via step_max_sps below.
'max_feed_mm_min': 600.0,
'home_dir': '-', # which direction is "toward limit" (host's view)
'home_position_mm': 0.0, # mm value to assign at home
# ESP-side homing rates (steps/sec). Pushed via HOMECFG on connect.
# Speeds tuned for a typical 25 steps/mm aux drive (so 1 step =
# 0.04 mm). With the limit-aware ESP firmware these values give
# a brisk seek (100 mm/s), enough backoff to clear the switch
# hysteresis (16 mm), and a slow re-engage (10 mm/s) that's
# accurate without being painfully slow on a longer axis.
'home_fast_sps': 2500, # ≈ 100 mm/s @ 25 steps/mm
'home_slow_sps': 250, # ≈ 10 mm/s
'home_backoff_steps': 400, # ≈ 16 mm
'home_maxtravel_steps': 200000,
'step_max_sps': 4000, # ≈ 160 mm/s normal-move cap
'step_accel_sps2': 12000,
'step_start_sps': 200,
'limit_low': True,
}
class AuxAxisError(Exception):
pass
class AuxAxis(object):
def __init__(self, ctrl):
self.ctrl = ctrl
self.log = ctrl.log.get('AuxAxis')
self._cfg = dict(DEFAULTS)
self._load_config()
self._sp = None
self._sp_lock = threading.Lock() # serial write/RPC serialization
self._rx_lock = threading.Lock() # read-line buffer access
self._reader_thread = None
self._stop = threading.Event()
# Pending replies waiting for a [topic] line. Single-slot since we
# serialize RPCs via _sp_lock.
self._pending_topics = []
self._pending_replies = []
self._pending_cv = threading.Condition()
# Async lines that aren't replies (e.g. logs) are simply logged.
self._present = False
self._homed = False
self._pos_steps = 0 # ESP step counter mirror
# Publish initial state
self._publish_state()
if not self._cfg['enabled']:
self.log.info('Aux axis disabled in config')
return
if serial is None:
self.log.error('pyserial not available; aux axis disabled')
return
self._open()
# ------------------------------------------------------------------ config
def _config_path(self):
return self.ctrl.get_path(filename='aux.json')
# Legacy aux.json fields that have been renamed for clarity.
# Loaded values are migrated up on every load/save so existing
# installs keep working without operator intervention.
_LEGACY_FIELD_MAP = {
'min_w': 'min_mm',
'max_w': 'max_mm',
}
def _migrate_legacy_fields(self, cfg):
"""In-place rename of legacy keys in `cfg` (dict). Returns
True if anything was migrated, so callers can decide whether
to persist the upgraded form.
"""
migrated = False
for old, new in self._LEGACY_FIELD_MAP.items():
if old in cfg:
if new not in cfg:
cfg[new] = cfg[old]
del cfg[old]
migrated = True
return migrated
def _load_config(self):
path = self._config_path()
if os.path.exists(path):
try:
with open(path) as f:
user = json.load(f)
migrated = self._migrate_legacy_fields(user)
# Be permissive; ignore unknown keys.
for k, v in user.items():
if k in self._cfg:
self._cfg[k] = v
self.log.info('Loaded aux config from %s' % path)
if migrated:
# Persist the upgraded form so future restarts
# see the new field names directly.
try:
self.save_config(self._cfg)
self.log.info(
'Migrated aux.json legacy fields '
'(min_w/max_w -> min_mm/max_mm)')
except Exception:
self.log.warning(
'Could not persist aux.json migration')
except Exception:
self.log.error('Failed to read aux.json: %s'
% traceback.format_exc())
def save_config(self, cfg):
merged = dict(DEFAULTS)
# Accept legacy keys from callers that may still send the
# old names (older UI bundles, hand-edited POSTs).
cfg = dict(cfg)
self._migrate_legacy_fields(cfg)
for k, v in cfg.items():
if k in DEFAULTS:
merged[k] = v
path = self._config_path()
with open(path, 'w') as f:
json.dump(merged, f, indent=2)
self._cfg = merged
self.log.info('Saved aux config')
# Push the relevant pieces to the ESP if connected.
if self._present:
try:
self._push_homecfg()
except Exception as e:
self.log.warning('Could not push HOMECFG after save: %s' % e)
def get_config(self):
return dict(self._cfg)
# ------------------------------------------------------------------ public
@property
def enabled(self):
return bool(self._cfg.get('enabled', False))
@property
def present(self):
return self._present
@property
def homed(self):
return self._homed
@property
def position_mm(self):
return self._steps_to_mm(self._pos_steps)
def set_state_observer(self, fn):
"""Register a callback invoked after every _publish_state.
Used by ExternalAxis to mirror the homed flag into State."""
self._state_observer = fn
def home(self):
"""Run the homing cycle on the ESP. Blocks until done. Raises on
failure. Updates aux_homed and aux_pos.
The ESP's home_zero is pre-loaded via HOMECFG so when the cycle
completes the step counter already corresponds to home_position_mm.
That way the homed-state survives a bbctrl restart correctly
(we don't need a post-home WPOS write, which would clear HOMED)."""
self._require_present()
# Make sure home_zero on the ESP matches our current
# home_position_mm in case the user just edited config.
self._push_homecfg()
line = self._rpc('HOME', topic='home', timeout=120.0)
# line is the body after '[home] '. Only terminal lines use
# the [home] topic now (done / failed); progress is [home_log].
if line.startswith('done'):
self._pos_steps = self._parse_kv_int(line, 'pos', 0)
self._homed = True
self._publish_state()
return
# failure
reason = line.split('reason=', 1)[1] if 'reason=' in line else line
raise AuxAxisError('Homing failed: %s' % reason)
def move_abs_mm(self, target_mm):
"""Move to absolute logical W position (mm). Blocks until done."""
self._require_present()
self._check_limits(target_mm)
target_steps = self._mm_to_steps(target_mm)
delta = target_steps - self._pos_steps
if delta == 0:
return
self._do_steps(delta)
def move_rel_mm(self, delta_mm):
"""Move by delta mm relative to current position. Blocks until done."""
self._require_present()
target_mm = self.position_mm + delta_mm
self._check_limits(target_mm)
target_steps = self._mm_to_steps(target_mm)
delta = target_steps - self._pos_steps
if delta == 0:
return
self._do_steps(delta)
def set_position_mm(self, mm):
"""Set current W to <mm> without moving (G92-style for W)."""
self._require_present()
steps = self._mm_to_steps(mm)
self._rpc('WPOS %d' % steps, topic='ok', timeout=2.0)
self._pos_steps = steps
# WPOS clears homed on the ESP; mirror it.
self._homed = False
self._publish_state()
def jog_steps(self, steps):
"""Raw step move bypassing mm conversion and soft limits.
Used by manual jog UI when axis isn't homed yet."""
self._require_present()
if steps == 0:
return
self._do_steps(int(steps), ignore_limits=True)
def abort(self):
"""Cancel any running ESP motion immediately."""
if not self._present:
return
try:
# Don't take the RPC lock; ABORT must be able to interrupt.
self._send_raw('ABORT')
except Exception as e:
self.log.warning('ABORT send failed: %s' % e)
# ---------------------------------------------------------- ATC commands
#
# The auxcnc firmware drives an AMB 1050 FME-W DI tool changer via
# three pneumatic valves on relays 1-3. The ESP runs the timed
# sequences itself; the host just kicks them off and waits for the
# terminal reply.
def atc_droptool(self, timeout=30.0):
"""Eject the current tool. Opens the collet (V1), oscillates the
ejector (V2), then re-clamps with a bleed cycle. Blocks until
the ESP reports done. Raises on failure."""
self._require_present()
line = self._rpc('DROPTOOL', topic='droptool', timeout=timeout)
if line.startswith('done'):
return
reason = line.split('reason=', 1)[1] if 'reason=' in line else line
raise AuxAxisError('DROPTOOL failed: %s' % reason)
def atc_grabtool(self, timeout=30.0):
"""Pick up a tool that's already been seated by the operator.
Opens V1 (releases the collet), waits for the operator to insert
the holder, then re-clamps with a bleed cycle. Blocks."""
self._require_present()
line = self._rpc('GRABTOOL', topic='grabtool', timeout=timeout)
if line.startswith('done'):
return
reason = line.split('reason=', 1)[1] if 'reason=' in line else line
raise AuxAxisError('GRABTOOL failed: %s' % reason)
def atc_release(self, timeout=5.0):
"""Manually open the collet (release-only, no clamp). Use
atc_clamp() afterwards once the new holder is in place."""
self._require_present()
line = self._rpc('RELEASE', topic='release', timeout=timeout)
if line.startswith('done'):
return
reason = line.split('reason=', 1)[1] if 'reason=' in line else line
raise AuxAxisError('RELEASE failed: %s' % reason)
def atc_clamp(self, timeout=10.0):
"""Manually clamp the collet (run a full bleed cycle). Pairs
with atc_release() for two-step manual tool changes."""
self._require_present()
line = self._rpc('CLAMP', topic='clamp', timeout=timeout)
if line.startswith('done'):
return
reason = line.split('reason=', 1)[1] if 'reason=' in line else line
raise AuxAxisError('CLAMP failed: %s' % reason)
def close(self):
self._stop.set()
try:
if self._sp is not None:
self._sp.close()
except Exception:
pass
# ------------------------------------------------------------------ guts
def _require_present(self):
if not self.enabled:
raise AuxAxisError('Aux axis disabled')
if not self._present:
raise AuxAxisError('Aux axis not connected')
def _check_limits(self, target_mm):
lo = float(self._cfg['min_mm'])
hi = float(self._cfg['max_mm'])
if hi <= lo:
return # no limits
if target_mm < lo - 1e-6 or target_mm > hi + 1e-6:
raise AuxAxisError(
'W=%.3f out of soft limits [%.3f, %.3f]' % (target_mm, lo, hi))
def _mm_to_steps(self, mm):
spm = float(self._cfg['steps_per_mm'])
sign = 1 if int(self._cfg.get('dir_sign', 1)) >= 0 else -1
return int(round(mm * spm * sign))
def _steps_to_mm(self, steps):
spm = float(self._cfg['steps_per_mm']) or 1.0
sign = 1 if int(self._cfg.get('dir_sign', 1)) >= 0 else -1
return (steps / spm) * sign
def _do_steps(self, signed_count, ignore_limits=False):
max_rate = int(self._cfg['step_max_sps'])
accel = int(self._cfg['step_accel_sps2'])
safe_flag = 0 if ignore_limits else 1
cmd = 'STEPS %d maxrate=%d accel=%d safe=%d' % (
signed_count, max_rate, accel, safe_flag)
line = self._rpc(cmd, topic='step', timeout=300.0)
# line: "done count=N pos=P limit=L" or "aborted count=N pos=P [reason=...]"
if line.startswith('done'):
self._pos_steps = self._parse_kv_int(line, 'pos', self._pos_steps)
self._publish_state()
return
# aborted
self._pos_steps = self._parse_kv_int(line, 'pos', self._pos_steps)
self._publish_state()
reason = self._parse_kv_str(line, 'reason')
if reason == 'limit':
self._homed = False
raise AuxAxisError('W move aborted by limit switch')
raise AuxAxisError('W move aborted: %s' % line)
def _do_line(self, signed_steps, length_mm,
max_accel_mm_min2, max_jerk_mm_min3,
entry_vel_mm_min, exit_vel_mm_min,
times_min, ignore_limits=False, timeout=300.0):
"""Run a 7-segment jerk-limited S-curve on the ESP that mirrors
gplan/buildbotics' planner output exactly.
Parameters are in the same units the AVR/gplan use:
- length_mm: absolute travel in mm (>= 0)
- max_accel: mm/min^2
- max_jerk: mm/min^3
- entry/exit_vel: mm/min
- times_min: 7-tuple of section durations in minutes
ignore_limits sets safe=0 on the ESP - used for jog/move
endpoints that may run before homing.
Blocks until the ESP reports done or aborted. Updates the
position mirror and re-publishes state on every reply.
"""
if signed_steps == 0 or length_mm <= 0:
return
if not any(times_min):
raise AuxAxisError('LINE rejected: all section times are zero')
# Build the LINE command. Float formatting matches the AVR's
# printf precision (6 sig figs) - that's well above what the
# ESP needs given it integrates into a few thousand 4 ms
# segments per move.
parts = [
'LINE',
'steps=%d' % int(signed_steps),
'length=%.6f' % float(length_mm),
'max_accel=%.6f' % float(max_accel_mm_min2),
'max_jerk=%.6f' % float(max_jerk_mm_min3),
'entry_vel=%.6f' % float(entry_vel_mm_min),
'exit_vel=%.6f' % float(exit_vel_mm_min),
]
for i, t in enumerate(times_min):
if t and t > 0:
parts.append('t%d=%.9f' % (i, float(t)))
if ignore_limits:
parts.append('safe=0')
cmd = ' '.join(parts)
line = self._rpc(cmd, topic='line', timeout=timeout)
# line: "done pos=P emitted=N" or "aborted pos=P emitted=N reason=..."
if line.startswith('done'):
self._pos_steps = self._parse_kv_int(line, 'pos', self._pos_steps)
self._publish_state()
return
# aborted
self._pos_steps = self._parse_kv_int(line, 'pos', self._pos_steps)
self._publish_state()
reason = self._parse_kv_str(line, 'reason')
if reason == 'limit':
self._homed = False
raise AuxAxisError('W move aborted by limit switch')
raise AuxAxisError('W move aborted: %s' % line)
# ------------------------------------------------------------ serial I/O
def _open(self):
port = self._cfg['port']
baud = int(self._cfg['baud'])
try:
self._sp = serial.Serial(port, baud, timeout=0.2)
except Exception as e:
self.log.error('Could not open %s: %s' % (port, e))
self._sp = None
return
self.log.info('Opened %s @ %d' % (port, baud))
self._reader_thread = threading.Thread(
target=self._reader_loop, name='AuxAxis-rx', daemon=True)
self._reader_thread.start()
# Give the ESP a moment to settle, then push HOMECFG and query state.
# This runs in a background thread to avoid blocking startup.
threading.Thread(target=self._on_connect, daemon=True).start()
def _on_connect(self):
time.sleep(0.5)
try:
self._push_homecfg()
self._refresh_state()
except Exception as e:
self.log.warning('Aux post-connect setup failed: %s' % e)
def _push_homecfg(self):
c = self._cfg
zero_steps = self._mm_to_steps(c['home_position_mm'])
cmd = ('HOMECFG dir=%s fast=%d slow=%d backoff=%d maxtravel=%d '
'zero=%d accel=%d step_max=%d step_start=%d limit_low=%d') % (
c['home_dir'],
int(c['home_fast_sps']),
int(c['home_slow_sps']),
int(c['home_backoff_steps']),
int(c['home_maxtravel_steps']),
int(zero_steps),
int(c['step_accel_sps2']),
int(c['step_max_sps']),
int(c['step_start_sps']),
1 if c['limit_low'] else 0,
)
self._rpc(cmd, topic='homecfg', timeout=3.0)
def _refresh_state(self):
try:
r = self._rpc('WPOS?', topic='wpos', timeout=2.0)
self._pos_steps = int(r.strip())
except Exception:
pass
# Force the host to start unhomed regardless of what the ESP
# remembers from a prior session. The ESP's homed flag survives
# bbctrl restarts (since the ESP itself wasn't power-cycled),
# but the host's planner offsets and DRO position get reset to
# zero on bbctrl boot. Trusting the ESP's homed flag would mean
# the user thinks A is homed at the wrong work-coord origin
# (offset_a=0 but ESP physically at home_position_mm). Sending
# UNHOME forces the user to re-home explicitly, which sets up
# the offset and gplan state correctly via the homing path in
# Mach.home.
try:
self._rpc('UNHOME', topic='ok', timeout=2.0)
self._homed = False
except Exception:
# Fall back to whatever HOMED? says - but treat any
# missing UNHOME support as "trust ESP's flag" so we
# don't break older firmware.
try:
r = self._rpc('HOMED?', topic='homed', timeout=2.0)
self._homed = (r.strip() == '1')
except Exception:
pass
self._publish_state()
def _reader_loop(self):
buf = b''
while not self._stop.is_set():
sp = self._sp
if sp is None:
time.sleep(0.5)
continue
try:
chunk = sp.read(256)
except Exception as e:
self.log.warning('Aux serial read error: %s' % e)
time.sleep(0.5)
continue
if not chunk:
continue
buf += chunk
while True:
nl = buf.find(b'\n')
if nl < 0:
break
line = buf[:nl].rstrip(b'\r').decode('utf-8', errors='replace')
buf = buf[nl+1:]
self._on_line(line)
def _on_line(self, line):
if not line:
return
# Boot banner -> reset homed flag.
if line.startswith('[boot]'):
self.log.warning('Aux ESP booted: %s' % line)
self._homed = False
self._present = True
self._publish_state()
self.ctrl.state.add_message(
'Auxiliary axis controller restarted - re-home before use')
return
# Topic dispatch: "[topic] body..."
if line.startswith('[') and ']' in line:
rb = line.index(']')
topic = line[1:rb]
body = line[rb+1:].lstrip()
# Mark present on first known topic.
if not self._present:
self._present = True
self._publish_state()
# Match against the head of the pending queue.
with self._pending_cv:
if (self._pending_topics
and topic in self._pending_topics[0]):
# Pop and deliver
self._pending_topics.pop(0)
self._pending_replies.append(body)
self._pending_cv.notify_all()
return
# Async informational line; just log.
self.log.info('aux: %s' % line)
else:
self.log.info('aux: %s' % line)
def _send_raw(self, cmd):
sp = self._sp
if sp is None:
raise AuxAxisError('Serial not open')
if not cmd.endswith('\n'):
cmd = cmd + '\n'
sp.write(cmd.encode('utf-8'))
sp.flush()
def _rpc(self, cmd, topic, timeout=5.0):
"""Send `cmd`, wait for a reply line whose topic is in `topic`.
topic may be a single string or a tuple/list of acceptable topics
(e.g. ('home', 'err'))."""
if isinstance(topic, str):
topics = (topic, 'err')
else:
topics = tuple(topic) + ('err',)
with self._sp_lock:
with self._pending_cv:
self._pending_topics.append(topics)
self._pending_replies = [] # reset
self.log.info('aux >> %s' % cmd.strip())
self._send_raw(cmd)
deadline = time.time() + timeout
with self._pending_cv:
while not self._pending_replies:
remaining = deadline - time.time()
if remaining <= 0:
# Drop the pending slot so we don't capture a
# late reply meant for the next caller.
try:
self._pending_topics.remove(topics)
except ValueError:
pass
raise AuxAxisError(
'Timeout waiting for %s reply to "%s"'
% (topics, cmd.strip()))
self._pending_cv.wait(timeout=remaining)
reply = self._pending_replies.pop(0)
self.log.info('aux << %s' % reply)
if reply.startswith('err') or reply.startswith('error'):
raise AuxAxisError('ESP error: %s' % reply)
return reply
@staticmethod
def _parse_kv_int(line, key, default=0):
# Parse "key=N" (signed integer) out of a line.
for tok in line.split():
if tok.startswith(key + '='):
try:
return int(tok.split('=', 1)[1])
except ValueError:
return default
return default
@staticmethod
def _parse_kv_str(line, key, default=''):
for tok in line.split():
if tok.startswith(key + '='):
return tok.split('=', 1)[1]
return default
# ------------------------------------------------------------ state push
def _publish_state(self):
st = self.ctrl.state
try:
st.set('aux_present', bool(self._present))
st.set('aux_homed', bool(self._homed))
st.set('aux_pos', round(self.position_mm, 4))
st.set('aux_enabled', bool(self.enabled))
except Exception:
# During very early startup, state may not be ready.
pass
# Notify the external-axis layer so it can mirror state
# (e.g. homed flag) into the synthetic motor vars.
observer = getattr(self, '_state_observer', None)
if observer is not None:
try:
observer()
except Exception:
pass

View File

@@ -73,6 +73,8 @@ class Ctrl(object):
self.pwr = bbctrl.Pwr(self)
with Trace.span('ctrl.hooks'):
self.hooks = bbctrl.Hooks(self)
with Trace.span('ctrl.aux'):
self.aux = bbctrl.AuxAxis(self)
with Trace.span('ctrl.mach.connect'):
self.mach.connect()
@@ -130,6 +132,8 @@ class Ctrl(object):
def close(self):
try: self.aux.close()
except Exception: pass
self.log.get('Ctrl').info('Closing %s' % self.id)
self.ioloop.close()
self.avr.close()

View File

@@ -787,6 +787,70 @@ class HooksFireHandler(bbctrl.APIHandler):
self.get_ctrl().hooks._fire(event, data)
# ----- W axis (auxcnc) endpoints --------------------------------------------
class AuxConfigGetHandler(bbctrl.APIHandler):
def get(self):
self.write_json(self.get_ctrl().aux.get_config())
class AuxConfigSaveHandler(bbctrl.APIHandler):
def put_ok(self):
self.get_ctrl().aux.save_config(self.json or {})
class AuxStatusHandler(bbctrl.APIHandler):
def get(self):
aux = self.get_ctrl().aux
self.write_json({
'enabled': aux.enabled,
'present': aux.present,
'homed': aux.homed,
'pos_mm': aux.position_mm,
})
class AuxHomeHandler(bbctrl.APIHandler):
def put_ok(self):
self.get_ctrl().aux.home()
class AuxAbortHandler(bbctrl.APIHandler):
def put_ok(self):
self.get_ctrl().aux.abort()
class AuxJogHandler(bbctrl.APIHandler):
"""Body: {"mm": 1.5} for relative-mm move,
{"steps": 200} for raw step move (bypasses soft limits)."""
def put_ok(self):
body = self.json or {}
aux = self.get_ctrl().aux
if 'mm' in body:
aux.move_rel_mm(float(body['mm']))
elif 'steps' in body:
aux.jog_steps(int(body['steps']))
else:
raise HTTPError(400, 'mm or steps required')
class AuxMoveHandler(bbctrl.APIHandler):
"""Body: {"mm": 12.5} absolute move in mm."""
def put_ok(self):
body = self.json or {}
if 'mm' not in body:
raise HTTPError(400, 'mm required')
self.get_ctrl().aux.move_abs_mm(float(body['mm']))
class AuxSetZeroHandler(bbctrl.APIHandler):
"""Body: {"mm": 0} set current position to <mm>."""
def put_ok(self):
body = self.json or {}
mm = float(body.get('mm', 0.0))
self.get_ctrl().aux.set_position_mm(mm)
class RemoteDiagnosticsHandler(bbctrl.APIHandler):
def get(self):
@@ -1016,6 +1080,14 @@ class Web(tornado.web.Application):
(r'/api/hooks/save', HooksSaveHandler),
(r'/api/hooks/status', HooksStatusHandler),
(r'/api/hooks/fire/([\w-]+)', HooksFireHandler),
(r'/api/aux/config', AuxConfigGetHandler),
(r'/api/aux/config/save', AuxConfigSaveHandler),
(r'/api/aux/status', AuxStatusHandler),
(r'/api/aux/home', AuxHomeHandler),
(r'/api/aux/abort', AuxAbortHandler),
(r'/api/aux/jog', AuxJogHandler),
(r'/api/aux/move', AuxMoveHandler),
(r'/api/aux/set-zero', AuxSetZeroHandler),
(r'/(.*)', StaticFileHandler,
{'path': bbctrl.get_resource('http/'),
'default_filename': 'index.html'}),

View File

@@ -67,6 +67,7 @@ from bbctrl.AVREmu import AVREmu
from bbctrl.IOLoop import IOLoop
from bbctrl.MonitorTemp import MonitorTemp
from bbctrl.Hooks import Hooks
from bbctrl.AuxAxis import AuxAxis
import bbctrl.Cmd as Cmd
import bbctrl.v4l2 as v4l2
import bbctrl.Log as log