Compare commits
	
		
			5 Commits
		
	
	
		
			renovate/a
			...
			main
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 3736bb279e | |||
|  | b59ee5ae8e | ||
| 33176590fd | |||
| 8046065929 | |||
|  | fbad5add6c | 
| @@ -16,7 +16,7 @@ repos: | ||||
|       - id: uv-lock | ||||
|  | ||||
|   - repo: https://github.com/astral-sh/ruff-pre-commit | ||||
|     rev: v0.14.0 | ||||
|     rev: v0.14.1 | ||||
|     hooks: | ||||
|       - id: ruff-check | ||||
|  | ||||
|   | ||||
| @@ -83,6 +83,43 @@ def load_pixmap_async(cover: str, width: int, height: int, callback: Callable[[Q | ||||
|             except Exception as e: | ||||
|                 logger.error(f"Ошибка обработки URL {cover}: {e}") | ||||
|  | ||||
|         # SteamGridDB (SGDB) | ||||
|         if cover and cover.startswith("https://cdn2.steamgriddb.com"): | ||||
|             try: | ||||
|                 parts = cover.split("/") | ||||
|                 filename = parts[-1] if parts else "sgdb_cover.png" | ||||
|                 # SGDB ссылки содержат уникальный хеш в названии — используем как имя | ||||
|                 local_path = os.path.join(image_folder, filename) | ||||
|  | ||||
|                 if os.path.exists(local_path): | ||||
|                     pixmap = QPixmap(local_path) | ||||
|                     finish_with(pixmap) | ||||
|                     return | ||||
|  | ||||
|                 def on_downloaded(result: str | None): | ||||
|                     pixmap = QPixmap() | ||||
|                     if result and os.path.exists(result): | ||||
|                         pixmap.load(result) | ||||
|                     if pixmap.isNull(): | ||||
|                         placeholder_path = theme_manager.get_theme_image("placeholder", current_theme_name) | ||||
|                         if placeholder_path and QFile.exists(placeholder_path): | ||||
|                             pixmap.load(placeholder_path) | ||||
|                         else: | ||||
|                             pixmap = QPixmap(width, height) | ||||
|                             pixmap.fill(QColor("#333333")) | ||||
|                             painter = QPainter(pixmap) | ||||
|                             painter.setPen(QPen(QColor("white"))) | ||||
|                             painter.drawText(pixmap.rect(), Qt.AlignmentFlag.AlignCenter, "No Image") | ||||
|                             painter.end() | ||||
|                     finish_with(pixmap) | ||||
|  | ||||
|                 logger.info("Downloading SGDB cover for %s -> %s", app_name or "unknown", filename) | ||||
|                 downloader.download_async(cover, local_path, timeout=5, callback=on_downloaded) | ||||
|                 return | ||||
|  | ||||
|             except Exception as e: | ||||
|                 logger.error(f"Ошибка обработки SGDB URL {cover}: {e}") | ||||
|  | ||||
|         if cover and cover.startswith(("http://", "https://")): | ||||
|             try: | ||||
|                 local_path = os.path.join(image_folder, f"{app_name}.jpg") | ||||
|   | ||||
| @@ -1440,6 +1440,7 @@ class InputManager(QObject): | ||||
|         self.udev_context = Context() | ||||
|         self.Devices = Devices | ||||
|         self.monitor_ready = False | ||||
|         self.monitor_event = threading.Event() | ||||
|  | ||||
|         # Подключаем сигнал hotplug к обработчику в главном потоке | ||||
|         self.gamepad_hotplug.connect(self._on_gamepad_hotplug) | ||||
| @@ -1491,6 +1492,7 @@ class InputManager(QObject): | ||||
|                     break | ||||
|  | ||||
|             self.monitor_ready = True | ||||
|             self.monitor_event.set() | ||||
|             logger.info(f"Drained {drained_count} initial events, now monitoring hotplug...") | ||||
|  | ||||
|             # Основной цикл | ||||
| @@ -1592,7 +1594,6 @@ class InputManager(QObject): | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error in hotplug handler: {e}", exc_info=True) | ||||
|  | ||||
|  | ||||
|     def check_gamepad(self) -> None: | ||||
|         """ | ||||
|         Проверка и подключение геймпада. | ||||
| @@ -1601,18 +1602,23 @@ class InputManager(QObject): | ||||
|         try: | ||||
|             new_gamepad = self.find_gamepad() | ||||
|  | ||||
|             # Проверяем, действительно ли это новый геймпад | ||||
|             if new_gamepad: | ||||
|                 if not self.gamepad or new_gamepad.path != self.gamepad.path: | ||||
|                     logger.info(f"Gamepad connected: {new_gamepad.name} at {new_gamepad.path}") | ||||
|                     self.stop_rumble() | ||||
|                     self.gamepad = new_gamepad | ||||
|  | ||||
|                     if self.gamepad_thread: | ||||
|                     if self.gamepad_thread and self.gamepad_thread.is_alive(): | ||||
|                         self.gamepad_thread.join(timeout=2.0) | ||||
|  | ||||
|                     def start_monitoring(): | ||||
|                         # Ожидание готовности udev monitor без busy-wait | ||||
|                         if not self.monitor_event.wait(timeout=2.0): | ||||
|                             logger.warning("Timeout waiting for udev monitor readiness") | ||||
|                         self.monitor_gamepad() | ||||
|  | ||||
|                     self.gamepad_thread = threading.Thread( | ||||
|                         target=self.monitor_gamepad, | ||||
|                         target=start_monitoring, | ||||
|                         daemon=True | ||||
|                     ) | ||||
|                     self.gamepad_thread.start() | ||||
| @@ -1622,12 +1628,11 @@ class InputManager(QObject): | ||||
|                         self.toggle_fullscreen.emit(True) | ||||
|  | ||||
|             elif self.gamepad and not any(self.gamepad.path == path for path in list_devices()): | ||||
|                 # Геймпад был подключён, но теперь его нет в системе | ||||
|                 logger.info("Gamepad no longer detected") | ||||
|                 self.stop_rumble() | ||||
|                 self.gamepad = None | ||||
|  | ||||
|                 if self.gamepad_thread: | ||||
|                 if self.gamepad_thread and self.gamepad_thread.is_alive(): | ||||
|                     self.gamepad_thread.join(timeout=2.0) | ||||
|  | ||||
|                 if read_auto_fullscreen_gamepad() and not read_fullscreen_config(): | ||||
| @@ -1636,7 +1641,6 @@ class InputManager(QObject): | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error checking gamepad: {e}", exc_info=True) | ||||
|  | ||||
|  | ||||
|     def find_gamepad(self) -> InputDevice | None: | ||||
|         """ | ||||
|         Находит первый доступный геймпад. | ||||
|   | ||||
| @@ -1253,7 +1253,15 @@ class MainWindow(QMainWindow): | ||||
|         # Показываем прогресс | ||||
|         self.autoInstallProgress.setVisible(True) | ||||
|         self.autoInstallProgress.setRange(0, 0) | ||||
|         self.portproton_api.get_autoinstall_games_async(on_autoinstall_games_loaded) | ||||
|  | ||||
|         # Store the thread to prevent premature destruction | ||||
|         self.autoInstallLoadThread = self.portproton_api.start_autoinstall_games_load(on_autoinstall_games_loaded) | ||||
|  | ||||
|         # Optional: Clean up thread when finished (prevents leak) | ||||
|         if self.autoInstallLoadThread: | ||||
|             def on_thread_finished(): | ||||
|                 self.autoInstallLoadThread = None  # Release reference | ||||
|             self.autoInstallLoadThread.finished.connect(on_thread_finished) | ||||
|  | ||||
|         self.stackedWidget.addWidget(autoInstallPage) | ||||
|  | ||||
|   | ||||
| @@ -6,13 +6,16 @@ import urllib.parse | ||||
| import time | ||||
| import glob | ||||
| import re | ||||
| import hashlib | ||||
| from collections.abc import Callable | ||||
| from PySide6.QtCore import QThread, Signal | ||||
| from portprotonqt.downloader import Downloader | ||||
| from portprotonqt.logger import get_logger | ||||
| from portprotonqt.config_utils import get_portproton_location | ||||
|  | ||||
| logger = get_logger(__name__) | ||||
| CACHE_DURATION = 30 * 24 * 60 * 60  # 30 days in seconds | ||||
| AUTOINSTALL_CACHE_DURATION = 3600  # 1 hour for autoinstall cache | ||||
|  | ||||
| def normalize_name(s): | ||||
|     """ | ||||
| @@ -59,6 +62,7 @@ class PortProtonAPI: | ||||
|         self.repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | ||||
|         self.builtin_custom_folder = os.path.join(self.repo_root, "custom_data") | ||||
|         self._topics_data = None | ||||
|         self._autoinstall_cache = None  # New: In-memory cache | ||||
|  | ||||
|     def _get_game_dir(self, exe_name: str) -> str: | ||||
|         game_dir = os.path.join(self.custom_data_dir, exe_name) | ||||
| @@ -231,38 +235,113 @@ class PortProtonAPI: | ||||
|             logger.error(f"Failed to parse {file_path}: {e}") | ||||
|             return None, None | ||||
|  | ||||
|     def get_autoinstall_games_async(self, callback: Callable[[list[tuple]], None]) -> None: | ||||
|         """Load auto-install games with user/builtin covers (no async download here).""" | ||||
|         games = [] | ||||
|         auto_dir = os.path.join(self.portproton_location or "", "data", "scripts", "pw_autoinstall") if self.portproton_location else "" | ||||
|     def _compute_scripts_signature(self, auto_dir: str) -> str: | ||||
|         """Compute a hash-based signature of the autoinstall scripts to detect changes.""" | ||||
|         if not os.path.exists(auto_dir): | ||||
|             callback(games) | ||||
|             return "" | ||||
|         scripts = sorted(glob.glob(os.path.join(auto_dir, "*"))) | ||||
|         # Simple hash: concatenate sorted filenames and hash | ||||
|         filenames_str = "".join(sorted([os.path.basename(s) for s in scripts])) | ||||
|         return hashlib.md5(filenames_str.encode()).hexdigest() | ||||
|  | ||||
|     def _load_autoinstall_cache(self): | ||||
|         """Load cached autoinstall games if fresh and scripts unchanged.""" | ||||
|         if self._autoinstall_cache is not None: | ||||
|             return self._autoinstall_cache | ||||
|         cache_dir = get_cache_dir() | ||||
|         cache_file = os.path.join(cache_dir, "autoinstall_games_cache.json") | ||||
|         if os.path.exists(cache_file): | ||||
|             try: | ||||
|                 mod_time = os.path.getmtime(cache_file) | ||||
|                 if time.time() - mod_time < AUTOINSTALL_CACHE_DURATION: | ||||
|                     with open(cache_file, "rb") as f: | ||||
|                         data = orjson.loads(f.read()) | ||||
|                         # Check signature | ||||
|                         cached_signature = data.get("scripts_signature", "") | ||||
|                         current_signature = self._compute_scripts_signature( | ||||
|                             os.path.join(self.portproton_location or "", "data", "scripts", "pw_autoinstall") | ||||
|                         ) | ||||
|                         if cached_signature != current_signature: | ||||
|                             logger.info("Scripts signature mismatch; invalidating cache") | ||||
|                             return None | ||||
|                         self._autoinstall_cache = data["games"] | ||||
|                         logger.info(f"Loaded {len(self._autoinstall_cache)} cached autoinstall games") | ||||
|                         return self._autoinstall_cache | ||||
|             except Exception as e: | ||||
|                 logger.error(f"Failed to load autoinstall cache: {e}") | ||||
|         return None | ||||
|  | ||||
|     def _save_autoinstall_cache(self, games): | ||||
|         """Save parsed autoinstall games to cache with scripts signature.""" | ||||
|         try: | ||||
|             cache_dir = get_cache_dir() | ||||
|             cache_file = os.path.join(cache_dir, "autoinstall_games_cache.json") | ||||
|             auto_dir = os.path.join(self.portproton_location or "", "data", "scripts", "pw_autoinstall") | ||||
|             scripts_signature = self._compute_scripts_signature(auto_dir) | ||||
|             data = {"games": games, "scripts_signature": scripts_signature, "timestamp": time.time()} | ||||
|             with open(cache_file, "wb") as f: | ||||
|                 f.write(orjson.dumps(data)) | ||||
|             logger.debug(f"Saved {len(games)} autoinstall games to cache with signature {scripts_signature}") | ||||
|         except Exception as e: | ||||
|             logger.error(f"Failed to save autoinstall cache: {e}") | ||||
|  | ||||
|     def start_autoinstall_games_load(self, callback: Callable[[list[tuple]], None]) -> QThread | None: | ||||
|         """Start loading auto-install games in a background thread. Returns the thread for management.""" | ||||
|         # Check cache first (sync, fast) | ||||
|         cached_games = self._load_autoinstall_cache() | ||||
|         if cached_games is not None: | ||||
|             # Emit via callback immediately if cached | ||||
|             QThread.msleep(0)  # Yield to Qt event loop | ||||
|             callback(cached_games) | ||||
|             return None  # No thread needed | ||||
|  | ||||
|         # No cache: Start background thread | ||||
|         class AutoinstallWorker(QThread): | ||||
|             finished = Signal(list) | ||||
|             api: "PortProtonAPI" | ||||
|             portproton_location: str | None | ||||
|  | ||||
|             def run(self): | ||||
|                 games = [] | ||||
|                 auto_dir = os.path.join( | ||||
|                     self.portproton_location or "", "data", "scripts", "pw_autoinstall" | ||||
|                 ) if self.portproton_location else "" | ||||
|                 if not os.path.exists(auto_dir): | ||||
|                     self.finished.emit(games) | ||||
|                     return | ||||
|  | ||||
|                 scripts = sorted(glob.glob(os.path.join(auto_dir, "*"))) | ||||
|                 if not scripts: | ||||
|             callback(games) | ||||
|                     self.finished.emit(games) | ||||
|                     return | ||||
|  | ||||
|         xdg_data_home = os.getenv("XDG_DATA_HOME", | ||||
|                                 os.path.join(os.path.expanduser("~"), ".local", "share")) | ||||
|         base_autoinstall_dir = os.path.join(xdg_data_home, "PortProtonQt", "custom_data", "autoinstall") | ||||
|                 xdg_data_home = os.getenv( | ||||
|                     "XDG_DATA_HOME", | ||||
|                     os.path.join(os.path.expanduser("~"), ".local", "share"), | ||||
|                 ) | ||||
|                 base_autoinstall_dir = os.path.join( | ||||
|                     xdg_data_home, "PortProtonQt", "custom_data", "autoinstall" | ||||
|                 ) | ||||
|                 os.makedirs(base_autoinstall_dir, exist_ok=True) | ||||
|  | ||||
|                 for script_path in scripts: | ||||
|             display_name, exe_name = self.parse_autoinstall_script(script_path) | ||||
|                     display_name, exe_name = self.api.parse_autoinstall_script(script_path) | ||||
|                     script_name = os.path.splitext(os.path.basename(script_path))[0] | ||||
|  | ||||
|                     if not (display_name and exe_name): | ||||
|                         continue | ||||
|  | ||||
|             exe_name = os.path.splitext(exe_name)[0]  # Без .exe | ||||
|                     exe_name = os.path.splitext(exe_name)[0] | ||||
|                     user_game_folder = os.path.join(base_autoinstall_dir, exe_name) | ||||
|                     os.makedirs(user_game_folder, exist_ok=True) | ||||
|  | ||||
|             # Поиск обложки | ||||
|                     # Find cover | ||||
|                     cover_path = "" | ||||
|             user_files = set(os.listdir(user_game_folder)) if os.path.exists(user_game_folder) else set() | ||||
|                     user_files = ( | ||||
|                         set(os.listdir(user_game_folder)) | ||||
|                         if os.path.exists(user_game_folder) | ||||
|                         else set() | ||||
|                     ) | ||||
|                     for ext in [".jpg", ".png", ".jpeg", ".bmp"]: | ||||
|                         candidate = f"cover{ext}" | ||||
|                         if candidate in user_files: | ||||
| @@ -272,26 +351,23 @@ class PortProtonAPI: | ||||
|                     if not cover_path: | ||||
|                         logger.debug(f"No local cover found for autoinstall {exe_name}") | ||||
|  | ||||
|             # Формируем кортеж игры (добавлен exe_name в конец) | ||||
|                     game_tuple = ( | ||||
|                 display_name,  # name | ||||
|                 "",  # description | ||||
|                 cover_path,  # cover | ||||
|                 "",  # appid | ||||
|                 f"autoinstall:{script_name}",  # exec_line | ||||
|                 "",  # controller_support | ||||
|                 "Never",  # last_launch | ||||
|                 "0h 0m",  # formatted_playtime | ||||
|                 "",  # protondb_tier | ||||
|                 "",  # anticheat_status | ||||
|                 0,  # last_played | ||||
|                 0,  # playtime_seconds | ||||
|                 "autoinstall",  # game_source | ||||
|                 exe_name  # exe_name | ||||
|                         display_name, "", cover_path, "", f"autoinstall:{script_name}", | ||||
|                         "", "Never", "0h 0m", "", "", 0, 0, "autoinstall", exe_name | ||||
|                     ) | ||||
|                     games.append(game_tuple) | ||||
|  | ||||
|         callback(games) | ||||
|                 self.api._save_autoinstall_cache(games) | ||||
|                 self.api._autoinstall_cache = games | ||||
|                 self.finished.emit(games) | ||||
|  | ||||
|         worker = AutoinstallWorker() | ||||
|         worker.api = self | ||||
|         worker.portproton_location = self.portproton_location | ||||
|         worker.finished.connect(lambda games: callback(games)) | ||||
|         worker.start() | ||||
|         logger.info("Started background load of autoinstall games") | ||||
|         return worker | ||||
|  | ||||
|     def _load_topics_data(self): | ||||
|         """Load and cache linux_gaming_topics_min.json from the archive.""" | ||||
|   | ||||
| @@ -23,6 +23,7 @@ import requests | ||||
| import random | ||||
| import base64 | ||||
| import glob | ||||
| import urllib.parse | ||||
|  | ||||
| downloader = Downloader() | ||||
| logger = get_logger(__name__) | ||||
| @@ -411,6 +412,39 @@ def save_app_details(app_id, data): | ||||
|     with open(cache_file, "wb") as f: | ||||
|         f.write(orjson.dumps(data)) | ||||
|  | ||||
| def fetch_sgdb_cover(game_name: str) -> str: | ||||
|     """ | ||||
|     Fetch a cover image URL from steamgrid.usebottles.com for the given game. | ||||
|     The API returns a single string (quoted URL). | ||||
|     """ | ||||
|     try: | ||||
|         encoded = urllib.parse.quote(game_name) | ||||
|         url = f"https://steamgrid.usebottles.com/api/search/{encoded}" | ||||
|         resp = requests.get(url, timeout=5) | ||||
|         if resp.status_code != 200: | ||||
|             logger.warning("SGDB request failed for %s: %s", game_name, resp.status_code) | ||||
|             return "" | ||||
|         text = resp.text.strip() | ||||
|         # Убираем возможные кавычки вокруг строки | ||||
|         if text.startswith('"') and text.endswith('"'): | ||||
|             text = text[1:-1] | ||||
|         if text: | ||||
|             logger.info("Fetched SGDB cover for %s: %s", game_name, text) | ||||
|         return text | ||||
|     except Exception as e: | ||||
|         logger.warning("Failed to fetch SGDB cover for %s: %s", game_name, e) | ||||
|     return "" | ||||
|  | ||||
|  | ||||
| def check_url_exists(url: str) -> bool: | ||||
|     """Check whether a URL returns HTTP 200.""" | ||||
|     try: | ||||
|         r = requests.head(url, timeout=3) | ||||
|         return r.status_code == 200 | ||||
|     except Exception: | ||||
|         return False | ||||
|  | ||||
|  | ||||
| def fetch_app_info_async(app_id: int, callback: Callable[[dict | None], None]): | ||||
|     """ | ||||
|     Asynchronously fetches detailed app info from Steam API. | ||||
| @@ -629,6 +663,11 @@ def get_full_steam_game_info_async(appid: int, callback: Callable[[dict], None]) | ||||
|         title = decode_text(app_info.get("name", "")) | ||||
|         description = decode_text(app_info.get("short_description", "")) | ||||
|         cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg" | ||||
|         if not check_url_exists(cover): | ||||
|             logger.info("Steam cover not found for %s, trying SGDB", title) | ||||
|             alt_cover = fetch_sgdb_cover(title) | ||||
|             if alt_cover: | ||||
|                 cover = alt_cover | ||||
|  | ||||
|         def on_protondb_tier(tier: str): | ||||
|             def on_anticheat_status(anticheat_status: str): | ||||
| @@ -722,12 +761,15 @@ def get_steam_game_info_async(desktop_name: str, exec_line: str, callback: Calla | ||||
|         game_name = desktop_name or exe_name.capitalize() | ||||
|  | ||||
|         if not matching_app: | ||||
|             cover = fetch_sgdb_cover(game_name) or "" | ||||
|             logger.info("Using SGDB cover for non-Steam game '%s': %s", game_name, cover) | ||||
|  | ||||
|             def on_anticheat_status(anticheat_status: str): | ||||
|                 callback({ | ||||
|                     "appid": "", | ||||
|                     "name": decode_text(game_name), | ||||
|                     "description": "", | ||||
|                     "cover": "", | ||||
|                     "cover": cover, | ||||
|                     "controller_support": "", | ||||
|                     "protondb_tier": "", | ||||
|                     "steam_game": "false", | ||||
| @@ -758,6 +800,11 @@ def get_steam_game_info_async(desktop_name: str, exec_line: str, callback: Calla | ||||
|             title = decode_text(app_info.get("name", game_name)) | ||||
|             description = decode_text(app_info.get("short_description", "")) | ||||
|             cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg" | ||||
|             if not check_url_exists(cover): | ||||
|                 logger.info("Steam cover not found for %s, trying SGDB", title) | ||||
|                 alt_cover = fetch_sgdb_cover(title) | ||||
|                 if alt_cover: | ||||
|                     cover = alt_cover | ||||
|             controller_support = app_info.get("controller_support", "") | ||||
|  | ||||
|             def on_protondb_tier(tier: str): | ||||
|   | ||||
		Reference in New Issue
	
	Block a user