import time import threading from typing import Protocol, cast from evdev import InputDevice, InputEvent, ecodes, list_devices, ff from pyudev import Context, Monitor, MonitorObserver, Device from PySide6.QtWidgets import QWidget, QStackedWidget, QApplication, QScrollArea, QLineEdit, QDialog, QMenu, QComboBox, QListView, QMessageBox from PySide6.QtCore import Qt, QObject, QEvent, QPoint, Signal, Slot, QTimer from PySide6.QtGui import QKeyEvent from portprotonqt.logger import get_logger from portprotonqt.image_utils import FullscreenDialog from portprotonqt.custom_widgets import NavLabel from portprotonqt.game_card import GameCard from portprotonqt.config_utils import read_fullscreen_config, read_window_geometry, save_window_geometry, read_auto_fullscreen_gamepad, read_rumble_config logger = get_logger(__name__) class MainWindowProtocol(Protocol): def activateFocusedWidget(self) -> None: ... def goBackDetailPage(self, page: QWidget | None) -> None: ... def switchTab(self, index: int) -> None: ... def openAddGameDialog(self, exe_path: str | None = None) -> None: ... def toggleGame(self, exec_line: str | None, button: QWidget | None = None) -> None: ... def openSystemOverlay(self) -> None: ... def on_slider_released(self) -> None: ... stackedWidget: QStackedWidget tabButtons: dict[int, QWidget] gamesListWidget: QWidget currentDetailPage: QWidget | None current_exec_line: str | None current_add_game_dialog: QDialog | None # Mapping of actions to evdev button codes, includes Xbox and PlayStation controllers # https://github.com/torvalds/linux/blob/master/drivers/hid/hid-playstation.c # https://github.com/torvalds/linux/blob/master/drivers/input/joystick/xpad.c BUTTONS = { 'confirm': {ecodes.BTN_A, ecodes.BTN_SOUTH}, # A (Xbox) / Cross (PS) 'back': {ecodes.BTN_B, ecodes.BTN_EAST}, # B (Xbox) / Circle (PS) 'add_game': {ecodes.BTN_Y, ecodes.BTN_NORTH}, # Y (Xbox) / Triangle (PS) 'prev_tab': {ecodes.BTN_TL}, # LB (Xbox) / L1 (PS) 'next_tab': {ecodes.BTN_TR}, # RB (Xbox) / R1 (PS) 'context_menu': {ecodes.BTN_START}, # Start (Xbox) / Options (PS) 'menu': {ecodes.BTN_SELECT}, # Select (Xbox) / Share (PS) 'guide': {ecodes.BTN_MODE}, # Xbox Button / PS Button 'increase_size': {ecodes.ABS_RZ}, # RT (Xbox) / R2 (PS) 'decrease_size': {ecodes.ABS_Z}, # LT (Xbox) / L2 (PS) } class InputManager(QObject): """ Manages input from gamepads and keyboards for navigating the application interface. Supports gamepad hotplugging, button and axis events, and keyboard event filtering for seamless UI interaction. """ # Signals for gamepad events button_pressed = Signal(int) # Signal for button presses dpad_moved = Signal(int, int, float) # Signal for D-pad movements toggle_fullscreen = Signal(bool) # Signal for toggling fullscreen mode (True for fullscreen, False for normal) def __init__( self, main_window: MainWindowProtocol, axis_deadzone: float = 0.5, initial_axis_move_delay: float = 0.3, repeat_axis_move_delay: float = 0.15 ): super().__init__(cast(QObject, main_window)) self._parent = main_window # Ensure attributes exist on main_window self._parent.currentDetailPage = getattr(self._parent, 'currentDetailPage', None) self._parent.current_exec_line = getattr(self._parent, 'current_exec_line', None) self._parent.current_add_game_dialog = getattr(self._parent, 'current_add_game_dialog', None) self.axis_deadzone = axis_deadzone self.initial_axis_move_delay = initial_axis_move_delay self.repeat_axis_move_delay = repeat_axis_move_delay self.current_axis_delay = initial_axis_move_delay self.last_move_time = 0.0 self.axis_moving = False self.gamepad: InputDevice | None = None self.gamepad_thread: threading.Thread | None = None self.running = True self._is_fullscreen = read_fullscreen_config() self.rumble_effect_id: int | None = None # Store the rumble effect ID self.lt_pressed = False self.rt_pressed = False self.last_trigger_time = 0.0 self.trigger_cooldown = 0.2 # Add variables for continuous D-pad movement self.dpad_timer = QTimer(self) self.dpad_timer.timeout.connect(self.handle_dpad_repeat) self.current_dpad_code = None # Tracks the current D-pad axis (e.g., ABS_HAT0X, ABS_HAT0Y) self.current_dpad_value = 0 # Tracks the current D-pad direction value (e.g., -1, 1) # Connect signals to slots self.button_pressed.connect(self.handle_button_slot) self.dpad_moved.connect(self.handle_dpad_slot) self.toggle_fullscreen.connect(self.handle_fullscreen_slot) # Install keyboard event filter app = QApplication.instance() if app is not None: app.installEventFilter(self) # Initialize evdev + hotplug self.init_gamepad() @Slot(bool) def handle_fullscreen_slot(self, enable: bool) -> None: try: window = self._parent if not isinstance(window, QWidget): return if enable and not self._is_fullscreen: if not window.isFullScreen(): save_window_geometry(window.width(), window.height()) window.showFullScreen() self._is_fullscreen = True elif not enable and self._is_fullscreen: window.showNormal() width, height = read_window_geometry() if width > 0 and height > 0: window.resize(width, height) self._is_fullscreen = False save_window_geometry(width, height) except Exception as e: logger.error(f"Error in handle_fullscreen_slot: {e}", exc_info=True) def trigger_rumble(self, duration_ms: int = 200, strong_magnitude: int = 0x8000, weak_magnitude: int = 0x8000) -> None: """Trigger a rumble effect on the gamepad if supported.""" if not read_rumble_config(): return if not self.gamepad: return try: # Check if the gamepad supports force feedback caps = self.gamepad.capabilities() if ecodes.EV_FF not in caps or ecodes.FF_RUMBLE not in caps.get(ecodes.EV_FF, []): logger.debug("Gamepad does not support force feedback or rumble") return # Create a rumble effect rumble = ff.Rumble(strong_magnitude=strong_magnitude, weak_magnitude=weak_magnitude) effect = ff.Effect( id=-1, # Let evdev assign an ID type=ecodes.FF_RUMBLE, direction=0, # Direction (not used for rumble) replay=ff.Replay(length=duration_ms, delay=0), u=ff.EffectType(ff_rumble_effect=rumble) ) # Upload the effect self.rumble_effect_id = self.gamepad.upload_effect(effect) # Play the effect event = InputEvent(0, 0, ecodes.EV_FF, self.rumble_effect_id, 1) self.gamepad.write_event(event) # Schedule effect erasure after duration QTimer.singleShot(duration_ms, self.stop_rumble) except Exception as e: logger.error(f"Error triggering rumble: {e}", exc_info=True) def stop_rumble(self) -> None: """Stop the rumble effect and clean up.""" if self.gamepad and self.rumble_effect_id is not None: try: self.gamepad.erase_effect(self.rumble_effect_id) self.rumble_effect_id = None except Exception as e: logger.error(f"Error stopping rumble: {e}", exc_info=True) @Slot(int) def handle_button_slot(self, button_code: int) -> None: try: # Ignore gamepad events if a game is launched if getattr(self._parent, '_gameLaunched', False): return app = QApplication.instance() if not app: return active = QApplication.activeWindow() focused = QApplication.focusWidget() popup = QApplication.activePopupWidget() # Handle Guide button to open system overlay if button_code in BUTTONS['guide']: if not popup and not isinstance(active, QDialog): self._parent.openSystemOverlay() return # Handle QMenu (context menu) if isinstance(popup, QMenu): if button_code in BUTTONS['confirm']: if popup.activeAction(): popup.activeAction().trigger() popup.close() return elif button_code in BUTTONS['back']: popup.close() return return # Handle QComboBox if isinstance(focused, QComboBox): if button_code in BUTTONS['confirm']: focused.showPopup() return # Handle QListView if isinstance(focused, QListView): combo = None parent = focused.parentWidget() while parent: if isinstance(parent, QComboBox): combo = parent break parent = parent.parentWidget() if button_code in BUTTONS['confirm']: idx = focused.currentIndex() if idx.isValid(): if combo: combo.setCurrentIndex(idx.row()) combo.hidePopup() combo.setFocus(Qt.FocusReason.OtherFocusReason) else: focused.activated.emit(idx) focused.clicked.emit(idx) focused.hide() return if button_code in BUTTONS['back']: if combo: combo.hidePopup() combo.setFocus(Qt.FocusReason.OtherFocusReason) else: focused.clearSelection() focused.hide() # Close AddGameDialog on B button if button_code in BUTTONS['back'] and isinstance(active, QDialog): active.reject() return # FullscreenDialog if isinstance(active, FullscreenDialog): if button_code in BUTTONS['prev_tab']: active.show_prev() elif button_code in BUTTONS['next_tab']: active.show_next() elif button_code in BUTTONS['back']: active.close() return # Context menu for GameCard if isinstance(focused, GameCard): if button_code in BUTTONS['context_menu']: pos = QPoint(focused.width() // 2, focused.height() // 2) menu = focused._show_context_menu(pos) if menu: menu.setFocus(Qt.FocusReason.OtherFocusReason) return # Game launch on detail page if (button_code in BUTTONS['confirm']) and self._parent.currentDetailPage is not None and self._parent.current_add_game_dialog is None: if self._parent.current_exec_line: self.trigger_rumble() self._parent.toggleGame(self._parent.current_exec_line, None) return # Standard navigation if button_code in BUTTONS['confirm']: self._parent.activateFocusedWidget() elif button_code in BUTTONS['back']: self._parent.goBackDetailPage(getattr(self._parent, 'currentDetailPage', None)) elif button_code in BUTTONS['add_game']: if self._parent.stackedWidget.currentIndex() == 0: self._parent.openAddGameDialog() elif button_code in BUTTONS['prev_tab']: idx = (self._parent.stackedWidget.currentIndex() - 1) % len(self._parent.tabButtons) self._parent.switchTab(idx) self._parent.tabButtons[idx].setFocus(Qt.FocusReason.OtherFocusReason) elif button_code in BUTTONS['next_tab']: idx = (self._parent.stackedWidget.currentIndex() + 1) % len(self._parent.tabButtons) self._parent.switchTab(idx) self._parent.tabButtons[idx].setFocus(Qt.FocusReason.OtherFocusReason) elif button_code in BUTTONS['increase_size'] and self._parent.stackedWidget.currentIndex() == 0: # Increase card size with RT (Xbox) / R2 (PS) size_slider = getattr(self._parent, 'sizeSlider', None) if size_slider: new_value = min(size_slider.value() + 10, size_slider.maximum()) size_slider.setValue(new_value) self._parent.on_slider_released() elif button_code in BUTTONS['decrease_size'] and self._parent.stackedWidget.currentIndex() == 0: # Decrease card size with LT (Xbox) / L2 (PS) size_slider = getattr(self._parent, 'sizeSlider', None) if size_slider: new_value = max(size_slider.value() - 10, size_slider.minimum()) size_slider.setValue(new_value) self._parent.on_slider_released() except Exception as e: logger.error(f"Error in handle_button_slot: {e}", exc_info=True) def handle_dpad_repeat(self) -> None: """Handle repeated D-pad input while the D-pad is held.""" if self.current_dpad_code is not None and self.current_dpad_value != 0: now = time.time() if (now - self.last_move_time) >= self.current_axis_delay: self.handle_dpad_slot(self.current_dpad_code, self.current_dpad_value, now) self.last_move_time = now self.current_axis_delay = self.repeat_axis_move_delay @Slot(int, int, float) def handle_dpad_slot(self, code: int, value: int, current_time: float) -> None: try: # Ignore gamepad events if a game is launched if getattr(self._parent, '_gameLaunched', False): return app = QApplication.instance() if not app: return active = QApplication.activeWindow() focused = QApplication.focusWidget() popup = QApplication.activePopupWidget() # Update D-pad state if value != 0: self.current_dpad_code = code self.current_dpad_value = value if not self.axis_moving: self.axis_moving = True self.last_move_time = current_time self.current_axis_delay = self.initial_axis_move_delay self.dpad_timer.start(int(self.repeat_axis_move_delay * 1000)) # Start timer (in milliseconds) else: self.current_dpad_code = None self.current_dpad_value = 0 self.axis_moving = False self.current_axis_delay = self.initial_axis_move_delay self.dpad_timer.stop() # Stop timer when D-pad is released return # Handle SystemOverlay, AddGameDialog, or QMessageBox navigation with D-pad if isinstance(active, QDialog) and code == ecodes.ABS_HAT0X and value != 0: if isinstance(active, QMessageBox): # Specific handling for QMessageBox if not focused or not active.focusWidget(): # If no widget is focused, focus the first focusable widget focusables = active.findChildren(QWidget, options=Qt.FindChildOption.FindChildrenRecursively) focusables = [w for w in focusables if w.focusPolicy() & Qt.FocusPolicy.StrongFocus] if focusables: focusables[0].setFocus(Qt.FocusReason.OtherFocusReason) return if value > 0: # Right active.focusNextChild() elif value < 0: # Left active.focusPreviousChild() return elif isinstance(active, QDialog) and code == ecodes.ABS_HAT0Y and value != 0: # Keep up/down for other dialogs if not focused or not active.focusWidget(): # If no widget is focused, focus the first focusable widget focusables = active.findChildren(QWidget, options=Qt.FindChildOption.FindChildrenRecursively) focusables = [w for w in focusables if w.focusPolicy() & Qt.FocusPolicy.StrongFocus] if focusables: focusables[0].setFocus(Qt.FocusReason.OtherFocusReason) return if value > 0: # Down active.focusNextChild() elif value < 0: # Up active.focusPreviousChild() return # Handle QMenu navigation with D-pad if isinstance(popup, QMenu): if code == ecodes.ABS_HAT0Y and value != 0: actions = popup.actions() if actions: current_idx = actions.index(popup.activeAction()) if popup.activeAction() in actions else 0 if value < 0: # Up next_idx = (current_idx - 1) % len(actions) popup.setActiveAction(actions[next_idx]) elif value > 0: # Down next_idx = (current_idx + 1) % len(actions) popup.setActiveAction(actions[next_idx]) return return # Handle QListView navigation with D-pad if isinstance(focused, QListView) and code == ecodes.ABS_HAT0Y and value != 0: model = focused.model() current_index = focused.currentIndex() if model and current_index.isValid(): row_count = model.rowCount() current_row = current_index.row() if value > 0: # Down next_row = min(current_row + 1, row_count - 1) focused.setCurrentIndex(model.index(next_row, current_index.column())) elif value < 0: # Up prev_row = max(current_row - 1, 0) focused.setCurrentIndex(model.index(prev_row, current_index.column())) focused.scrollTo(focused.currentIndex(), QListView.ScrollHint.PositionAtCenter) return # Fullscreen horizontal navigation if isinstance(active, FullscreenDialog) and code == ecodes.ABS_HAT0X: if value < 0: active.show_prev() elif value > 0: active.show_next() return # Library tab navigation (index 0) if self._parent.stackedWidget.currentIndex() == 0 and code in (ecodes.ABS_HAT0X, ecodes.ABS_HAT0Y): focused = QApplication.focusWidget() game_cards = self._parent.gamesListWidget.findChildren(GameCard) if not game_cards: return scroll_area = self._parent.gamesListWidget.parentWidget() while scroll_area and not isinstance(scroll_area, QScrollArea): scroll_area = scroll_area.parentWidget() # If no focused widget or not a GameCard, focus the first card if not isinstance(focused, GameCard) or focused not in game_cards: game_cards[0].setFocus() if scroll_area: scroll_area.ensureWidgetVisible(game_cards[0], 50, 50) return # Group cards by rows based on y-coordinate rows = {} for card in game_cards: y = card.pos().y() if y not in rows: rows[y] = [] rows[y].append(card) # Sort cards in each row by x-coordinate for y in rows: rows[y].sort(key=lambda c: c.pos().x()) # Sort rows by y-coordinate sorted_rows = sorted(rows.items(), key=lambda x: x[0]) # Find current row and column current_y = focused.pos().y() current_row_idx = next(i for i, (y, _) in enumerate(sorted_rows) if y == current_y) current_row = sorted_rows[current_row_idx][1] current_col_idx = current_row.index(focused) if code == ecodes.ABS_HAT0X and value != 0: # Left/Right if value < 0: # Left next_col_idx = current_col_idx - 1 if next_col_idx >= 0: next_card = current_row[next_col_idx] next_card.setFocus() if scroll_area: scroll_area.ensureWidgetVisible(next_card, 50, 50) else: # Move to the last card of the previous row if available if current_row_idx > 0: prev_row = sorted_rows[current_row_idx - 1][1] next_card = prev_row[-1] if prev_row else None if next_card: next_card.setFocus() if scroll_area: scroll_area.ensureWidgetVisible(next_card, 50, 50) elif value > 0: # Right next_col_idx = current_col_idx + 1 if next_col_idx < len(current_row): next_card = current_row[next_col_idx] next_card.setFocus() if scroll_area: scroll_area.ensureWidgetVisible(next_card, 50, 50) else: # Move to the first card of the next row if available if current_row_idx < len(sorted_rows) - 1: next_row = sorted_rows[current_row_idx + 1][1] next_card = next_row[0] if next_row else None if next_card: next_card.setFocus() if scroll_area: scroll_area.ensureWidgetVisible(next_card, 50, 50) elif code == ecodes.ABS_HAT0Y and value != 0: # Up/Down if value > 0: # Down next_row_idx = current_row_idx + 1 if next_row_idx < len(sorted_rows): next_row = sorted_rows[next_row_idx][1] # Find card in same column or closest target_x = focused.pos().x() next_card = min( next_row, key=lambda c: abs(c.pos().x() - target_x), default=None ) if next_card: next_card.setFocus() if scroll_area: scroll_area.ensureWidgetVisible(next_card, 50, 50) elif value < 0: # Up next_row_idx = current_row_idx - 1 if next_row_idx >= 0: next_row = sorted_rows[next_row_idx][1] # Find card in same column or closest target_x = focused.pos().x() next_card = min( next_row, key=lambda c: abs(c.pos().x() - target_x), default=None ) if next_card: next_card.setFocus() if scroll_area: scroll_area.ensureWidgetVisible(next_card, 50, 50) elif current_row_idx == 0: self._parent.tabButtons[0].setFocus(Qt.FocusReason.OtherFocusReason) # Vertical navigation in other tabs elif code == ecodes.ABS_HAT0Y and value != 0: focused = QApplication.focusWidget() page = self._parent.stackedWidget.currentWidget() if value > 0: # Down if isinstance(focused, NavLabel): focusables = page.findChildren(QWidget, options=Qt.FindChildOption.FindChildrenRecursively) focusables = [w for w in focusables if w.focusPolicy() & Qt.FocusPolicy.StrongFocus] if focusables: focusables[0].setFocus() return elif focused: focused.focusNextChild() return elif value < 0 and focused: # Up focused.focusPreviousChild() return except Exception as e: logger.error(f"Error in handle_dpad_slot: {e}", exc_info=True) def eventFilter(self, obj: QObject, event: QEvent) -> bool: app = QApplication.instance() if not app: return super().eventFilter(obj, event) # Handle key press and release events if not isinstance(event, QKeyEvent): return super().eventFilter(obj, event) key = event.key() modifiers = event.modifiers() focused = QApplication.focusWidget() popup = QApplication.activePopupWidget() active_win = QApplication.activeWindow() # Handle key press events if event.type() == QEvent.Type.KeyPress: # Open system overlay with Insert if key == Qt.Key.Key_Insert: if not popup and not isinstance(active_win, QDialog): self._parent.openSystemOverlay() return True # Close application with Ctrl+Q if key == Qt.Key.Key_Q and modifiers & Qt.KeyboardModifier.ControlModifier: app.quit() return True # Close AddGameDialog with Escape if key == Qt.Key.Key_Escape and isinstance(popup, QDialog): popup.reject() return True # FullscreenDialog navigation if isinstance(active_win, FullscreenDialog): if key in (Qt.Key.Key_Escape, Qt.Key.Key_Return, Qt.Key.Key_Enter, Qt.Key.Key_Backspace): active_win.close() return True elif key in (Qt.Key.Key_Left, Qt.Key.Key_Right): # Navigate screenshots in FullscreenDialog if key == Qt.Key.Key_Left: active_win.show_prev() elif key == Qt.Key.Key_Right: active_win.show_next() return True # Consume event to prevent tab switching # Handle tab switching with Left/Right arrow keys when not in GameCard focus if key in (Qt.Key.Key_Left, Qt.Key.Key_Right) and (not isinstance(focused, GameCard) or focused is None): idx = self._parent.stackedWidget.currentIndex() total = len(self._parent.tabButtons) if key == Qt.Key.Key_Left: new_idx = (idx - 1) % total self._parent.switchTab(new_idx) self._parent.tabButtons[new_idx].setFocus(Qt.FocusReason.OtherFocusReason) return True elif key == Qt.Key.Key_Right: new_idx = (idx + 1) % total self._parent.switchTab(new_idx) self._parent.tabButtons[new_idx].setFocus(Qt.FocusReason.OtherFocusReason) return True # Map arrow keys to D-pad press events for other contexts if key in (Qt.Key.Key_Up, Qt.Key.Key_Down, Qt.Key.Key_Left, Qt.Key.Key_Right): now = time.time() dpad_code = None dpad_value = 0 if key == Qt.Key.Key_Up: dpad_code = ecodes.ABS_HAT0Y dpad_value = -1 elif key == Qt.Key.Key_Down: dpad_code = ecodes.ABS_HAT0Y dpad_value = 1 elif key == Qt.Key.Key_Left: dpad_code = ecodes.ABS_HAT0X dpad_value = -1 elif key == Qt.Key.Key_Right: dpad_code = ecodes.ABS_HAT0X dpad_value = 1 if dpad_code is not None: self.dpad_moved.emit(dpad_code, dpad_value, now) return True # Launch/stop game on detail page if self._parent.currentDetailPage and key in (Qt.Key.Key_Return, Qt.Key.Key_Enter): if self._parent.current_exec_line: self._parent.toggleGame(self._parent.current_exec_line, None) return True # Context menu for GameCard if isinstance(focused, GameCard): if key == Qt.Key.Key_F10 and modifiers & Qt.KeyboardModifier.ShiftModifier: pos = QPoint(focused.width() // 2, focused.height() // 2) focused._show_context_menu(pos) return True # General actions: Activate, Back, Add if key in (Qt.Key.Key_Return, Qt.Key.Key_Enter): self._parent.activateFocusedWidget() return True elif key in (Qt.Key.Key_Escape, Qt.Key.Key_Backspace): if isinstance(focused, QLineEdit): return False self._parent.goBackDetailPage(self._parent.currentDetailPage) return True elif key == Qt.Key.Key_E: if isinstance(focused, QLineEdit): return False # Only open AddGameDialog if in library tab (index 0) if self._parent.stackedWidget.currentIndex() == 0: self._parent.openAddGameDialog() return True # Toggle fullscreen with F11 if key == Qt.Key.Key_F11: self.toggle_fullscreen.emit(not self._is_fullscreen) return True # Handle key release events for arrow keys elif event.type() == QEvent.Type.KeyRelease: if key in (Qt.Key.Key_Up, Qt.Key.Key_Down, Qt.Key.Key_Left, Qt.Key.Key_Right): now = time.time() dpad_code = None if key in (Qt.Key.Key_Up, Qt.Key.Key_Down): dpad_code = ecodes.ABS_HAT0Y elif key in (Qt.Key.Key_Left, Qt.Key.Key_Right): dpad_code = ecodes.ABS_HAT0X if dpad_code is not None: # Emit release event with value 0 to stop continuous movement self.dpad_moved.emit(dpad_code, 0, now) return True return super().eventFilter(obj, event) def init_gamepad(self) -> None: self.check_gamepad() threading.Thread(target=self.run_udev_monitor, daemon=True).start() logger.info("Gamepad support initialized with hotplug (evdev + pyudev)") def run_udev_monitor(self) -> None: try: context = Context() monitor = Monitor.from_netlink(context) monitor.filter_by(subsystem='input') observer = MonitorObserver(monitor, self.handle_udev_event) observer.start() while self.running: time.sleep(1) except Exception as e: logger.error(f"Error in udev monitor: {e}", exc_info=True) def handle_udev_event(self, action: str, device: Device) -> None: try: if action == 'add': time.sleep(0.1) self.check_gamepad() elif action == 'remove' and self.gamepad: if not any(self.gamepad.path == path for path in list_devices()): logger.info("Gamepad disconnected") self.stop_rumble() self.gamepad = None if self.gamepad_thread: self.gamepad_thread.join() # Signal to exit fullscreen mode self.toggle_fullscreen.emit(False) except Exception as e: logger.error(f"Error handling udev event: {e}", exc_info=True) def check_gamepad(self) -> None: try: new_gamepad = self.find_gamepad() if new_gamepad and new_gamepad != self.gamepad: logger.info(f"Gamepad connected: {new_gamepad.name}") self.stop_rumble() self.gamepad = new_gamepad if self.gamepad_thread: self.gamepad_thread.join() self.gamepad_thread = threading.Thread(target=self.monitor_gamepad, daemon=True) self.gamepad_thread.start() # Send signal for fullscreen mode only if: # 1. auto_fullscreen_gamepad is enabled # 2. fullscreen is not already enabled (to avoid conflict) if read_auto_fullscreen_gamepad() and not read_fullscreen_config(): self.toggle_fullscreen.emit(True) except Exception as e: logger.error(f"Error checking gamepad: {e}", exc_info=True) def find_gamepad(self) -> InputDevice | None: try: devices = [InputDevice(path) for path in list_devices()] for device in devices: caps = device.capabilities() if ecodes.EV_KEY in caps or ecodes.EV_ABS in caps: return device return None except Exception as e: logger.error(f"Error finding gamepad: {e}", exc_info=True) return None def monitor_gamepad(self) -> None: try: if not self.gamepad: return for event in self.gamepad.read_loop(): if not self.running: break if event.type not in (ecodes.EV_KEY, ecodes.EV_ABS): continue now = time.time() if event.type == ecodes.EV_KEY and event.value == 1: if event.code in BUTTONS['menu']: self.toggle_fullscreen.emit(not self._is_fullscreen) else: self.button_pressed.emit(event.code) elif event.type == ecodes.EV_ABS: if event.code in {ecodes.ABS_Z, ecodes.ABS_RZ}: # Проверяем, достаточно ли времени прошло с последнего срабатывания if now - self.last_trigger_time < self.trigger_cooldown: continue if event.code == ecodes.ABS_Z: # LT/L2 if event.value > 128 and not self.lt_pressed: self.lt_pressed = True self.button_pressed.emit(event.code) self.last_trigger_time = now elif event.value <= 128 and self.lt_pressed: self.lt_pressed = False elif event.code == ecodes.ABS_RZ: # RT/R2 if event.value > 128 and not self.rt_pressed: self.rt_pressed = True self.button_pressed.emit(event.code) self.last_trigger_time = now elif event.value <= 128 and self.rt_pressed: self.rt_pressed = False else: self.dpad_moved.emit(event.code, event.value, now) except OSError as e: if e.errno == 19: # ENODEV: No such device logger.info("Gamepad disconnected during event loop") else: logger.error(f"OSError in gamepad monitoring: {e}", exc_info=True) except Exception as e: logger.error(f"Error in gamepad monitoring: {e}", exc_info=True) finally: if self.gamepad: try: self.stop_rumble() self.gamepad.close() except Exception: pass self.gamepad = None def cleanup(self) -> None: try: self.running = False self.dpad_timer.stop() self.stop_rumble() if self.gamepad_thread: self.gamepad_thread.join() if self.gamepad: self.gamepad.close() except Exception as e: logger.error(f"Error during cleanup: {e}", exc_info=True)