chore: optimize and clean code
All checks were successful
Code check / Check code (push) Successful in 1m9s

Signed-off-by: Boris Yumankulov <boria138@altlinux.org>
This commit is contained in:
2025-10-17 13:08:40 +05:00
parent cde92885d4
commit 464ad0fe9c
2 changed files with 221 additions and 73 deletions

View File

@@ -4,7 +4,7 @@ import os
from typing import Protocol, cast
from evdev import InputDevice, InputEvent, ecodes, list_devices, ff
from enum import Enum
from pyudev import Context, Monitor, MonitorObserver, Device, Devices
from pyudev import Context, Monitor, Device, Devices
from PySide6.QtWidgets import QWidget, QStackedWidget, QApplication, QScrollArea, QLineEdit, QDialog, QMenu, QComboBox, QListView, QMessageBox, QListWidget, QTableWidget, QAbstractItemView, QTableWidgetItem
from PySide6.QtCore import Qt, QObject, QEvent, QPoint, Signal, Slot, QTimer
from PySide6.QtGui import QKeyEvent, QMouseEvent
@@ -76,6 +76,7 @@ class InputManager(QObject):
button_event = Signal(int, int) # Signal for button events: (code, value) where value=1 (press), 0 (release)
dpad_moved = Signal(int, int, float) # Signal for D-pad movements
toggle_fullscreen = Signal(bool) # Signal for toggling fullscreen mode (True for fullscreen, False for normal)
gamepad_hotplug = Signal(str) # 'add' or 'remove'
def __init__(
self,
@@ -1436,106 +1437,241 @@ class InputManager(QObject):
return super().eventFilter(obj, event)
def init_gamepad(self) -> None:
self.monitor_observer = None
self.udev_context = Context() # Создаём context один раз
self.Devices = Devices # Сохраняем класс для использования в других методах
self.udev_context = Context()
self.Devices = Devices
self.monitor_ready = False
# Подключаем сигнал hotplug к обработчику в главном потоке
self.gamepad_hotplug.connect(self._on_gamepad_hotplug)
# Debounce timer для отложенной проверки геймпада (в главном потоке Qt)
self.gamepad_check_timer = QTimer()
self.gamepad_check_timer.setSingleShot(True)
self.gamepad_check_timer.timeout.connect(self.check_gamepad)
# Первоначальная проверка
self.check_gamepad()
# Запускаем udev monitor в отдельном потоке
threading.Thread(target=self.run_udev_monitor, daemon=True).start()
logger.info("Gamepad support initialized with hotplug (evdev + pyudev)")
def run_udev_monitor(self) -> None:
"""
Неблокирующий опрос udev событий без MonitorObserver.
Использует monitor.poll() с таймаутом для корректного завершения.
"""
try:
logger.info("Starting udev monitor...")
monitor = Monitor.from_netlink(self.udev_context)
monitor.filter_by(subsystem='input')
logger.info("Monitor created and filtered")
monitor.start()
logger.info("Monitor started, draining initial events...")
observer = MonitorObserver(monitor, self.handle_udev_event)
self.monitor_observer = observer
logger.info("MonitorObserver created")
# КРИТИЧНО: При старте udev отправляет события о ВСЕХ существующих устройствах
# Это может быть 10-50+ событий, которые блокируют инициализацию
# Решение: дренируем (игнорируем) все события за первые 500ms
drain_start = time.time()
drained_count = 0
observer.start()
logger.info("MonitorObserver started")
while time.time() - drain_start < 0.5:
device = monitor.poll(timeout=0.1)
if device is not None:
drained_count += 1
# Держим поток живым, пока не получим сигнал остановки
self.monitor_ready = True
logger.info(f"Drained {drained_count} initial events, now monitoring hotplug...")
# Основной цикл опроса с таймаутом 1 секунда
while self.running:
time.sleep(1)
# poll() возвращает None при таймауте - не блокирует навсегда
device = monitor.poll(timeout=1.0)
logger.info("MonitorObserver stopped gracefully")
if device is not None:
action = device.action
# Фильтруем только джойстики на уровне callback
# Это предотвращает обработку мышей/клавиатур/и т.д.
if action and self._is_joystick_device(device):
logger.info(f"Joystick hotplug event: {action} for {device.sys_name}")
self.handle_udev_event(action, device)
logger.info("udev monitor stopped gracefully")
except Exception as e:
logger.error(f"Error in udev monitor: {e}", exc_info=True)
def _is_joystick_device(self, device: Device) -> bool:
"""
Быстрая проверка: является ли устройство джойстиком.
Проверяет ID_INPUT_JOYSTICK из udev базы данных.
"""
try:
# Проверяем свойство ID_INPUT_JOYSTICK
if device.get('ID_INPUT_JOYSTICK') == '1':
return True
# Дополнительно: проверяем родительские устройства
# (некоторые контроллеры имеют свойство только у родителя)
parent = device.parent
if parent and parent.get('ID_INPUT_JOYSTICK') == '1':
return True
return False
except Exception as e:
logger.debug(f"Error checking joystick device: {e}")
return False
def handle_udev_event(self, action: str, device: Device) -> None:
"""
Обработчик udev событий для джойстиков.
Отправляет сигнал в главный поток Qt вместо прямого вызова QTimer.
"""
try:
if action == 'add':
time.sleep(0.1)
self.check_gamepad()
# Отправляем сигнал в главный поток Qt
# QTimer будет запущен там безопасно
logger.debug("Emitting gamepad add signal")
self.gamepad_hotplug.emit('add')
elif action == 'remove' and self.gamepad:
if not any(self.gamepad.path == path for path in list_devices()):
logger.info("Gamepad disconnected")
self.stop_rumble()
self.gamepad = None
if self.gamepad_thread:
self.gamepad_thread.join()
if read_auto_fullscreen_gamepad() and not read_fullscreen_config():
self.toggle_fullscreen.emit(False)
# Проверяем конкретно наш геймпад по пути устройства
device_node = device.device_node # например, /dev/input/event3
if device_node and self.gamepad.path == device_node:
logger.info(f"Connected gamepad disconnected: {device_node}")
# Отправляем сигнал в главный поток
self.gamepad_hotplug.emit('remove')
except Exception as e:
logger.error(f"Error handling udev event: {e}", exc_info=True)
def _on_gamepad_hotplug(self, action: str) -> None:
"""
Обработчик сигнала hotplug, выполняется в главном потоке Qt.
Безопасно работает с QTimer.
"""
try:
if action == 'add':
# Debounce: откладываем проверку на 200ms
# Множественные события за короткое время объединяются в один вызов
logger.debug("Scheduling gamepad check (debounced)")
self.gamepad_check_timer.start(200)
elif action == 'remove':
# Немедленная обработка отключения
self.stop_rumble()
self.gamepad = None
if self.gamepad_thread:
self.gamepad_thread.join(timeout=2.0)
if read_auto_fullscreen_gamepad() and not read_fullscreen_config():
self.toggle_fullscreen.emit(False)
except Exception as e:
logger.error(f"Error in hotplug handler: {e}", exc_info=True)
def check_gamepad(self) -> None:
"""
Проверка и подключение геймпада.
Вызывается из главного потока Qt через QTimer (debounced).
"""
try:
new_gamepad = self.find_gamepad()
if new_gamepad and new_gamepad != self.gamepad:
logger.info(f"Gamepad connected: {new_gamepad.name}")
# Проверяем, действительно ли это новый геймпад
if new_gamepad:
if not self.gamepad or new_gamepad.path != self.gamepad.path:
logger.info(f"Gamepad connected: {new_gamepad.name} at {new_gamepad.path}")
self.stop_rumble()
self.gamepad = new_gamepad
if self.gamepad_thread:
self.gamepad_thread.join(timeout=2.0)
self.gamepad_thread = threading.Thread(
target=self.monitor_gamepad,
daemon=True
)
self.gamepad_thread.start()
# Автоматический фуллскрин при подключении геймпада
if read_auto_fullscreen_gamepad() and not read_fullscreen_config():
self.toggle_fullscreen.emit(True)
elif self.gamepad and not any(self.gamepad.path == path for path in list_devices()):
# Геймпад был подключён, но теперь его нет в системе
logger.info("Gamepad no longer detected")
self.stop_rumble()
self.gamepad = new_gamepad
self.gamepad = None
if self.gamepad_thread:
self.gamepad_thread.join()
self.gamepad_thread = threading.Thread(target=self.monitor_gamepad, daemon=True)
self.gamepad_thread.start()
# Send signal for fullscreen mode only if:
# 1. auto_fullscreen_gamepad is enabled
# 2. fullscreen is not already enabled (to avoid conflict)
self.gamepad_thread.join(timeout=2.0)
if read_auto_fullscreen_gamepad() and not read_fullscreen_config():
self.toggle_fullscreen.emit(True)
self.toggle_fullscreen.emit(False)
except Exception as e:
logger.error(f"Error checking gamepad: {e}", exc_info=True)
def find_gamepad(self) -> InputDevice | None:
"""
Находит первый доступный геймпад.
Оптимизирован: предварительная фильтрация по capabilities перед udev-запросами.
"""
try:
devices = [InputDevice(path) for path in list_devices()]
logger.info(f"Checking {len(devices)} devices for gamepad...")
if not devices:
return None
logger.debug(f"Checking {len(devices)} devices for gamepad...")
for device in devices:
logger.debug(f"Checking device: {device.name} at {device.path}")
# Skip ASRock LED controller (vendor ID: 26ce, product ID: 01a2)
# Skip ASRock LED controller (известная проблема)
if device.info.vendor == 0x26ce and device.info.product == 0x01a2:
logger.debug(f"Skipping ASRock LED controller: {device.name}")
continue
# Получаем udev-устройство для проверки ID_INPUT_JOYSTICK
try:
udev_device = self.Devices.from_device_file(self.udev_context, device.path)
is_joystick = udev_device.get('ID_INPUT_JOYSTICK')
# Предварительная фильтрация: проверяем capabilities
# Джойстик должен иметь хотя бы оси (ABS) или кнопки (KEY)
# Это избегает udev-запросов для явно не-джойстиков
caps = device.capabilities(verbose=False)
has_abs_axes = ecodes.EV_ABS in caps
has_buttons = ecodes.EV_KEY in caps
logger.debug(f"Device {device.name}: ID_INPUT_JOYSTICK = {is_joystick}")
if not (has_abs_axes or has_buttons):
continue
# Только для потенциальных джойстиков делаем udev-запрос
try:
udev_device = self.Devices.from_device_file(
self.udev_context,
device.path
)
is_joystick = udev_device.get('ID_INPUT_JOYSTICK')
if is_joystick == '1':
logger.info(f"Found gamepad: {device.name}")
return device
else:
logger.debug(f"Skipping non-joystick device: {device.name}")
except Exception as e:
logger.warning(f"Could not check udev properties for {device.path}: {e}")
logger.debug(f"Could not check udev properties for {device.path}: {e}")
continue
logger.warning("No gamepad found")
logger.debug("No gamepad found")
return None
except Exception as e:
logger.error(f"Error finding gamepad: {e}", exc_info=True)
return None
def monitor_gamepad(self) -> None:
try:
if not self.gamepad:
@@ -1599,26 +1735,32 @@ class InputManager(QObject):
self.gamepad = None
def cleanup(self) -> None:
"""
Корректное завершение работы с геймпадом и udev монитором.
"""
try:
# Флаг для остановки udev monitor loop
self.running = False
# Останавливаем udev monitor
if self.monitor_observer:
try:
logger.info("Stopping udev monitor...")
self.monitor_observer.send_stop()
except Exception as e:
logger.warning(f"Error stopping monitor observer: {e}")
self.monitor_observer = None
# Останавливаем все таймеры
if hasattr(self, 'gamepad_check_timer'):
self.gamepad_check_timer.stop()
self.dpad_timer.stop()
self.nav_timer.stop()
# Очистка геймпада
self.stop_rumble()
if self.gamepad_thread:
self.gamepad_thread.join(timeout=2.0) # Добавлен таймаут
self.gamepad_thread.join(timeout=2.0)
if self.gamepad:
self.gamepad.close()
self.gamepad = None
self.gamepad_type = GamepadType.UNKNOWN
logger.info("Gamepad cleanup completed")
except Exception as e:
logger.error(f"Error during cleanup: {e}", exc_info=True)

View File

@@ -54,8 +54,8 @@ class VirtualKeyboard(QFrame):
self.main_window = cast(Any, parent_widget)
parent_widget = cast(QWidget | None, parent_widget.parent())
self.current_theme_name = read_theme_from_config()
self.current_theme_name = read_theme_from_config()
self.initUI()
self.hide()
@@ -132,27 +132,33 @@ class VirtualKeyboard(QFrame):
self.buttons: dict[str, QPushButton] = {}
self.update_keyboard()
def set_gamepad_icon(self, button, icon_type, gtype='default'):
"""Set gamepad icon on button based on type. Now works even without gamepad by using 'default' gtype."""
def set_gamepad_icon(self, button, icon_type, gtype=''):
"""Set gamepad icon on button based on type"""
if icon_type in ['back', 'add_game']:
icon_name = self.main_window.get_button_icon(icon_type, gtype) if self.main_window else f"{icon_type}_default.png"
icon_name = self.main_window.get_button_icon(icon_type, gtype)
else: # nav left/right
if icon_type in ['left', 'right']:
direction = icon_type
icon_name = self.main_window.get_nav_icon(direction, gtype) if self.main_window else f"{direction}_default.png"
icon_name = self.main_window.get_nav_icon(direction, gtype)
else:
direction = 'left' if icon_type == 'left' else 'right'
icon_name = self.main_window.get_nav_icon(direction, gtype) if self.main_window else f"{direction}_default.png"
icon_name = self.main_window.get_nav_icon(direction, gtype)
icon_path = theme_manager.get_theme_image(icon_name, self.current_theme_name)
pixmap = QPixmap()
if icon_path:
pixmap = QPixmap(str(icon_path))
if not pixmap.isNull():
button.setIcon(QIcon(pixmap))
pixmap.load(str(icon_path))
if not pixmap.isNull():
button.setIcon(QIcon(pixmap))
button.setIconSize(QSize(20, 20))
return
else:
# Fallback to placeholder
placeholder = theme_manager.get_theme_image("placeholder", self.current_theme_name)
if placeholder:
button.setIcon(QIcon(placeholder))
button.setIconSize(QSize(20, 20))
return
# Fallback: if no icon found, try standard Qt icon or leave empty
print(f"Warning: Icon {icon_name} not found for button {icon_type}")
def update_keyboard(self):
coords = self._save_focused_coords()
@@ -187,7 +193,7 @@ class VirtualKeyboard(QFrame):
button.setChecked(self.shift_pressed)
button.clicked.connect(lambda checked: self.on_shift_click(checked))
# Add gamepad icon for Shift (RB/R)
gtype = self.input_manager.gamepad_type if self.input_manager and self.input_manager.gamepad else 'default'
gtype = self.input_manager.gamepad_type
self.set_gamepad_icon(button, 'right', gtype)
else:
button.clicked.connect(lambda checked=False, k=key: self.on_button_click(k))
@@ -202,7 +208,7 @@ class VirtualKeyboard(QFrame):
shift.setChecked(self.shift_pressed)
shift.clicked.connect(lambda checked: self.on_shift_click(checked))
# Add gamepad icon for Shift (RB/R)
gtype = self.input_manager.gamepad_type if self.input_manager and self.input_manager.gamepad else 'default'
gtype = self.input_manager.gamepad_type
self.set_gamepad_icon(shift, 'right', gtype)
self.keyboard_layout.addWidget(shift, 3, 11, 1, 3)
@@ -221,7 +227,7 @@ class VirtualKeyboard(QFrame):
backspace.pressed.connect(self.on_backspace_pressed)
backspace.released.connect(self.stop_backspace_repeat)
# Add gamepad icon for Backspace (X/Triangle)
gtype = self.input_manager.gamepad_type if self.input_manager and self.input_manager.gamepad else 'default'
gtype = self.input_manager.gamepad_type
self.set_gamepad_icon(backspace, 'add_game', gtype)
self.keyboard_layout.addWidget(backspace, 0, 13, 1, 1)
@@ -234,7 +240,7 @@ class VirtualKeyboard(QFrame):
lang.setFixedSize(fixed_w, fixed_h)
lang.clicked.connect(self.on_lang_click)
# Add gamepad icon for Lang (LB/L)
gtype = self.input_manager.gamepad_type if self.input_manager and self.input_manager.gamepad else 'default'
gtype = self.input_manager.gamepad_type
self.set_gamepad_icon(lang, 'left', gtype)
self.keyboard_layout.addWidget(lang, 4, 0, 1, 1)
@@ -267,7 +273,7 @@ class VirtualKeyboard(QFrame):
hide_button.setFixedSize(fixed_w * 2 + self.spacing, fixed_h)
hide_button.clicked.connect(self.hide)
# Add gamepad icon for Hide (B/Circle)
gtype = self.input_manager.gamepad_type if self.input_manager and self.input_manager.gamepad else 'default'
gtype = self.input_manager.gamepad_type
self.set_gamepad_icon(hide_button, 'back', gtype)
self.keyboard_layout.addWidget(hide_button, 4, 12, 1, 2)