Files
PortProtonQt/portprotonqt/steam_api.py
2025-08-10 12:35:12 +05:00

1303 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import functools
import os
import shlex
import subprocess
import time
import html
import orjson
import vdf
import tarfile
import threading
from pathlib import Path
from portprotonqt.logger import get_logger
from portprotonqt.localization import get_steam_language
from portprotonqt.downloader import Downloader
from portprotonqt.dialogs import generate_thumbnail
from portprotonqt.config_utils import get_portproton_location
from collections.abc import Callable
import re
import shutil
import zlib
import websocket
import requests
import random
import base64
downloader = Downloader()
logger = get_logger(__name__)
CACHE_DURATION = 30 * 24 * 60 * 60
def safe_vdf_load(path: str | Path) -> dict:
path = str(path) # Convert Path to str
try:
with open(path, 'rb') as f:
data = vdf.binary_load(f)
return data
except Exception as e:
try:
with open(path, encoding='utf-8', errors='ignore') as f:
data = vdf.load(f)
return data
except Exception:
logger.error(f"Failed to load VDF file {path}: {e}")
return {}
def decode_text(text: str) -> str:
"""
Декодирует HTML-сущности в строке.
Например, """ преобразуется в '"'.
Остальные символы и HTML-теги остаются без изменений.
"""
return html.unescape(text)
def get_cache_dir():
"""Возвращает путь к каталогу кэша, создаёт его при необходимости."""
xdg_cache_home = os.getenv("XDG_CACHE_HOME", os.path.join(os.path.expanduser("~"), ".cache"))
cache_dir = os.path.join(xdg_cache_home, "PortProtonQt")
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
STEAM_DATA_DIRS = (
"~/.local/share/Steam",
"~/snap/steam/common/.local/share/Steam",
"~/.var/app/com.valvesoftware.Steam/data/Steam",
)
def get_steam_home():
"""Возвращает путь к директории Steam, используя список возможных директорий."""
for dir_path in STEAM_DATA_DIRS:
expanded_path = Path(os.path.expanduser(dir_path))
if expanded_path.exists():
return expanded_path
return None
def get_last_steam_user(steam_home: Path) -> dict | None:
"""Возвращает данные последнего пользователя Steam из loginusers.vdf."""
loginusers_path = steam_home / "config/loginusers.vdf"
data = safe_vdf_load(loginusers_path)
if not data:
return None
users = data.get('users', {})
for user_id, user_info in users.items():
if user_info.get('MostRecent') == '1':
try:
return {'SteamID': int(user_id)}
except ValueError:
logger.error(f"Неверный формат SteamID: {user_id}")
return None
logger.info("Не найден пользователь с MostRecent=1")
return None
def convert_steam_id(steam_id: int) -> int:
"""
Преобразует знаковое 32-битное целое число в беззнаковое 32-битное целое число.
Использует побитовое И с 0xFFFFFFFF, что корректно обрабатывает отрицательные значения.
"""
return steam_id & 0xFFFFFFFF
def get_steam_libs(steam_dir: Path) -> set[Path]:
"""Возвращает набор директорий Steam libraryfolders."""
libs = set()
libs_vdf = steam_dir / "steamapps/libraryfolders.vdf"
data = safe_vdf_load(libs_vdf)
folders = data.get('libraryfolders', {})
for key, info in folders.items():
if key.isdigit():
path_str = info.get('path') if isinstance(info, dict) else None
if path_str:
path = Path(path_str).expanduser()
if path.exists():
libs.add(path)
libs.add(steam_dir)
return libs
def get_playtime_data(steam_home: Path | None = None) -> dict[int, tuple[int, int]]:
"""Возвращает данные о времени игры для последнего пользователя."""
play_data: dict[int, tuple[int, int]] = {}
if steam_home is None:
steam_home = get_steam_home()
if steam_home is None or not steam_home.exists():
logger.error("Steam home directory not found or does not exist")
return play_data
userdata_dir = steam_home / "userdata"
if not userdata_dir.exists():
logger.info("Userdata directory not found")
return play_data
if steam_home is not None: # Explicit check for type checker
last_user = get_last_steam_user(steam_home)
else:
logger.info("Steam home is None, cannot retrieve last user")
return play_data
if not last_user:
logger.info("Не удалось определить последнего пользователя Steam")
return play_data
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
if not user_dir.exists():
logger.info(f"Директория пользователя {unsigned_id} не найдена")
return play_data
localconfig = user_dir / "config/localconfig.vdf"
data = safe_vdf_load(localconfig)
cfg = data.get('UserLocalConfigStore', {})
apps = cfg.get('Software', {}).get('Valve', {}).get('Steam', {}).get('apps', {})
for appid_str, info in apps.items():
try:
appid = int(appid_str)
last_played = int(info.get('LastPlayed', 0))
playtime = int(info.get('Playtime', 0))
play_data[appid] = (last_played, playtime)
except ValueError:
logger.warning(f"Некорректные данные playtime для app {appid_str}")
return play_data
def get_steam_installed_games() -> list[tuple[str, int, int, int]]:
"""Возвращает список установленных Steam игр в формате (name, appid, last_played, playtime_sec)."""
games: list[tuple[str, int, int, int]] = []
steam_home = get_steam_home()
if steam_home is None or not steam_home.exists():
logger.error("Steam home directory not found or does not exist")
return games
play_data = get_playtime_data(steam_home)
for lib in get_steam_libs(steam_home):
steamapps_dir = lib / "steamapps"
if not steamapps_dir.exists():
continue
for manifest in steamapps_dir.glob("appmanifest_*.acf"):
data = safe_vdf_load(manifest)
app = data.get('AppState', {})
try:
appid = int(app.get('appid', 0))
except ValueError:
continue
name = app.get('name', f"Unknown ({appid})")
lname = name.lower()
if any(token in lname for token in ["proton", "steamworks", "steam linux runtime"]):
continue
last_played, playtime_min = play_data.get(appid, (0, 0))
games.append((name, appid, last_played, playtime_min * 60))
return games
def normalize_name(s):
"""
Приведение строки к нормальному виду:
- перевод в нижний регистр,
- удаление символов ™ и ®,
- замена разделителей (-, :, ,) на пробел,
- удаление лишних пробелов,
- удаление суффиксов 'bin' или 'app' в конце строки,
- удаление ключевых слов типа 'ultimate', 'edition' и т.п.
"""
s = s.lower()
for ch in ["", "®"]:
s = s.replace(ch, "")
for ch in ["-", ":", ","]:
s = s.replace(ch, " ")
s = " ".join(s.split())
for suffix in ["bin", "app"]:
if s.endswith(suffix):
s = s[:-len(suffix)].strip()
keywords_to_remove = {"ultimate", "edition", "definitive", "complete", "remastered"}
words = s.split()
filtered_words = [word for word in words if word not in keywords_to_remove]
return " ".join(filtered_words)
def is_valid_candidate(candidate):
"""
Проверяет, содержит ли кандидат запрещённые подстроки:
- win32
- win64
- gamelauncher
Для проверки дополнительно используется строка без пробелов.
Возвращает True, если кандидат допустим, иначе False.
"""
normalized_candidate = normalize_name(candidate)
normalized_no_space = normalized_candidate.replace(" ", "")
forbidden = ["win32", "win64", "gamelauncher"]
for token in forbidden:
if token in normalized_no_space:
return False
return True
def filter_candidates(candidates):
"""
Фильтрует список кандидатов, отбрасывая недопустимые.
"""
valid = []
dropped = []
for cand in candidates:
if cand.strip() and is_valid_candidate(cand):
valid.append(cand)
else:
dropped.append(cand)
if dropped:
logger.info("Отбрасываю кандидатов: %s", dropped)
return valid
def remove_duplicates(candidates):
"""
Удаляет дубликаты из списка, сохраняя порядок.
"""
return list(dict.fromkeys(candidates))
@functools.lru_cache(maxsize=256)
def get_exiftool_data(game_exe):
"""Получает метаданные через exiftool"""
try:
proc = subprocess.run(
["exiftool", "-j", game_exe],
capture_output=True,
text=True,
check=False
)
if proc.returncode != 0:
logger.error(f"exiftool error for {game_exe}: {proc.stderr.strip()}")
return {}
meta_data_list = orjson.loads(proc.stdout.encode("utf-8"))
return meta_data_list[0] if meta_data_list else {}
except Exception as e:
logger.error(f"An unexpected error occurred in get_exiftool_data for {game_exe}: {e}")
return {}
def load_steam_apps_async(callback: Callable[[list], None]):
"""
Asynchronously loads the list of Steam applications, using cache if available.
Calls the callback with the list of apps.
"""
cache_dir = get_cache_dir()
cache_tar = os.path.join(cache_dir, "games_appid.tar.xz")
cache_json = os.path.join(cache_dir, "steam_apps.json")
def process_tar(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download Steam apps archive")
callback([])
return
try:
with tarfile.open(result, mode='r:xz') as tar:
member = next((m for m in tar.getmembers() if m.name.endswith('.json')), None)
if member is None:
raise RuntimeError("JSON file not found in archive")
fobj = tar.extractfile(member)
if fobj is None:
raise RuntimeError(f"Failed to extract file {member.name} from archive")
raw = fobj.read()
fobj.close()
data = orjson.loads(raw)
with open(cache_json, "wb") as f:
f.write(orjson.dumps(data))
if os.path.exists(cache_tar):
os.remove(cache_tar)
logger.info("Archive %s deleted after extraction", cache_tar)
steam_apps = data if isinstance(data, list) else []
logger.info("Loaded %d apps from archive", len(steam_apps))
callback(steam_apps)
except Exception as e:
logger.error("Error extracting Steam apps archive: %s", e)
callback([])
if os.path.exists(cache_json) and (time.time() - os.path.getmtime(cache_json) < CACHE_DURATION):
logger.info("Using cached Steam apps JSON: %s", cache_json)
try:
with open(cache_json, "rb") as f:
data = orjson.loads(f.read())
# Validate JSON structure
if not isinstance(data, list):
logger.error("Cached JSON %s has invalid format (not a list), re-downloading", cache_json)
raise ValueError("Invalid JSON structure")
# Validate each app entry
for app in data:
if not isinstance(app, dict) or "appid" not in app or "normalized_name" not in app:
logger.error("Cached JSON %s contains invalid app entry, re-downloading", cache_json)
raise ValueError("Invalid app entry structure")
steam_apps = data
logger.info("Loaded %d apps from cache", len(steam_apps))
callback(steam_apps)
except Exception as e:
logger.error("Error reading or validating cached JSON %s: %s", cache_json, e)
# Attempt to re-download if cache is invalid or corrupted
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/games_appid.tar.xz"
)
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
else:
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/games_appid.tar.xz"
)
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
def build_index(steam_apps):
"""
Строит индекс приложений по полю normalized_name.
"""
steam_apps_index = {}
if not steam_apps:
return steam_apps_index
logger.info("Построение индекса Steam приложений:")
for app in steam_apps:
normalized = app["normalized_name"]
steam_apps_index[normalized] = app
return steam_apps_index
def search_app(candidate, steam_apps_index):
"""
Ищет приложение по кандидату: сначала пытается точное совпадение, затем ищет подстроку.
"""
candidate_norm = normalize_name(candidate)
logger.info("Поиск приложения для кандидата: '%s' -> '%s'", candidate, candidate_norm)
if candidate_norm in steam_apps_index:
logger.info(" Найдено точное совпадение: '%s'", candidate_norm)
return steam_apps_index[candidate_norm]
for name_norm, app in steam_apps_index.items():
if candidate_norm in name_norm:
ratio = len(candidate_norm) / len(name_norm)
if ratio > 0.8:
logger.info(" Найдено частичное совпадение: кандидат '%s' в '%s' (ratio: %.2f)",
candidate_norm, name_norm, ratio)
return app
logger.info(" Приложение для кандидата '%s' не найдено", candidate_norm)
return None
def load_app_details(app_id):
"""Загружает кэшированные данные для игры по appid, если они не устарели."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
if os.path.exists(cache_file):
if time.time() - os.path.getmtime(cache_file) < CACHE_DURATION:
with open(cache_file, "rb") as f:
return orjson.loads(f.read())
return None
def save_app_details(app_id, data):
"""Сохраняет данные по appid в файл кэша."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
with open(cache_file, "wb") as f:
f.write(orjson.dumps(data))
def fetch_app_info_async(app_id: int, callback: Callable[[dict | None], None]):
"""
Asynchronously fetches detailed app info from Steam API.
Calls the callback with the app data or None if failed.
"""
cached = load_app_details(app_id)
if cached is not None:
callback(cached)
return
lang = get_steam_language()
url = f"https://store.steampowered.com/api/appdetails?appids={app_id}&l={lang}"
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
def process_response(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download Steam app info for appid %s", app_id)
callback(None)
return
try:
with open(result, "rb") as f:
data = orjson.loads(f.read())
details = data.get(str(app_id), {})
if not details.get("success"):
callback(None)
return
app_data_full = details.get("data", {})
app_data = {
"steam_appid": app_data_full.get("steam_appid", app_id),
"name": app_data_full.get("name", ""),
"short_description": app_data_full.get("short_description", ""),
"controller_support": app_data_full.get("controller_support", "")
}
save_app_details(app_id, app_data)
callback(app_data)
except Exception as e:
logger.error("Error processing Steam app info for appid %s: %s", app_id, e)
callback(None)
downloader.download_async(url, cache_file, timeout=5, callback=process_response)
def load_weanticheatyet_data_async(callback: Callable[[list], None]):
"""
Asynchronously loads the list of WeAntiCheatYet data, using cache if available.
Calls the callback with the list of anti-cheat data.
"""
cache_dir = get_cache_dir()
cache_tar = os.path.join(cache_dir, "anticheat_games.tar.xz")
cache_json = os.path.join(cache_dir, "anticheat_games.json")
def process_tar(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download WeAntiCheatYet archive")
callback([])
return
try:
with tarfile.open(result, mode='r:xz') as tar:
member = next((m for m in tar.getmembers() if m.name.endswith('anticheat_games_min.json')), None)
if member is None:
raise RuntimeError("JSON file not found in archive")
fobj = tar.extractfile(member)
if fobj is None:
raise RuntimeError(f"Failed to extract file {member.name} from archive")
raw = fobj.read()
fobj.close()
data = orjson.loads(raw)
with open(cache_json, "wb") as f:
f.write(orjson.dumps(data))
if os.path.exists(cache_tar):
os.remove(cache_tar)
logger.info("Archive %s deleted after extraction", cache_tar)
anti_cheat_data = data or []
logger.info("Loaded %d anti-cheat entries from archive", len(anti_cheat_data))
callback(anti_cheat_data)
except Exception as e:
logger.error("Error extracting WeAntiCheatYet archive: %s", e)
callback([])
if os.path.exists(cache_json) and (time.time() - os.path.getmtime(cache_json) < CACHE_DURATION):
logger.info("Using cached WeAntiCheatYet JSON: %s", cache_json)
try:
with open(cache_json, "rb") as f:
data = orjson.loads(f.read())
# Validate JSON structure
if not isinstance(data, list):
logger.error("Cached JSON %s has invalid format (not a list), re-downloading", cache_json)
raise ValueError("Invalid JSON structure")
# Validate each anti-cheat entry
for entry in data:
if not isinstance(entry, dict) or "normalized_name" not in entry or "status" not in entry:
logger.error("Cached JSON %s contains invalid anti-cheat entry, re-downloading", cache_json)
raise ValueError("Invalid anti-cheat entry structure")
anti_cheat_data = data
logger.info("Loaded %d anti-cheat entries from cache", len(anti_cheat_data))
callback(anti_cheat_data)
except Exception as e:
logger.error("Error reading or validating cached WeAntiCheatYet JSON %s: %s", cache_json, e)
# Attempt to re-download if cache is invalid or corrupted
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/anticheat_games.tar.xz"
)
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
else:
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/anticheat_games.tar.xz"
)
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
def build_weanticheatyet_index(anti_cheat_data):
"""
Строит индекс античит-данных по полю normalized_name.
"""
anti_cheat_index = {}
if not anti_cheat_data:
return anti_cheat_index
logger.info("Построение индекса WeAntiCheatYet данных:")
for entry in anti_cheat_data:
normalized = entry["normalized_name"]
anti_cheat_index[normalized] = entry
return anti_cheat_index
def search_anticheat_status(candidate, anti_cheat_index):
candidate_norm = normalize_name(candidate)
logger.info("Поиск античит-статуса для кандидата: '%s' -> '%s'", candidate, candidate_norm)
if candidate_norm in anti_cheat_index:
status = anti_cheat_index[candidate_norm]["status"]
logger.info(" Найдено точное совпадение: '%s', статус: '%s'", candidate_norm, status)
return status
for name_norm, entry in anti_cheat_index.items():
if candidate_norm in name_norm:
ratio = len(candidate_norm) / len(name_norm)
if ratio > 0.8:
status = entry["status"]
logger.info(" Найдено частичное совпадение: кандидат '%s' в '%s' (ratio: %.2f), статус: '%s'",
candidate_norm, name_norm, ratio, status)
return status
logger.info(" Античит-статус для кандидата '%s' не найден", candidate_norm)
return ""
def get_weanticheatyet_status_async(game_name: str, callback: Callable[[str], None]):
"""
Asynchronously retrieves WeAntiCheatYet status for a game by name.
Calls the callback with the status string or empty string if not found.
"""
def on_anticheat_data(anti_cheat_data: list):
anti_cheat_index = build_weanticheatyet_index(anti_cheat_data)
status = search_anticheat_status(game_name, anti_cheat_index)
callback(status)
load_weanticheatyet_data_async(on_anticheat_data)
def load_protondb_status(appid):
"""Загружает закешированные данные ProtonDB для игры по appid, если они не устарели."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
if os.path.exists(cache_file):
if time.time() - os.path.getmtime(cache_file) < CACHE_DURATION:
try:
with open(cache_file, "rb") as f:
return orjson.loads(f.read())
except Exception as e:
logger.error("Ошибка загрузки кеша ProtonDB для appid %s: %s", appid, e)
return None
def save_protondb_status(appid, data):
"""Сохраняет данные ProtonDB для игры по appid в файл кэша."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
try:
with open(cache_file, "wb") as f:
f.write(orjson.dumps(data))
except Exception as e:
logger.error("Ошибка сохранения кеша ProtonDB для appid %s: %s", appid, e)
def get_protondb_tier_async(appid: int, callback: Callable[[str], None]):
"""
Asynchronously fetches ProtonDB tier for an app.
Calls the callback with the tier string or empty string if failed.
"""
cached = load_protondb_status(appid)
if cached is not None:
callback(cached.get("tier", ""))
return
url = f"https://www.protondb.com/api/v1/reports/summaries/{appid}.json"
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
def process_response(result: str | None):
if not result or not os.path.exists(result):
logger.info("Failed to download ProtonDB data for appid %s", appid)
callback("")
return
try:
with open(result, "rb") as f:
data = orjson.loads(f.read())
filtered_data = {"tier": data.get("tier", "")}
save_protondb_status(appid, filtered_data)
callback(filtered_data["tier"])
except Exception as e:
logger.info("Failed to process ProtonDB data for appid %s: %s", appid, e)
callback("")
downloader.download_async(url, cache_file, timeout=5, callback=process_response)
def get_full_steam_game_info_async(appid: int, callback: Callable[[dict], None]):
"""
Asynchronously retrieves full Steam game info, including WeAntiCheatYet status.
Calls the callback with the game info dictionary.
"""
def on_app_info(app_info: dict | None):
if not app_info:
callback({})
return
title = decode_text(app_info.get("name", ""))
description = decode_text(app_info.get("short_description", ""))
cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg"
def on_protondb_tier(tier: str):
def on_anticheat_status(anticheat_status: str):
callback({
'description': description,
'controller_support': app_info.get('controller_support', ''),
'cover': cover,
'protondb_tier': tier,
'steam_game': "true",
'name': title,
'anticheat_status': anticheat_status
})
get_weanticheatyet_status_async(title, on_anticheat_status)
get_protondb_tier_async(appid, on_protondb_tier)
fetch_app_info_async(appid, on_app_info)
def get_steam_game_info_async(desktop_name: str, exec_line: str, callback: Callable[[dict], tuple[bool, str] | None]) -> None:
"""
Asynchronously retrieves game info based on desktop name and exec line, including WeAntiCheatYet status for all games.
Calls the callback with the game info dictionary.
"""
parts = shlex.split(exec_line)
game_exe = parts[-1] if parts else exec_line
if game_exe.lower().endswith('.bat'):
if os.path.exists(game_exe):
try:
with open(game_exe, encoding='utf-8') as f:
bat_lines = f.readlines()
for line in bat_lines:
line = line.strip()
if '.exe' in line.lower():
tokens = shlex.split(line)
for token in tokens:
if token.lower().endswith('.exe'):
game_exe = token
break
if game_exe.lower().endswith('.exe'):
break
except Exception as e:
logger.error("Error processing bat file %s: %s", game_exe, e)
else:
logger.error("Bat file not found: %s", game_exe)
if not game_exe.lower().endswith('.exe'):
logger.error("Invalid executable path: %s. Expected .exe", game_exe)
meta_data = {}
else:
meta_data = get_exiftool_data(game_exe)
exe_name = os.path.splitext(os.path.basename(game_exe))[0]
folder_path = os.path.dirname(game_exe)
folder_name = os.path.basename(folder_path)
if folder_name.lower() in ['bin', 'binaries']:
folder_path = os.path.dirname(folder_path)
folder_name = os.path.basename(folder_path)
logger.info("Game folder name: '%s'", folder_name)
candidates = []
product_name = meta_data.get("ProductName", "")
file_description = meta_data.get("FileDescription", "")
if product_name and product_name not in ['BootstrapPackagedGame']:
candidates.append(product_name)
if file_description and file_description not in ['BootstrapPackagedGame']:
candidates.append(file_description)
if desktop_name:
candidates.append(desktop_name)
if exe_name:
candidates.append(exe_name)
if folder_name:
candidates.append(folder_name)
logger.info("Initial candidates: %s", candidates)
candidates = filter_candidates(candidates)
candidates = remove_duplicates(candidates)
candidates_ordered = sorted(candidates, key=lambda s: len(s.split()), reverse=True)
logger.info("Sorted candidates: %s", candidates_ordered)
def on_steam_apps(steam_apps: list):
steam_apps_index = build_index(steam_apps)
matching_app = None
for candidate in candidates_ordered:
if not candidate:
continue
matching_app = search_app(candidate, steam_apps_index)
if matching_app:
logger.info("Match found for candidate '%s': %s", candidate, matching_app.get("normalized_name"))
break
game_name = desktop_name or exe_name.capitalize()
if not matching_app:
def on_anticheat_status(anticheat_status: str):
callback({
"appid": "",
"name": decode_text(game_name),
"description": "",
"cover": "",
"controller_support": "",
"protondb_tier": "",
"steam_game": "false",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(game_name, on_anticheat_status)
return
appid = matching_app["appid"]
def on_app_info(app_info: dict | None):
if not app_info:
def on_anticheat_status(anticheat_status: str):
callback({
"appid": "",
"name": decode_text(game_name),
"description": "",
"cover": "",
"controller_support": "",
"protondb_tier": "",
"steam_game": "false",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(game_name, on_anticheat_status)
return
title = decode_text(app_info.get("name", game_name))
description = decode_text(app_info.get("short_description", ""))
cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg"
controller_support = app_info.get("controller_support", "")
def on_protondb_tier(tier: str):
def on_anticheat_status(anticheat_status: str):
callback({
"appid": appid,
"name": title,
"description": description,
"cover": cover,
"controller_support": controller_support,
"protondb_tier": tier,
"steam_game": "true",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(title, on_anticheat_status)
get_protondb_tier_async(appid, on_protondb_tier)
fetch_app_info_async(appid, on_app_info)
load_steam_apps_async(on_steam_apps)
_STEAM_APPS = None
_STEAM_APPS_INDEX = None
_STEAM_APPS_LOCK = threading.Lock()
def get_steam_apps_and_index_async(callback: Callable[[tuple[list, dict]], None]):
"""
Asynchronously loads and caches Steam apps and their index.
Calls the callback with (steam_apps, steam_apps_index).
"""
global _STEAM_APPS, _STEAM_APPS_INDEX
with _STEAM_APPS_LOCK:
if _STEAM_APPS is not None and _STEAM_APPS_INDEX is not None:
callback((_STEAM_APPS, _STEAM_APPS_INDEX))
return
def on_steam_apps(steam_apps: list):
global _STEAM_APPS, _STEAM_APPS_INDEX
with _STEAM_APPS_LOCK:
_STEAM_APPS = steam_apps
_STEAM_APPS_INDEX = build_index(steam_apps)
callback((_STEAM_APPS, _STEAM_APPS_INDEX))
load_steam_apps_async(on_steam_apps)
def enable_steam_cef() -> tuple[bool, str]:
"""
Проверяет и при необходимости активирует режим удаленной отладки Steam CEF.
Создает файл .cef-enable-remote-debugging в директории Steam.
Steam необходимо перезапустить после первого создания этого файла.
Возвращает кортеж:
- (True, "already_enabled") если уже было активно.
- (True, "restart_needed") если было только что активировано и нужен перезапуск Steam.
- (False, "steam_not_found") если директория Steam не найдена.
"""
steam_home = get_steam_home()
if not steam_home:
return (False, "steam_not_found")
cef_flag_file = steam_home / ".cef-enable-remote-debugging"
logger.info(f"Проверка CEF флага: {cef_flag_file}")
if cef_flag_file.exists():
logger.info("CEF Remote Debugging уже активирован.")
return (True, "already_enabled")
else:
try:
os.makedirs(cef_flag_file.parent, exist_ok=True)
cef_flag_file.touch()
logger.info("CEF Remote Debugging активирован. Steam необходимо перезапустить.")
return (True, "restart_needed")
except Exception as e:
logger.error(f"Не удалось создать CEF флаг {cef_flag_file}: {e}")
return (False, str(e))
def call_steam_api(js_cmd: str, *args) -> dict | None:
"""
Выполняет JavaScript функцию в контексте Steam через CEF Remote Debugging.
Args:
js_cmd: Имя JS функции для вызова (напр. 'createShortcut').
*args: Аргументы для передачи в JS функцию.
Returns:
Словарь с результатом выполнения или None в случае ошибки.
"""
status, message = enable_steam_cef()
if not (status is True and message == "already_enabled"):
if message == "restart_needed":
logger.warning("Steam CEF API доступен, но требует перезапуска Steam для полной активации.")
elif message == "steam_not_found":
logger.error("Не удалось найти директорию Steam для проверки CEF API.")
else:
logger.error(f"Steam CEF API недоступен или не готов: {message}")
return None
steam_debug_url = "http://localhost:8080/json"
try:
response = requests.get(steam_debug_url, timeout=2)
response.raise_for_status()
contexts = response.json()
ws_url = next((ctx['webSocketDebuggerUrl'] for ctx in contexts if ctx['title'] == 'SharedJSContext'), None)
if not ws_url:
logger.warning("Не удалось найти SharedJSContext. Steam запущен с флагом -cef-enable-remote-debugging?")
return None
except Exception as e:
logger.warning(f"Не удалось подключиться к Steam CEF API по адресу {steam_debug_url}. Steam запущен? {e}")
return None
js_code = """
async function createShortcut(name, exe, dir, icon, args) {
const id = await SteamClient.Apps.AddShortcut(name, exe, dir, args);
console.log("Shortcut created with ID:", id);
await SteamClient.Apps.SetShortcutName(id, name);
if (icon)
await SteamClient.Apps.SetShortcutIcon(id, icon);
if (args)
await SteamClient.Apps.SetAppLaunchOptions(id, args);
return { id };
};
async function setGrid(id, i, ext, image) {
await SteamClient.Apps.SetCustomArtworkForApp(id, image, ext, i);
return true;
};
async function removeShortcut(id) {
await SteamClient.Apps.RemoveShortcut(+id);
return true;
};
"""
try:
ws = websocket.create_connection(ws_url, timeout=5)
js_args = ", ".join(orjson.dumps(arg).decode('utf-8') for arg in args)
expression = f"{js_code} {js_cmd}({js_args});"
payload = {
"id": random.randint(0, 32767),
"method": "Runtime.evaluate",
"params": {
"expression": expression,
"awaitPromise": True,
"returnByValue": True
}
}
ws.send(orjson.dumps(payload))
response_str = ws.recv()
ws.close()
response_data = orjson.loads(response_str)
if "error" in response_data:
logger.error(f"Ошибка выполнения JS в Steam: {response_data['error']['message']}")
return None
result = response_data.get('result', {}).get('result', {})
if result.get('type') == 'object' and result.get('subtype') == 'error':
logger.error(f"Ошибка выполнения JS в Steam: {result.get('description')}")
return None
return result.get('value')
except Exception as e:
logger.error(f"Ошибка при взаимодействии с WebSocket Steam: {e}")
return None
def add_to_steam(game_name: str, exec_line: str, cover_path: str) -> tuple[bool, str]:
"""
Add a non-Steam game to Steam via shortcuts.vdf with PortProton tag,
and download Steam Grid covers with correct sizes and names.
"""
if not exec_line or not exec_line.strip():
logger.error("Invalid exec_line: empty or whitespace")
return (False, "Executable command is empty or invalid")
# Parse exec_line to get the executable path
try:
entry_exec_split = shlex.split(exec_line)
if not entry_exec_split:
logger.error(f"Failed to parse exec_line: {exec_line}")
return (False, "Failed to parse executable command: no valid tokens")
if entry_exec_split[0] == "env" and len(entry_exec_split) >= 3:
exe_path = entry_exec_split[2]
elif entry_exec_split[0] == "flatpak" and len(entry_exec_split) >= 4:
exe_path = entry_exec_split[3]
else:
exe_path = entry_exec_split[-1]
except Exception as e:
logger.error(f"Failed to parse exec_line: {exec_line}, error: {e}")
return (False, f"Failed to parse executable command: {e}")
if not os.path.exists(exe_path):
logger.error(f"Executable not found: {exe_path}")
return (False, f"Executable file not found: {exe_path}")
portproton_dir = get_portproton_location()
if not portproton_dir:
logger.error("PortProton directory not found")
return (False, "PortProton directory not found")
steam_scripts_dir = os.path.join(portproton_dir, "steam_scripts")
os.makedirs(steam_scripts_dir, exist_ok=True)
safe_game_name = re.sub(r'[<>:"/\\|?*]', '_', game_name.strip())
script_path = os.path.join(steam_scripts_dir, f"{safe_game_name}.sh")
start_sh_path = os.path.join(portproton_dir, "data", "scripts", "start.sh")
if not os.path.exists(start_sh_path):
logger.error(f"start.sh not found at {start_sh_path}")
return (False, f"start.sh not found at {start_sh_path}")
if not os.path.exists(script_path):
script_content = f"""#!/usr/bin/env bash
export LD_PRELOAD=
export START_FROM_STEAM=1
"{start_sh_path}" "{exe_path}" "$@"
"""
try:
with open(script_path, "w", encoding="utf-8") as f:
f.write(script_content)
os.chmod(script_path, 0o755)
logger.info(f"Created launch script: {script_path}")
except Exception as e:
logger.error(f"Failed to create launch script {script_path}: {e}")
return (False, f"Failed to create launch script: {e}")
else:
logger.info(f"Launch script already exists: {script_path}")
generated_icon_path = os.path.join(portproton_dir, "data", "img", f"{safe_game_name}.png")
try:
img_dir = os.path.join(portproton_dir, "data", "img")
os.makedirs(img_dir, exist_ok=True)
if os.path.exists(generated_icon_path):
logger.info(f"Reusing existing thumbnail: {generated_icon_path}")
else:
success = generate_thumbnail(exe_path, generated_icon_path, size=128, force_resize=True)
if not success or not os.path.exists(generated_icon_path):
logger.warning(f"generate_thumbnail failed to create icon for {exe_path}")
icon_path = ""
else:
logger.info(f"Generated thumbnail: {generated_icon_path}")
icon_path = generated_icon_path
except Exception as e:
logger.error(f"Error generating thumbnail for {exe_path}: {e}")
icon_path = ""
steam_home = get_steam_home()
if not steam_home:
logger.error("Steam home directory not found")
return (False, "Steam directory not found.")
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.error("Failed to retrieve Steam user ID")
return (False, "Failed to get Steam user ID.")
userdata_dir = steam_home / "userdata"
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
steam_shortcuts_path = user_dir / "config" / "shortcuts.vdf"
grid_dir = user_dir / "config" / "grid"
os.makedirs(grid_dir, exist_ok=True)
appid = None
was_api_used = False
logger.info("Попытка добавления ярлыка через Steam CEF API...")
api_response = call_steam_api(
"createShortcut",
game_name,
script_path,
str(Path(script_path).parent),
icon_path,
""
)
if api_response and isinstance(api_response, dict) and 'id' in api_response:
appid = api_response['id']
was_api_used = True
logger.info(f"Ярлык успешно добавлен через API. AppID: {appid}")
else:
logger.warning("Не удалось добавить ярлык через API. Используется запасной метод (запись в shortcuts.vdf).")
backup_path = f"{steam_shortcuts_path}.backup"
if os.path.exists(steam_shortcuts_path):
try:
shutil.copy2(steam_shortcuts_path, backup_path)
logger.info(f"Created backup of shortcuts.vdf at {backup_path}")
except Exception as e:
logger.error(f"Failed to create backup of shortcuts.vdf: {e}")
return (False, f"Failed to create backup of shortcuts.vdf: {e}")
unique_string = f"{script_path}{game_name}"
baseid = zlib.crc32(unique_string.encode('utf-8')) & 0xffffffff
appid = baseid | 0x80000000
if appid > 0x7FFFFFFF:
aidvdf = appid - 0x100000000
else:
aidvdf = appid
shortcut = {
"appid": aidvdf,
"AppName": game_name,
"Exe": f'"{script_path}"',
"StartDir": f'"{os.path.dirname(script_path)}"',
"icon": icon_path,
"LaunchOptions": "",
"IsHidden": 0,
"AllowDesktopConfig": 1,
"AllowOverlay": 1,
"openvr": 0,
"Devkit": 0,
"DevkitGameID": "",
"LastPlayTime": 0,
"tags": {'0': 'PortProton'}
}
logger.info(f"Shortcut entry to be written: {shortcut}")
try:
if not os.path.exists(steam_shortcuts_path):
os.makedirs(os.path.dirname(steam_shortcuts_path), exist_ok=True)
open(steam_shortcuts_path, 'wb').close()
try:
if os.path.getsize(steam_shortcuts_path) > 0:
with open(steam_shortcuts_path, 'rb') as f:
shortcuts_data = vdf.binary_load(f)
else:
shortcuts_data = {"shortcuts": {}}
except Exception as load_err:
logger.warning(f"Failed to load existing shortcuts.vdf, starting fresh: {load_err}")
shortcuts_data = {"shortcuts": {}}
shortcuts = shortcuts_data.get("shortcuts", {})
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name and entry.get("Exe") == f'"{script_path}"':
logger.info(f"Game '{game_name}' already exists in Steam shortcuts")
return (False, f"Game '{game_name}' already exists in Steam")
new_index = str(len(shortcuts))
shortcuts[new_index] = shortcut
with open(steam_shortcuts_path, 'wb') as f:
vdf.binary_dump({"shortcuts": shortcuts}, f)
logger.info(f"Game '{game_name}' successfully added to Steam with covers")
except Exception as e:
logger.error(f"Failed to update shortcuts.vdf: {e}")
if os.path.exists(backup_path):
try:
shutil.copy2(backup_path, steam_shortcuts_path)
logger.info("Restored shortcuts.vdf from backup due to update failure")
except Exception as restore_err:
logger.error(f"Failed to restore shortcuts.vdf from backup: {restore_err}")
appid = None
if not appid:
return (False, "Не удалось создать ярлык ни одним из способов.")
steam_appid = None
def on_game_info(game_info: dict):
nonlocal steam_appid
steam_appid = game_info.get("appid")
if not steam_appid or not isinstance(steam_appid, int):
logger.info("No valid Steam appid found, skipping cover download")
return
logger.info(f"Найден Steam AppID {steam_appid} для загрузки обложек.")
cover_types = [
("p.jpg", "library_600x900_2x.jpg"),
("_hero.jpg", "library_hero.jpg"),
("_logo.png", "logo.png"),
(".jpg", "header.jpg")
]
def on_cover_download(result_path: str | None, steam_name: str, index: int):
try:
if result_path and os.path.exists(result_path):
logger.info(f"Downloaded cover {steam_name} to {result_path}")
if was_api_used:
try:
with open(result_path, 'rb') as f:
img_b64 = base64.b64encode(f.read()).decode('utf-8')
logger.info(f"Применение обложки типа '{steam_name}' через API для AppID {appid}")
ext = Path(steam_name).suffix.lstrip('.')
call_steam_api("setGrid", appid, index, ext, img_b64)
except Exception as e:
logger.error(f"Ошибка при применении обложки '{steam_name}' через API: {e}")
else:
logger.warning(f"Failed to download cover {steam_name} for appid {steam_appid}")
except Exception as e:
logger.error(f"Error processing cover {steam_name} for appid {steam_appid}: {e}")
for i, (suffix, steam_name) in enumerate(cover_types):
cover_file = os.path.join(grid_dir, f"{appid}{suffix}")
cover_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{steam_appid}/{steam_name}"
downloader.download_async(
cover_url,
cover_file,
timeout=5,
callback=lambda result, index=i, name=steam_name: on_cover_download(result, name, index)
)
get_steam_game_info_async(game_name, exec_line, on_game_info)
return (True, "Adding game to Steam, checking for covers...")
def remove_from_steam(game_name: str, exec_line: str) -> tuple[bool, str]:
"""
Remove a non-Steam game from Steam by deleting its entry from shortcuts.vdf.
"""
# Validate inputs
if not game_name or not game_name.strip():
logger.error("Invalid game_name: empty or whitespace")
return (False, "Game name is empty or invalid")
if not exec_line or not exec_line.strip():
logger.error("Invalid exec_line: empty or whitespace")
return (False, "Executable command is empty or invalid")
# Get PortProton directory
portproton_dir = get_portproton_location()
if not portproton_dir:
logger.error("PortProton directory not found")
return (False, "PortProton directory not found")
# Construct script path
safe_game_name = re.sub(r'[<>:"/\\|?*]', '_', game_name.strip())
script_path = os.path.join(portproton_dir, "steam_scripts", f"{safe_game_name}.sh")
# Get Steam home directory
steam_home = get_steam_home()
if not steam_home:
logger.error("Steam home directory not found")
return (False, "Steam directory not found.")
# Get current Steam user ID
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.error("Failed to retrieve Steam user ID")
return (False, "Failed to get Steam user ID.")
userdata_dir = steam_home / "userdata"
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
# Construct path to shortcuts.vdf and grid directory
steam_shortcuts_path = os.path.join(user_dir, "config", "shortcuts.vdf")
grid_dir = os.path.join(user_dir, "config", "grid")
# Check if shortcuts.vdf exists
if not os.path.exists(steam_shortcuts_path):
logger.info(f"shortcuts.vdf not found at {steam_shortcuts_path}")
return (False, f"Game '{game_name}' not found in Steam")
appid = None
# Load and modify shortcuts.vdf
try:
if os.path.getsize(steam_shortcuts_path) > 0:
with open(steam_shortcuts_path, 'rb') as f:
shortcuts_data = vdf.binary_load(f)
else:
shortcuts_data = {"shortcuts": {}}
except Exception as load_err:
logger.error(f"Failed to load shortcuts.vdf: {load_err}")
return (False, f"Failed to load shortcuts.vdf: {load_err}")
shortcuts = shortcuts_data.get("shortcuts", {})
new_shortcuts = {}
index = 0
# Filter out the matching shortcut
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name and entry.get("Exe") == f'"{script_path}"':
appid = convert_steam_id(int(entry.get("appid")))
logger.info(f"Found matching shortcut for '{game_name}' to remove")
continue
new_shortcuts[str(index)] = entry
index += 1
if not appid:
logger.info(f"Game '{game_name}' not found in Steam shortcuts")
return (False, f"Game '{game_name}' not found in Steam")
api_response = call_steam_api("removeShortcut", appid)
if api_response is not None: # API ответил, даже если ответ пустой
logger.info(f"Ярлык для AppID {appid} успешно удален через API.")
else:
logger.warning("Не удалось удалить ярлык через API. Используется запасной метод (редактирование shortcuts.vdf).")
# Create backup of shortcuts.vdf
backup_path = f"{steam_shortcuts_path}.backup"
try:
shutil.copy2(steam_shortcuts_path, backup_path)
logger.info(f"Created backup of shortcuts.vdf at {backup_path}")
except Exception as e:
logger.error(f"Failed to create backup of shortcuts.vdf: {e}")
return (False, f"Failed to create backup of shortcuts.vdf: {e}")
# Save updated shortcuts.vdf
try:
with open(steam_shortcuts_path, 'wb') as f:
vdf.binary_dump({"shortcuts": new_shortcuts}, f)
logger.info(f"Successfully updated shortcuts.vdf, removed '{game_name}'")
except Exception as e:
logger.error(f"Failed to update shortcuts.vdf: {e}")
if os.path.exists(backup_path):
try:
shutil.copy2(backup_path, steam_shortcuts_path)
logger.info("Restored shortcuts.vdf from backup due to update failure")
except Exception as restore_err:
logger.error(f"Failed to restore shortcuts.vdf from backup: {restore_err}")
return (False, f"Failed to update shortcuts.vdf: {e}")
# Delete cover files
cover_files = [
os.path.join(grid_dir, f"{appid}.jpg"),
os.path.join(grid_dir, f"{appid}p.jpg"),
os.path.join(grid_dir, f"{appid}_hero.jpg"),
os.path.join(grid_dir, f"{appid}_logo.png")
]
for cover_file in cover_files:
if os.path.exists(cover_file):
try:
os.remove(cover_file)
logger.info(f"Deleted cover file: {cover_file}")
except Exception as e:
logger.error(f"Failed to delete cover file {cover_file}: {e}")
else:
logger.info(f"Cover file not found: {cover_file}")
if os.path.exists(script_path):
try:
os.remove(script_path)
logger.info(f"Deleted steam script: {script_path}")
except Exception as e:
logger.error(f"Failed to delete steam script {script_path}: {e}")
else:
logger.info(f"Steam script not found: {script_path}")
logger.info(f"Game '{game_name}' successfully removed from Steam")
return (True, f"Game '{game_name}' removed from Steam")
def is_game_in_steam(game_name: str) -> bool:
steam_home = get_steam_home()
if steam_home is None:
logger.warning("Steam home directory not found")
return False
try:
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.warning("No valid Steam user found")
return False
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
steam_shortcuts_path = os.path.join(str(steam_home), "userdata", str(unsigned_id), "config", "shortcuts.vdf")
if not os.path.exists(steam_shortcuts_path):
logger.warning(f"Shortcuts file not found at {steam_shortcuts_path}")
return False
shortcuts_data = safe_vdf_load(steam_shortcuts_path)
shortcuts = shortcuts_data.get("shortcuts", {})
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name:
return True
except Exception as e:
logger.error(f"Error checking if game {game_name} is in Steam: {e}")
return False