From cce4e17b492559d7afceccf63811dc543c7bb0c6 Mon Sep 17 00:00:00 2001 From: David Carley Date: Thu, 1 Sep 2022 00:09:11 +0000 Subject: [PATCH] Gamepad support improvements. -Added support for the XBox 360 and XBox One controllers. - Both sets of buttons/triggers control "lock" on all controllers. - Improved logging output, especially in debug mode. - Filter out "noisy" controller events that happen inside the "deadband" - Error handling (and logging) when parsing a bad "gamepads.json" --- src/py/bbctrl/Jog.py | 203 ++++++++++++++++++++++++++++--------------- 1 file changed, 131 insertions(+), 72 deletions(-) diff --git a/src/py/bbctrl/Jog.py b/src/py/bbctrl/Jog.py index 1b2aff3..0516931 100644 --- a/src/py/bbctrl/Jog.py +++ b/src/py/bbctrl/Jog.py @@ -1,6 +1,6 @@ from bbctrl.Ctrl import Ctrl from bbctrl.Log import Logger -from evdev.ecodes import EV_ABS, EV_KEY +from evdev.ecodes import EV, EV_ABS, EV_KEY import errno import evdev import functools @@ -19,7 +19,8 @@ gamepadConfigs = { "sign-x": 1, "sign-y": -1, "sign-z": -1, - "deadband": 0.15 + "deadband": 0.15, + "debug": False, }, "9E2B3A63": { "description": "Logitech 710, X mode", @@ -34,6 +35,8 @@ gamepadConfigs = { "EV_ABS:4": "axis-z", "EV_KEY:310": "lock-y", "EV_KEY:311": "lock-x", + "EV_ABS:2": "lock-y", + "EV_ABS:5": "lock-x", }, "B98EF4EC": { "description": "Logitech 710, D mode", @@ -48,6 +51,8 @@ gamepadConfigs = { "EV_ABS:5": "axis-z", "EV_KEY:308": "lock-y", "EV_KEY:309": "lock-x", + "EV_KEY:310": "lock-y", + "EV_KEY:311": "lock-x", }, "268256FD": { "description": "EasySMX ESM-9013, top lights mode", @@ -62,6 +67,8 @@ gamepadConfigs = { "EV_ABS:4": "axis-z", "EV_KEY:310": "lock-y", "EV_KEY:311": "lock-x", + "EV_ABS:2": "lock-y", + "EV_ABS:5": "lock-x", }, "23CEC0CB": { "description": "EasySMX ESM-9013, left lights mode", @@ -76,6 +83,8 @@ gamepadConfigs = { "EV_ABS:5": "axis-z", "EV_KEY:308": "lock-y", "EV_KEY:309": "lock-x", + "EV_KEY:310": "lock-y", + "EV_KEY:311": "lock-x", }, "370DCB72": { "description": "EasySMX ESM-9013, bottom lights mode", @@ -90,6 +99,8 @@ gamepadConfigs = { "EV_ABS:5": "axis-z", "EV_KEY:310": "lock-y", "EV_KEY:311": "lock-x", + "EV_KEY:312": "lock-y", + "EV_KEY:313": "lock-x", }, "0BD0841F": { "description": "Sony Playstation 4 Dual-Shock Controller", @@ -104,6 +115,40 @@ gamepadConfigs = { "EV_ABS:5": "axis-z", "EV_KEY:308": "lock-y", "EV_KEY:309": "lock-x", + "EV_KEY:310": "lock-y", + "EV_KEY:311": "lock-x", + }, + "06656EBD": { + "description": "XBox One Controller", + "EV_KEY:308": "speed-4", + "EV_KEY:305": "speed-3", + "EV_KEY:304": "speed-2", + "EV_KEY:307": "speed-1", + "EV_ABS:0": "axis-x", + "EV_ABS:16": "axis-x", + "EV_ABS:1": "axis-y", + "EV_ABS:17": "axis-y", + "EV_ABS:4": "axis-z", + "EV_KEY:310": "lock-y", + "EV_KEY:311": "lock-x", + "EV_ABS:2": "lock-y", + "EV_ABS:5": "lock-x", + }, + "BFF99E89": { + "description": "XBox 360 Controller", + "EV_KEY:308": "speed-4", + "EV_KEY:305": "speed-3", + "EV_KEY:304": "speed-2", + "EV_KEY:307": "speed-1", + "EV_ABS:0": "axis-x", + "EV_ABS:16": "axis-x", + "EV_ABS:1": "axis-y", + "EV_ABS:17": "axis-y", + "EV_ABS:4": "axis-z", + "EV_KEY:310": "lock-y", + "EV_KEY:311": "lock-x", + "EV_ABS:2": "lock-y", + "EV_ABS:5": "lock-x", } } @@ -122,7 +167,7 @@ def get_udev_prop(device: pyudev.Device, propertyName: str): return None -def to_sorted_json(value): +def sorted_json(value): return json.dumps(value, sort_keys=True) @@ -145,8 +190,29 @@ def processCapabilities(capabilities): return result +# A forward declaration, so Command can reference it class Gamepad(object): - _logRecord = set() + pass + + +class Command(object): + + def __init__(self, id: str, event: evdev.InputEvent, value: int, + gamepad: Gamepad): + self.id = id + self.event = event + self.value = value + self.gamepad = gamepad + + def __str__(self): + return "Command({}={}, Event(type={}, code={}, value={}))".format( + self.id, self.value, EV[self.event.type], self.event.code, + self.event.value) + + +class Gamepad(object): + _logOnceRecord = set() + _eventValuesByCode = {} def __init__(self, log: Logger, _evdev: evdev.InputDevice, _udev: pyudev.Device): @@ -168,7 +234,7 @@ class Gamepad(object): for key in _udev.properties} } - json = to_sorted_json(self._details["evdev"]) + json = sorted_json(self._details["evdev"]) self.hash = hashlib.sha256(json.encode()).hexdigest()[-8:].upper() self.config = { @@ -189,6 +255,31 @@ class Gamepad(object): def devicePath(self): return self._evdev.path + def getCommandFromEvent(self, event: evdev.InputEvent) -> Command: + if event.type not in [EV_ABS, EV_KEY]: + return + + value = self.scaleAndClampValue(event) + + lastValue = self._eventValuesByCode.get(event.code) + if value == lastValue: + return + + self._eventValuesByCode[event.code] = value + + eventSignature = "{}:{}".format(EV[event.type], event.code) + commandId = self.config.get(eventSignature) + + if not commandId: + self.logOnce("Unmapped event: {}".format(eventSignature)) + return + + command = Command(commandId, event, value, self) + + self.logDebug(command) + + return command + def scaleAndClampValue(self, event: evdev.InputEvent): if event.type != EV_ABS: return event.value @@ -214,14 +305,16 @@ class Gamepad(object): # e.g. if value == deadband, the new value will be zero delta = value - deadband range = 1 - deadband - return (delta * sign) / range + value = (delta * sign) / range + + return round(value, 3) def log(self, msg): self._log.info("{}: {}".format(self.hash, msg)) def logOnce(self, msg): - if self.config.get("debug") or msg not in self._logRecord: - self._logRecord.add(msg) + if self.config.get("debug") or msg not in self._logOnceRecord: + self._logOnceRecord.add(msg) self.log(msg) def logDebug(self, msg): @@ -229,7 +322,7 @@ class Gamepad(object): self.log(msg) def __str__(self) -> str: - return to_sorted_json({ + return sorted_json({ "devicePath": self.devicePath, "bustype": self._evdev.info.bustype, "details": self._details, @@ -237,18 +330,6 @@ class Gamepad(object): }) -class Command(object): - - def __init__(self, id: str, value: int, gamepad: Gamepad): - self.id = id - self.value = value - self.gamepad = gamepad - - def __str__(self): - return "Command(id='{}', value={}, gamepad='{}')".format( - self.id, self.value, self.gamepad.hash) - - class Jog(object): gamepads = {} # type: dict[typing.Union[int, str], Gamepad] lock = {"x": False, "y": False} @@ -267,11 +348,15 @@ class Jog(object): self._updateJogging() def _loadUserGamepadConfigs(self): - path = self.ctrl.get_path('gamepads.json') - if os.path.exists(path): - with open(path, 'r') as f: - global userGamepadConfigs - userGamepadConfigs = json.load(f) + try: + path = self.ctrl.get_path('gamepads.json') + if os.path.exists(path): + with open(path, 'r') as f: + global userGamepadConfigs + userGamepadConfigs = json.load(f) + except: + self.log.info(traceback.format_exc()) + self.log.info("Failed to read 'gamepads.json'") def _startMonitoring(self): self.udev_context = pyudev.Context() @@ -291,7 +376,7 @@ class Jog(object): if inputJoystick != 1: self.log.info("Ignoring non-gamepad device: {}".format( - to_sorted_json( + sorted_json( {key: udev.properties[key] for key in udev.properties}))) continue @@ -320,7 +405,7 @@ class Jog(object): self.log, evdev.InputDevice(devicePath), pyudev.Devices.from_device_file(self.udev_context, devicePath)) - self.log.info("Found gamepad: {}".format(str(gamepad))) + self.log.info("Found gamepad: {}".format(gamepad)) self.gamepads[gamepad.fd] = self.gamepads[devicePath] = gamepad @@ -332,8 +417,7 @@ class Jog(object): if not gamepad: return - self.log.info("Device removed: {}, {}".format(gamepad.hash, - devicePath)) + gamepad.log("Gamepad removed: {}".format(devicePath)) self.ioloop.remove_handler(gamepad.fd) del self.gamepads[gamepad.devicePath] @@ -342,14 +426,13 @@ class Jog(object): def _gamepadHandler(self, fd, events): gamepad = self.gamepads.get(fd) if not gamepad: - self.log.info("_gamepad_handler: Unknown gamepad? {}".format(fd)) + self.log.info("Unknown gamepad? {}".format(fd)) return try: for event in gamepad.read(): - command = self._getCommandFromEvent(gamepad, event) - if command: - self._processCommand(command) + command = gamepad.getCommandFromEvent(event) + self._processCommand(command) except BlockingIOError: pass except OSError as error: @@ -360,40 +443,22 @@ class Jog(object): except Exception as error: gamepad.log(traceback.format_exc()) - def _getCommandFromEvent(self, gamepad: Gamepad, - event: evdev.InputEvent) -> Command: - if event.type not in [EV_ABS, EV_KEY]: - return - - eventSignature = "{}:{}".format(evdev.ecodes.EV[event.type], - event.code) - commandId = gamepad.config.get(eventSignature) - - if not commandId: - gamepad.logOnce("Unmapped event: {}:{}".format( - gamepad.hash, eventSignature)) - return - - gamepad.logDebug("Got event: {}".format(str(event))) - - return Command(commandId, gamepad.scaleAndClampValue(event), gamepad) - def _processCommand(self, command: Command): + if not command: + return + processor = self.commandProcessors.get(command.id) if not processor: - command.gamepad.log("Unrecognized command: {}".format(command.id)) + command.gamepad.log("Bad command: {}".format(command)) return - command.gamepad.logDebug("Processing command: {}".format(str(command))) - processor(self, command) def _processSpeedCommand(self, command: Command): match = re.match(r"^speed-(\d)$", command.id) speed = int(match.group(1)) if match else 0 if speed not in [1, 2, 3, 4]: - command.gamepad.log("Unrecognized speed command: {}".format( - str(command))) + command.gamepad.log("Bad speed command: {}".format(command)) self.changed = self.changed or self.speed != speed self.speed = speed @@ -402,8 +467,7 @@ class Jog(object): match = re.match(r"^axis-(.)$", command.id) axis = match.group(1) if match else "" if axis not in ["x", "y", "z"]: - command.gamepad.log("Unrecognized axis command: {}".format( - str(command))) + command.gamepad.log("Bad axis command: {}".format(command)) sign = command.gamepad.config.get("sign-{}".format(axis), 1) oldValue = self.axes[axis] @@ -412,25 +476,20 @@ class Jog(object): self.axes[axis] = 0 if locked else command.value * sign self.changed = self.changed or oldValue != self.axes[axis] - command.gamepad.logDebug("_processAxisCommand: {}".format( - json.dumps({ - "command.value": command.value, - "axis": axis, - "oldValue": oldValue, - "value": self.axes[axis], - "sign": sign, - "locked": locked, - "changed": self.changed - }))) + command.gamepad.logDebug( + "{}(value={}, oldValue={}, sign={}, locked={})".format( + command.id, self.axes[axis], oldValue, sign, locked)) def _processLockCommand(self, command: Command): match = re.match(r"^lock-(.)$", command.id) axis = match.group(1) if match else "" if axis not in ["x", "y"]: - command.gamepad.log("Unrecognized lock command: {}".format( - str(command))) + command.gamepad.log("Bad lock command: {}".format(command)) - self.lock[axis] = bool(command.value) + if command.event.type == EV_ABS: + self.lock[axis] = bool(command.value > -0.9) + else: + self.lock[axis] = bool(command.value) def _processDisabled(self, command: Command): pass