feat(input-manager): add haptic feedback for game launch with gamepad
All checks were successful
Code and build check / Check code (push) Successful in 1m19s
Code and build check / Build with uv (push) Successful in 45s

Signed-off-by: Boris Yumankulov <boria138@altlinux.org>
This commit is contained in:
2025-06-11 19:08:58 +05:00
parent 2d7369d46c
commit 30a4fc6ed7

View File

@ -1,8 +1,8 @@
import time
import threading
from typing import Protocol, cast
from evdev import InputDevice, ecodes, list_devices
import pyudev
from evdev import InputDevice, InputEvent, ecodes, list_devices, ff
from pyudev import Context, Monitor, MonitorObserver, Device
from PySide6.QtWidgets import QWidget, QStackedWidget, QApplication, QScrollArea, QLineEdit, QDialog, QMenu, QComboBox, QListView, QMessageBox
from PySide6.QtCore import Qt, QObject, QEvent, QPoint, Signal, Slot, QTimer
from PySide6.QtGui import QKeyEvent
@ -52,8 +52,7 @@ class InputManager(QObject):
"""
Manages input from gamepads and keyboards for navigating the application interface.
Supports gamepad hotplugging, button and axis events, and keyboard event filtering
for seamless UI interaction. Enables fullscreen mode when a gamepad is connected
and restores normal mode when disconnected.
for seamless UI interaction.
"""
# Signals for gamepad events
button_pressed = Signal(int) # Signal for button presses
@ -83,6 +82,7 @@ class InputManager(QObject):
self.gamepad_thread: threading.Thread | None = None
self.running = True
self._is_fullscreen = read_fullscreen_config()
self.rumble_effect_id: int | None = None # Store the rumble effect ID
# Add variables for continuous D-pad movement
self.dpad_timer = QTimer(self)
@ -126,6 +126,46 @@ class InputManager(QObject):
except Exception as e:
logger.error(f"Error in handle_fullscreen_slot: {e}", exc_info=True)
def trigger_rumble(self, duration_ms: int = 200, strong_magnitude: int = 0x8000, weak_magnitude: int = 0x8000) -> None:
"""Trigger a rumble effect on the gamepad if supported."""
if not self.gamepad:
return
try:
# Check if the gamepad supports force feedback
caps = self.gamepad.capabilities()
if ecodes.EV_FF not in caps or ecodes.FF_RUMBLE not in caps.get(ecodes.EV_FF, []):
logger.debug("Gamepad does not support force feedback or rumble")
return
# Create a rumble effect
rumble = ff.Rumble(strong_magnitude=strong_magnitude, weak_magnitude=weak_magnitude)
effect = ff.Effect(
id=-1, # Let evdev assign an ID
type=ecodes.FF_RUMBLE,
direction=0, # Direction (not used for rumble)
replay=ff.Replay(length=duration_ms, delay=0),
u=ff.EffectType(ff_rumble_effect=rumble)
)
# Upload the effect
self.rumble_effect_id = self.gamepad.upload_effect(effect)
# Play the effect
event = InputEvent(0, 0, ecodes.EV_FF, self.rumble_effect_id, 1)
self.gamepad.write_event(event)
# Schedule effect erasure after duration
QTimer.singleShot(duration_ms, self.stop_rumble)
except Exception as e:
logger.error(f"Error triggering rumble: {e}", exc_info=True)
def stop_rumble(self) -> None:
"""Stop the rumble effect and clean up."""
if self.gamepad and self.rumble_effect_id is not None:
try:
self.gamepad.erase_effect(self.rumble_effect_id)
self.rumble_effect_id = None
except Exception as e:
logger.error(f"Error stopping rumble: {e}", exc_info=True)
@Slot(int)
def handle_button_slot(self, button_code: int) -> None:
try:
@ -222,6 +262,7 @@ class InputManager(QObject):
# Game launch on detail page
if (button_code in BUTTONS['confirm']) and self._parent.currentDetailPage is not None and self._parent.current_add_game_dialog is None:
if self._parent.current_exec_line:
self.trigger_rumble()
self._parent.toggleGame(self._parent.current_exec_line, None)
return
@ -728,17 +769,17 @@ class InputManager(QObject):
def run_udev_monitor(self) -> None:
try:
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
context = Context()
monitor = Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
observer = pyudev.MonitorObserver(monitor, self.handle_udev_event)
observer = MonitorObserver(monitor, self.handle_udev_event)
observer.start()
while self.running:
time.sleep(1)
except Exception as e:
logger.error(f"Error in udev monitor: {e}", exc_info=True)
def handle_udev_event(self, action: str, device: pyudev.Device) -> None:
def handle_udev_event(self, action: str, device: Device) -> None:
try:
if action == 'add':
time.sleep(0.1)
@ -746,6 +787,7 @@ class InputManager(QObject):
elif action == 'remove' and self.gamepad:
if not any(self.gamepad.path == path for path in list_devices()):
logger.info("Gamepad disconnected")
self.stop_rumble()
self.gamepad = None
if self.gamepad_thread:
self.gamepad_thread.join()
@ -759,6 +801,7 @@ class InputManager(QObject):
new_gamepad = self.find_gamepad()
if new_gamepad and new_gamepad != self.gamepad:
logger.info(f"Gamepad connected: {new_gamepad.name}")
self.stop_rumble()
self.gamepad = new_gamepad
if self.gamepad_thread:
self.gamepad_thread.join()
@ -811,6 +854,7 @@ class InputManager(QObject):
finally:
if self.gamepad:
try:
self.stop_rumble()
self.gamepad.close()
except Exception:
pass
@ -820,6 +864,7 @@ class InputManager(QObject):
try:
self.running = False
self.dpad_timer.stop()
self.stop_rumble()
if self.gamepad_thread:
self.gamepad_thread.join()
if self.gamepad: