forked from Boria138/PortProtonQt
1133 lines
47 KiB
Python
1133 lines
47 KiB
Python
import functools
|
||
import os
|
||
import shlex
|
||
import subprocess
|
||
import time
|
||
import html
|
||
import orjson
|
||
import vdf
|
||
import tarfile
|
||
import threading
|
||
from pathlib import Path
|
||
from portprotonqt.logger import get_logger
|
||
from portprotonqt.localization import get_steam_language
|
||
from portprotonqt.downloader import Downloader
|
||
from portprotonqt.dialogs import generate_thumbnail
|
||
from portprotonqt.config_utils import get_portproton_location
|
||
from collections.abc import Callable
|
||
import re
|
||
import shutil
|
||
import zlib
|
||
|
||
downloader = Downloader()
|
||
logger = get_logger(__name__)
|
||
CACHE_DURATION = 30 * 24 * 60 * 60
|
||
|
||
def safe_vdf_load(path: str | Path) -> dict:
|
||
path = str(path) # Convert Path to str
|
||
try:
|
||
with open(path, 'rb') as f:
|
||
data = vdf.binary_load(f)
|
||
return data
|
||
except Exception as e:
|
||
try:
|
||
with open(path, encoding='utf-8', errors='ignore') as f:
|
||
data = vdf.load(f)
|
||
return data
|
||
except Exception:
|
||
logger.error(f"Failed to load VDF file {path}: {e}")
|
||
return {}
|
||
|
||
def decode_text(text: str) -> str:
|
||
"""
|
||
Декодирует HTML-сущности в строке.
|
||
Например, """ преобразуется в '"'.
|
||
Остальные символы и HTML-теги остаются без изменений.
|
||
"""
|
||
return html.unescape(text)
|
||
|
||
def get_cache_dir():
|
||
"""Возвращает путь к каталогу кэша, создаёт его при необходимости."""
|
||
xdg_cache_home = os.getenv("XDG_CACHE_HOME", os.path.join(os.path.expanduser("~"), ".cache"))
|
||
cache_dir = os.path.join(xdg_cache_home, "PortProtonQT")
|
||
os.makedirs(cache_dir, exist_ok=True)
|
||
return cache_dir
|
||
|
||
STEAM_DATA_DIRS = (
|
||
"~/.local/share/Steam",
|
||
"~/snap/steam/common/.local/share/Steam",
|
||
"~/.var/app/com.valvesoftware.Steam/data/Steam",
|
||
)
|
||
|
||
def get_steam_home():
|
||
"""Возвращает путь к директории Steam, используя список возможных директорий."""
|
||
for dir_path in STEAM_DATA_DIRS:
|
||
expanded_path = Path(os.path.expanduser(dir_path))
|
||
if expanded_path.exists():
|
||
return expanded_path
|
||
return None
|
||
|
||
def get_last_steam_user(steam_home: Path) -> dict | None:
|
||
"""Возвращает данные последнего пользователя Steam из loginusers.vdf."""
|
||
loginusers_path = steam_home / "config/loginusers.vdf"
|
||
data = safe_vdf_load(loginusers_path)
|
||
if not data:
|
||
return None
|
||
users = data.get('users', {})
|
||
for user_id, user_info in users.items():
|
||
if user_info.get('MostRecent') == '1':
|
||
try:
|
||
return {'SteamID': int(user_id)}
|
||
except ValueError:
|
||
logger.error(f"Неверный формат SteamID: {user_id}")
|
||
return None
|
||
logger.info("Не найден пользователь с MostRecent=1")
|
||
return None
|
||
|
||
def convert_steam_id(steam_id: int) -> int:
|
||
"""
|
||
Преобразует знаковое 32-битное целое число в беззнаковое 32-битное целое число.
|
||
Использует побитовое И с 0xFFFFFFFF, что корректно обрабатывает отрицательные значения.
|
||
"""
|
||
return steam_id & 0xFFFFFFFF
|
||
|
||
def get_steam_libs(steam_dir: Path) -> set[Path]:
|
||
"""Возвращает набор директорий Steam libraryfolders."""
|
||
libs = set()
|
||
libs_vdf = steam_dir / "steamapps/libraryfolders.vdf"
|
||
data = safe_vdf_load(libs_vdf)
|
||
folders = data.get('libraryfolders', {})
|
||
for key, info in folders.items():
|
||
if key.isdigit():
|
||
path_str = info.get('path') if isinstance(info, dict) else None
|
||
if path_str:
|
||
path = Path(path_str).expanduser()
|
||
if path.exists():
|
||
libs.add(path)
|
||
libs.add(steam_dir)
|
||
return libs
|
||
|
||
def get_playtime_data(steam_home: Path | None = None) -> dict[int, tuple[int, int]]:
|
||
"""Возвращает данные о времени игры для последнего пользователя."""
|
||
play_data: dict[int, tuple[int, int]] = {}
|
||
if steam_home is None:
|
||
steam_home = get_steam_home()
|
||
if steam_home is None or not steam_home.exists():
|
||
logger.error("Steam home directory not found or does not exist")
|
||
return play_data
|
||
|
||
userdata_dir = steam_home / "userdata"
|
||
if not userdata_dir.exists():
|
||
logger.info("Userdata directory not found")
|
||
return play_data
|
||
|
||
if steam_home is not None: # Explicit check for type checker
|
||
last_user = get_last_steam_user(steam_home)
|
||
else:
|
||
logger.info("Steam home is None, cannot retrieve last user")
|
||
return play_data
|
||
|
||
if not last_user:
|
||
logger.info("Не удалось определить последнего пользователя Steam")
|
||
return play_data
|
||
|
||
user_id = last_user['SteamID']
|
||
unsigned_id = convert_steam_id(user_id)
|
||
user_dir = userdata_dir / str(unsigned_id)
|
||
if not user_dir.exists():
|
||
logger.info(f"Директория пользователя {unsigned_id} не найдена")
|
||
return play_data
|
||
|
||
localconfig = user_dir / "config/localconfig.vdf"
|
||
data = safe_vdf_load(localconfig)
|
||
cfg = data.get('UserLocalConfigStore', {})
|
||
apps = cfg.get('Software', {}).get('Valve', {}).get('Steam', {}).get('apps', {})
|
||
for appid_str, info in apps.items():
|
||
try:
|
||
appid = int(appid_str)
|
||
last_played = int(info.get('LastPlayed', 0))
|
||
playtime = int(info.get('Playtime', 0))
|
||
play_data[appid] = (last_played, playtime)
|
||
except ValueError:
|
||
logger.warning(f"Некорректные данные playtime для app {appid_str}")
|
||
return play_data
|
||
|
||
def get_steam_installed_games() -> list[tuple[str, int, int, int]]:
|
||
"""Возвращает список установленных Steam игр в формате (name, appid, last_played, playtime_sec)."""
|
||
games: list[tuple[str, int, int, int]] = []
|
||
steam_home = get_steam_home()
|
||
if steam_home is None or not steam_home.exists():
|
||
logger.error("Steam home directory not found or does not exist")
|
||
return games
|
||
|
||
play_data = get_playtime_data(steam_home)
|
||
for lib in get_steam_libs(steam_home):
|
||
steamapps_dir = lib / "steamapps"
|
||
if not steamapps_dir.exists():
|
||
continue
|
||
for manifest in steamapps_dir.glob("appmanifest_*.acf"):
|
||
data = safe_vdf_load(manifest)
|
||
app = data.get('AppState', {})
|
||
try:
|
||
appid = int(app.get('appid', 0))
|
||
except ValueError:
|
||
continue
|
||
name = app.get('name', f"Unknown ({appid})")
|
||
lname = name.lower()
|
||
if any(token in lname for token in ["proton", "steamworks", "steam linux runtime"]):
|
||
continue
|
||
last_played, playtime_min = play_data.get(appid, (0, 0))
|
||
games.append((name, appid, last_played, playtime_min * 60))
|
||
return games
|
||
|
||
def normalize_name(s):
|
||
"""
|
||
Приведение строки к нормальному виду:
|
||
- перевод в нижний регистр,
|
||
- удаление символов ™ и ®,
|
||
- замена разделителей (-, :, ,) на пробел,
|
||
- удаление лишних пробелов,
|
||
- удаление суффиксов 'bin' или 'app' в конце строки,
|
||
- удаление ключевых слов типа 'ultimate', 'edition' и т.п.
|
||
"""
|
||
s = s.lower()
|
||
for ch in ["™", "®"]:
|
||
s = s.replace(ch, "")
|
||
for ch in ["-", ":", ","]:
|
||
s = s.replace(ch, " ")
|
||
s = " ".join(s.split())
|
||
for suffix in ["bin", "app"]:
|
||
if s.endswith(suffix):
|
||
s = s[:-len(suffix)].strip()
|
||
keywords_to_remove = {"ultimate", "edition", "definitive", "complete", "remastered"}
|
||
words = s.split()
|
||
filtered_words = [word for word in words if word not in keywords_to_remove]
|
||
return " ".join(filtered_words)
|
||
|
||
def is_valid_candidate(candidate):
|
||
"""
|
||
Проверяет, содержит ли кандидат запрещённые подстроки:
|
||
- win32
|
||
- win64
|
||
- gamelauncher
|
||
Для проверки дополнительно используется строка без пробелов.
|
||
Возвращает True, если кандидат допустим, иначе False.
|
||
"""
|
||
normalized_candidate = normalize_name(candidate)
|
||
normalized_no_space = normalized_candidate.replace(" ", "")
|
||
forbidden = ["win32", "win64", "gamelauncher"]
|
||
for token in forbidden:
|
||
if token in normalized_no_space:
|
||
return False
|
||
return True
|
||
|
||
def filter_candidates(candidates):
|
||
"""
|
||
Фильтрует список кандидатов, отбрасывая недопустимые.
|
||
"""
|
||
valid = []
|
||
dropped = []
|
||
for cand in candidates:
|
||
if cand.strip() and is_valid_candidate(cand):
|
||
valid.append(cand)
|
||
else:
|
||
dropped.append(cand)
|
||
if dropped:
|
||
logger.info("Отбрасываю кандидатов: %s", dropped)
|
||
return valid
|
||
|
||
def remove_duplicates(candidates):
|
||
"""
|
||
Удаляет дубликаты из списка, сохраняя порядок.
|
||
"""
|
||
return list(dict.fromkeys(candidates))
|
||
|
||
@functools.lru_cache(maxsize=256)
|
||
def get_exiftool_data(game_exe):
|
||
"""Получает метаданные через exiftool"""
|
||
try:
|
||
proc = subprocess.run(
|
||
["exiftool", "-j", game_exe],
|
||
capture_output=True,
|
||
text=True,
|
||
check=False
|
||
)
|
||
if proc.returncode != 0:
|
||
logger.error(f"exiftool error for {game_exe}: {proc.stderr.strip()}")
|
||
return {}
|
||
meta_data_list = orjson.loads(proc.stdout.encode("utf-8"))
|
||
return meta_data_list[0] if meta_data_list else {}
|
||
except Exception as e:
|
||
logger.error(f"An unexpected error occurred in get_exiftool_data for {game_exe}: {e}")
|
||
return {}
|
||
|
||
def load_steam_apps_async(callback: Callable[[list], None]):
|
||
"""
|
||
Asynchronously loads the list of Steam applications, using cache if available.
|
||
Calls the callback with the list of apps.
|
||
"""
|
||
cache_dir = get_cache_dir()
|
||
cache_tar = os.path.join(cache_dir, "games_appid.tar.xz")
|
||
cache_json = os.path.join(cache_dir, "steam_apps.json")
|
||
|
||
def process_tar(result: str | None):
|
||
if not result or not os.path.exists(result):
|
||
logger.error("Failed to download Steam apps archive")
|
||
callback([])
|
||
return
|
||
try:
|
||
with tarfile.open(result, mode='r:xz') as tar:
|
||
member = next((m for m in tar.getmembers() if m.name.endswith('.json')), None)
|
||
if member is None:
|
||
raise RuntimeError("JSON file not found in archive")
|
||
fobj = tar.extractfile(member)
|
||
if fobj is None:
|
||
raise RuntimeError(f"Failed to extract file {member.name} from archive")
|
||
raw = fobj.read()
|
||
fobj.close()
|
||
data = orjson.loads(raw)
|
||
with open(cache_json, "wb") as f:
|
||
f.write(orjson.dumps(data))
|
||
if os.path.exists(cache_tar):
|
||
os.remove(cache_tar)
|
||
logger.info("Archive %s deleted after extraction", cache_tar)
|
||
steam_apps = data.get("applist", {}).get("apps", []) if isinstance(data, dict) else data or []
|
||
logger.info("Loaded %d apps from archive", len(steam_apps))
|
||
callback(steam_apps)
|
||
except Exception as e:
|
||
logger.error("Error extracting Steam apps archive: %s", e)
|
||
callback([])
|
||
|
||
if os.path.exists(cache_json) and (time.time() - os.path.getmtime(cache_json) < CACHE_DURATION):
|
||
logger.info("Using cached Steam apps JSON: %s", cache_json)
|
||
try:
|
||
with open(cache_json, "rb") as f:
|
||
data = orjson.loads(f.read())
|
||
steam_apps = data.get("applist", {}).get("apps", []) if isinstance(data, dict) else data or []
|
||
logger.info("Loaded %d apps from cache", len(steam_apps))
|
||
callback(steam_apps)
|
||
except Exception as e:
|
||
logger.error("Error reading cached JSON: %s", e)
|
||
callback([])
|
||
else:
|
||
app_list_url = (
|
||
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/games_appid.tar.xz"
|
||
)
|
||
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
|
||
|
||
def build_index(steam_apps):
|
||
"""
|
||
Строит индекс приложений по полю normalized_name.
|
||
"""
|
||
steam_apps_index = {}
|
||
if not steam_apps:
|
||
return steam_apps_index
|
||
logger.info("Построение индекса Steam приложений:")
|
||
for app in steam_apps:
|
||
normalized = app["normalized_name"]
|
||
steam_apps_index[normalized] = app
|
||
return steam_apps_index
|
||
|
||
def search_app(candidate, steam_apps_index):
|
||
"""
|
||
Ищет приложение по кандидату: сначала пытается точное совпадение, затем ищет подстроку.
|
||
"""
|
||
candidate_norm = normalize_name(candidate)
|
||
logger.info("Поиск приложения для кандидата: '%s' -> '%s'", candidate, candidate_norm)
|
||
if candidate_norm in steam_apps_index:
|
||
logger.info(" Найдено точное совпадение: '%s'", candidate_norm)
|
||
return steam_apps_index[candidate_norm]
|
||
for name_norm, app in steam_apps_index.items():
|
||
if candidate_norm in name_norm:
|
||
ratio = len(candidate_norm) / len(name_norm)
|
||
if ratio > 0.8:
|
||
logger.info(" Найдено частичное совпадение: кандидат '%s' в '%s' (ratio: %.2f)",
|
||
candidate_norm, name_norm, ratio)
|
||
return app
|
||
logger.info(" Приложение для кандидата '%s' не найдено", candidate_norm)
|
||
return None
|
||
|
||
def load_app_details(app_id):
|
||
"""Загружает кэшированные данные для игры по appid, если они не устарели."""
|
||
cache_dir = get_cache_dir()
|
||
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
|
||
if os.path.exists(cache_file):
|
||
if time.time() - os.path.getmtime(cache_file) < CACHE_DURATION:
|
||
with open(cache_file, "rb") as f:
|
||
return orjson.loads(f.read())
|
||
return None
|
||
|
||
def save_app_details(app_id, data):
|
||
"""Сохраняет данные по appid в файл кэша."""
|
||
cache_dir = get_cache_dir()
|
||
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
|
||
with open(cache_file, "wb") as f:
|
||
f.write(orjson.dumps(data))
|
||
|
||
def fetch_app_info_async(app_id: int, callback: Callable[[dict | None], None]):
|
||
"""
|
||
Asynchronously fetches detailed app info from Steam API.
|
||
Calls the callback with the app data or None if failed.
|
||
"""
|
||
cached = load_app_details(app_id)
|
||
if cached is not None:
|
||
callback(cached)
|
||
return
|
||
|
||
lang = get_steam_language()
|
||
url = f"https://store.steampowered.com/api/appdetails?appids={app_id}&l={lang}"
|
||
cache_dir = get_cache_dir()
|
||
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
|
||
|
||
def process_response(result: str | None):
|
||
if not result or not os.path.exists(result):
|
||
logger.error("Failed to download Steam app info for appid %s", app_id)
|
||
callback(None)
|
||
return
|
||
try:
|
||
with open(result, "rb") as f:
|
||
data = orjson.loads(f.read())
|
||
details = data.get(str(app_id), {})
|
||
if not details.get("success"):
|
||
callback(None)
|
||
return
|
||
app_data_full = details.get("data", {})
|
||
app_data = {
|
||
"steam_appid": app_data_full.get("steam_appid", app_id),
|
||
"name": app_data_full.get("name", ""),
|
||
"short_description": app_data_full.get("short_description", ""),
|
||
"controller_support": app_data_full.get("controller_support", "")
|
||
}
|
||
save_app_details(app_id, app_data)
|
||
callback(app_data)
|
||
except Exception as e:
|
||
logger.error("Error processing Steam app info for appid %s: %s", app_id, e)
|
||
callback(None)
|
||
|
||
downloader.download_async(url, cache_file, timeout=5, callback=process_response)
|
||
|
||
def load_weanticheatyet_data_async(callback: Callable[[list], None]):
|
||
"""
|
||
Asynchronously loads the list of WeAntiCheatYet data, using cache if available.
|
||
Calls the callback with the list of anti-cheat data.
|
||
"""
|
||
cache_dir = get_cache_dir()
|
||
cache_tar = os.path.join(cache_dir, "anticheat_games.tar.xz")
|
||
cache_json = os.path.join(cache_dir, "anticheat_games.json")
|
||
|
||
def process_tar(result: str | None):
|
||
if not result or not os.path.exists(result):
|
||
logger.error("Failed to download WeAntiCheatYet archive")
|
||
callback([])
|
||
return
|
||
try:
|
||
with tarfile.open(result, mode='r:xz') as tar:
|
||
member = next((m for m in tar.getmembers() if m.name.endswith('anticheat_games_min.json')), None)
|
||
if member is None:
|
||
raise RuntimeError("JSON file not found in archive")
|
||
fobj = tar.extractfile(member)
|
||
if fobj is None:
|
||
raise RuntimeError(f"Failed to extract file {member.name} from archive")
|
||
raw = fobj.read()
|
||
fobj.close()
|
||
data = orjson.loads(raw)
|
||
with open(cache_json, "wb") as f:
|
||
f.write(orjson.dumps(data))
|
||
if os.path.exists(cache_tar):
|
||
os.remove(cache_tar)
|
||
logger.info("Archive %s deleted after extraction", cache_tar)
|
||
anti_cheat_data = data or []
|
||
logger.info("Loaded %d anti-cheat entries from archive", len(anti_cheat_data))
|
||
callback(anti_cheat_data)
|
||
except Exception as e:
|
||
logger.error("Error extracting WeAntiCheatYet archive: %s", e)
|
||
callback([])
|
||
|
||
if os.path.exists(cache_json) and (time.time() - os.path.getmtime(cache_json) < CACHE_DURATION):
|
||
logger.info("Using cached WeAntiCheatYet JSON: %s", cache_json)
|
||
try:
|
||
with open(cache_json, "rb") as f:
|
||
data = orjson.loads(f.read())
|
||
anti_cheat_data = data or []
|
||
logger.info("Loaded %d anti-cheat entries from cache", len(anti_cheat_data))
|
||
callback(anti_cheat_data)
|
||
except Exception as e:
|
||
logger.error("Error reading cached WeAntiCheatYet JSON: %s", e)
|
||
callback([])
|
||
else:
|
||
app_list_url = (
|
||
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/anticheat_games.tar.xz"
|
||
)
|
||
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
|
||
|
||
def build_weanticheatyet_index(anti_cheat_data):
|
||
"""
|
||
Строит индекс античит-данных по полю normalized_name.
|
||
"""
|
||
anti_cheat_index = {}
|
||
if not anti_cheat_data:
|
||
return anti_cheat_index
|
||
logger.info("Построение индекса WeAntiCheatYet данных:")
|
||
for entry in anti_cheat_data:
|
||
normalized = entry["normalized_name"]
|
||
anti_cheat_index[normalized] = entry
|
||
return anti_cheat_index
|
||
|
||
def search_anticheat_status(candidate, anti_cheat_index):
|
||
candidate_norm = normalize_name(candidate)
|
||
logger.info("Поиск античит-статуса для кандидата: '%s' -> '%s'", candidate, candidate_norm)
|
||
if candidate_norm in anti_cheat_index:
|
||
status = anti_cheat_index[candidate_norm]["status"]
|
||
logger.info(" Найдено точное совпадение: '%s', статус: '%s'", candidate_norm, status)
|
||
return status
|
||
for name_norm, entry in anti_cheat_index.items():
|
||
if candidate_norm in name_norm:
|
||
ratio = len(candidate_norm) / len(name_norm)
|
||
if ratio > 0.8:
|
||
status = entry["status"]
|
||
logger.info(" Найдено частичное совпадение: кандидат '%s' в '%s' (ratio: %.2f), статус: '%s'",
|
||
candidate_norm, name_norm, ratio, status)
|
||
return status
|
||
logger.info(" Античит-статус для кандидата '%s' не найден", candidate_norm)
|
||
return ""
|
||
|
||
def get_weanticheatyet_status_async(game_name: str, callback: Callable[[str], None]):
|
||
"""
|
||
Asynchronously retrieves WeAntiCheatYet status for a game by name.
|
||
Calls the callback with the status string or empty string if not found.
|
||
"""
|
||
def on_anticheat_data(anti_cheat_data: list):
|
||
anti_cheat_index = build_weanticheatyet_index(anti_cheat_data)
|
||
status = search_anticheat_status(game_name, anti_cheat_index)
|
||
callback(status)
|
||
|
||
load_weanticheatyet_data_async(on_anticheat_data)
|
||
|
||
def load_protondb_status(appid):
|
||
"""Загружает закешированные данные ProtonDB для игры по appid, если они не устарели."""
|
||
cache_dir = get_cache_dir()
|
||
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
|
||
if os.path.exists(cache_file):
|
||
if time.time() - os.path.getmtime(cache_file) < CACHE_DURATION:
|
||
try:
|
||
with open(cache_file, "rb") as f:
|
||
return orjson.loads(f.read())
|
||
except Exception as e:
|
||
logger.error("Ошибка загрузки кеша ProtonDB для appid %s: %s", appid, e)
|
||
return None
|
||
|
||
def save_protondb_status(appid, data):
|
||
"""Сохраняет данные ProtonDB для игры по appid в файл кэша."""
|
||
cache_dir = get_cache_dir()
|
||
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
|
||
try:
|
||
with open(cache_file, "wb") as f:
|
||
f.write(orjson.dumps(data))
|
||
except Exception as e:
|
||
logger.error("Ошибка сохранения кеша ProtonDB для appid %s: %s", appid, e)
|
||
|
||
def get_protondb_tier_async(appid: int, callback: Callable[[str], None]):
|
||
"""
|
||
Asynchronously fetches ProtonDB tier for an app.
|
||
Calls the callback with the tier string or empty string if failed.
|
||
"""
|
||
cached = load_protondb_status(appid)
|
||
if cached is not None:
|
||
callback(cached.get("tier", ""))
|
||
return
|
||
|
||
url = f"https://www.protondb.com/api/v1/reports/summaries/{appid}.json"
|
||
cache_dir = get_cache_dir()
|
||
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
|
||
|
||
def process_response(result: str | None):
|
||
if not result or not os.path.exists(result):
|
||
logger.info("Failed to download ProtonDB data for appid %s", appid)
|
||
callback("")
|
||
return
|
||
try:
|
||
with open(result, "rb") as f:
|
||
data = orjson.loads(f.read())
|
||
filtered_data = {"tier": data.get("tier", "")}
|
||
save_protondb_status(appid, filtered_data)
|
||
callback(filtered_data["tier"])
|
||
except Exception as e:
|
||
logger.info("Failed to process ProtonDB data for appid %s: %s", appid, e)
|
||
callback("")
|
||
|
||
downloader.download_async(url, cache_file, timeout=5, callback=process_response)
|
||
|
||
def get_full_steam_game_info_async(appid: int, callback: Callable[[dict], None]):
|
||
"""
|
||
Asynchronously retrieves full Steam game info, including WeAntiCheatYet status.
|
||
Calls the callback with the game info dictionary.
|
||
"""
|
||
def on_app_info(app_info: dict | None):
|
||
if not app_info:
|
||
callback({})
|
||
return
|
||
title = decode_text(app_info.get("name", ""))
|
||
description = decode_text(app_info.get("short_description", ""))
|
||
cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg"
|
||
|
||
def on_protondb_tier(tier: str):
|
||
def on_anticheat_status(anticheat_status: str):
|
||
callback({
|
||
'description': description,
|
||
'controller_support': app_info.get('controller_support', ''),
|
||
'cover': cover,
|
||
'protondb_tier': tier,
|
||
'steam_game': "true",
|
||
'name': title,
|
||
'anticheat_status': anticheat_status
|
||
})
|
||
|
||
get_weanticheatyet_status_async(title, on_anticheat_status)
|
||
|
||
get_protondb_tier_async(appid, on_protondb_tier)
|
||
|
||
fetch_app_info_async(appid, on_app_info)
|
||
|
||
def get_steam_game_info_async(desktop_name: str, exec_line: str, callback: Callable[[dict], tuple[bool, str] | None]) -> None:
|
||
"""
|
||
Asynchronously retrieves game info based on desktop name and exec line, including WeAntiCheatYet status for all games.
|
||
Calls the callback with the game info dictionary.
|
||
"""
|
||
parts = shlex.split(exec_line)
|
||
game_exe = parts[-1] if parts else exec_line
|
||
|
||
if game_exe.lower().endswith('.bat'):
|
||
if os.path.exists(game_exe):
|
||
try:
|
||
with open(game_exe, encoding='utf-8') as f:
|
||
bat_lines = f.readlines()
|
||
for line in bat_lines:
|
||
line = line.strip()
|
||
if '.exe' in line.lower():
|
||
tokens = shlex.split(line)
|
||
for token in tokens:
|
||
if token.lower().endswith('.exe'):
|
||
game_exe = token
|
||
break
|
||
if game_exe.lower().endswith('.exe'):
|
||
break
|
||
except Exception as e:
|
||
logger.error("Error processing bat file %s: %s", game_exe, e)
|
||
else:
|
||
logger.error("Bat file not found: %s", game_exe)
|
||
|
||
if not game_exe.lower().endswith('.exe'):
|
||
logger.error("Invalid executable path: %s. Expected .exe", game_exe)
|
||
meta_data = {}
|
||
else:
|
||
meta_data = get_exiftool_data(game_exe)
|
||
|
||
exe_name = os.path.splitext(os.path.basename(game_exe))[0]
|
||
folder_path = os.path.dirname(game_exe)
|
||
folder_name = os.path.basename(folder_path)
|
||
if folder_name.lower() in ['bin', 'binaries']:
|
||
folder_path = os.path.dirname(folder_path)
|
||
folder_name = os.path.basename(folder_path)
|
||
logger.info("Game folder name: '%s'", folder_name)
|
||
candidates = []
|
||
product_name = meta_data.get("ProductName", "")
|
||
file_description = meta_data.get("FileDescription", "")
|
||
if product_name and product_name not in ['BootstrapPackagedGame']:
|
||
candidates.append(product_name)
|
||
if file_description and file_description not in ['BootstrapPackagedGame']:
|
||
candidates.append(file_description)
|
||
if desktop_name:
|
||
candidates.append(desktop_name)
|
||
if exe_name:
|
||
candidates.append(exe_name)
|
||
if folder_name:
|
||
candidates.append(folder_name)
|
||
logger.info("Initial candidates: %s", candidates)
|
||
candidates = filter_candidates(candidates)
|
||
candidates = remove_duplicates(candidates)
|
||
candidates_ordered = sorted(candidates, key=lambda s: len(s.split()), reverse=True)
|
||
logger.info("Sorted candidates: %s", candidates_ordered)
|
||
|
||
def on_steam_apps(steam_apps: list):
|
||
steam_apps_index = build_index(steam_apps)
|
||
matching_app = None
|
||
for candidate in candidates_ordered:
|
||
if not candidate:
|
||
continue
|
||
matching_app = search_app(candidate, steam_apps_index)
|
||
if matching_app:
|
||
logger.info("Match found for candidate '%s': %s", candidate, matching_app.get("normalized_name"))
|
||
break
|
||
|
||
game_name = desktop_name or exe_name.capitalize()
|
||
|
||
if not matching_app:
|
||
def on_anticheat_status(anticheat_status: str):
|
||
callback({
|
||
"appid": "",
|
||
"name": decode_text(game_name),
|
||
"description": "",
|
||
"cover": "",
|
||
"controller_support": "",
|
||
"protondb_tier": "",
|
||
"steam_game": "false",
|
||
"anticheat_status": anticheat_status
|
||
})
|
||
|
||
get_weanticheatyet_status_async(game_name, on_anticheat_status)
|
||
return
|
||
|
||
appid = matching_app["appid"]
|
||
def on_app_info(app_info: dict | None):
|
||
if not app_info:
|
||
def on_anticheat_status(anticheat_status: str):
|
||
callback({
|
||
"appid": "",
|
||
"name": decode_text(game_name),
|
||
"description": "",
|
||
"cover": "",
|
||
"controller_support": "",
|
||
"protondb_tier": "",
|
||
"steam_game": "false",
|
||
"anticheat_status": anticheat_status
|
||
})
|
||
|
||
get_weanticheatyet_status_async(game_name, on_anticheat_status)
|
||
return
|
||
|
||
title = decode_text(app_info.get("name", game_name))
|
||
description = decode_text(app_info.get("short_description", ""))
|
||
cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg"
|
||
controller_support = app_info.get("controller_support", "")
|
||
|
||
def on_protondb_tier(tier: str):
|
||
def on_anticheat_status(anticheat_status: str):
|
||
callback({
|
||
"appid": appid,
|
||
"name": title,
|
||
"description": description,
|
||
"cover": cover,
|
||
"controller_support": controller_support,
|
||
"protondb_tier": tier,
|
||
"steam_game": "true",
|
||
"anticheat_status": anticheat_status
|
||
})
|
||
|
||
get_weanticheatyet_status_async(title, on_anticheat_status)
|
||
|
||
get_protondb_tier_async(appid, on_protondb_tier)
|
||
|
||
fetch_app_info_async(appid, on_app_info)
|
||
|
||
load_steam_apps_async(on_steam_apps)
|
||
|
||
_STEAM_APPS = None
|
||
_STEAM_APPS_INDEX = None
|
||
_STEAM_APPS_LOCK = threading.Lock()
|
||
|
||
def get_steam_apps_and_index_async(callback: Callable[[tuple[list, dict]], None]):
|
||
"""
|
||
Asynchronously loads and caches Steam apps and their index.
|
||
Calls the callback with (steam_apps, steam_apps_index).
|
||
"""
|
||
global _STEAM_APPS, _STEAM_APPS_INDEX
|
||
with _STEAM_APPS_LOCK:
|
||
if _STEAM_APPS is not None and _STEAM_APPS_INDEX is not None:
|
||
callback((_STEAM_APPS, _STEAM_APPS_INDEX))
|
||
return
|
||
|
||
def on_steam_apps(steam_apps: list):
|
||
global _STEAM_APPS, _STEAM_APPS_INDEX
|
||
with _STEAM_APPS_LOCK:
|
||
_STEAM_APPS = steam_apps
|
||
_STEAM_APPS_INDEX = build_index(steam_apps)
|
||
callback((_STEAM_APPS, _STEAM_APPS_INDEX))
|
||
|
||
load_steam_apps_async(on_steam_apps)
|
||
|
||
def add_to_steam(game_name: str, exec_line: str, cover_path: str) -> tuple[bool, str]:
|
||
"""
|
||
Add a non-Steam game to Steam via shortcuts.vdf with PortProton tag,
|
||
and download Steam Grid covers with correct sizes and names.
|
||
"""
|
||
|
||
if not exec_line or not exec_line.strip():
|
||
logger.error("Invalid exec_line: empty or whitespace")
|
||
return (False, "Executable command is empty or invalid")
|
||
|
||
# Parse exec_line to get the executable path
|
||
try:
|
||
entry_exec_split = shlex.split(exec_line)
|
||
if not entry_exec_split:
|
||
logger.error(f"Failed to parse exec_line: {exec_line}")
|
||
return (False, "Failed to parse executable command: no valid tokens")
|
||
|
||
if entry_exec_split[0] == "env" and len(entry_exec_split) >= 3:
|
||
exe_path = entry_exec_split[2]
|
||
elif entry_exec_split[0] == "flatpak" and len(entry_exec_split) >= 4:
|
||
exe_path = entry_exec_split[3]
|
||
else:
|
||
exe_path = entry_exec_split[-1]
|
||
except Exception as e:
|
||
logger.error(f"Failed to parse exec_line: {exec_line}, error: {e}")
|
||
return (False, f"Failed to parse executable command: {e}")
|
||
|
||
if not os.path.exists(exe_path):
|
||
logger.error(f"Executable not found: {exe_path}")
|
||
return (False, f"Executable file not found: {exe_path}")
|
||
|
||
portproton_dir = get_portproton_location()
|
||
if not portproton_dir:
|
||
logger.error("PortProton directory not found")
|
||
return (False, "PortProton directory not found")
|
||
|
||
steam_scripts_dir = os.path.join(portproton_dir, "steam_scripts")
|
||
os.makedirs(steam_scripts_dir, exist_ok=True)
|
||
|
||
safe_game_name = re.sub(r'[<>:"/\\|?*]', '_', game_name.strip())
|
||
script_path = os.path.join(steam_scripts_dir, f"{safe_game_name}.sh")
|
||
start_sh_path = os.path.join(portproton_dir, "data", "scripts", "start.sh")
|
||
|
||
if not os.path.exists(start_sh_path):
|
||
logger.error(f"start.sh not found at {start_sh_path}")
|
||
return (False, f"start.sh not found at {start_sh_path}")
|
||
|
||
if not os.path.exists(script_path):
|
||
script_content = f"""#!/usr/bin/env bash
|
||
export LD_PRELOAD=
|
||
export START_FROM_STEAM=1
|
||
"{start_sh_path}" "{exe_path}" "$@"
|
||
"""
|
||
try:
|
||
with open(script_path, "w", encoding="utf-8") as f:
|
||
f.write(script_content)
|
||
os.chmod(script_path, 0o755)
|
||
logger.info(f"Created launch script: {script_path}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to create launch script {script_path}: {e}")
|
||
return (False, f"Failed to create launch script: {e}")
|
||
else:
|
||
logger.info(f"Launch script already exists: {script_path}")
|
||
|
||
generated_icon_path = os.path.join(portproton_dir, "data", "img", f"{safe_game_name}.png")
|
||
try:
|
||
img_dir = os.path.join(portproton_dir, "data", "img")
|
||
os.makedirs(img_dir, exist_ok=True)
|
||
|
||
if os.path.exists(generated_icon_path):
|
||
logger.info(f"Reusing existing thumbnail: {generated_icon_path}")
|
||
else:
|
||
success = generate_thumbnail(exe_path, generated_icon_path, size=128, force_resize=True)
|
||
if not success or not os.path.exists(generated_icon_path):
|
||
logger.warning(f"generate_thumbnail failed to create icon for {exe_path}")
|
||
icon_path = ""
|
||
else:
|
||
logger.info(f"Generated thumbnail: {generated_icon_path}")
|
||
icon_path = generated_icon_path
|
||
except Exception as e:
|
||
logger.error(f"Error generating thumbnail for {exe_path}: {e}")
|
||
icon_path = ""
|
||
|
||
steam_home = get_steam_home()
|
||
if not steam_home:
|
||
logger.error("Steam home directory not found")
|
||
return (False, "Steam directory not found.")
|
||
|
||
last_user = get_last_steam_user(steam_home)
|
||
if not last_user or 'SteamID' not in last_user:
|
||
logger.error("Failed to retrieve Steam user ID")
|
||
return (False, "Failed to get Steam user ID.")
|
||
|
||
userdata_dir = steam_home / "userdata"
|
||
user_id = last_user['SteamID']
|
||
unsigned_id = convert_steam_id(user_id)
|
||
user_dir = userdata_dir / str(unsigned_id)
|
||
steam_shortcuts_path = user_dir / "config" / "shortcuts.vdf"
|
||
grid_dir = user_dir / "config" / "grid"
|
||
os.makedirs(grid_dir, exist_ok=True)
|
||
|
||
backup_path = f"{steam_shortcuts_path}.backup"
|
||
if os.path.exists(steam_shortcuts_path):
|
||
try:
|
||
shutil.copy2(steam_shortcuts_path, backup_path)
|
||
logger.info(f"Created backup of shortcuts.vdf at {backup_path}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to create backup of shortcuts.vdf: {e}")
|
||
return (False, f"Failed to create backup of shortcuts.vdf: {e}")
|
||
|
||
unique_string = f"{script_path}{game_name}"
|
||
baseid = zlib.crc32(unique_string.encode('utf-8')) & 0xffffffff
|
||
appid = baseid | 0x80000000
|
||
if appid > 0x7FFFFFFF:
|
||
aidvdf = appid - 0x100000000
|
||
else:
|
||
aidvdf = appid
|
||
|
||
steam_appid = None
|
||
downloaded_count = 0
|
||
total_covers = 4 # количество обложек
|
||
|
||
download_lock = threading.Lock()
|
||
|
||
def on_cover_download(cover_file: str, cover_type: str):
|
||
nonlocal downloaded_count
|
||
try:
|
||
if cover_file and os.path.exists(cover_file):
|
||
logger.info(f"Downloaded cover {cover_type} to {cover_file}")
|
||
else:
|
||
logger.warning(f"Failed to download cover {cover_type} for appid {steam_appid}")
|
||
except Exception as e:
|
||
logger.error(f"Error processing cover {cover_type} for appid {steam_appid}: {e}")
|
||
with download_lock:
|
||
downloaded_count += 1
|
||
if downloaded_count == total_covers:
|
||
finalize_shortcut()
|
||
|
||
def finalize_shortcut():
|
||
tags_dict = {'0': 'PortProton'}
|
||
shortcut = {
|
||
"appid": aidvdf,
|
||
"AppName": game_name,
|
||
"Exe": f'"{script_path}"',
|
||
"StartDir": f'"{os.path.dirname(script_path)}"',
|
||
"icon": icon_path,
|
||
"LaunchOptions": "",
|
||
"IsHidden": 0,
|
||
"AllowDesktopConfig": 1,
|
||
"AllowOverlay": 1,
|
||
"openvr": 0,
|
||
"Devkit": 0,
|
||
"DevkitGameID": "",
|
||
"LastPlayTime": 0,
|
||
"tags": tags_dict
|
||
}
|
||
logger.info(f"Shortcut entry to be written: {shortcut}")
|
||
|
||
try:
|
||
if not os.path.exists(steam_shortcuts_path):
|
||
os.makedirs(os.path.dirname(steam_shortcuts_path), exist_ok=True)
|
||
open(steam_shortcuts_path, 'wb').close()
|
||
|
||
try:
|
||
if os.path.getsize(steam_shortcuts_path) > 0:
|
||
with open(steam_shortcuts_path, 'rb') as f:
|
||
shortcuts_data = vdf.binary_load(f)
|
||
else:
|
||
shortcuts_data = {"shortcuts": {}}
|
||
except Exception as load_err:
|
||
logger.warning(f"Failed to load existing shortcuts.vdf, starting fresh: {load_err}")
|
||
shortcuts_data = {"shortcuts": {}}
|
||
|
||
shortcuts = shortcuts_data.get("shortcuts", {})
|
||
for _key, entry in shortcuts.items():
|
||
if entry.get("AppName") == game_name and entry.get("Exe") == f'"{script_path}"':
|
||
logger.info(f"Game '{game_name}' already exists in Steam shortcuts")
|
||
return (False, f"Game '{game_name}' already exists in Steam")
|
||
|
||
new_index = str(len(shortcuts))
|
||
shortcuts[new_index] = shortcut
|
||
|
||
with open(steam_shortcuts_path, 'wb') as f:
|
||
vdf.binary_dump({"shortcuts": shortcuts}, f)
|
||
except Exception as e:
|
||
logger.error(f"Failed to update shortcuts.vdf: {e}")
|
||
if os.path.exists(backup_path):
|
||
try:
|
||
shutil.copy2(backup_path, steam_shortcuts_path)
|
||
logger.info("Restored shortcuts.vdf from backup due to update failure")
|
||
except Exception as restore_err:
|
||
logger.error(f"Failed to restore shortcuts.vdf from backup: {restore_err}")
|
||
return (False, f"Failed to update shortcuts.vdf: {e}")
|
||
|
||
logger.info(f"Game '{game_name}' successfully added to Steam with covers")
|
||
return (True, f"Game '{game_name}' added to Steam with covers")
|
||
|
||
def on_game_info(game_info: dict):
|
||
nonlocal steam_appid
|
||
steam_appid = game_info.get("appid")
|
||
if not steam_appid or not isinstance(steam_appid, int):
|
||
logger.info("No valid Steam appid found, skipping cover download")
|
||
return finalize_shortcut()
|
||
|
||
# Обложки и имена, соответствующие bash-скрипту и твоим размерам
|
||
cover_types = [
|
||
(".jpg", "header.jpg"), # базовый, сохранится как AppId.jpg
|
||
("p.jpg", "library_600x900_2x.jpg"), # сохранится как AppIdp.jpg
|
||
("_hero.jpg", "library_hero.jpg"), # AppId_hero.jpg
|
||
("_logo.png", "logo.png") # AppId_logo.png
|
||
]
|
||
|
||
for suffix, cover_type in cover_types:
|
||
cover_file = os.path.join(grid_dir, f"{appid}{suffix}")
|
||
cover_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{steam_appid}/{cover_type}"
|
||
downloader.download_async(
|
||
cover_url,
|
||
cover_file,
|
||
timeout=5,
|
||
callback=lambda result, cfile=cover_file, ctype=cover_type: on_cover_download(cfile, ctype)
|
||
)
|
||
|
||
get_steam_game_info_async(game_name, exec_line, on_game_info)
|
||
return (True, "Adding game to Steam, checking for covers...")
|
||
|
||
def remove_from_steam(game_name: str, exec_line: str) -> tuple[bool, str]:
|
||
"""
|
||
Remove a non-Steam game from Steam by deleting its entry from shortcuts.vdf.
|
||
"""
|
||
# Validate inputs
|
||
if not game_name or not game_name.strip():
|
||
logger.error("Invalid game_name: empty or whitespace")
|
||
return (False, "Game name is empty or invalid")
|
||
if not exec_line or not exec_line.strip():
|
||
logger.error("Invalid exec_line: empty or whitespace")
|
||
return (False, "Executable command is empty or invalid")
|
||
|
||
# Get PortProton directory
|
||
portproton_dir = get_portproton_location()
|
||
if not portproton_dir:
|
||
logger.error("PortProton directory not found")
|
||
return (False, "PortProton directory not found")
|
||
|
||
# Construct script path
|
||
safe_game_name = re.sub(r'[<>:"/\\|?*]', '_', game_name.strip())
|
||
script_path = os.path.join(portproton_dir, "steam_scripts", f"{safe_game_name}.sh")
|
||
|
||
# Get Steam home directory
|
||
steam_home = get_steam_home()
|
||
if not steam_home:
|
||
logger.error("Steam home directory not found")
|
||
return (False, "Steam directory not found.")
|
||
|
||
# Get current Steam user ID
|
||
last_user = get_last_steam_user(steam_home)
|
||
if not last_user or 'SteamID' not in last_user:
|
||
logger.error("Failed to retrieve Steam user ID")
|
||
return (False, "Failed to get Steam user ID.")
|
||
userdata_dir = steam_home / "userdata"
|
||
user_id = last_user['SteamID']
|
||
unsigned_id = convert_steam_id(user_id)
|
||
user_dir = userdata_dir / str(unsigned_id)
|
||
|
||
# Construct path to shortcuts.vdf and grid directory
|
||
steam_shortcuts_path = os.path.join(user_dir, "config", "shortcuts.vdf")
|
||
grid_dir = os.path.join(user_dir, "config", "grid")
|
||
|
||
# Check if shortcuts.vdf exists
|
||
if not os.path.exists(steam_shortcuts_path):
|
||
logger.info(f"shortcuts.vdf not found at {steam_shortcuts_path}")
|
||
return (False, f"Game '{game_name}' not found in Steam")
|
||
|
||
# Generate appid for identifying cover files
|
||
unique_string = f"{script_path}{game_name}"
|
||
baseid = zlib.crc32(unique_string.encode('utf-8')) & 0xffffffff
|
||
appid = baseid | 0x80000000
|
||
|
||
# Create backup of shortcuts.vdf
|
||
backup_path = f"{steam_shortcuts_path}.backup"
|
||
try:
|
||
shutil.copy2(steam_shortcuts_path, backup_path)
|
||
logger.info(f"Created backup of shortcuts.vdf at {backup_path}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to create backup of shortcuts.vdf: {e}")
|
||
return (False, f"Failed to create backup of shortcuts.vdf: {e}")
|
||
|
||
# Load and modify shortcuts.vdf
|
||
try:
|
||
if os.path.getsize(steam_shortcuts_path) > 0:
|
||
with open(steam_shortcuts_path, 'rb') as f:
|
||
shortcuts_data = vdf.binary_load(f)
|
||
else:
|
||
shortcuts_data = {"shortcuts": {}}
|
||
except Exception as load_err:
|
||
logger.error(f"Failed to load shortcuts.vdf: {load_err}")
|
||
return (False, f"Failed to load shortcuts.vdf: {load_err}")
|
||
|
||
shortcuts = shortcuts_data.get("shortcuts", {})
|
||
found = False
|
||
new_shortcuts = {}
|
||
index = 0
|
||
|
||
# Filter out the matching shortcut
|
||
for _key, entry in shortcuts.items():
|
||
if entry.get("AppName") == game_name and entry.get("Exe") == f'"{script_path}"':
|
||
found = True
|
||
logger.info(f"Found matching shortcut for '{game_name}' to remove")
|
||
continue
|
||
new_shortcuts[str(index)] = entry
|
||
index += 1
|
||
|
||
if not found:
|
||
logger.info(f"Game '{game_name}' not found in Steam shortcuts")
|
||
return (False, f"Game '{game_name}' not found in Steam")
|
||
|
||
# Save updated shortcuts.vdf
|
||
try:
|
||
with open(steam_shortcuts_path, 'wb') as f:
|
||
vdf.binary_dump({"shortcuts": new_shortcuts}, f)
|
||
logger.info(f"Successfully updated shortcuts.vdf, removed '{game_name}'")
|
||
except Exception as e:
|
||
logger.error(f"Failed to update shortcuts.vdf: {e}")
|
||
if os.path.exists(backup_path):
|
||
try:
|
||
shutil.copy2(backup_path, steam_shortcuts_path)
|
||
logger.info("Restored shortcuts.vdf from backup due to update failure")
|
||
except Exception as restore_err:
|
||
logger.error(f"Failed to restore shortcuts.vdf from backup: {restore_err}")
|
||
return (False, f"Failed to update shortcuts.vdf: {e}")
|
||
|
||
# Delete cover files
|
||
cover_files = [
|
||
os.path.join(grid_dir, f"{appid}.jpg"),
|
||
os.path.join(grid_dir, f"{appid}p.jpg"),
|
||
os.path.join(grid_dir, f"{appid}_hero.jpg"),
|
||
os.path.join(grid_dir, f"{appid}_logo.png")
|
||
]
|
||
for cover_file in cover_files:
|
||
if os.path.exists(cover_file):
|
||
try:
|
||
os.remove(cover_file)
|
||
logger.info(f"Deleted cover file: {cover_file}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete cover file {cover_file}: {e}")
|
||
else:
|
||
logger.info(f"Cover file not found: {cover_file}")
|
||
|
||
if os.path.exists(script_path):
|
||
try:
|
||
os.remove(script_path)
|
||
logger.info(f"Deleted steam script: {script_path}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete steam script {script_path}: {e}")
|
||
else:
|
||
logger.info(f"Steam script not found: {script_path}")
|
||
|
||
logger.info(f"Game '{game_name}' successfully removed from Steam")
|
||
return (True, f"Game '{game_name}' removed from Steam")
|
||
|
||
def is_game_in_steam(game_name: str) -> bool:
|
||
steam_home = get_steam_home()
|
||
if steam_home is None:
|
||
logger.warning("Steam home directory not found")
|
||
return False
|
||
|
||
try:
|
||
last_user = get_last_steam_user(steam_home)
|
||
if not last_user or 'SteamID' not in last_user:
|
||
logger.warning("No valid Steam user found")
|
||
return False
|
||
user_id = last_user['SteamID']
|
||
unsigned_id = convert_steam_id(user_id)
|
||
steam_shortcuts_path = os.path.join(str(steam_home), "userdata", str(unsigned_id), "config", "shortcuts.vdf")
|
||
if not os.path.exists(steam_shortcuts_path):
|
||
logger.warning(f"Shortcuts file not found at {steam_shortcuts_path}")
|
||
return False
|
||
|
||
shortcuts_data = safe_vdf_load(steam_shortcuts_path)
|
||
shortcuts = shortcuts_data.get("shortcuts", {})
|
||
for _key, entry in shortcuts.items():
|
||
if entry.get("AppName") == game_name:
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Error checking if game {game_name} is in Steam: {e}")
|
||
return False
|