Add hooks system for external triggers during G-code execution

- New Hooks module (src/py/bbctrl/Hooks.py) that watches controller state
  and fires webhooks or scripts on events:
  - tool-change (M6), program-start, program-end, pause, estop,
    homing-start, homing-end, custom (via MSG comments)
- API endpoints:
  - GET /api/hooks - get current hook config
  - PUT /api/hooks/save - save hook config
  - PUT /api/hooks/fire/<event> - manually fire a hook (for testing)
- Hook config stored in hooks.json with two types:
  - webhook: HTTP POST/PUT to external URL with JSON context
  - script: run local command with env vars (HOOK_OLD_TOOL, etc.)
- Fix tornado.web.asynchronous deprecation in Camera.py
- Wired into Ctrl initialization and state listener system
This commit is contained in:
2026-04-20 17:43:02 +02:00
parent 44b85bad5a
commit 7f8fd23615
7 changed files with 300 additions and 2 deletions

268
src/py/bbctrl/Hooks.py Normal file
View File

@@ -0,0 +1,268 @@
################################################################################
#
# Hooks - External event triggers during G-code execution
#
# Watches planner state changes and fires webhooks / runs scripts when
# specific events occur (tool change, program start/end, pause, etc.)
#
# Configuration is loaded from hooks.json in the controller directory:
#
# {
# "tool-change": {
# "type": "webhook",
# "url": "http://toolchanger.local/api/change",
# "method": "POST",
# "timeout": 30,
# "wait": true
# },
# "program-start": {
# "type": "script",
# "command": "/usr/local/bin/dust-collector on"
# },
# "program-end": {
# "type": "webhook",
# "url": "http://homeassistant.local:8123/api/services/switch/turn_off",
# "method": "POST",
# "headers": {"Authorization": "Bearer TOKEN"},
# "body": {"entity_id": "switch.dust_collector"}
# },
# "pause": {
# "type": "script",
# "command": "/usr/local/bin/notify 'CNC paused'"
# }
# }
#
################################################################################
import os
import json
import subprocess
import traceback
from urllib.request import Request, urlopen
from urllib.error import URLError
# Events that can be hooked
HOOK_EVENTS = [
'tool-change', # M6 - tool change requested
'program-start', # Program begins running
'program-end', # M2/M30 - program ends
'pause', # M0/M1 - program pause
'probe-start', # Probe cycle begins
'estop', # Emergency stop triggered
'homing-start', # Homing cycle begins
'homing-end', # Homing cycle completes
'custom', # Triggered by (MSG,HOOK:name:data) comments
]
class Hooks:
def __init__(self, ctrl):
self.ctrl = ctrl
self.log = ctrl.log.get('Hooks')
self.hooks = {}
# Track state for edge detection — must be set before add_listener
# because add_listener fires immediately with current state
self._last_cycle = ctrl.state.get('cycle', 'idle')
self._last_state = ctrl.state.get('xx', '')
self._last_tool = ctrl.state.get('tool', 0)
self._last_pause_reason = ctrl.state.get('pr', '')
self._initialized = False
self._load_config()
# Listen for state changes
ctrl.state.add_listener(self._on_state_change)
self._initialized = True
def _get_config_path(self):
return self.ctrl.get_path(filename='hooks.json')
def _load_config(self):
path = self._get_config_path()
if os.path.exists(path):
try:
with open(path) as f:
self.hooks = json.load(f)
self.log.info('Loaded %d hook(s) from %s' %
(len(self.hooks), path))
except Exception:
self.log.error('Failed to load hooks.json: %s' %
traceback.format_exc())
else:
self.log.info('No hooks.json found, hooks disabled')
def save_config(self, config):
"""Save hook configuration (called from API)."""
path = self._get_config_path()
with open(path, 'w') as f:
json.dump(config, f, indent=2)
self.hooks = config
self.log.info('Saved %d hook(s)' % len(config))
def get_config(self):
return self.hooks
def _on_state_change(self, update):
"""Called on every state update from the controller."""
if not self._initialized:
return
state = self.ctrl.state
# Detect tool change (tool number changed)
if 'tool' in update:
new_tool = update['tool']
if new_tool != self._last_tool:
self._fire('tool-change', {
'old_tool': self._last_tool,
'new_tool': new_tool,
})
self._last_tool = new_tool
# Detect cycle changes
if 'cycle' in update:
new_cycle = update['cycle']
if new_cycle != self._last_cycle:
if new_cycle == 'running' and self._last_cycle == 'idle':
self._fire('program-start', {})
elif new_cycle == 'idle' and self._last_cycle == 'running':
self._fire('program-end', {})
elif new_cycle == 'homing':
self._fire('homing-start', {})
elif self._last_cycle == 'homing' and new_cycle == 'idle':
self._fire('homing-end', {})
self._last_cycle = new_cycle
# Detect state changes
if 'xc' in update or 'xx' in update:
new_state = state.get('xx', '')
if new_state != self._last_state:
if new_state == 'ESTOPPED':
self._fire('estop', {})
self._last_state = new_state
# Detect pause
if 'pr' in update:
pr = update['pr']
if pr and pr != self._last_pause_reason:
self._fire('pause', {'reason': pr})
self._last_pause_reason = pr
# Detect custom hook messages: (MSG,HOOK:event_name:data)
if 'message' in update:
msg = update['message']
if isinstance(msg, str) and msg.startswith('HOOK:'):
parts = msg[5:].split(':', 1)
event = parts[0]
data = parts[1] if len(parts) > 1 else ''
self._fire('custom', {
'event': event,
'data': data,
}, custom_name=event)
def _fire(self, event, context, custom_name=None):
"""Fire a hook event."""
# Look up by event name, or by custom name for custom events
hook = self.hooks.get(event)
if custom_name and not hook:
hook = self.hooks.get(custom_name)
if not hook:
return
self.log.info('Hook firing: %s %s' % (event, json.dumps(context)))
# Add standard context
state = self.ctrl.state
context.update({
'event': event,
'position': state.get_position() if hasattr(state, 'get_position') else {},
'state': state.get('xx', ''),
'cycle': state.get('cycle', 'idle'),
})
hook_type = hook.get('type', 'webhook')
try:
if hook_type == 'webhook':
self._fire_webhook(hook, context)
elif hook_type == 'script':
self._fire_script(hook, context)
else:
self.log.error('Unknown hook type: %s' % hook_type)
except Exception:
self.log.error('Hook %s failed: %s' % (event, traceback.format_exc()))
def _fire_webhook(self, hook, context):
"""Fire a webhook HTTP request."""
url = hook.get('url')
if not url:
self.log.error('Webhook missing url')
return
method = hook.get('method', 'POST').upper()
timeout = hook.get('timeout', 10)
headers = hook.get('headers', {})
body = hook.get('body', {})
# Merge context into body
body['_context'] = context
data = json.dumps(body).encode('utf-8')
headers['Content-Type'] = 'application/json'
req = Request(url, data=data, headers=headers, method=method)
self.log.info('Webhook %s %s' % (method, url))
try:
resp = urlopen(req, timeout=timeout)
self.log.info('Webhook response: %d' % resp.status)
except URLError as e:
self.log.error('Webhook failed: %s' % e)
except Exception as e:
self.log.error('Webhook error: %s' % e)
def _fire_script(self, hook, context):
"""Fire a local script/command."""
command = hook.get('command')
if not command:
self.log.error('Script hook missing command')
return
timeout = hook.get('timeout', 30)
wait = hook.get('wait', True)
# Pass context as environment variables
env = os.environ.copy()
env['HOOK_EVENT'] = context.get('event', '')
env['HOOK_STATE'] = context.get('state', '')
env['HOOK_CYCLE'] = context.get('cycle', '')
env['HOOK_DATA'] = json.dumps(context)
if 'old_tool' in context:
env['HOOK_OLD_TOOL'] = str(context['old_tool'])
if 'new_tool' in context:
env['HOOK_NEW_TOOL'] = str(context['new_tool'])
self.log.info('Script: %s (wait=%s)' % (command, wait))
try:
if wait:
result = subprocess.run(
command, shell=True, env=env,
timeout=timeout,
capture_output=True, text=True
)
if result.returncode != 0:
self.log.error('Script failed (%d): %s' %
(result.returncode, result.stderr))
else:
if result.stdout.strip():
self.log.info('Script output: %s' % result.stdout.strip())
else:
subprocess.Popen(command, shell=True, env=env)
except subprocess.TimeoutExpired:
self.log.error('Script timed out after %ds' % timeout)
except Exception as e:
self.log.error('Script error: %s' % e)