import orjson import re import os from dataclasses import dataclass, field from typing import Any from difflib import SequenceMatcher from threading import Thread import requests from bs4 import BeautifulSoup, Tag from portprotonqt.config_utils import read_proxy_config from portprotonqt.time_utils import format_playtime from PySide6.QtCore import QObject, Signal @dataclass class GameEntry: """Информация об игре из HowLongToBeat.""" game_id: int = -1 game_name: str | None = None main_story: float | None = None main_extra: float | None = None completionist: float | None = None similarity: float = -1.0 raw_data: dict[str, Any] = field(default_factory=dict) @dataclass class SearchConfig: """Конфигурация для поиска.""" api_key: str | None = None search_url: str | None = None class APIKeyExtractor: """Извлекает API ключ и URL поиска из скриптов сайта.""" @staticmethod def extract_from_script(script_content: str) -> SearchConfig: config = SearchConfig() config.api_key = APIKeyExtractor._extract_api_key(script_content) config.search_url = APIKeyExtractor._extract_search_url(script_content, config.api_key) return config @staticmethod def _extract_api_key(script_content: str) -> str | None: user_id_pattern = r'users\s*:\s*{\s*id\s*:\s*"([^"]+)"' matches = re.findall(user_id_pattern, script_content) if matches: return ''.join(matches) concat_pattern = r'\/api\/\w+\/"(?:\.concat\("[^"]*"\))+' matches = re.findall(concat_pattern, script_content) if matches: parts = str(matches).split('.concat') cleaned_parts = [re.sub(r'["\(\)\[\]\']', '', part) for part in parts[1:]] return ''.join(cleaned_parts) return None @staticmethod def _extract_search_url(script_content: str, api_key: str | None) -> str | None: if not api_key: return None pattern = re.compile( r'fetch\(\s*["\'](\/api\/[^"\']*)["\']' r'((?:\s*\.concat\(\s*["\']([^"\']*)["\']\s*\))+)' r'\s*,', re.DOTALL ) for match in pattern.finditer(script_content): endpoint = match.group(1) concat_calls = match.group(2) concat_strings = re.findall(r'\.concat\(\s*["\']([^"\']*)["\']\s*\)', concat_calls) concatenated_str = ''.join(concat_strings) if concatenated_str == api_key: return endpoint return None class HTTPClient: """HTTP клиент для работы с API HowLongToBeat.""" BASE_URL = 'https://howlongtobeat.com/' SEARCH_URL = BASE_URL + "api/s/" def __init__(self, timeout: int = 60): self.timeout = timeout self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'referer': self.BASE_URL }) proxy_config = read_proxy_config() if proxy_config: self.session.proxies.update(proxy_config) def get_search_config(self, parse_all_scripts: bool = False) -> SearchConfig | None: try: response = self.session.get(self.BASE_URL, timeout=self.timeout) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') scripts = soup.find_all('script', src=True) script_urls = [] for script in scripts: if isinstance(script, Tag): src = script.get('src') if src is not None and isinstance(src, str): if parse_all_scripts or '_app-' in src: script_urls.append(src) for script_url in script_urls: full_url = self.BASE_URL + script_url script_response = self.session.get(full_url, timeout=self.timeout) if script_response.status_code == 200: config = APIKeyExtractor.extract_from_script(script_response.text) if config.api_key: return config except requests.RequestException: pass return None def search_games(self, game_name: str, page: int = 1, config: SearchConfig | None = None) -> str | None: if not config: config = self.get_search_config() if not config: config = self.get_search_config(parse_all_scripts=True) if not config or not config.api_key: return None search_url = self.SEARCH_URL if config.search_url: search_url = self.BASE_URL + config.search_url.lstrip('/') payload = self._build_search_payload(game_name, page, config) headers = { 'content-type': 'application/json', 'accept': '*/*' } try: response = self.session.post( search_url + config.api_key, headers=headers, data=orjson.dumps(payload), timeout=self.timeout ) if response.status_code == 200: return response.text except requests.RequestException: pass try: response = self.session.post( search_url, headers=headers, data=orjson.dumps(payload), timeout=self.timeout ) if response.status_code == 200: return response.text except requests.RequestException: pass return None def _build_search_payload(self, game_name: str, page: int, config: SearchConfig) -> dict[str, Any]: payload = { 'searchType': "games", 'searchTerms': game_name.split(), 'searchPage': page, 'size': 1, # Limit to 1 result 'searchOptions': { 'games': { 'userId': 0, 'platform': "", 'sortCategory': "popular", 'rangeCategory': "main", 'rangeTime': {'min': 0, 'max': 0}, 'gameplay': { 'perspective': "", 'flow': "", 'genre': "", "difficulty": "" }, 'rangeYear': {'max': "", 'min': ""}, 'modifier': "" # Hardcoded to empty string for SearchModifiers.NONE }, 'users': {'sortCategory': "postcount"}, 'lists': {'sortCategory': "follows"}, 'filter': "", 'sort': 0, 'randomizer': 0 }, 'useCache': True, 'fields': ["game_id", "game_name", "comp_main", "comp_plus", "comp_100"] # Request only needed fields } if config.api_key: payload['searchOptions']['users']['id'] = config.api_key return payload class ResultParser: """Парсер результатов поиска.""" def __init__(self, search_query: str, minimum_similarity: float = 0.4, case_sensitive: bool = True): self.search_query = search_query self.minimum_similarity = minimum_similarity self.case_sensitive = case_sensitive self.search_numbers = self._extract_numbers(search_query) def parse_results(self, json_response: str, target_game_id: int | None = None) -> list[GameEntry]: try: data = orjson.loads(json_response) games = [] # Only process the first result if data.get("data"): game_data = data["data"][0] game = self._parse_game_entry(game_data) if target_game_id is not None: if game.game_id == target_game_id: games.append(game) elif self.minimum_similarity == 0.0 or game.similarity >= self.minimum_similarity: games.append(game) return games except (orjson.JSONDecodeError, KeyError, IndexError): return [] def _parse_game_entry(self, game_data: dict[str, Any]) -> GameEntry: game = GameEntry() game.game_id = game_data.get("game_id", -1) game.game_name = game_data.get("game_name") game.raw_data = game_data time_fields = [ ("comp_main", "main_story"), ("comp_plus", "main_extra"), ("comp_100", "completionist") ] for json_field, attr_name in time_fields: if json_field in game_data: time_hours = round(game_data[json_field] / 3600, 2) setattr(game, attr_name, time_hours) game.similarity = self._calculate_similarity(game) return game def _calculate_similarity(self, game: GameEntry) -> float: return self._compare_strings(self.search_query, game.game_name) def _compare_strings(self, a: str | None, b: str | None) -> float: if not a or not b: return 0.0 if self.case_sensitive: similarity = SequenceMatcher(None, a, b).ratio() else: similarity = SequenceMatcher(None, a.lower(), b.lower()).ratio() if self.search_numbers and not self._contains_numbers(b, self.search_numbers): similarity -= 0.1 return max(0.0, similarity) @staticmethod def _extract_numbers(text: str) -> list[str]: return [word for word in text.split() if word.isdigit()] @staticmethod def _contains_numbers(text: str, numbers: list[str]) -> bool: if not numbers: return True cleaned_text = re.sub(r'([^\s\w]|_)+', '', text) text_numbers = [word for word in cleaned_text.split() if word.isdigit()] return any(num in text_numbers for num in numbers) def get_cache_dir(): """Возвращает путь к каталогу кэша, создаёт его при необходимости.""" xdg_cache_home = os.getenv("XDG_CACHE_HOME", os.path.join(os.path.expanduser("~"), ".cache")) cache_dir = os.path.join(xdg_cache_home, "PortProtonQt") os.makedirs(cache_dir, exist_ok=True) return cache_dir class HowLongToBeat(QObject): """Основной класс для работы с API HowLongToBeat.""" searchCompleted = Signal(list) def __init__(self, minimum_similarity: float = 0.4, timeout: int = 60, parent=None): super().__init__(parent) self.minimum_similarity = minimum_similarity self.http_client = HTTPClient(timeout) self.cache_dir = get_cache_dir() def _get_cache_file_path(self, game_name: str) -> str: """Возвращает путь к файлу кэша для заданного имени игры.""" safe_game_name = re.sub(r'[^\w\s-]', '', game_name).replace(' ', '_').lower() cache_file = f"hltb_{safe_game_name}.json" return os.path.join(self.cache_dir, cache_file) def _load_from_cache(self, game_name: str) -> str | None: """Пытается загрузить данные из кэша, если они существуют.""" cache_file = self._get_cache_file_path(game_name) try: if os.path.exists(cache_file): with open(cache_file, 'rb') as f: return f.read().decode('utf-8') except (OSError, UnicodeDecodeError): pass return None def _save_to_cache(self, game_name: str, json_response: str): """Сохраняет данные в кэш, храня только первую игру и необходимые поля.""" cache_file = self._get_cache_file_path(game_name) try: # Парсим JSON и берем только первую игру data = orjson.loads(json_response) if data.get("data"): first_game = data["data"][0] simplified_data = { "data": [{ "game_id": first_game.get("game_id", -1), "game_name": first_game.get("game_name"), "comp_main": first_game.get("comp_main", 0), "comp_plus": first_game.get("comp_plus", 0), "comp_100": first_game.get("comp_100", 0) }] } with open(cache_file, 'wb') as f: f.write(orjson.dumps(simplified_data)) except (OSError, orjson.JSONDecodeError, IndexError): pass def search(self, game_name: str, case_sensitive: bool = True) -> list[GameEntry] | None: if not game_name or not game_name.strip(): return None # Проверяем кэш cached_response = self._load_from_cache(game_name) if cached_response: try: cached_data = orjson.loads(cached_response) full_json = { "data": [ { "game_id": game["game_id"], "game_name": game["game_name"], "comp_main": game["comp_main"], "comp_plus": game["comp_plus"], "comp_100": game["comp_100"] } for game in cached_data.get("data", []) ] } parser = ResultParser( game_name, self.minimum_similarity, case_sensitive ) return parser.parse_results(orjson.dumps(full_json).decode('utf-8')) except orjson.JSONDecodeError: pass # Если нет в кэше, делаем запрос json_response = self.http_client.search_games(game_name) if not json_response: return None # Сохраняем в кэш только первую игру self._save_to_cache(game_name, json_response) parser = ResultParser( game_name, self.minimum_similarity, case_sensitive ) return parser.parse_results(json_response) def format_game_time(self, game_entry: GameEntry, time_field: str = "main_story") -> str | None: time_value = getattr(game_entry, time_field, None) if time_value is None: return None time_seconds = int(time_value * 3600) return format_playtime(time_seconds) def search_with_callback(self, game_name: str, case_sensitive: bool = True): """Выполняет поиск игры в фоновом потоке и испускает сигнал с результатами.""" def search_thread(): try: results = self.search(game_name, case_sensitive) self.searchCompleted.emit(results if results else []) except Exception as e: print(f"Error in search_with_callback: {e}") self.searchCompleted.emit([]) thread = Thread(target=search_thread) thread.daemon = True thread.start()