PortProtonQt/portprotonqt/steam_api.py
Boris Yumankulov 6f211c66c2
chore: replace github to git.linux-gaming.ru
Signed-off-by: Boris Yumankulov <boria138@altlinux.org>
2025-06-01 18:26:10 +05:00

1133 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import functools
import os
import shlex
import subprocess
import time
import html
import orjson
import vdf
import tarfile
import threading
from pathlib import Path
from portprotonqt.logger import get_logger
from portprotonqt.localization import get_steam_language
from portprotonqt.downloader import Downloader
from portprotonqt.dialogs import generate_thumbnail
from portprotonqt.config_utils import get_portproton_location
from collections.abc import Callable
import re
import shutil
import zlib
downloader = Downloader()
logger = get_logger(__name__)
CACHE_DURATION = 30 * 24 * 60 * 60
def safe_vdf_load(path: str | Path) -> dict:
path = str(path) # Convert Path to str
try:
with open(path, 'rb') as f:
data = vdf.binary_load(f)
return data
except Exception as e:
try:
with open(path, encoding='utf-8', errors='ignore') as f:
data = vdf.load(f)
return data
except Exception:
logger.error(f"Failed to load VDF file {path}: {e}")
return {}
def decode_text(text: str) -> str:
"""
Декодирует HTML-сущности в строке.
Например, "&amp;quot;" преобразуется в '"'.
Остальные символы и HTML-теги остаются без изменений.
"""
return html.unescape(text)
def get_cache_dir():
"""Возвращает путь к каталогу кэша, создаёт его при необходимости."""
xdg_cache_home = os.getenv("XDG_CACHE_HOME", os.path.join(os.path.expanduser("~"), ".cache"))
cache_dir = os.path.join(xdg_cache_home, "PortProtonQT")
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
STEAM_DATA_DIRS = (
"~/.local/share/Steam",
"~/snap/steam/common/.local/share/Steam",
"~/.var/app/com.valvesoftware.Steam/data/Steam",
)
def get_steam_home():
"""Возвращает путь к директории Steam, используя список возможных директорий."""
for dir_path in STEAM_DATA_DIRS:
expanded_path = Path(os.path.expanduser(dir_path))
if expanded_path.exists():
return expanded_path
return None
def get_last_steam_user(steam_home: Path) -> dict | None:
"""Возвращает данные последнего пользователя Steam из loginusers.vdf."""
loginusers_path = steam_home / "config/loginusers.vdf"
data = safe_vdf_load(loginusers_path)
if not data:
return None
users = data.get('users', {})
for user_id, user_info in users.items():
if user_info.get('MostRecent') == '1':
try:
return {'SteamID': int(user_id)}
except ValueError:
logger.error(f"Неверный формат SteamID: {user_id}")
return None
logger.info("Не найден пользователь с MostRecent=1")
return None
def convert_steam_id(steam_id: int) -> int:
"""
Преобразует знаковое 32-битное целое число в беззнаковое 32-битное целое число.
Использует побитовое И с 0xFFFFFFFF, что корректно обрабатывает отрицательные значения.
"""
return steam_id & 0xFFFFFFFF
def get_steam_libs(steam_dir: Path) -> set[Path]:
"""Возвращает набор директорий Steam libraryfolders."""
libs = set()
libs_vdf = steam_dir / "steamapps/libraryfolders.vdf"
data = safe_vdf_load(libs_vdf)
folders = data.get('libraryfolders', {})
for key, info in folders.items():
if key.isdigit():
path_str = info.get('path') if isinstance(info, dict) else None
if path_str:
path = Path(path_str).expanduser()
if path.exists():
libs.add(path)
libs.add(steam_dir)
return libs
def get_playtime_data(steam_home: Path | None = None) -> dict[int, tuple[int, int]]:
"""Возвращает данные о времени игры для последнего пользователя."""
play_data: dict[int, tuple[int, int]] = {}
if steam_home is None:
steam_home = get_steam_home()
if steam_home is None or not steam_home.exists():
logger.error("Steam home directory not found or does not exist")
return play_data
userdata_dir = steam_home / "userdata"
if not userdata_dir.exists():
logger.info("Userdata directory not found")
return play_data
if steam_home is not None: # Explicit check for type checker
last_user = get_last_steam_user(steam_home)
else:
logger.info("Steam home is None, cannot retrieve last user")
return play_data
if not last_user:
logger.info("Не удалось определить последнего пользователя Steam")
return play_data
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
if not user_dir.exists():
logger.info(f"Директория пользователя {unsigned_id} не найдена")
return play_data
localconfig = user_dir / "config/localconfig.vdf"
data = safe_vdf_load(localconfig)
cfg = data.get('UserLocalConfigStore', {})
apps = cfg.get('Software', {}).get('Valve', {}).get('Steam', {}).get('apps', {})
for appid_str, info in apps.items():
try:
appid = int(appid_str)
last_played = int(info.get('LastPlayed', 0))
playtime = int(info.get('Playtime', 0))
play_data[appid] = (last_played, playtime)
except ValueError:
logger.warning(f"Некорректные данные playtime для app {appid_str}")
return play_data
def get_steam_installed_games() -> list[tuple[str, int, int, int]]:
"""Возвращает список установленных Steam игр в формате (name, appid, last_played, playtime_sec)."""
games: list[tuple[str, int, int, int]] = []
steam_home = get_steam_home()
if steam_home is None or not steam_home.exists():
logger.error("Steam home directory not found or does not exist")
return games
play_data = get_playtime_data(steam_home)
for lib in get_steam_libs(steam_home):
steamapps_dir = lib / "steamapps"
if not steamapps_dir.exists():
continue
for manifest in steamapps_dir.glob("appmanifest_*.acf"):
data = safe_vdf_load(manifest)
app = data.get('AppState', {})
try:
appid = int(app.get('appid', 0))
except ValueError:
continue
name = app.get('name', f"Unknown ({appid})")
lname = name.lower()
if any(token in lname for token in ["proton", "steamworks", "steam linux runtime"]):
continue
last_played, playtime_min = play_data.get(appid, (0, 0))
games.append((name, appid, last_played, playtime_min * 60))
return games
def normalize_name(s):
"""
Приведение строки к нормальному виду:
- перевод в нижний регистр,
- удаление символов ™ и ®,
- замена разделителей (-, :, ,) на пробел,
- удаление лишних пробелов,
- удаление суффиксов 'bin' или 'app' в конце строки,
- удаление ключевых слов типа 'ultimate', 'edition' и т.п.
"""
s = s.lower()
for ch in ["", "®"]:
s = s.replace(ch, "")
for ch in ["-", ":", ","]:
s = s.replace(ch, " ")
s = " ".join(s.split())
for suffix in ["bin", "app"]:
if s.endswith(suffix):
s = s[:-len(suffix)].strip()
keywords_to_remove = {"ultimate", "edition", "definitive", "complete", "remastered"}
words = s.split()
filtered_words = [word for word in words if word not in keywords_to_remove]
return " ".join(filtered_words)
def is_valid_candidate(candidate):
"""
Проверяет, содержит ли кандидат запрещённые подстроки:
- win32
- win64
- gamelauncher
Для проверки дополнительно используется строка без пробелов.
Возвращает True, если кандидат допустим, иначе False.
"""
normalized_candidate = normalize_name(candidate)
normalized_no_space = normalized_candidate.replace(" ", "")
forbidden = ["win32", "win64", "gamelauncher"]
for token in forbidden:
if token in normalized_no_space:
return False
return True
def filter_candidates(candidates):
"""
Фильтрует список кандидатов, отбрасывая недопустимые.
"""
valid = []
dropped = []
for cand in candidates:
if cand.strip() and is_valid_candidate(cand):
valid.append(cand)
else:
dropped.append(cand)
if dropped:
logger.info("Отбрасываю кандидатов: %s", dropped)
return valid
def remove_duplicates(candidates):
"""
Удаляет дубликаты из списка, сохраняя порядок.
"""
return list(dict.fromkeys(candidates))
@functools.lru_cache(maxsize=256)
def get_exiftool_data(game_exe):
"""Получает метаданные через exiftool"""
try:
proc = subprocess.run(
["exiftool", "-j", game_exe],
capture_output=True,
text=True,
check=False
)
if proc.returncode != 0:
logger.error(f"exiftool error for {game_exe}: {proc.stderr.strip()}")
return {}
meta_data_list = orjson.loads(proc.stdout.encode("utf-8"))
return meta_data_list[0] if meta_data_list else {}
except Exception as e:
logger.error(f"An unexpected error occurred in get_exiftool_data for {game_exe}: {e}")
return {}
def load_steam_apps_async(callback: Callable[[list], None]):
"""
Asynchronously loads the list of Steam applications, using cache if available.
Calls the callback with the list of apps.
"""
cache_dir = get_cache_dir()
cache_tar = os.path.join(cache_dir, "games_appid.tar.xz")
cache_json = os.path.join(cache_dir, "steam_apps.json")
def process_tar(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download Steam apps archive")
callback([])
return
try:
with tarfile.open(result, mode='r:xz') as tar:
member = next((m for m in tar.getmembers() if m.name.endswith('.json')), None)
if member is None:
raise RuntimeError("JSON file not found in archive")
fobj = tar.extractfile(member)
if fobj is None:
raise RuntimeError(f"Failed to extract file {member.name} from archive")
raw = fobj.read()
fobj.close()
data = orjson.loads(raw)
with open(cache_json, "wb") as f:
f.write(orjson.dumps(data))
if os.path.exists(cache_tar):
os.remove(cache_tar)
logger.info("Archive %s deleted after extraction", cache_tar)
steam_apps = data.get("applist", {}).get("apps", []) if isinstance(data, dict) else data or []
logger.info("Loaded %d apps from archive", len(steam_apps))
callback(steam_apps)
except Exception as e:
logger.error("Error extracting Steam apps archive: %s", e)
callback([])
if os.path.exists(cache_json) and (time.time() - os.path.getmtime(cache_json) < CACHE_DURATION):
logger.info("Using cached Steam apps JSON: %s", cache_json)
try:
with open(cache_json, "rb") as f:
data = orjson.loads(f.read())
steam_apps = data.get("applist", {}).get("apps", []) if isinstance(data, dict) else data or []
logger.info("Loaded %d apps from cache", len(steam_apps))
callback(steam_apps)
except Exception as e:
logger.error("Error reading cached JSON: %s", e)
callback([])
else:
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/games_appid.tar.xz"
)
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
def build_index(steam_apps):
"""
Строит индекс приложений по полю normalized_name.
"""
steam_apps_index = {}
if not steam_apps:
return steam_apps_index
logger.info("Построение индекса Steam приложений:")
for app in steam_apps:
normalized = app["normalized_name"]
steam_apps_index[normalized] = app
return steam_apps_index
def search_app(candidate, steam_apps_index):
"""
Ищет приложение по кандидату: сначала пытается точное совпадение, затем ищет подстроку.
"""
candidate_norm = normalize_name(candidate)
logger.info("Поиск приложения для кандидата: '%s' -> '%s'", candidate, candidate_norm)
if candidate_norm in steam_apps_index:
logger.info(" Найдено точное совпадение: '%s'", candidate_norm)
return steam_apps_index[candidate_norm]
for name_norm, app in steam_apps_index.items():
if candidate_norm in name_norm:
ratio = len(candidate_norm) / len(name_norm)
if ratio > 0.8:
logger.info(" Найдено частичное совпадение: кандидат '%s' в '%s' (ratio: %.2f)",
candidate_norm, name_norm, ratio)
return app
logger.info(" Приложение для кандидата '%s' не найдено", candidate_norm)
return None
def load_app_details(app_id):
"""Загружает кэшированные данные для игры по appid, если они не устарели."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
if os.path.exists(cache_file):
if time.time() - os.path.getmtime(cache_file) < CACHE_DURATION:
with open(cache_file, "rb") as f:
return orjson.loads(f.read())
return None
def save_app_details(app_id, data):
"""Сохраняет данные по appid в файл кэша."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
with open(cache_file, "wb") as f:
f.write(orjson.dumps(data))
def fetch_app_info_async(app_id: int, callback: Callable[[dict | None], None]):
"""
Asynchronously fetches detailed app info from Steam API.
Calls the callback with the app data or None if failed.
"""
cached = load_app_details(app_id)
if cached is not None:
callback(cached)
return
lang = get_steam_language()
url = f"https://store.steampowered.com/api/appdetails?appids={app_id}&l={lang}"
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
def process_response(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download Steam app info for appid %s", app_id)
callback(None)
return
try:
with open(result, "rb") as f:
data = orjson.loads(f.read())
details = data.get(str(app_id), {})
if not details.get("success"):
callback(None)
return
app_data_full = details.get("data", {})
app_data = {
"steam_appid": app_data_full.get("steam_appid", app_id),
"name": app_data_full.get("name", ""),
"short_description": app_data_full.get("short_description", ""),
"controller_support": app_data_full.get("controller_support", "")
}
save_app_details(app_id, app_data)
callback(app_data)
except Exception as e:
logger.error("Error processing Steam app info for appid %s: %s", app_id, e)
callback(None)
downloader.download_async(url, cache_file, timeout=5, callback=process_response)
def load_weanticheatyet_data_async(callback: Callable[[list], None]):
"""
Asynchronously loads the list of WeAntiCheatYet data, using cache if available.
Calls the callback with the list of anti-cheat data.
"""
cache_dir = get_cache_dir()
cache_tar = os.path.join(cache_dir, "anticheat_games.tar.xz")
cache_json = os.path.join(cache_dir, "anticheat_games.json")
def process_tar(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download WeAntiCheatYet archive")
callback([])
return
try:
with tarfile.open(result, mode='r:xz') as tar:
member = next((m for m in tar.getmembers() if m.name.endswith('anticheat_games_min.json')), None)
if member is None:
raise RuntimeError("JSON file not found in archive")
fobj = tar.extractfile(member)
if fobj is None:
raise RuntimeError(f"Failed to extract file {member.name} from archive")
raw = fobj.read()
fobj.close()
data = orjson.loads(raw)
with open(cache_json, "wb") as f:
f.write(orjson.dumps(data))
if os.path.exists(cache_tar):
os.remove(cache_tar)
logger.info("Archive %s deleted after extraction", cache_tar)
anti_cheat_data = data or []
logger.info("Loaded %d anti-cheat entries from archive", len(anti_cheat_data))
callback(anti_cheat_data)
except Exception as e:
logger.error("Error extracting WeAntiCheatYet archive: %s", e)
callback([])
if os.path.exists(cache_json) and (time.time() - os.path.getmtime(cache_json) < CACHE_DURATION):
logger.info("Using cached WeAntiCheatYet JSON: %s", cache_json)
try:
with open(cache_json, "rb") as f:
data = orjson.loads(f.read())
anti_cheat_data = data or []
logger.info("Loaded %d anti-cheat entries from cache", len(anti_cheat_data))
callback(anti_cheat_data)
except Exception as e:
logger.error("Error reading cached WeAntiCheatYet JSON: %s", e)
callback([])
else:
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/anticheat_games.tar.xz"
)
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
def build_weanticheatyet_index(anti_cheat_data):
"""
Строит индекс античит-данных по полю normalized_name.
"""
anti_cheat_index = {}
if not anti_cheat_data:
return anti_cheat_index
logger.info("Построение индекса WeAntiCheatYet данных:")
for entry in anti_cheat_data:
normalized = entry["normalized_name"]
anti_cheat_index[normalized] = entry
return anti_cheat_index
def search_anticheat_status(candidate, anti_cheat_index):
candidate_norm = normalize_name(candidate)
logger.info("Поиск античит-статуса для кандидата: '%s' -> '%s'", candidate, candidate_norm)
if candidate_norm in anti_cheat_index:
status = anti_cheat_index[candidate_norm]["status"]
logger.info(" Найдено точное совпадение: '%s', статус: '%s'", candidate_norm, status)
return status
for name_norm, entry in anti_cheat_index.items():
if candidate_norm in name_norm:
ratio = len(candidate_norm) / len(name_norm)
if ratio > 0.8:
status = entry["status"]
logger.info(" Найдено частичное совпадение: кандидат '%s' в '%s' (ratio: %.2f), статус: '%s'",
candidate_norm, name_norm, ratio, status)
return status
logger.info(" Античит-статус для кандидата '%s' не найден", candidate_norm)
return ""
def get_weanticheatyet_status_async(game_name: str, callback: Callable[[str], None]):
"""
Asynchronously retrieves WeAntiCheatYet status for a game by name.
Calls the callback with the status string or empty string if not found.
"""
def on_anticheat_data(anti_cheat_data: list):
anti_cheat_index = build_weanticheatyet_index(anti_cheat_data)
status = search_anticheat_status(game_name, anti_cheat_index)
callback(status)
load_weanticheatyet_data_async(on_anticheat_data)
def load_protondb_status(appid):
"""Загружает закешированные данные ProtonDB для игры по appid, если они не устарели."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
if os.path.exists(cache_file):
if time.time() - os.path.getmtime(cache_file) < CACHE_DURATION:
try:
with open(cache_file, "rb") as f:
return orjson.loads(f.read())
except Exception as e:
logger.error("Ошибка загрузки кеша ProtonDB для appid %s: %s", appid, e)
return None
def save_protondb_status(appid, data):
"""Сохраняет данные ProtonDB для игры по appid в файл кэша."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
try:
with open(cache_file, "wb") as f:
f.write(orjson.dumps(data))
except Exception as e:
logger.error("Ошибка сохранения кеша ProtonDB для appid %s: %s", appid, e)
def get_protondb_tier_async(appid: int, callback: Callable[[str], None]):
"""
Asynchronously fetches ProtonDB tier for an app.
Calls the callback with the tier string or empty string if failed.
"""
cached = load_protondb_status(appid)
if cached is not None:
callback(cached.get("tier", ""))
return
url = f"https://www.protondb.com/api/v1/reports/summaries/{appid}.json"
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
def process_response(result: str | None):
if not result or not os.path.exists(result):
logger.info("Failed to download ProtonDB data for appid %s", appid)
callback("")
return
try:
with open(result, "rb") as f:
data = orjson.loads(f.read())
filtered_data = {"tier": data.get("tier", "")}
save_protondb_status(appid, filtered_data)
callback(filtered_data["tier"])
except Exception as e:
logger.info("Failed to process ProtonDB data for appid %s: %s", appid, e)
callback("")
downloader.download_async(url, cache_file, timeout=5, callback=process_response)
def get_full_steam_game_info_async(appid: int, callback: Callable[[dict], None]):
"""
Asynchronously retrieves full Steam game info, including WeAntiCheatYet status.
Calls the callback with the game info dictionary.
"""
def on_app_info(app_info: dict | None):
if not app_info:
callback({})
return
title = decode_text(app_info.get("name", ""))
description = decode_text(app_info.get("short_description", ""))
cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg"
def on_protondb_tier(tier: str):
def on_anticheat_status(anticheat_status: str):
callback({
'description': description,
'controller_support': app_info.get('controller_support', ''),
'cover': cover,
'protondb_tier': tier,
'steam_game': "true",
'name': title,
'anticheat_status': anticheat_status
})
get_weanticheatyet_status_async(title, on_anticheat_status)
get_protondb_tier_async(appid, on_protondb_tier)
fetch_app_info_async(appid, on_app_info)
def get_steam_game_info_async(desktop_name: str, exec_line: str, callback: Callable[[dict], tuple[bool, str] | None]) -> None:
"""
Asynchronously retrieves game info based on desktop name and exec line, including WeAntiCheatYet status for all games.
Calls the callback with the game info dictionary.
"""
parts = shlex.split(exec_line)
game_exe = parts[-1] if parts else exec_line
if game_exe.lower().endswith('.bat'):
if os.path.exists(game_exe):
try:
with open(game_exe, encoding='utf-8') as f:
bat_lines = f.readlines()
for line in bat_lines:
line = line.strip()
if '.exe' in line.lower():
tokens = shlex.split(line)
for token in tokens:
if token.lower().endswith('.exe'):
game_exe = token
break
if game_exe.lower().endswith('.exe'):
break
except Exception as e:
logger.error("Error processing bat file %s: %s", game_exe, e)
else:
logger.error("Bat file not found: %s", game_exe)
if not game_exe.lower().endswith('.exe'):
logger.error("Invalid executable path: %s. Expected .exe", game_exe)
meta_data = {}
else:
meta_data = get_exiftool_data(game_exe)
exe_name = os.path.splitext(os.path.basename(game_exe))[0]
folder_path = os.path.dirname(game_exe)
folder_name = os.path.basename(folder_path)
if folder_name.lower() in ['bin', 'binaries']:
folder_path = os.path.dirname(folder_path)
folder_name = os.path.basename(folder_path)
logger.info("Game folder name: '%s'", folder_name)
candidates = []
product_name = meta_data.get("ProductName", "")
file_description = meta_data.get("FileDescription", "")
if product_name and product_name not in ['BootstrapPackagedGame']:
candidates.append(product_name)
if file_description and file_description not in ['BootstrapPackagedGame']:
candidates.append(file_description)
if desktop_name:
candidates.append(desktop_name)
if exe_name:
candidates.append(exe_name)
if folder_name:
candidates.append(folder_name)
logger.info("Initial candidates: %s", candidates)
candidates = filter_candidates(candidates)
candidates = remove_duplicates(candidates)
candidates_ordered = sorted(candidates, key=lambda s: len(s.split()), reverse=True)
logger.info("Sorted candidates: %s", candidates_ordered)
def on_steam_apps(steam_apps: list):
steam_apps_index = build_index(steam_apps)
matching_app = None
for candidate in candidates_ordered:
if not candidate:
continue
matching_app = search_app(candidate, steam_apps_index)
if matching_app:
logger.info("Match found for candidate '%s': %s", candidate, matching_app.get("normalized_name"))
break
game_name = desktop_name or exe_name.capitalize()
if not matching_app:
def on_anticheat_status(anticheat_status: str):
callback({
"appid": "",
"name": decode_text(game_name),
"description": "",
"cover": "",
"controller_support": "",
"protondb_tier": "",
"steam_game": "false",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(game_name, on_anticheat_status)
return
appid = matching_app["appid"]
def on_app_info(app_info: dict | None):
if not app_info:
def on_anticheat_status(anticheat_status: str):
callback({
"appid": "",
"name": decode_text(game_name),
"description": "",
"cover": "",
"controller_support": "",
"protondb_tier": "",
"steam_game": "false",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(game_name, on_anticheat_status)
return
title = decode_text(app_info.get("name", game_name))
description = decode_text(app_info.get("short_description", ""))
cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg"
controller_support = app_info.get("controller_support", "")
def on_protondb_tier(tier: str):
def on_anticheat_status(anticheat_status: str):
callback({
"appid": appid,
"name": title,
"description": description,
"cover": cover,
"controller_support": controller_support,
"protondb_tier": tier,
"steam_game": "true",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(title, on_anticheat_status)
get_protondb_tier_async(appid, on_protondb_tier)
fetch_app_info_async(appid, on_app_info)
load_steam_apps_async(on_steam_apps)
_STEAM_APPS = None
_STEAM_APPS_INDEX = None
_STEAM_APPS_LOCK = threading.Lock()
def get_steam_apps_and_index_async(callback: Callable[[tuple[list, dict]], None]):
"""
Asynchronously loads and caches Steam apps and their index.
Calls the callback with (steam_apps, steam_apps_index).
"""
global _STEAM_APPS, _STEAM_APPS_INDEX
with _STEAM_APPS_LOCK:
if _STEAM_APPS is not None and _STEAM_APPS_INDEX is not None:
callback((_STEAM_APPS, _STEAM_APPS_INDEX))
return
def on_steam_apps(steam_apps: list):
global _STEAM_APPS, _STEAM_APPS_INDEX
with _STEAM_APPS_LOCK:
_STEAM_APPS = steam_apps
_STEAM_APPS_INDEX = build_index(steam_apps)
callback((_STEAM_APPS, _STEAM_APPS_INDEX))
load_steam_apps_async(on_steam_apps)
def add_to_steam(game_name: str, exec_line: str, cover_path: str) -> tuple[bool, str]:
"""
Add a non-Steam game to Steam via shortcuts.vdf with PortProton tag,
and download Steam Grid covers with correct sizes and names.
"""
if not exec_line or not exec_line.strip():
logger.error("Invalid exec_line: empty or whitespace")
return (False, "Executable command is empty or invalid")
# Parse exec_line to get the executable path
try:
entry_exec_split = shlex.split(exec_line)
if not entry_exec_split:
logger.error(f"Failed to parse exec_line: {exec_line}")
return (False, "Failed to parse executable command: no valid tokens")
if entry_exec_split[0] == "env" and len(entry_exec_split) >= 3:
exe_path = entry_exec_split[2]
elif entry_exec_split[0] == "flatpak" and len(entry_exec_split) >= 4:
exe_path = entry_exec_split[3]
else:
exe_path = entry_exec_split[-1]
except Exception as e:
logger.error(f"Failed to parse exec_line: {exec_line}, error: {e}")
return (False, f"Failed to parse executable command: {e}")
if not os.path.exists(exe_path):
logger.error(f"Executable not found: {exe_path}")
return (False, f"Executable file not found: {exe_path}")
portproton_dir = get_portproton_location()
if not portproton_dir:
logger.error("PortProton directory not found")
return (False, "PortProton directory not found")
steam_scripts_dir = os.path.join(portproton_dir, "steam_scripts")
os.makedirs(steam_scripts_dir, exist_ok=True)
safe_game_name = re.sub(r'[<>:"/\\|?*]', '_', game_name.strip())
script_path = os.path.join(steam_scripts_dir, f"{safe_game_name}.sh")
start_sh_path = os.path.join(portproton_dir, "data", "scripts", "start.sh")
if not os.path.exists(start_sh_path):
logger.error(f"start.sh not found at {start_sh_path}")
return (False, f"start.sh not found at {start_sh_path}")
if not os.path.exists(script_path):
script_content = f"""#!/usr/bin/env bash
export LD_PRELOAD=
export START_FROM_STEAM=1
"{start_sh_path}" "{exe_path}" "$@"
"""
try:
with open(script_path, "w", encoding="utf-8") as f:
f.write(script_content)
os.chmod(script_path, 0o755)
logger.info(f"Created launch script: {script_path}")
except Exception as e:
logger.error(f"Failed to create launch script {script_path}: {e}")
return (False, f"Failed to create launch script: {e}")
else:
logger.info(f"Launch script already exists: {script_path}")
generated_icon_path = os.path.join(portproton_dir, "data", "img", f"{safe_game_name}.png")
try:
img_dir = os.path.join(portproton_dir, "data", "img")
os.makedirs(img_dir, exist_ok=True)
if os.path.exists(generated_icon_path):
logger.info(f"Reusing existing thumbnail: {generated_icon_path}")
else:
success = generate_thumbnail(exe_path, generated_icon_path, size=128, force_resize=True)
if not success or not os.path.exists(generated_icon_path):
logger.warning(f"generate_thumbnail failed to create icon for {exe_path}")
icon_path = ""
else:
logger.info(f"Generated thumbnail: {generated_icon_path}")
icon_path = generated_icon_path
except Exception as e:
logger.error(f"Error generating thumbnail for {exe_path}: {e}")
icon_path = ""
steam_home = get_steam_home()
if not steam_home:
logger.error("Steam home directory not found")
return (False, "Steam directory not found.")
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.error("Failed to retrieve Steam user ID")
return (False, "Failed to get Steam user ID.")
userdata_dir = steam_home / "userdata"
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
steam_shortcuts_path = user_dir / "config" / "shortcuts.vdf"
grid_dir = user_dir / "config" / "grid"
os.makedirs(grid_dir, exist_ok=True)
backup_path = f"{steam_shortcuts_path}.backup"
if os.path.exists(steam_shortcuts_path):
try:
shutil.copy2(steam_shortcuts_path, backup_path)
logger.info(f"Created backup of shortcuts.vdf at {backup_path}")
except Exception as e:
logger.error(f"Failed to create backup of shortcuts.vdf: {e}")
return (False, f"Failed to create backup of shortcuts.vdf: {e}")
unique_string = f"{script_path}{game_name}"
baseid = zlib.crc32(unique_string.encode('utf-8')) & 0xffffffff
appid = baseid | 0x80000000
if appid > 0x7FFFFFFF:
aidvdf = appid - 0x100000000
else:
aidvdf = appid
steam_appid = None
downloaded_count = 0
total_covers = 4 # количество обложек
download_lock = threading.Lock()
def on_cover_download(cover_file: str, cover_type: str):
nonlocal downloaded_count
try:
if cover_file and os.path.exists(cover_file):
logger.info(f"Downloaded cover {cover_type} to {cover_file}")
else:
logger.warning(f"Failed to download cover {cover_type} for appid {steam_appid}")
except Exception as e:
logger.error(f"Error processing cover {cover_type} for appid {steam_appid}: {e}")
with download_lock:
downloaded_count += 1
if downloaded_count == total_covers:
finalize_shortcut()
def finalize_shortcut():
tags_dict = {'0': 'PortProton'}
shortcut = {
"appid": aidvdf,
"AppName": game_name,
"Exe": f'"{script_path}"',
"StartDir": f'"{os.path.dirname(script_path)}"',
"icon": icon_path,
"LaunchOptions": "",
"IsHidden": 0,
"AllowDesktopConfig": 1,
"AllowOverlay": 1,
"openvr": 0,
"Devkit": 0,
"DevkitGameID": "",
"LastPlayTime": 0,
"tags": tags_dict
}
logger.info(f"Shortcut entry to be written: {shortcut}")
try:
if not os.path.exists(steam_shortcuts_path):
os.makedirs(os.path.dirname(steam_shortcuts_path), exist_ok=True)
open(steam_shortcuts_path, 'wb').close()
try:
if os.path.getsize(steam_shortcuts_path) > 0:
with open(steam_shortcuts_path, 'rb') as f:
shortcuts_data = vdf.binary_load(f)
else:
shortcuts_data = {"shortcuts": {}}
except Exception as load_err:
logger.warning(f"Failed to load existing shortcuts.vdf, starting fresh: {load_err}")
shortcuts_data = {"shortcuts": {}}
shortcuts = shortcuts_data.get("shortcuts", {})
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name and entry.get("Exe") == f'"{script_path}"':
logger.info(f"Game '{game_name}' already exists in Steam shortcuts")
return (False, f"Game '{game_name}' already exists in Steam")
new_index = str(len(shortcuts))
shortcuts[new_index] = shortcut
with open(steam_shortcuts_path, 'wb') as f:
vdf.binary_dump({"shortcuts": shortcuts}, f)
except Exception as e:
logger.error(f"Failed to update shortcuts.vdf: {e}")
if os.path.exists(backup_path):
try:
shutil.copy2(backup_path, steam_shortcuts_path)
logger.info("Restored shortcuts.vdf from backup due to update failure")
except Exception as restore_err:
logger.error(f"Failed to restore shortcuts.vdf from backup: {restore_err}")
return (False, f"Failed to update shortcuts.vdf: {e}")
logger.info(f"Game '{game_name}' successfully added to Steam with covers")
return (True, f"Game '{game_name}' added to Steam with covers")
def on_game_info(game_info: dict):
nonlocal steam_appid
steam_appid = game_info.get("appid")
if not steam_appid or not isinstance(steam_appid, int):
logger.info("No valid Steam appid found, skipping cover download")
return finalize_shortcut()
# Обложки и имена, соответствующие bash-скрипту и твоим размерам
cover_types = [
(".jpg", "header.jpg"), # базовый, сохранится как AppId.jpg
("p.jpg", "library_600x900_2x.jpg"), # сохранится как AppIdp.jpg
("_hero.jpg", "library_hero.jpg"), # AppId_hero.jpg
("_logo.png", "logo.png") # AppId_logo.png
]
for suffix, cover_type in cover_types:
cover_file = os.path.join(grid_dir, f"{appid}{suffix}")
cover_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{steam_appid}/{cover_type}"
downloader.download_async(
cover_url,
cover_file,
timeout=5,
callback=lambda result, cfile=cover_file, ctype=cover_type: on_cover_download(cfile, ctype)
)
get_steam_game_info_async(game_name, exec_line, on_game_info)
return (True, "Adding game to Steam, checking for covers...")
def remove_from_steam(game_name: str, exec_line: str) -> tuple[bool, str]:
"""
Remove a non-Steam game from Steam by deleting its entry from shortcuts.vdf.
"""
# Validate inputs
if not game_name or not game_name.strip():
logger.error("Invalid game_name: empty or whitespace")
return (False, "Game name is empty or invalid")
if not exec_line or not exec_line.strip():
logger.error("Invalid exec_line: empty or whitespace")
return (False, "Executable command is empty or invalid")
# Get PortProton directory
portproton_dir = get_portproton_location()
if not portproton_dir:
logger.error("PortProton directory not found")
return (False, "PortProton directory not found")
# Construct script path
safe_game_name = re.sub(r'[<>:"/\\|?*]', '_', game_name.strip())
script_path = os.path.join(portproton_dir, "steam_scripts", f"{safe_game_name}.sh")
# Get Steam home directory
steam_home = get_steam_home()
if not steam_home:
logger.error("Steam home directory not found")
return (False, "Steam directory not found.")
# Get current Steam user ID
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.error("Failed to retrieve Steam user ID")
return (False, "Failed to get Steam user ID.")
userdata_dir = steam_home / "userdata"
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
# Construct path to shortcuts.vdf and grid directory
steam_shortcuts_path = os.path.join(user_dir, "config", "shortcuts.vdf")
grid_dir = os.path.join(user_dir, "config", "grid")
# Check if shortcuts.vdf exists
if not os.path.exists(steam_shortcuts_path):
logger.info(f"shortcuts.vdf not found at {steam_shortcuts_path}")
return (False, f"Game '{game_name}' not found in Steam")
# Generate appid for identifying cover files
unique_string = f"{script_path}{game_name}"
baseid = zlib.crc32(unique_string.encode('utf-8')) & 0xffffffff
appid = baseid | 0x80000000
# Create backup of shortcuts.vdf
backup_path = f"{steam_shortcuts_path}.backup"
try:
shutil.copy2(steam_shortcuts_path, backup_path)
logger.info(f"Created backup of shortcuts.vdf at {backup_path}")
except Exception as e:
logger.error(f"Failed to create backup of shortcuts.vdf: {e}")
return (False, f"Failed to create backup of shortcuts.vdf: {e}")
# Load and modify shortcuts.vdf
try:
if os.path.getsize(steam_shortcuts_path) > 0:
with open(steam_shortcuts_path, 'rb') as f:
shortcuts_data = vdf.binary_load(f)
else:
shortcuts_data = {"shortcuts": {}}
except Exception as load_err:
logger.error(f"Failed to load shortcuts.vdf: {load_err}")
return (False, f"Failed to load shortcuts.vdf: {load_err}")
shortcuts = shortcuts_data.get("shortcuts", {})
found = False
new_shortcuts = {}
index = 0
# Filter out the matching shortcut
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name and entry.get("Exe") == f'"{script_path}"':
found = True
logger.info(f"Found matching shortcut for '{game_name}' to remove")
continue
new_shortcuts[str(index)] = entry
index += 1
if not found:
logger.info(f"Game '{game_name}' not found in Steam shortcuts")
return (False, f"Game '{game_name}' not found in Steam")
# Save updated shortcuts.vdf
try:
with open(steam_shortcuts_path, 'wb') as f:
vdf.binary_dump({"shortcuts": new_shortcuts}, f)
logger.info(f"Successfully updated shortcuts.vdf, removed '{game_name}'")
except Exception as e:
logger.error(f"Failed to update shortcuts.vdf: {e}")
if os.path.exists(backup_path):
try:
shutil.copy2(backup_path, steam_shortcuts_path)
logger.info("Restored shortcuts.vdf from backup due to update failure")
except Exception as restore_err:
logger.error(f"Failed to restore shortcuts.vdf from backup: {restore_err}")
return (False, f"Failed to update shortcuts.vdf: {e}")
# Delete cover files
cover_files = [
os.path.join(grid_dir, f"{appid}.jpg"),
os.path.join(grid_dir, f"{appid}p.jpg"),
os.path.join(grid_dir, f"{appid}_hero.jpg"),
os.path.join(grid_dir, f"{appid}_logo.png")
]
for cover_file in cover_files:
if os.path.exists(cover_file):
try:
os.remove(cover_file)
logger.info(f"Deleted cover file: {cover_file}")
except Exception as e:
logger.error(f"Failed to delete cover file {cover_file}: {e}")
else:
logger.info(f"Cover file not found: {cover_file}")
if os.path.exists(script_path):
try:
os.remove(script_path)
logger.info(f"Deleted steam script: {script_path}")
except Exception as e:
logger.error(f"Failed to delete steam script {script_path}: {e}")
else:
logger.info(f"Steam script not found: {script_path}")
logger.info(f"Game '{game_name}' successfully removed from Steam")
return (True, f"Game '{game_name}' removed from Steam")
def is_game_in_steam(game_name: str) -> bool:
steam_home = get_steam_home()
if steam_home is None:
logger.warning("Steam home directory not found")
return False
try:
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.warning("No valid Steam user found")
return False
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
steam_shortcuts_path = os.path.join(str(steam_home), "userdata", str(unsigned_id), "config", "shortcuts.vdf")
if not os.path.exists(steam_shortcuts_path):
logger.warning(f"Shortcuts file not found at {steam_shortcuts_path}")
return False
shortcuts_data = safe_vdf_load(steam_shortcuts_path)
shortcuts = shortcuts_data.get("shortcuts", {})
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name:
return True
except Exception as e:
logger.error(f"Error checking if game {game_name} is in Steam: {e}")
return False