fix(input-manager): resolve threading error in gamepad events

Signed-off-by: Boris Yumankulov <boria138@altlinux.org>
This commit is contained in:
Boris Yumankulov 2025-06-05 17:19:32 +05:00
parent b317e4760b
commit fe208f0783
Signed by: Boria138
GPG Key ID: 14B4A5673FD39C76

View File

@ -4,13 +4,13 @@ from typing import Protocol, cast
from evdev import InputDevice, ecodes, list_devices
import pyudev
from PySide6.QtWidgets import QWidget, QStackedWidget, QApplication, QScrollArea, QLineEdit
from PySide6.QtCore import Qt, QObject, QEvent, QPoint
from PySide6.QtCore import Qt, QObject, QEvent, QPoint, Signal, Slot
from PySide6.QtGui import QKeyEvent
from portprotonqt.logger import get_logger
from portprotonqt.image_utils import FullscreenDialog
from portprotonqt.custom_widgets import NavLabel
from portprotonqt.game_card import GameCard
from portprotonqt.config_utils import read_fullscreen_config
from portprotonqt.config_utils import read_fullscreen_config, read_window_geometry, save_window_geometry
logger = get_logger(__name__)
@ -31,23 +31,15 @@ class MainWindowProtocol(Protocol):
currentDetailPage: QWidget | None
current_exec_line: str | None
# Mapping of actions to evdev button codes, includes PlayStation, Xbox, and Switch controllers (https://www.kernel.org/doc/html/v4.12/input/gamepad.html)
# Mapping of actions to evdev button codes, includes PlayStation, Xbox, and Switch controllers
BUTTONS = {
# South button: X (PlayStation), A (Xbox), B (Switch Joy-Con south)
'confirm': {ecodes.BTN_SOUTH, ecodes.BTN_A},
# East button: Circle (PS), B (Xbox), A (Switch Joy-Con east)
'back': {ecodes.BTN_EAST, ecodes.BTN_B},
# North button: Triangle (PS), Y (Xbox), X (Switch Joy-Con north)
'add_game': {ecodes.BTN_NORTH, ecodes.BTN_Y},
# Shoulder buttons: L1/L2 (PS), LB (Xbox), L (Switch): BTN_TL, BTN_TL2
'confirm': {ecodes.BTN_A},
'back': {ecodes.BTN_B},
'add_game': {ecodes.BTN_Y},
'prev_tab': {ecodes.BTN_TL, ecodes.BTN_TL2},
# Shoulder buttons: R1/R2 (PS), RB (Xbox), R (Switch): BTN_TR, BTN_TR2
'next_tab': {ecodes.BTN_TR, ecodes.BTN_TR2},
# Optional: stick presses on Switch Joy-Con
'confirm_stick': {ecodes.BTN_THUMBL, ecodes.BTN_THUMBR},
# Start button for context menu
'context_menu': {ecodes.BTN_START},
# Select/home for back/menu
'menu': {ecodes.BTN_SELECT, ecodes.BTN_MODE},
}
@ -55,8 +47,14 @@ class InputManager(QObject):
"""
Manages input from gamepads and keyboards for navigating the application interface.
Supports gamepad hotplugging, button and axis events, and keyboard event filtering
for seamless UI interaction.
for seamless UI interaction. Enables fullscreen mode when a gamepad is connected
and restores normal mode when disconnected.
"""
# Signals for gamepad events
button_pressed = Signal(int) # Signal for button presses
dpad_moved = Signal(int, int, float) # Signal for D-pad movements
toggle_fullscreen = Signal(bool) # Signal for toggling fullscreen mode (True for fullscreen, False for normal)
def __init__(
self,
main_window: MainWindowProtocol,
@ -81,22 +79,48 @@ class InputManager(QObject):
self.running = True
self._is_fullscreen = read_fullscreen_config()
# Connect signals to slots
self.button_pressed.connect(self.handle_button_slot)
self.dpad_moved.connect(self.handle_dpad_slot)
self.toggle_fullscreen.connect(self.handle_fullscreen_slot)
# Install keyboard event filter
app = QApplication.instance()
if app is not None:
app.installEventFilter(self)
else:
logger.error("QApplication instance is None, cannot install event filter")
# Initialize evdev + hotplug
self.init_gamepad()
@Slot(bool)
def handle_fullscreen_slot(self, enable: bool) -> None:
try:
if read_fullscreen_config():
return
window = self._parent
if not isinstance(window, QWidget):
return
if enable and not self._is_fullscreen:
if not window.isFullScreen():
save_window_geometry(window.width(), window.height())
window.showFullScreen()
self._is_fullscreen = True
elif not enable and self._is_fullscreen:
window.showNormal()
width, height = read_window_geometry()
if width > 0 and height > 0:
window.resize(width, height)
self._is_fullscreen = False
save_window_geometry(width, height)
except Exception as e:
logger.error(f"Error in handle_fullscreen_slot: {e}", exc_info=True)
def eventFilter(self, obj: QObject, event: QEvent) -> bool:
app = QApplication.instance()
if not app:
return super().eventFilter(obj, event)
# 1) Интересуют только нажатия клавиш
# Handle only key press events
if not (isinstance(event, QKeyEvent) and event.type() == QEvent.Type.KeyPress):
return super().eventFilter(obj, event)
@ -105,17 +129,16 @@ class InputManager(QObject):
focused = QApplication.focusWidget()
popup = QApplication.activePopupWidget()
# 2) Закрытие приложения по Ctrl+Q
# Close application with Ctrl+Q
if key == Qt.Key.Key_Q and modifiers & Qt.KeyboardModifier.ControlModifier:
app.quit()
return True
# 3) Если открыт любой popup — не перехватываем ENTER, ESC и стрелки
# Skip navigation keys if a popup is open
if popup:
# возвращаем False, чтобы событие пошло дальше в Qt и закрыло popup как нужно
return False
# 4) Навигация в полноэкранном просмотре
# FullscreenDialog navigation
active_win = QApplication.activeWindow()
if isinstance(active_win, FullscreenDialog):
if key == Qt.Key.Key_Right:
@ -128,27 +151,25 @@ class InputManager(QObject):
active_win.close()
return True
# 5) На странице деталей Enter запускает/останавливает игру
# Launch/stop game on detail page
if self._parent.currentDetailPage and key in (Qt.Key.Key_Return, Qt.Key.Key_Enter):
if self._parent.current_exec_line:
self._parent.toggleGame(self._parent.current_exec_line, None)
return True
# 6) Открытие контекстного меню для GameCard
# Context menu for GameCard
if isinstance(focused, GameCard):
if key == Qt.Key.Key_F10 and Qt.KeyboardModifier.ShiftModifier:
pos = QPoint(focused.width() // 2, focused.height() // 2)
focused._show_context_menu(pos)
return True
# 7) Навигация по карточкам в Library
# Navigation in Library tab
if self._parent.stackedWidget.currentIndex() == 0:
game_cards = self._parent.gamesListWidget.findChildren(GameCard)
scroll_area = self._parent.gamesListWidget.parentWidget()
while scroll_area and not isinstance(scroll_area, QScrollArea):
scroll_area = scroll_area.parentWidget()
if not scroll_area:
logger.warning("No QScrollArea found for gamesListWidget")
if isinstance(focused, GameCard):
current_index = game_cards.index(focused) if focused in game_cards else -1
@ -184,7 +205,7 @@ class InputManager(QObject):
scroll_area.ensureWidgetVisible(next_card, 50, 50)
return True
# 8) Переключение вкладок ←/→
# Tab switching with Left/Right keys
idx = self._parent.stackedWidget.currentIndex()
total = len(self._parent.tabButtons)
if key == Qt.Key.Key_Left and not isinstance(focused, GameCard):
@ -198,7 +219,7 @@ class InputManager(QObject):
self._parent.tabButtons[new].setFocus()
return True
# 9) Спуск в содержимое вкладки ↓
# Navigate down into tab content
if key == Qt.Key.Key_Down:
if isinstance(focused, NavLabel):
page = self._parent.stackedWidget.currentWidget()
@ -212,15 +233,15 @@ class InputManager(QObject):
focused.focusNextChild()
return True
# 10) Подъём по содержимому вкладки ↑
# Navigate up through tab content
if key == Qt.Key.Key_Up:
if isinstance(focused, NavLabel):
return True # Не даём уйти выше NavLabel
return True
if focused is not None:
focused.focusPreviousChild()
return True
# 11) Общие: Activate, Back, Add
# General actions: Activate, Back, Add
if key in (Qt.Key.Key_Return, Qt.Key.Key_Enter):
self._parent.activateFocusedWidget()
return True
@ -235,18 +256,11 @@ class InputManager(QObject):
self._parent.openAddGameDialog()
return True
# 12) Переключение полноэкранного режима по F11
# Toggle fullscreen with F11
if key == Qt.Key.Key_F11:
if read_fullscreen_config():
return True
window = self._parent
if isinstance(window, QWidget):
if self._is_fullscreen:
window.showNormal()
self._is_fullscreen = False
else:
window.showFullScreen()
self._is_fullscreen = True
self.toggle_fullscreen.emit(not self._is_fullscreen)
return True
return super().eventFilter(obj, event)
@ -254,45 +268,62 @@ class InputManager(QObject):
def init_gamepad(self) -> None:
self.check_gamepad()
threading.Thread(target=self.run_udev_monitor, daemon=True).start()
logger.info("Input support initialized with hotplug (evdev + pyudev)")
logger.info("Gamepad support initialized with hotplug (evdev + pyudev)")
def run_udev_monitor(self) -> None:
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
observer = pyudev.MonitorObserver(monitor, self.handle_udev_event)
observer.start()
while self.running:
time.sleep(1)
try:
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by(subsystem='input')
observer = pyudev.MonitorObserver(monitor, self.handle_udev_event)
observer.start()
while self.running:
time.sleep(1)
except Exception as e:
logger.error(f"Error in udev monitor: {e}", exc_info=True)
def handle_udev_event(self, action: str, device: pyudev.Device) -> None:
if action == 'add':
time.sleep(0.1)
self.check_gamepad()
elif action == 'remove' and self.gamepad:
if not any(self.gamepad.path == path for path in list_devices()):
logger.info("Gamepad disconnected")
self.gamepad = None
if self.gamepad_thread:
self.gamepad_thread.join()
try:
if action == 'add':
time.sleep(0.1)
self.check_gamepad()
elif action == 'remove' and self.gamepad:
if not any(self.gamepad.path == path for path in list_devices()):
logger.info("Gamepad disconnected")
self.gamepad = None
if self.gamepad_thread:
self.gamepad_thread.join()
# Signal to exit fullscreen mode
self.toggle_fullscreen.emit(False)
except Exception as e:
logger.error(f"Error handling udev event: {e}", exc_info=True)
def check_gamepad(self) -> None:
new_gamepad = self.find_gamepad()
if new_gamepad and new_gamepad != self.gamepad:
logger.info(f"Gamepad connected: {new_gamepad.name}")
self.gamepad = new_gamepad
if self.gamepad_thread:
self.gamepad_thread.join()
self.gamepad_thread = threading.Thread(target=self.monitor_gamepad, daemon=True)
self.gamepad_thread.start()
try:
new_gamepad = self.find_gamepad()
if new_gamepad and new_gamepad != self.gamepad:
logger.info(f"Gamepad connected: {new_gamepad.name}")
self.gamepad = new_gamepad
if self.gamepad_thread:
self.gamepad_thread.join()
self.gamepad_thread = threading.Thread(target=self.monitor_gamepad, daemon=True)
self.gamepad_thread.start()
# Signal to enter fullscreen mode
self.toggle_fullscreen.emit(True)
except Exception as e:
logger.error(f"Error checking gamepad: {e}", exc_info=True)
def find_gamepad(self) -> InputDevice | None:
devices = [InputDevice(path) for path in list_devices()]
for device in devices:
caps = device.capabilities()
if ecodes.EV_KEY in caps or ecodes.EV_ABS in caps:
return device
return None
try:
devices = [InputDevice(path) for path in list_devices()]
for device in devices:
caps = device.capabilities()
if ecodes.EV_KEY in caps or ecodes.EV_ABS in caps:
return device
return None
except Exception as e:
logger.error(f"Error finding gamepad: {e}", exc_info=True)
return None
def monitor_gamepad(self) -> None:
try:
@ -305,126 +336,148 @@ class InputManager(QObject):
continue
now = time.time()
if event.type == ecodes.EV_KEY and event.value == 1:
self.handle_button(event.code)
self.button_pressed.emit(event.code)
elif event.type == ecodes.EV_ABS:
self.handle_dpad(event.code, event.value, now)
self.dpad_moved.emit(event.code, event.value, now)
except OSError as e:
if e.errno == 19: # ENODEV: No such device
logger.info("Gamepad disconnected during event loop")
else:
logger.error(f"OSError in gamepad monitoring: {e}", exc_info=True)
except Exception as e:
logger.error(f"Error accessing gamepad: {e}")
logger.error(f"Error in gamepad monitoring: {e}", exc_info=True)
finally:
if self.gamepad:
try:
self.gamepad.close()
except Exception:
pass
self.gamepad = None
def handle_button(self, button_code: int) -> None:
app = QApplication.instance()
if app is None:
logger.error("QApplication instance is None")
return
active = QApplication.activeWindow()
focused = QApplication.focusWidget()
# FullscreenDialog
if isinstance(active, FullscreenDialog):
if button_code in BUTTONS['prev_tab']:
active.show_prev()
elif button_code in BUTTONS['next_tab']:
active.show_next()
elif button_code in BUTTONS['back']:
active.close()
return
# Context menu for GameCard
if isinstance(focused, GameCard):
if button_code in BUTTONS['context_menu']:
pos = QPoint(focused.width() // 2, focused.height() // 2)
focused._show_context_menu(pos)
return
# Game launch on detail page
if (button_code in BUTTONS['confirm'] or button_code in BUTTONS['confirm_stick']) and self._parent.currentDetailPage is not None:
if self._parent.current_exec_line:
self._parent.toggleGame(self._parent.current_exec_line, None)
return
# Standard navigation
if button_code in BUTTONS['confirm'] or button_code in BUTTONS['confirm_stick']:
self._parent.activateFocusedWidget()
elif button_code in BUTTONS['back'] or button_code in BUTTONS['menu']:
self._parent.goBackDetailPage(getattr(self._parent, 'currentDetailPage', None))
elif button_code in BUTTONS['add_game']:
self._parent.openAddGameDialog()
elif button_code in BUTTONS['prev_tab']:
idx = (self._parent.stackedWidget.currentIndex() - 1) % len(self._parent.tabButtons)
self._parent.switchTab(idx)
self._parent.tabButtons[idx].setFocus(Qt.FocusReason.OtherFocusReason)
elif button_code in BUTTONS['next_tab']:
idx = (self._parent.stackedWidget.currentIndex() + 1) % len(self._parent.tabButtons)
self._parent.switchTab(idx)
self._parent.tabButtons[idx].setFocus(Qt.FocusReason.OtherFocusReason)
def handle_dpad(self, code: int, value: int, current_time: float) -> None:
app = QApplication.instance()
if app is None:
logger.error("QApplication instance is None")
return
active = QApplication.activeWindow()
# Fullscreen horizontal
if isinstance(active, FullscreenDialog) and code == ecodes.ABS_HAT0X:
if value < 0:
active.show_prev()
elif value > 0:
active.show_next()
return
# Vertical navigation (DPAD up/down)
if code == ecodes.ABS_HAT0Y:
# ignore release
if value == 0:
@Slot(int)
def handle_button_slot(self, button_code: int) -> None:
try:
app = QApplication.instance()
if not app:
return
active = QApplication.activeWindow()
focused = QApplication.focusWidget()
page = self._parent.stackedWidget.currentWidget()
if value > 0:
# down
if isinstance(focused, NavLabel):
focusables = page.findChildren(QWidget, options=Qt.FindChildOption.FindChildrenRecursively)
focusables = [w for w in focusables if w.focusPolicy() & Qt.FocusPolicy.StrongFocus]
if focusables:
focusables[0].setFocus()
return
elif focused:
focused.focusNextChild()
return
elif value < 0 and focused:
# up
focused.focusPreviousChild()
# FullscreenDialog
if isinstance(active, FullscreenDialog):
if button_code in BUTTONS['prev_tab']:
active.show_prev()
elif button_code in BUTTONS['next_tab']:
active.show_next()
elif button_code in BUTTONS['back']:
active.close()
return
# Horizontal wrap navigation repeat logic
if code != ecodes.ABS_HAT0X:
return
if value == 0:
self.axis_moving = False
self.current_axis_delay = self.initial_axis_move_delay
return
if not self.axis_moving:
self.trigger_dpad_movement(code, value)
self.last_move_time = current_time
self.axis_moving = True
elif current_time - self.last_move_time >= self.current_axis_delay:
self.trigger_dpad_movement(code, value)
self.last_move_time = current_time
self.current_axis_delay = self.repeat_axis_move_delay
# Context menu for GameCard
if isinstance(focused, GameCard):
if button_code in BUTTONS['context_menu']:
pos = QPoint(focused.width() // 2, focused.height() // 2)
focused._show_context_menu(pos)
return
# Game launch on detail page
if (button_code in BUTTONS['confirm'] or button_code in BUTTONS['confirm_stick']) and self._parent.currentDetailPage is not None:
if self._parent.current_exec_line:
self._parent.toggleGame(self._parent.current_exec_line, None)
return
# Standard navigation
if button_code in BUTTONS['confirm'] or button_code in BUTTONS['confirm_stick']:
self._parent.activateFocusedWidget()
elif button_code in BUTTONS['back'] or button_code in BUTTONS['menu']:
self._parent.goBackDetailPage(getattr(self._parent, 'currentDetailPage', None))
elif button_code in BUTTONS['add_game']:
self._parent.openAddGameDialog()
elif button_code in BUTTONS['prev_tab']:
idx = (self._parent.stackedWidget.currentIndex() - 1) % len(self._parent.tabButtons)
self._parent.switchTab(idx)
self._parent.tabButtons[idx].setFocus(Qt.FocusReason.OtherFocusReason)
elif button_code in BUTTONS['next_tab']:
idx = (self._parent.stackedWidget.currentIndex() + 1) % len(self._parent.tabButtons)
self._parent.switchTab(idx)
self._parent.tabButtons[idx].setFocus(Qt.FocusReason.OtherFocusReason)
except Exception as e:
logger.error(f"Error in handle_button_slot: {e}", exc_info=True)
@Slot(int, int, float)
def handle_dpad_slot(self, code: int, value: int, current_time: float) -> None:
try:
app = QApplication.instance()
if not app:
return
active = QApplication.activeWindow()
# Fullscreen horizontal
if isinstance(active, FullscreenDialog) and code == ecodes.ABS_HAT0X:
if value < 0:
active.show_prev()
elif value > 0:
active.show_next()
return
# Vertical navigation (DPAD up/down)
if code == ecodes.ABS_HAT0Y:
if value == 0:
return
focused = QApplication.focusWidget()
page = self._parent.stackedWidget.currentWidget()
if value > 0:
if isinstance(focused, NavLabel):
focusables = page.findChildren(QWidget, options=Qt.FindChildOption.FindChildrenRecursively)
focusables = [w for w in focusables if w.focusPolicy() & Qt.FocusPolicy.StrongFocus]
if focusables:
focusables[0].setFocus()
return
elif focused:
focused.focusNextChild()
return
elif value < 0 and focused:
focused.focusPreviousChild()
return
# Horizontal wrap navigation repeat logic
if code != ecodes.ABS_HAT0X:
return
if value == 0:
self.axis_moving = False
self.current_axis_delay = self.initial_axis_move_delay
return
if not self.axis_moving:
self.trigger_dpad_movement(code, value)
self.last_move_time = current_time
self.axis_moving = True
elif current_time - self.last_move_time >= self.current_axis_delay:
self.trigger_dpad_movement(code, value)
self.last_move_time = current_time
self.current_axis_delay = self.repeat_axis_move_delay
except Exception as e:
logger.error(f"Error in handle_dpad_slot: {e}", exc_info=True)
def trigger_dpad_movement(self, code: int, value: int) -> None:
if code != ecodes.ABS_HAT0X:
return
idx = self._parent.stackedWidget.currentIndex()
if value < 0:
new = (idx - 1) % len(self._parent.tabButtons)
else:
new = (idx + 1) % len(self._parent.tabButtons)
self._parent.switchTab(new)
self._parent.tabButtons[new].setFocus(Qt.FocusReason.OtherFocusReason)
try:
if code != ecodes.ABS_HAT0X:
return
idx = self._parent.stackedWidget.currentIndex()
if value < 0:
new = (idx - 1) % len(self._parent.tabButtons)
else:
new = (idx + 1) % len(self._parent.tabButtons)
self._parent.switchTab(new)
self._parent.tabButtons[new].setFocus(Qt.FocusReason.OtherFocusReason)
except Exception as e:
logger.error(f"Error in trigger_dpad_movement: {e}", exc_info=True)
def cleanup(self) -> None:
self.running = False
if self.gamepad:
self.gamepad.close()
logger.info("Input support cleaned up")
try:
self.running = False
if self.gamepad_thread:
self.gamepad_thread.join()
if self.gamepad:
self.gamepad.close()
except Exception as e:
logger.error(f"Error during cleanup: {e}", exc_info=True)