Gamepad support improvements.
-Added support for the XBox 360 and XBox One controllers. - Both sets of buttons/triggers control "lock" on all controllers. - Improved logging output, especially in debug mode. - Filter out "noisy" controller events that happen inside the "deadband" - Error handling (and logging) when parsing a bad "gamepads.json"
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
from bbctrl.Ctrl import Ctrl
|
||||
from bbctrl.Log import Logger
|
||||
from evdev.ecodes import EV_ABS, EV_KEY
|
||||
from evdev.ecodes import EV, EV_ABS, EV_KEY
|
||||
import errno
|
||||
import evdev
|
||||
import functools
|
||||
@@ -19,7 +19,8 @@ gamepadConfigs = {
|
||||
"sign-x": 1,
|
||||
"sign-y": -1,
|
||||
"sign-z": -1,
|
||||
"deadband": 0.15
|
||||
"deadband": 0.15,
|
||||
"debug": False,
|
||||
},
|
||||
"9E2B3A63": {
|
||||
"description": "Logitech 710, X mode",
|
||||
@@ -34,6 +35,8 @@ gamepadConfigs = {
|
||||
"EV_ABS:4": "axis-z",
|
||||
"EV_KEY:310": "lock-y",
|
||||
"EV_KEY:311": "lock-x",
|
||||
"EV_ABS:2": "lock-y",
|
||||
"EV_ABS:5": "lock-x",
|
||||
},
|
||||
"B98EF4EC": {
|
||||
"description": "Logitech 710, D mode",
|
||||
@@ -48,6 +51,8 @@ gamepadConfigs = {
|
||||
"EV_ABS:5": "axis-z",
|
||||
"EV_KEY:308": "lock-y",
|
||||
"EV_KEY:309": "lock-x",
|
||||
"EV_KEY:310": "lock-y",
|
||||
"EV_KEY:311": "lock-x",
|
||||
},
|
||||
"268256FD": {
|
||||
"description": "EasySMX ESM-9013, top lights mode",
|
||||
@@ -62,6 +67,8 @@ gamepadConfigs = {
|
||||
"EV_ABS:4": "axis-z",
|
||||
"EV_KEY:310": "lock-y",
|
||||
"EV_KEY:311": "lock-x",
|
||||
"EV_ABS:2": "lock-y",
|
||||
"EV_ABS:5": "lock-x",
|
||||
},
|
||||
"23CEC0CB": {
|
||||
"description": "EasySMX ESM-9013, left lights mode",
|
||||
@@ -76,6 +83,8 @@ gamepadConfigs = {
|
||||
"EV_ABS:5": "axis-z",
|
||||
"EV_KEY:308": "lock-y",
|
||||
"EV_KEY:309": "lock-x",
|
||||
"EV_KEY:310": "lock-y",
|
||||
"EV_KEY:311": "lock-x",
|
||||
},
|
||||
"370DCB72": {
|
||||
"description": "EasySMX ESM-9013, bottom lights mode",
|
||||
@@ -90,6 +99,8 @@ gamepadConfigs = {
|
||||
"EV_ABS:5": "axis-z",
|
||||
"EV_KEY:310": "lock-y",
|
||||
"EV_KEY:311": "lock-x",
|
||||
"EV_KEY:312": "lock-y",
|
||||
"EV_KEY:313": "lock-x",
|
||||
},
|
||||
"0BD0841F": {
|
||||
"description": "Sony Playstation 4 Dual-Shock Controller",
|
||||
@@ -104,6 +115,40 @@ gamepadConfigs = {
|
||||
"EV_ABS:5": "axis-z",
|
||||
"EV_KEY:308": "lock-y",
|
||||
"EV_KEY:309": "lock-x",
|
||||
"EV_KEY:310": "lock-y",
|
||||
"EV_KEY:311": "lock-x",
|
||||
},
|
||||
"06656EBD": {
|
||||
"description": "XBox One Controller",
|
||||
"EV_KEY:308": "speed-4",
|
||||
"EV_KEY:305": "speed-3",
|
||||
"EV_KEY:304": "speed-2",
|
||||
"EV_KEY:307": "speed-1",
|
||||
"EV_ABS:0": "axis-x",
|
||||
"EV_ABS:16": "axis-x",
|
||||
"EV_ABS:1": "axis-y",
|
||||
"EV_ABS:17": "axis-y",
|
||||
"EV_ABS:4": "axis-z",
|
||||
"EV_KEY:310": "lock-y",
|
||||
"EV_KEY:311": "lock-x",
|
||||
"EV_ABS:2": "lock-y",
|
||||
"EV_ABS:5": "lock-x",
|
||||
},
|
||||
"BFF99E89": {
|
||||
"description": "XBox 360 Controller",
|
||||
"EV_KEY:308": "speed-4",
|
||||
"EV_KEY:305": "speed-3",
|
||||
"EV_KEY:304": "speed-2",
|
||||
"EV_KEY:307": "speed-1",
|
||||
"EV_ABS:0": "axis-x",
|
||||
"EV_ABS:16": "axis-x",
|
||||
"EV_ABS:1": "axis-y",
|
||||
"EV_ABS:17": "axis-y",
|
||||
"EV_ABS:4": "axis-z",
|
||||
"EV_KEY:310": "lock-y",
|
||||
"EV_KEY:311": "lock-x",
|
||||
"EV_ABS:2": "lock-y",
|
||||
"EV_ABS:5": "lock-x",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,7 +167,7 @@ def get_udev_prop(device: pyudev.Device, propertyName: str):
|
||||
return None
|
||||
|
||||
|
||||
def to_sorted_json(value):
|
||||
def sorted_json(value):
|
||||
return json.dumps(value, sort_keys=True)
|
||||
|
||||
|
||||
@@ -145,8 +190,29 @@ def processCapabilities(capabilities):
|
||||
return result
|
||||
|
||||
|
||||
# A forward declaration, so Command can reference it
|
||||
class Gamepad(object):
|
||||
_logRecord = set()
|
||||
pass
|
||||
|
||||
|
||||
class Command(object):
|
||||
|
||||
def __init__(self, id: str, event: evdev.InputEvent, value: int,
|
||||
gamepad: Gamepad):
|
||||
self.id = id
|
||||
self.event = event
|
||||
self.value = value
|
||||
self.gamepad = gamepad
|
||||
|
||||
def __str__(self):
|
||||
return "Command({}={}, Event(type={}, code={}, value={}))".format(
|
||||
self.id, self.value, EV[self.event.type], self.event.code,
|
||||
self.event.value)
|
||||
|
||||
|
||||
class Gamepad(object):
|
||||
_logOnceRecord = set()
|
||||
_eventValuesByCode = {}
|
||||
|
||||
def __init__(self, log: Logger, _evdev: evdev.InputDevice,
|
||||
_udev: pyudev.Device):
|
||||
@@ -168,7 +234,7 @@ class Gamepad(object):
|
||||
for key in _udev.properties}
|
||||
}
|
||||
|
||||
json = to_sorted_json(self._details["evdev"])
|
||||
json = sorted_json(self._details["evdev"])
|
||||
self.hash = hashlib.sha256(json.encode()).hexdigest()[-8:].upper()
|
||||
|
||||
self.config = {
|
||||
@@ -189,6 +255,31 @@ class Gamepad(object):
|
||||
def devicePath(self):
|
||||
return self._evdev.path
|
||||
|
||||
def getCommandFromEvent(self, event: evdev.InputEvent) -> Command:
|
||||
if event.type not in [EV_ABS, EV_KEY]:
|
||||
return
|
||||
|
||||
value = self.scaleAndClampValue(event)
|
||||
|
||||
lastValue = self._eventValuesByCode.get(event.code)
|
||||
if value == lastValue:
|
||||
return
|
||||
|
||||
self._eventValuesByCode[event.code] = value
|
||||
|
||||
eventSignature = "{}:{}".format(EV[event.type], event.code)
|
||||
commandId = self.config.get(eventSignature)
|
||||
|
||||
if not commandId:
|
||||
self.logOnce("Unmapped event: {}".format(eventSignature))
|
||||
return
|
||||
|
||||
command = Command(commandId, event, value, self)
|
||||
|
||||
self.logDebug(command)
|
||||
|
||||
return command
|
||||
|
||||
def scaleAndClampValue(self, event: evdev.InputEvent):
|
||||
if event.type != EV_ABS:
|
||||
return event.value
|
||||
@@ -214,14 +305,16 @@ class Gamepad(object):
|
||||
# e.g. if value == deadband, the new value will be zero
|
||||
delta = value - deadband
|
||||
range = 1 - deadband
|
||||
return (delta * sign) / range
|
||||
value = (delta * sign) / range
|
||||
|
||||
return round(value, 3)
|
||||
|
||||
def log(self, msg):
|
||||
self._log.info("{}: {}".format(self.hash, msg))
|
||||
|
||||
def logOnce(self, msg):
|
||||
if self.config.get("debug") or msg not in self._logRecord:
|
||||
self._logRecord.add(msg)
|
||||
if self.config.get("debug") or msg not in self._logOnceRecord:
|
||||
self._logOnceRecord.add(msg)
|
||||
self.log(msg)
|
||||
|
||||
def logDebug(self, msg):
|
||||
@@ -229,7 +322,7 @@ class Gamepad(object):
|
||||
self.log(msg)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return to_sorted_json({
|
||||
return sorted_json({
|
||||
"devicePath": self.devicePath,
|
||||
"bustype": self._evdev.info.bustype,
|
||||
"details": self._details,
|
||||
@@ -237,18 +330,6 @@ class Gamepad(object):
|
||||
})
|
||||
|
||||
|
||||
class Command(object):
|
||||
|
||||
def __init__(self, id: str, value: int, gamepad: Gamepad):
|
||||
self.id = id
|
||||
self.value = value
|
||||
self.gamepad = gamepad
|
||||
|
||||
def __str__(self):
|
||||
return "Command(id='{}', value={}, gamepad='{}')".format(
|
||||
self.id, self.value, self.gamepad.hash)
|
||||
|
||||
|
||||
class Jog(object):
|
||||
gamepads = {} # type: dict[typing.Union[int, str], Gamepad]
|
||||
lock = {"x": False, "y": False}
|
||||
@@ -267,11 +348,15 @@ class Jog(object):
|
||||
self._updateJogging()
|
||||
|
||||
def _loadUserGamepadConfigs(self):
|
||||
path = self.ctrl.get_path('gamepads.json')
|
||||
if os.path.exists(path):
|
||||
with open(path, 'r') as f:
|
||||
global userGamepadConfigs
|
||||
userGamepadConfigs = json.load(f)
|
||||
try:
|
||||
path = self.ctrl.get_path('gamepads.json')
|
||||
if os.path.exists(path):
|
||||
with open(path, 'r') as f:
|
||||
global userGamepadConfigs
|
||||
userGamepadConfigs = json.load(f)
|
||||
except:
|
||||
self.log.info(traceback.format_exc())
|
||||
self.log.info("Failed to read 'gamepads.json'")
|
||||
|
||||
def _startMonitoring(self):
|
||||
self.udev_context = pyudev.Context()
|
||||
@@ -291,7 +376,7 @@ class Jog(object):
|
||||
|
||||
if inputJoystick != 1:
|
||||
self.log.info("Ignoring non-gamepad device: {}".format(
|
||||
to_sorted_json(
|
||||
sorted_json(
|
||||
{key: udev.properties[key]
|
||||
for key in udev.properties})))
|
||||
continue
|
||||
@@ -320,7 +405,7 @@ class Jog(object):
|
||||
self.log, evdev.InputDevice(devicePath),
|
||||
pyudev.Devices.from_device_file(self.udev_context, devicePath))
|
||||
|
||||
self.log.info("Found gamepad: {}".format(str(gamepad)))
|
||||
self.log.info("Found gamepad: {}".format(gamepad))
|
||||
|
||||
self.gamepads[gamepad.fd] = self.gamepads[devicePath] = gamepad
|
||||
|
||||
@@ -332,8 +417,7 @@ class Jog(object):
|
||||
if not gamepad:
|
||||
return
|
||||
|
||||
self.log.info("Device removed: {}, {}".format(gamepad.hash,
|
||||
devicePath))
|
||||
gamepad.log("Gamepad removed: {}".format(devicePath))
|
||||
|
||||
self.ioloop.remove_handler(gamepad.fd)
|
||||
del self.gamepads[gamepad.devicePath]
|
||||
@@ -342,14 +426,13 @@ class Jog(object):
|
||||
def _gamepadHandler(self, fd, events):
|
||||
gamepad = self.gamepads.get(fd)
|
||||
if not gamepad:
|
||||
self.log.info("_gamepad_handler: Unknown gamepad? {}".format(fd))
|
||||
self.log.info("Unknown gamepad? {}".format(fd))
|
||||
return
|
||||
|
||||
try:
|
||||
for event in gamepad.read():
|
||||
command = self._getCommandFromEvent(gamepad, event)
|
||||
if command:
|
||||
self._processCommand(command)
|
||||
command = gamepad.getCommandFromEvent(event)
|
||||
self._processCommand(command)
|
||||
except BlockingIOError:
|
||||
pass
|
||||
except OSError as error:
|
||||
@@ -360,40 +443,22 @@ class Jog(object):
|
||||
except Exception as error:
|
||||
gamepad.log(traceback.format_exc())
|
||||
|
||||
def _getCommandFromEvent(self, gamepad: Gamepad,
|
||||
event: evdev.InputEvent) -> Command:
|
||||
if event.type not in [EV_ABS, EV_KEY]:
|
||||
return
|
||||
|
||||
eventSignature = "{}:{}".format(evdev.ecodes.EV[event.type],
|
||||
event.code)
|
||||
commandId = gamepad.config.get(eventSignature)
|
||||
|
||||
if not commandId:
|
||||
gamepad.logOnce("Unmapped event: {}:{}".format(
|
||||
gamepad.hash, eventSignature))
|
||||
return
|
||||
|
||||
gamepad.logDebug("Got event: {}".format(str(event)))
|
||||
|
||||
return Command(commandId, gamepad.scaleAndClampValue(event), gamepad)
|
||||
|
||||
def _processCommand(self, command: Command):
|
||||
if not command:
|
||||
return
|
||||
|
||||
processor = self.commandProcessors.get(command.id)
|
||||
if not processor:
|
||||
command.gamepad.log("Unrecognized command: {}".format(command.id))
|
||||
command.gamepad.log("Bad command: {}".format(command))
|
||||
return
|
||||
|
||||
command.gamepad.logDebug("Processing command: {}".format(str(command)))
|
||||
|
||||
processor(self, command)
|
||||
|
||||
def _processSpeedCommand(self, command: Command):
|
||||
match = re.match(r"^speed-(\d)$", command.id)
|
||||
speed = int(match.group(1)) if match else 0
|
||||
if speed not in [1, 2, 3, 4]:
|
||||
command.gamepad.log("Unrecognized speed command: {}".format(
|
||||
str(command)))
|
||||
command.gamepad.log("Bad speed command: {}".format(command))
|
||||
|
||||
self.changed = self.changed or self.speed != speed
|
||||
self.speed = speed
|
||||
@@ -402,8 +467,7 @@ class Jog(object):
|
||||
match = re.match(r"^axis-(.)$", command.id)
|
||||
axis = match.group(1) if match else ""
|
||||
if axis not in ["x", "y", "z"]:
|
||||
command.gamepad.log("Unrecognized axis command: {}".format(
|
||||
str(command)))
|
||||
command.gamepad.log("Bad axis command: {}".format(command))
|
||||
|
||||
sign = command.gamepad.config.get("sign-{}".format(axis), 1)
|
||||
oldValue = self.axes[axis]
|
||||
@@ -412,25 +476,20 @@ class Jog(object):
|
||||
self.axes[axis] = 0 if locked else command.value * sign
|
||||
self.changed = self.changed or oldValue != self.axes[axis]
|
||||
|
||||
command.gamepad.logDebug("_processAxisCommand: {}".format(
|
||||
json.dumps({
|
||||
"command.value": command.value,
|
||||
"axis": axis,
|
||||
"oldValue": oldValue,
|
||||
"value": self.axes[axis],
|
||||
"sign": sign,
|
||||
"locked": locked,
|
||||
"changed": self.changed
|
||||
})))
|
||||
command.gamepad.logDebug(
|
||||
"{}(value={}, oldValue={}, sign={}, locked={})".format(
|
||||
command.id, self.axes[axis], oldValue, sign, locked))
|
||||
|
||||
def _processLockCommand(self, command: Command):
|
||||
match = re.match(r"^lock-(.)$", command.id)
|
||||
axis = match.group(1) if match else ""
|
||||
if axis not in ["x", "y"]:
|
||||
command.gamepad.log("Unrecognized lock command: {}".format(
|
||||
str(command)))
|
||||
command.gamepad.log("Bad lock command: {}".format(command))
|
||||
|
||||
self.lock[axis] = bool(command.value)
|
||||
if command.event.type == EV_ABS:
|
||||
self.lock[axis] = bool(command.value > -0.9)
|
||||
else:
|
||||
self.lock[axis] = bool(command.value)
|
||||
|
||||
def _processDisabled(self, command: Command):
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user