From 30a4fc6ed783e6a1e839caf51288bfbd7f0e9735 Mon Sep 17 00:00:00 2001 From: Boris Yumankulov Date: Wed, 11 Jun 2025 19:08:58 +0500 Subject: [PATCH] feat(input-manager): add haptic feedback for game launch with gamepad Signed-off-by: Boris Yumankulov --- portprotonqt/input_manager.py | 61 ++++++++++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/portprotonqt/input_manager.py b/portprotonqt/input_manager.py index e76f158..3640bfe 100644 --- a/portprotonqt/input_manager.py +++ b/portprotonqt/input_manager.py @@ -1,8 +1,8 @@ import time import threading from typing import Protocol, cast -from evdev import InputDevice, ecodes, list_devices -import pyudev +from evdev import InputDevice, InputEvent, ecodes, list_devices, ff +from pyudev import Context, Monitor, MonitorObserver, Device from PySide6.QtWidgets import QWidget, QStackedWidget, QApplication, QScrollArea, QLineEdit, QDialog, QMenu, QComboBox, QListView, QMessageBox from PySide6.QtCore import Qt, QObject, QEvent, QPoint, Signal, Slot, QTimer from PySide6.QtGui import QKeyEvent @@ -52,8 +52,7 @@ class InputManager(QObject): """ Manages input from gamepads and keyboards for navigating the application interface. Supports gamepad hotplugging, button and axis events, and keyboard event filtering - for seamless UI interaction. Enables fullscreen mode when a gamepad is connected - and restores normal mode when disconnected. + for seamless UI interaction. """ # Signals for gamepad events button_pressed = Signal(int) # Signal for button presses @@ -83,6 +82,7 @@ class InputManager(QObject): self.gamepad_thread: threading.Thread | None = None self.running = True self._is_fullscreen = read_fullscreen_config() + self.rumble_effect_id: int | None = None # Store the rumble effect ID # Add variables for continuous D-pad movement self.dpad_timer = QTimer(self) @@ -126,6 +126,46 @@ class InputManager(QObject): except Exception as e: logger.error(f"Error in handle_fullscreen_slot: {e}", exc_info=True) + def trigger_rumble(self, duration_ms: int = 200, strong_magnitude: int = 0x8000, weak_magnitude: int = 0x8000) -> None: + """Trigger a rumble effect on the gamepad if supported.""" + if not self.gamepad: + return + try: + # Check if the gamepad supports force feedback + caps = self.gamepad.capabilities() + if ecodes.EV_FF not in caps or ecodes.FF_RUMBLE not in caps.get(ecodes.EV_FF, []): + logger.debug("Gamepad does not support force feedback or rumble") + return + + # Create a rumble effect + rumble = ff.Rumble(strong_magnitude=strong_magnitude, weak_magnitude=weak_magnitude) + effect = ff.Effect( + id=-1, # Let evdev assign an ID + type=ecodes.FF_RUMBLE, + direction=0, # Direction (not used for rumble) + replay=ff.Replay(length=duration_ms, delay=0), + u=ff.EffectType(ff_rumble_effect=rumble) + ) + + # Upload the effect + self.rumble_effect_id = self.gamepad.upload_effect(effect) + # Play the effect + event = InputEvent(0, 0, ecodes.EV_FF, self.rumble_effect_id, 1) + self.gamepad.write_event(event) + # Schedule effect erasure after duration + QTimer.singleShot(duration_ms, self.stop_rumble) + except Exception as e: + logger.error(f"Error triggering rumble: {e}", exc_info=True) + + def stop_rumble(self) -> None: + """Stop the rumble effect and clean up.""" + if self.gamepad and self.rumble_effect_id is not None: + try: + self.gamepad.erase_effect(self.rumble_effect_id) + self.rumble_effect_id = None + except Exception as e: + logger.error(f"Error stopping rumble: {e}", exc_info=True) + @Slot(int) def handle_button_slot(self, button_code: int) -> None: try: @@ -222,6 +262,7 @@ class InputManager(QObject): # Game launch on detail page if (button_code in BUTTONS['confirm']) and self._parent.currentDetailPage is not None and self._parent.current_add_game_dialog is None: if self._parent.current_exec_line: + self.trigger_rumble() self._parent.toggleGame(self._parent.current_exec_line, None) return @@ -728,17 +769,17 @@ class InputManager(QObject): def run_udev_monitor(self) -> None: try: - context = pyudev.Context() - monitor = pyudev.Monitor.from_netlink(context) + context = Context() + monitor = Monitor.from_netlink(context) monitor.filter_by(subsystem='input') - observer = pyudev.MonitorObserver(monitor, self.handle_udev_event) + observer = MonitorObserver(monitor, self.handle_udev_event) observer.start() while self.running: time.sleep(1) except Exception as e: logger.error(f"Error in udev monitor: {e}", exc_info=True) - def handle_udev_event(self, action: str, device: pyudev.Device) -> None: + def handle_udev_event(self, action: str, device: Device) -> None: try: if action == 'add': time.sleep(0.1) @@ -746,6 +787,7 @@ class InputManager(QObject): elif action == 'remove' and self.gamepad: if not any(self.gamepad.path == path for path in list_devices()): logger.info("Gamepad disconnected") + self.stop_rumble() self.gamepad = None if self.gamepad_thread: self.gamepad_thread.join() @@ -759,6 +801,7 @@ class InputManager(QObject): new_gamepad = self.find_gamepad() if new_gamepad and new_gamepad != self.gamepad: logger.info(f"Gamepad connected: {new_gamepad.name}") + self.stop_rumble() self.gamepad = new_gamepad if self.gamepad_thread: self.gamepad_thread.join() @@ -811,6 +854,7 @@ class InputManager(QObject): finally: if self.gamepad: try: + self.stop_rumble() self.gamepad.close() except Exception: pass @@ -820,6 +864,7 @@ class InputManager(QObject): try: self.running = False self.dpad_timer.stop() + self.stop_rumble() if self.gamepad_thread: self.gamepad_thread.join() if self.gamepad: