################################################################################ # # # Lightweight phase tracing for bbctrl restart / boot timing. # # # # Anchored at module import time. All timestamps are seconds since the # # process anchor (monotonic). A wall-clock anchor is captured once so the # # timeline can be aligned with journalctl / systemd-analyze. # # # # Set BBCTRL_TRACE=0 in the environment to disable all marks (no-op). # # # # Exposed by /api/diag/timing as JSON. # # # ################################################################################ """Bbctrl restart / startup tracing. Usage: import bbctrl.Trace as T T.mark('proc.start') with T.span('ctrl.avr.init'): ... The timeline is also dumped on demand via /api/diag/timing. """ import os import time import json import threading _ENABLED = os.environ.get('BBCTRL_TRACE', '1') != '0' _t0_monotonic = time.monotonic() _t0_wall = time.time() _lock = threading.Lock() _events = [] # list of dicts: {t, name, fields} _ui_timing = None # last timeline POSTed by the browser def _read_kernel_anchors(): """Return (btime_wall, uptime_at_anchor) so we can express bbctrl events in seconds since kernel boot. btime_wall: wall-clock epoch seconds when the kernel booted (from /proc/stat 'btime'). uptime_at_anchor: monotonic offset (seconds since kernel boot) at the moment Trace was imported. Equivalent to (Trace anchor) - btime in wall time, but read directly from /proc/uptime so it isn't sensitive to wall-clock skew. """ btime = None uptime_at_anchor = None try: with open('/proc/stat') as f: for line in f: if line.startswith('btime '): btime = int(line.split()[1]) break except Exception: pass try: with open('/proc/uptime') as f: uptime_at_anchor = float(f.read().split()[0]) except Exception: pass return btime, uptime_at_anchor _btime_wall, _uptime_at_anchor = _read_kernel_anchors() def now(): return time.monotonic() - _t0_monotonic def mark(name, **fields): """Record a single named event at the current monotonic time.""" if not _ENABLED: return t = now() ev = {'t': round(t, 4), 'name': name} if fields: ev['fields'] = fields with _lock: _events.append(ev) # Also surface in the regular log stream so journalctl shows it. try: extras = '' if fields: extras = ' ' + ' '.join('%s=%s' % (k, v) for k, v in fields.items()) print('TRACE +%.3fs %s%s' % (t, name, extras), flush=True) except Exception: pass class span(object): """Context manager that emits .start / .end with duration.""" def __init__(self, name, **fields): self.name = name self.fields = fields self._t = None def __enter__(self): if _ENABLED: self._t = time.monotonic() mark(self.name + '.start', **self.fields) return self def __exit__(self, exc_type, exc, tb): if _ENABLED and self._t is not None: dur_ms = int((time.monotonic() - self._t) * 1000) extra = dict(self.fields) extra['dur_ms'] = dur_ms if exc_type is not None: extra['error'] = exc_type.__name__ mark(self.name + '.end', **extra) return False def set_ui_timing(data): global _ui_timing _ui_timing = data def _current_uptime(): try: with open('/proc/uptime') as f: return float(f.read().split()[0]) except Exception: return None def timeline(): with _lock: events = list(_events) return { 'enabled': _ENABLED, 't0_wall': _t0_wall, 't0_iso': time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(_t0_wall)), 'now': now(), 'pid': os.getpid(), 'events': events, 'ui': _ui_timing, # Kernel-boot anchors so the timeline can be expressed in # "seconds since power on". 'btime_wall': _btime_wall, 'uptime_at_anchor': _uptime_at_anchor, 'uptime_now': _current_uptime(), } def dump(path): try: with open(path, 'w') as f: json.dump(timeline(), f, indent=2) except Exception: pass # Sd_notify helper ------------------------------------------------------------- # # Allows bbctrl to tell systemd "I am ready" / "current status is X" so # `systemctl status bbctrl` and `systemd-analyze critical-chain` reflect the # actual application state instead of just exec start. def sd_notify(state): """Send a status line to systemd. Safe no-op when not under systemd.""" addr = os.environ.get('NOTIFY_SOCKET') if not addr: return try: import socket sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) try: # Abstract socket if it starts with '@' target = '\0' + addr[1:] if addr.startswith('@') else addr sock.sendto(state.encode('utf-8'), target) finally: sock.close() except Exception: pass # Mark module-import time so even importing bbctrl shows up. mark('trace.import')