Files
PortProtonQt/portprotonqt/howlongtobeat_api.py
2025-07-14 13:15:17 +05:00

372 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import orjson
import re
import os
from dataclasses import dataclass, field
from typing import Any
from difflib import SequenceMatcher
from threading import Thread
import requests
from bs4 import BeautifulSoup, Tag
from portprotonqt.config_utils import read_proxy_config
from portprotonqt.time_utils import format_playtime
from PySide6.QtCore import QObject, Signal
@dataclass
class GameEntry:
"""Информация об игре из HowLongToBeat."""
game_id: int = -1
game_name: str | None = None
main_story: float | None = None
main_extra: float | None = None
completionist: float | None = None
similarity: float = -1.0
raw_data: dict[str, Any] = field(default_factory=dict)
@dataclass
class SearchConfig:
"""Конфигурация для поиска."""
api_key: str | None = None
search_url: str | None = None
class APIKeyExtractor:
"""Извлекает API ключ и URL поиска из скриптов сайта."""
@staticmethod
def extract_from_script(script_content: str) -> SearchConfig:
config = SearchConfig()
config.api_key = APIKeyExtractor._extract_api_key(script_content)
config.search_url = APIKeyExtractor._extract_search_url(script_content, config.api_key)
return config
@staticmethod
def _extract_api_key(script_content: str) -> str | None:
user_id_pattern = r'users\s*:\s*{\s*id\s*:\s*"([^"]+)"'
matches = re.findall(user_id_pattern, script_content)
if matches:
return ''.join(matches)
concat_pattern = r'\/api\/\w+\/"(?:\.concat\("[^"]*"\))+'
matches = re.findall(concat_pattern, script_content)
if matches:
parts = str(matches).split('.concat')
cleaned_parts = [re.sub(r'["\(\)\[\]\']', '', part) for part in parts[1:]]
return ''.join(cleaned_parts)
return None
@staticmethod
def _extract_search_url(script_content: str, api_key: str | None) -> str | None:
if not api_key:
return None
pattern = re.compile(
r'fetch\(\s*["\'](\/api\/[^"\']*)["\']'
r'((?:\s*\.concat\(\s*["\']([^"\']*)["\']\s*\))+)'
r'\s*,',
re.DOTALL
)
for match in pattern.finditer(script_content):
endpoint = match.group(1)
concat_calls = match.group(2)
concat_strings = re.findall(r'\.concat\(\s*["\']([^"\']*)["\']\s*\)', concat_calls)
concatenated_str = ''.join(concat_strings)
if concatenated_str == api_key:
return endpoint
return None
class HTTPClient:
"""HTTP клиент для работы с API HowLongToBeat."""
BASE_URL = 'https://howlongtobeat.com/'
SEARCH_URL = BASE_URL + "api/s/"
def __init__(self, timeout: int = 60):
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'referer': self.BASE_URL
})
proxy_config = read_proxy_config()
if proxy_config:
self.session.proxies.update(proxy_config)
def get_search_config(self, parse_all_scripts: bool = False) -> SearchConfig | None:
try:
response = self.session.get(self.BASE_URL, timeout=self.timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
scripts = soup.find_all('script', src=True)
script_urls = []
for script in scripts:
if isinstance(script, Tag):
src = script.get('src')
if src is not None and isinstance(src, str):
if parse_all_scripts or '_app-' in src:
script_urls.append(src)
for script_url in script_urls:
full_url = self.BASE_URL + script_url
script_response = self.session.get(full_url, timeout=self.timeout)
if script_response.status_code == 200:
config = APIKeyExtractor.extract_from_script(script_response.text)
if config.api_key:
return config
except requests.RequestException:
pass
return None
def search_games(self, game_name: str, page: int = 1, config: SearchConfig | None = None) -> str | None:
if not config:
config = self.get_search_config()
if not config:
config = self.get_search_config(parse_all_scripts=True)
if not config or not config.api_key:
return None
search_url = self.SEARCH_URL
if config.search_url:
search_url = self.BASE_URL + config.search_url.lstrip('/')
payload = self._build_search_payload(game_name, page, config)
headers = {
'content-type': 'application/json',
'accept': '*/*'
}
try:
response = self.session.post(
search_url + config.api_key,
headers=headers,
data=orjson.dumps(payload),
timeout=self.timeout
)
if response.status_code == 200:
return response.text
except requests.RequestException:
pass
try:
response = self.session.post(
search_url,
headers=headers,
data=orjson.dumps(payload),
timeout=self.timeout
)
if response.status_code == 200:
return response.text
except requests.RequestException:
pass
return None
def _build_search_payload(self, game_name: str, page: int, config: SearchConfig) -> dict[str, Any]:
payload = {
'searchType': "games",
'searchTerms': game_name.split(),
'searchPage': page,
'size': 1, # Limit to 1 result
'searchOptions': {
'games': {
'userId': 0,
'platform': "",
'sortCategory': "popular",
'rangeCategory': "main",
'rangeTime': {'min': 0, 'max': 0},
'gameplay': {
'perspective': "",
'flow': "",
'genre': "",
"difficulty": ""
},
'rangeYear': {'max': "", 'min': ""},
'modifier': "" # Hardcoded to empty string for SearchModifiers.NONE
},
'users': {'sortCategory': "postcount"},
'lists': {'sortCategory': "follows"},
'filter': "",
'sort': 0,
'randomizer': 0
},
'useCache': True,
'fields': ["game_id", "game_name", "comp_main", "comp_plus", "comp_100"] # Request only needed fields
}
if config.api_key:
payload['searchOptions']['users']['id'] = config.api_key
return payload
class ResultParser:
"""Парсер результатов поиска."""
def __init__(self, search_query: str, minimum_similarity: float = 0.4, case_sensitive: bool = True):
self.search_query = search_query
self.minimum_similarity = minimum_similarity
self.case_sensitive = case_sensitive
self.search_numbers = self._extract_numbers(search_query)
def parse_results(self, json_response: str, target_game_id: int | None = None) -> list[GameEntry]:
try:
data = orjson.loads(json_response)
games = []
# Only process the first result
if data.get("data"):
game_data = data["data"][0]
game = self._parse_game_entry(game_data)
if target_game_id is not None:
if game.game_id == target_game_id:
games.append(game)
elif self.minimum_similarity == 0.0 or game.similarity >= self.minimum_similarity:
games.append(game)
return games
except (orjson.JSONDecodeError, KeyError, IndexError):
return []
def _parse_game_entry(self, game_data: dict[str, Any]) -> GameEntry:
game = GameEntry()
game.game_id = game_data.get("game_id", -1)
game.game_name = game_data.get("game_name")
game.raw_data = game_data
time_fields = [
("comp_main", "main_story"),
("comp_plus", "main_extra"),
("comp_100", "completionist")
]
for json_field, attr_name in time_fields:
if json_field in game_data:
time_hours = round(game_data[json_field] / 3600, 2)
setattr(game, attr_name, time_hours)
game.similarity = self._calculate_similarity(game)
return game
def _calculate_similarity(self, game: GameEntry) -> float:
return self._compare_strings(self.search_query, game.game_name)
def _compare_strings(self, a: str | None, b: str | None) -> float:
if not a or not b:
return 0.0
if self.case_sensitive:
similarity = SequenceMatcher(None, a, b).ratio()
else:
similarity = SequenceMatcher(None, a.lower(), b.lower()).ratio()
if self.search_numbers and not self._contains_numbers(b, self.search_numbers):
similarity -= 0.1
return max(0.0, similarity)
@staticmethod
def _extract_numbers(text: str) -> list[str]:
return [word for word in text.split() if word.isdigit()]
@staticmethod
def _contains_numbers(text: str, numbers: list[str]) -> bool:
if not numbers:
return True
cleaned_text = re.sub(r'([^\s\w]|_)+', '', text)
text_numbers = [word for word in cleaned_text.split() if word.isdigit()]
return any(num in text_numbers for num in numbers)
def get_cache_dir():
"""Возвращает путь к каталогу кэша, создаёт его при необходимости."""
xdg_cache_home = os.getenv("XDG_CACHE_HOME", os.path.join(os.path.expanduser("~"), ".cache"))
cache_dir = os.path.join(xdg_cache_home, "PortProtonQt")
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
class HowLongToBeat(QObject):
"""Основной класс для работы с API HowLongToBeat."""
searchCompleted = Signal(list)
def __init__(self, minimum_similarity: float = 0.4, timeout: int = 60, parent=None):
super().__init__(parent)
self.minimum_similarity = minimum_similarity
self.http_client = HTTPClient(timeout)
self.cache_dir = get_cache_dir()
def _get_cache_file_path(self, game_name: str) -> str:
"""Возвращает путь к файлу кэша для заданного имени игры."""
safe_game_name = re.sub(r'[^\w\s-]', '', game_name).replace(' ', '_').lower()
cache_file = f"hltb_{safe_game_name}.json"
return os.path.join(self.cache_dir, cache_file)
def _load_from_cache(self, game_name: str) -> str | None:
"""Пытается загрузить данные из кэша, если они существуют."""
cache_file = self._get_cache_file_path(game_name)
try:
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
return f.read().decode('utf-8')
except (OSError, UnicodeDecodeError):
pass
return None
def _save_to_cache(self, game_name: str, json_response: str):
"""Сохраняет данные в кэш, храня только первую игру и необходимые поля."""
cache_file = self._get_cache_file_path(game_name)
try:
# Парсим JSON и берем только первую игру
data = orjson.loads(json_response)
if data.get("data"):
first_game = data["data"][0]
simplified_data = {
"data": [{
"game_id": first_game.get("game_id", -1),
"game_name": first_game.get("game_name"),
"comp_main": first_game.get("comp_main", 0),
"comp_plus": first_game.get("comp_plus", 0),
"comp_100": first_game.get("comp_100", 0)
}]
}
with open(cache_file, 'wb') as f:
f.write(orjson.dumps(simplified_data))
except (OSError, orjson.JSONDecodeError, IndexError):
pass
def search(self, game_name: str, case_sensitive: bool = True) -> list[GameEntry] | None:
if not game_name or not game_name.strip():
return None
# Проверяем кэш
cached_response = self._load_from_cache(game_name)
if cached_response:
try:
cached_data = orjson.loads(cached_response)
full_json = {
"data": [
{
"game_id": game["game_id"],
"game_name": game["game_name"],
"comp_main": game["comp_main"],
"comp_plus": game["comp_plus"],
"comp_100": game["comp_100"]
}
for game in cached_data.get("data", [])
]
}
parser = ResultParser(
game_name,
self.minimum_similarity,
case_sensitive
)
return parser.parse_results(orjson.dumps(full_json).decode('utf-8'))
except orjson.JSONDecodeError:
pass
# Если нет в кэше, делаем запрос
json_response = self.http_client.search_games(game_name)
if not json_response:
return None
# Сохраняем в кэш только первую игру
self._save_to_cache(game_name, json_response)
parser = ResultParser(
game_name,
self.minimum_similarity,
case_sensitive
)
return parser.parse_results(json_response)
def format_game_time(self, game_entry: GameEntry, time_field: str = "main_story") -> str | None:
time_value = getattr(game_entry, time_field, None)
if time_value is None:
return None
time_seconds = int(time_value * 3600)
return format_playtime(time_seconds)
def search_with_callback(self, game_name: str, case_sensitive: bool = True):
"""Выполняет поиск игры в фоновом потоке и испускает сигнал с результатами."""
def search_thread():
try:
results = self.search(game_name, case_sensitive)
self.searchCompleted.emit(results if results else [])
except Exception as e:
print(f"Error in search_with_callback: {e}")
self.searchCompleted.emit([])
thread = Thread(target=search_thread)
thread.daemon = True
thread.start()