Files
PortProtonQt/portprotonqt/steam_api.py
2025-09-23 21:05:58 +05:00

1319 lines
53 KiB
Python

import functools
import os
import shlex
import subprocess
import time
import html
import orjson
import vdf
import tarfile
import threading
from pathlib import Path
from portprotonqt.logger import get_logger
from portprotonqt.localization import get_steam_language
from portprotonqt.downloader import Downloader
from portprotonqt.dialogs import generate_thumbnail
from portprotonqt.config_utils import get_portproton_location
from collections.abc import Callable
import re
import shutil
import zlib
import websocket
import requests
import random
import base64
import glob
downloader = Downloader()
logger = get_logger(__name__)
CACHE_DURATION = 30 * 24 * 60 * 60
def safe_vdf_load(path: str | Path) -> dict:
path = str(path) # Convert Path to str
try:
with open(path, 'rb') as f:
data = vdf.binary_load(f)
return data
except Exception as e:
try:
with open(path, encoding='utf-8', errors='ignore') as f:
data = vdf.load(f)
return data
except Exception:
logger.error(f"Failed to load VDF file {path}: {e}")
return {}
def decode_text(text: str) -> str:
"""
Decodes HTML entities in a string.
For example, """ is converted to '"'.
Other characters and HTML tags remain unchanged.
"""
return html.unescape(text)
def get_cache_dir():
"""Returns the path to the cache directory, creating it if necessary."""
xdg_cache_home = os.getenv("XDG_CACHE_HOME", os.path.join(os.path.expanduser("~"), ".cache"))
cache_dir = os.path.join(xdg_cache_home, "PortProtonQt")
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
STEAM_DATA_DIRS = (
"~/.local/share/Steam",
"~/snap/steam/common/.local/share/Steam",
"~/.var/app/com.valvesoftware.Steam/data/Steam",
)
def get_steam_home():
"""Returns the path to the Steam directory using a list of possible directories."""
for dir_path in STEAM_DATA_DIRS:
expanded_path = Path(os.path.expanduser(dir_path))
if expanded_path.exists():
return expanded_path
return None
def get_last_steam_user(steam_home: Path) -> dict | None:
"""Returns data for the last Steam user from loginusers.vdf."""
loginusers_path = steam_home / "config/loginusers.vdf"
data = safe_vdf_load(loginusers_path)
if not data:
return None
users = data.get('users', {})
for user_id, user_info in users.items():
if user_info.get('MostRecent') == '1':
try:
return {'SteamID': int(user_id)}
except ValueError:
logger.error(f"Invalid SteamID format: {user_id}")
return None
logger.info("No user found with MostRecent=1")
return None
def convert_steam_id(steam_id: int) -> int:
"""
Converts a signed 32-bit integer to an unsigned 32-bit integer.
Uses bitwise AND with 0xFFFFFFFF to correctly handle negative values.
"""
return steam_id & 0xFFFFFFFF
def get_steam_libs(steam_dir: Path) -> set[Path]:
"""Returns a set of Steam library folders."""
libs = set()
libs_vdf = steam_dir / "steamapps/libraryfolders.vdf"
data = safe_vdf_load(libs_vdf)
folders = data.get('libraryfolders', {})
for key, info in folders.items():
if key.isdigit():
path_str = info.get('path') if isinstance(info, dict) else None
if path_str:
path = Path(path_str).expanduser()
if path.exists():
libs.add(path)
libs.add(steam_dir)
return libs
def get_playtime_data(steam_home: Path | None = None) -> dict[int, tuple[int, int]]:
"""Returns playtime data for the last user."""
play_data: dict[int, tuple[int, int]] = {}
if steam_home is None:
steam_home = get_steam_home()
if steam_home is None or not steam_home.exists():
logger.error("Steam home directory not found or does not exist")
return play_data
userdata_dir = steam_home / "userdata"
if not userdata_dir.exists():
logger.info("Userdata directory not found")
return play_data
if steam_home is not None: # Explicit check for type checker
last_user = get_last_steam_user(steam_home)
else:
logger.info("Steam home is None, cannot retrieve last user")
return play_data
if not last_user:
logger.info("Could not identify the last Steam user")
return play_data
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
if not user_dir.exists():
logger.info(f"User directory {unsigned_id} not found")
return play_data
localconfig = user_dir / "config/localconfig.vdf"
data = safe_vdf_load(localconfig)
cfg = data.get('UserLocalConfigStore', {})
apps = cfg.get('Software', {}).get('Valve', {}).get('Steam', {}).get('apps', {})
for appid_str, info in apps.items():
try:
appid = int(appid_str)
last_played = int(info.get('LastPlayed', 0))
playtime = int(info.get('Playtime', 0))
play_data[appid] = (last_played, playtime)
except ValueError:
logger.warning(f"Invalid playtime data for app {appid_str}")
return play_data
def get_steam_installed_games() -> list[tuple[str, int, int, int]]:
"""Returns a list of installed Steam games in the format (name, appid, last_played, playtime_sec)."""
games: list[tuple[str, int, int, int]] = []
steam_home = get_steam_home()
if steam_home is None or not steam_home.exists():
logger.error("Steam home directory not found or does not exist")
return games
play_data = get_playtime_data(steam_home)
for lib in get_steam_libs(steam_home):
steamapps_dir = lib / "steamapps"
if not steamapps_dir.exists():
continue
for manifest in steamapps_dir.glob("appmanifest_*.acf"):
data = safe_vdf_load(manifest)
app = data.get('AppState', {})
try:
appid = int(app.get('appid', 0))
except ValueError:
continue
name = app.get('name', f"Unknown ({appid})")
lname = name.lower()
if any(token in lname for token in ["proton", "steamworks", "steam linux runtime"]):
continue
last_played, playtime_min = play_data.get(appid, (0, 0))
games.append((name, appid, last_played, playtime_min * 60))
return games
def normalize_name(s):
"""
Normalizes a string by:
- converting to lowercase,
- removing ™ and ® symbols,
- replacing separators (-, :, ,) with spaces,
- removing extra spaces,
- removing 'bin' or 'app' suffixes,
- removing keywords like 'ultimate', 'edition', etc.
"""
s = s.lower()
for ch in ["", "®"]:
s = s.replace(ch, "")
for ch in ["-", ":", ","]:
s = s.replace(ch, " ")
s = " ".join(s.split())
for suffix in ["bin", "app"]:
if s.endswith(suffix):
s = s[:-len(suffix)].strip()
keywords_to_remove = {"ultimate", "edition", "definitive", "complete", "remastered"}
words = s.split()
filtered_words = [word for word in words if word not in keywords_to_remove]
return " ".join(filtered_words)
def is_valid_candidate(candidate):
"""
Checks if a candidate contains forbidden substrings:
- win32
- win64
- gamelauncher
Additionally checks the string without spaces.
Returns True if the candidate is valid, otherwise False.
"""
normalized_candidate = normalize_name(candidate)
normalized_no_space = normalized_candidate.replace(" ", "")
forbidden = ["win32", "win64", "gamelauncher"]
for token in forbidden:
if token in normalized_no_space:
return False
return True
def filter_candidates(candidates):
"""
Filters a list of candidates, discarding invalid ones.
"""
valid = []
dropped = []
for cand in candidates:
if cand.strip() and is_valid_candidate(cand):
valid.append(cand)
else:
dropped.append(cand)
if dropped:
logger.info("Discarding candidates: %s", dropped)
return valid
def remove_duplicates(candidates):
"""
Removes duplicates from a list while preserving order.
"""
return list(dict.fromkeys(candidates))
@functools.lru_cache(maxsize=256)
def get_exiftool_data(game_exe):
"""Retrieves metadata using exiftool."""
try:
proc = subprocess.run(
["exiftool", "-j", game_exe],
capture_output=True,
text=True,
check=False
)
if proc.returncode != 0:
logger.error(f"exiftool failed for {game_exe}: {proc.stderr.strip()}")
return {}
meta_data_list = orjson.loads(proc.stdout.encode("utf-8"))
return meta_data_list[0] if meta_data_list else {}
except Exception as e:
logger.error(f"Unexpected error in get_exiftool_data for {game_exe}: {e}")
return {}
def delete_cached_app_files(cache_dir: str, pattern: str):
"""Deletes cached files matching the given pattern in the cache directory."""
try:
for file_path in glob.glob(os.path.join(cache_dir, pattern)):
os.remove(file_path)
logger.info(f"Deleted cached file: {file_path}")
except Exception as e:
logger.error(f"Failed to delete cached files matching {pattern}: {e}")
def load_steam_apps_async(callback: Callable[[list], None]):
"""
Asynchronously loads the list of Steam applications, using cache if available.
Calls the callback with the list of apps.
Deletes cached app detail files when downloading a new steam_apps.json.
"""
cache_dir = get_cache_dir()
cache_tar = os.path.join(cache_dir, "games_appid.tar.xz")
cache_json = os.path.join(cache_dir, "steam_apps.json")
def process_tar(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download Steam apps archive")
callback([])
return
try:
with tarfile.open(result, mode='r:xz') as tar:
member = next((m for m in tar.getmembers() if m.name.endswith('.json')), None)
if member is None:
raise RuntimeError("JSON file not found in archive")
fobj = tar.extractfile(member)
if fobj is None:
raise RuntimeError(f"Failed to extract file {member.name} from archive")
raw = fobj.read()
fobj.close()
data = orjson.loads(raw)
with open(cache_json, "wb") as f:
f.write(orjson.dumps(data))
if os.path.exists(cache_tar):
os.remove(cache_tar)
logger.info("Deleted archive: %s", cache_tar)
# Delete all cached app detail files (steam_app_*.json)
delete_cached_app_files(cache_dir, "steam_app_*.json")
steam_apps = data if isinstance(data, list) else []
logger.info("Loaded %d apps from archive", len(steam_apps))
callback(steam_apps)
except Exception as e:
logger.error("Failed to extract Steam apps archive: %s", e)
callback([])
if os.path.exists(cache_json) and (time.time() - os.path.getmtime(cache_json) < CACHE_DURATION):
logger.info("Using cached Steam apps JSON: %s", cache_json)
try:
with open(cache_json, "rb") as f:
data = orjson.loads(f.read())
# Validate JSON structure
if not isinstance(data, list):
logger.error("Invalid JSON format in %s (not a list), re-downloading", cache_json)
raise ValueError("Invalid JSON structure")
# Validate each app entry
for app in data:
if not isinstance(app, dict) or "appid" not in app or "normalized_name" not in app:
logger.error("Invalid app entry in cached JSON %s, re-downloading", cache_json)
raise ValueError("Invalid app entry structure")
steam_apps = data
logger.info("Loaded %d apps from cache", len(steam_apps))
callback(steam_apps)
except Exception as e:
logger.error("Failed to read or validate cached JSON %s: %s", cache_json, e)
# Attempt to re-download if cache is invalid or corrupted
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/games_appid.tar.xz"
)
# Delete cached app detail files before re-downloading
delete_cached_app_files(cache_dir, "steam_app_*.json")
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
else:
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/games_appid.tar.xz"
)
# Delete cached app detail files before downloading
delete_cached_app_files(cache_dir, "steam_app_*.json")
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
def build_index(steam_apps):
"""
Builds an index of applications by normalized_name field.
"""
steam_apps_index = {}
if not steam_apps:
return steam_apps_index
logger.info("Building Steam apps index")
for app in steam_apps:
normalized = app["normalized_name"]
steam_apps_index[normalized] = app
return steam_apps_index
def search_app(candidate, steam_apps_index):
"""
Searches for an application by candidate: tries exact match first, then substring match.
"""
candidate_norm = normalize_name(candidate)
logger.info("Searching for app with candidate: '%s' -> '%s'", candidate, candidate_norm)
if candidate_norm in steam_apps_index:
logger.info("Found exact match: '%s'", candidate_norm)
return steam_apps_index[candidate_norm]
for name_norm, app in steam_apps_index.items():
if candidate_norm in name_norm:
ratio = len(candidate_norm) / len(name_norm)
if ratio > 0.8:
logger.info("Found partial match: candidate '%s' in '%s' (ratio: %.2f)", candidate_norm, name_norm, ratio)
return app
logger.info("No app found for candidate '%s'", candidate_norm)
return None
def load_app_details(app_id):
"""Loads cached game data by appid if not outdated."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
if os.path.exists(cache_file):
if time.time() - os.path.getmtime(cache_file) < CACHE_DURATION:
with open(cache_file, "rb") as f:
return orjson.loads(f.read())
return None
def save_app_details(app_id, data):
"""Saves appid data to a cache file."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
with open(cache_file, "wb") as f:
f.write(orjson.dumps(data))
def fetch_app_info_async(app_id: int, callback: Callable[[dict | None], None]):
"""
Asynchronously fetches detailed app info from Steam API.
Calls the callback with the app data or None if failed.
"""
cached = load_app_details(app_id)
if cached is not None:
callback(cached)
return
lang = get_steam_language()
url = f"https://store.steampowered.com/api/appdetails?appids={app_id}&l={lang}"
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"steam_app_{app_id}.json")
def process_response(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download Steam app info for appid %s", app_id)
callback(None)
return
try:
with open(result, "rb") as f:
data = orjson.loads(f.read())
details = data.get(str(app_id), {})
if not details.get("success"):
callback(None)
return
app_data_full = details.get("data", {})
app_data = {
"steam_appid": app_data_full.get("steam_appid", app_id),
"name": app_data_full.get("name", ""),
"short_description": app_data_full.get("short_description", ""),
"controller_support": app_data_full.get("controller_support", "")
}
save_app_details(app_id, app_data)
callback(app_data)
except Exception as e:
logger.error("Failed to process Steam app info for appid %s: %s", app_id, e)
callback(None)
downloader.download_async(url, cache_file, timeout=5, callback=process_response)
def load_weanticheatyet_data_async(callback: Callable[[list], None]):
"""
Asynchronously loads the list of WeAntiCheatYet data, using cache if available.
Calls the callback with the list of anti-cheat data.
Deletes cached anti-cheat files when downloading a new anticheat_games.json.
"""
cache_dir = get_cache_dir()
cache_tar = os.path.join(cache_dir, "anticheat_games.tar.xz")
cache_json = os.path.join(cache_dir, "anticheat_games.json")
def process_tar(result: str | None):
if not result or not os.path.exists(result):
logger.error("Failed to download WeAntiCheatYet archive")
callback([])
return
try:
with tarfile.open(result, mode='r:xz') as tar:
member = next((m for m in tar.getmembers() if m.name.endswith('anticheat_games_min.json')), None)
if member is None:
raise RuntimeError("JSON file not found in archive")
fobj = tar.extractfile(member)
if fobj is None:
raise RuntimeError(f"Failed to extract file {member.name} from archive")
raw = fobj.read()
fobj.close()
data = orjson.loads(raw)
with open(cache_json, "wb") as f:
f.write(orjson.dumps(data))
if os.path.exists(cache_tar):
os.remove(cache_tar)
logger.info("Deleted archive: %s", cache_tar)
anti_cheat_data = data or []
logger.info("Loaded %d anti-cheat entries from archive", len(anti_cheat_data))
callback(anti_cheat_data)
except Exception as e:
logger.error("Failed to extract WeAntiCheatYet archive: %s", e)
callback([])
if os.path.exists(cache_json) and (time.time() - os.path.getmtime(cache_json) < CACHE_DURATION):
logger.info("Using cached WeAntiCheatYet JSON: %s", cache_json)
try:
with open(cache_json, "rb") as f:
data = orjson.loads(f.read())
# Validate JSON structure
if not isinstance(data, list):
logger.error("Invalid JSON format in %s (not a list), re-downloading", cache_json)
raise ValueError("Invalid JSON structure")
# Validate each anti-cheat entry
for entry in data:
if not isinstance(entry, dict) or "normalized_name" not in entry or "status" not in entry:
logger.error("Invalid anti-cheat entry in cached JSON %s, re-downloading", cache_json)
raise ValueError("Invalid anti-cheat entry structure")
anti_cheat_data = data
logger.info("Loaded %d anti-cheat entries from cache", len(anti_cheat_data))
callback(anti_cheat_data)
except Exception as e:
logger.error("Failed to read or validate cached WeAntiCheatYet JSON %s: %s", cache_json, e)
# Attempt to re-download if cache is invalid or corrupted
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/anticheat_games.tar.xz"
)
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
else:
app_list_url = (
"https://git.linux-gaming.ru/Boria138/PortProtonQt/raw/branch/main/data/anticheat_games.tar.xz"
)
downloader.download_async(app_list_url, cache_tar, timeout=5, callback=process_tar)
def build_weanticheatyet_index(anti_cheat_data):
"""
Builds an index of anti-cheat data by normalized_name field.
"""
anti_cheat_index = {}
if not anti_cheat_data:
return anti_cheat_index
logger.info("Building WeAntiCheatYet data index")
for entry in anti_cheat_data:
normalized = entry["normalized_name"]
anti_cheat_index[normalized] = entry
return anti_cheat_index
def search_anticheat_status(candidate, anti_cheat_index):
candidate_norm = normalize_name(candidate)
logger.info("Searching for anti-cheat status for candidate: '%s' -> '%s'", candidate, candidate_norm)
if candidate_norm in anti_cheat_index:
status = anti_cheat_index[candidate_norm]["status"]
logger.info("Found exact match: '%s', status: '%s'", candidate_norm, status)
return status
for name_norm, entry in anti_cheat_index.items():
if candidate_norm in name_norm:
ratio = len(candidate_norm) / len(name_norm)
if ratio > 0.8:
status = entry["status"]
logger.info("Found partial match: candidate '%s' in '%s' (ratio: %.2f), status: '%s'", candidate_norm, name_norm, ratio, status)
return status
logger.info("No anti-cheat status found for candidate '%s'", candidate_norm)
return ""
def get_weanticheatyet_status_async(game_name: str, callback: Callable[[str], None]):
"""
Asynchronously retrieves WeAntiCheatYet status for a game by name.
Calls the callback with the status string or empty string if not found.
"""
def on_anticheat_data(anti_cheat_data: list):
anti_cheat_index = build_weanticheatyet_index(anti_cheat_data)
status = search_anticheat_status(game_name, anti_cheat_index)
callback(status)
load_weanticheatyet_data_async(on_anticheat_data)
def load_protondb_status(appid):
"""Loads cached ProtonDB data for a game by appid if not outdated."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
if os.path.exists(cache_file):
if time.time() - os.path.getmtime(cache_file) < CACHE_DURATION:
try:
with open(cache_file, "rb") as f:
return orjson.loads(f.read())
except Exception as e:
logger.error("Failed to load ProtonDB cache for appid %s: %s", appid, e)
return None
def save_protondb_status(appid, data):
"""Saves ProtonDB data for a game by appid to a cache file."""
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
try:
with open(cache_file, "wb") as f:
f.write(orjson.dumps(data))
except Exception as e:
logger.error("Failed to save ProtonDB cache for appid %s: %s", appid, e)
def get_protondb_tier_async(appid: int, callback: Callable[[str], None]):
"""
Asynchronously fetches ProtonDB tier for an app.
Calls the callback with the tier string or empty string if failed.
"""
cached = load_protondb_status(appid)
if cached is not None:
callback(cached.get("tier", ""))
return
url = f"https://www.protondb.com/api/v1/reports/summaries/{appid}.json"
cache_dir = get_cache_dir()
cache_file = os.path.join(cache_dir, f"protondb_{appid}.json")
def process_response(result: str | None):
if not result or not os.path.exists(result):
logger.info("Failed to download ProtonDB data for appid %s", appid)
callback("")
return
try:
with open(result, "rb") as f:
data = orjson.loads(f.read())
filtered_data = {"tier": data.get("tier", "")}
save_protondb_status(appid, filtered_data)
callback(filtered_data["tier"])
except Exception as e:
logger.info("Failed to process ProtonDB data for appid %s: %s", appid, e)
callback("")
downloader.download_async(url, cache_file, timeout=5, callback=process_response)
def get_full_steam_game_info_async(appid: int, callback: Callable[[dict], None]):
"""
Asynchronously retrieves full Steam game info, including WeAntiCheatYet status.
Calls the callback with the game info dictionary.
"""
def on_app_info(app_info: dict | None):
if not app_info:
callback({})
return
title = decode_text(app_info.get("name", ""))
description = decode_text(app_info.get("short_description", ""))
cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg"
def on_protondb_tier(tier: str):
def on_anticheat_status(anticheat_status: str):
callback({
'description': description,
'controller_support': app_info.get('controller_support', ''),
'cover': cover,
'protondb_tier': tier,
'steam_game': "true",
'name': title,
'anticheat_status': anticheat_status
})
get_weanticheatyet_status_async(title, on_anticheat_status)
get_protondb_tier_async(appid, on_protondb_tier)
fetch_app_info_async(appid, on_app_info)
def get_steam_game_info_async(desktop_name: str, exec_line: str, callback: Callable[[dict], tuple[bool, str] | None]) -> None:
"""
Asynchronously retrieves game info based on desktop name and exec line, including WeAntiCheatYet status for all games.
Calls the callback with the game info dictionary.
"""
parts = shlex.split(exec_line)
game_exe = parts[-1] if parts else exec_line
if game_exe.lower().endswith('.bat'):
if os.path.exists(game_exe):
try:
with open(game_exe, encoding='utf-8') as f:
bat_lines = f.readlines()
for line in bat_lines:
line = line.strip()
if '.exe' in line.lower():
tokens = shlex.split(line)
for token in tokens:
if token.lower().endswith('.exe'):
game_exe = token
break
if game_exe.lower().endswith('.exe'):
break
except Exception as e:
logger.error("Failed to process bat file %s: %s", game_exe, e)
else:
logger.error("Bat file not found: %s", game_exe)
if not game_exe.lower().endswith('.exe'):
logger.error("Invalid executable path: %s. Expected .exe", game_exe)
meta_data = {}
else:
meta_data = get_exiftool_data(game_exe)
exe_name = os.path.splitext(os.path.basename(game_exe))[0]
folder_path = os.path.dirname(game_exe)
folder_name = os.path.basename(folder_path)
if folder_name.lower() in ['bin', 'binaries']:
folder_path = os.path.dirname(folder_path)
folder_name = os.path.basename(folder_path)
logger.info("Game folder name: '%s'", folder_name)
candidates = []
product_name = meta_data.get("ProductName", "")
file_description = meta_data.get("FileDescription", "")
if product_name and product_name not in ['BootstrapPackagedGame']:
candidates.append(product_name)
if file_description and file_description not in ['BootstrapPackagedGame']:
candidates.append(file_description)
if desktop_name:
candidates.append(desktop_name)
if exe_name:
candidates.append(exe_name)
if folder_name:
candidates.append(folder_name)
logger.info("Initial candidates: %s", candidates)
candidates = filter_candidates(candidates)
candidates = remove_duplicates(candidates)
candidates_ordered = sorted(candidates, key=lambda s: len(s.split()), reverse=True)
logger.info("Sorted candidates: %s", candidates_ordered)
def on_steam_apps(steam_apps: list):
steam_apps_index = build_index(steam_apps)
matching_app = None
for candidate in candidates_ordered:
if not candidate:
continue
matching_app = search_app(candidate, steam_apps_index)
if matching_app:
logger.info("Match found for candidate '%s': %s", candidate, matching_app.get("normalized_name"))
break
game_name = desktop_name or exe_name.capitalize()
if not matching_app:
def on_anticheat_status(anticheat_status: str):
callback({
"appid": "",
"name": decode_text(game_name),
"description": "",
"cover": "",
"controller_support": "",
"protondb_tier": "",
"steam_game": "false",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(game_name, on_anticheat_status)
return
appid = matching_app["appid"]
def on_app_info(app_info: dict | None):
if not app_info:
def on_anticheat_status(anticheat_status: str):
callback({
"appid": "",
"name": decode_text(game_name),
"description": "",
"cover": "",
"controller_support": "",
"protondb_tier": "",
"steam_game": "false",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(game_name, on_anticheat_status)
return
title = decode_text(app_info.get("name", game_name))
description = decode_text(app_info.get("short_description", ""))
cover = f"https://steamcdn-a.akamaihd.net/steam/apps/{appid}/library_600x900_2x.jpg"
controller_support = app_info.get("controller_support", "")
def on_protondb_tier(tier: str):
def on_anticheat_status(anticheat_status: str):
callback({
"appid": appid,
"name": title,
"description": description,
"cover": cover,
"controller_support": controller_support,
"protondb_tier": tier,
"steam_game": "true",
"anticheat_status": anticheat_status
})
get_weanticheatyet_status_async(title, on_anticheat_status)
get_protondb_tier_async(appid, on_protondb_tier)
fetch_app_info_async(appid, on_app_info)
load_steam_apps_async(on_steam_apps)
_STEAM_APPS = None
_STEAM_APPS_INDEX = None
_STEAM_APPS_LOCK = threading.Lock()
def get_steam_apps_and_index_async(callback: Callable[[tuple[list, dict]], None]):
"""
Asynchronously loads and caches Steam apps and their index.
Calls the callback with (steam_apps, steam_apps_index).
"""
global _STEAM_APPS, _STEAM_APPS_INDEX
with _STEAM_APPS_LOCK:
if _STEAM_APPS is not None and _STEAM_APPS_INDEX is not None:
callback((_STEAM_APPS, _STEAM_APPS_INDEX))
return
def on_steam_apps(steam_apps: list):
global _STEAM_APPS, _STEAM_APPS_INDEX
with _STEAM_APPS_LOCK:
_STEAM_APPS = steam_apps
_STEAM_APPS_INDEX = build_index(steam_apps)
callback((_STEAM_APPS, _STEAM_APPS_INDEX))
load_steam_apps_async(on_steam_apps)
def enable_steam_cef() -> tuple[bool, str]:
"""
Checks and enables Steam CEF remote debugging if necessary.
Creates a .cef-enable-remote-debugging file in the Steam directory.
Steam must be restarted after the file is first created.
Returns a tuple:
- (True, "already_enabled") if already enabled.
- (True, "restart_needed") if just enabled and Steam restart is needed.
- (False, "steam_not_found") if Steam directory is not found.
"""
steam_home = get_steam_home()
if not steam_home:
return (False, "steam_not_found")
cef_flag_file = steam_home / ".cef-enable-remote-debugging"
logger.info(f"Checking CEF flag: {cef_flag_file}")
if cef_flag_file.exists():
logger.info("CEF Remote Debugging is already enabled")
return (True, "already_enabled")
else:
try:
os.makedirs(cef_flag_file.parent, exist_ok=True)
cef_flag_file.touch()
logger.info("Enabled CEF Remote Debugging. Steam restart required")
return (True, "restart_needed")
except Exception as e:
logger.error(f"Failed to create CEF flag {cef_flag_file}: {e}")
return (False, str(e))
def call_steam_api(js_cmd: str, *args) -> dict | None:
"""
Executes a JavaScript function in the Steam context via CEF Remote Debugging.
Args:
js_cmd: Name of the JS function to call (e.g., 'createShortcut').
*args: Arguments to pass to the JS function.
Returns:
Dictionary with the result or None if an error occurs.
"""
status, message = enable_steam_cef()
if not (status is True and message == "already_enabled"):
if message == "restart_needed":
logger.warning("Steam CEF API is available but requires Steam restart for full activation")
elif message == "steam_not_found":
logger.error("Could not find Steam directory to check CEF API")
else:
logger.error(f"Steam CEF API is unavailable or not ready: {message}")
return None
steam_debug_url = "http://localhost:8080/json"
try:
response = requests.get(steam_debug_url, timeout=2)
response.raise_for_status()
contexts = response.json()
ws_url = next((ctx['webSocketDebuggerUrl'] for ctx in contexts if ctx['title'] == 'SharedJSContext'), None)
if not ws_url:
logger.warning("SharedJSContext not found. Is Steam running with -cef-enable-remote-debugging?")
return None
except Exception as e:
logger.warning(f"Failed to connect to Steam CEF API at {steam_debug_url}. Is Steam running? {e}")
return None
js_code = """
async function createShortcut(name, exe, dir, icon, args) {
const id = await SteamClient.Apps.AddShortcut(name, exe, dir, args);
console.log("Shortcut created with ID:", id);
await SteamClient.Apps.SetShortcutName(id, name);
if (icon)
await SteamClient.Apps.SetShortcutIcon(id, icon);
if (args)
await SteamClient.Apps.SetAppLaunchOptions(id, args);
return { id };
};
async function setGrid(id, i, ext, image) {
await SteamClient.Apps.SetCustomArtworkForApp(id, image, ext, i);
return true;
};
async function removeShortcut(id) {
await SteamClient.Apps.RemoveShortcut(+id);
return true;
};
"""
try:
ws = websocket.create_connection(ws_url, timeout=5)
js_args = ", ".join(orjson.dumps(arg).decode('utf-8') for arg in args)
expression = f"{js_code} {js_cmd}({js_args});"
payload = {
"id": random.randint(0, 32767),
"method": "Runtime.evaluate",
"params": {
"expression": expression,
"awaitPromise": True,
"returnByValue": True
}
}
ws.send(orjson.dumps(payload))
response_str = ws.recv()
ws.close()
response_data = orjson.loads(response_str)
if "error" in response_data:
logger.error(f"JavaScript execution error in Steam: {response_data['error']['message']}")
return None
result = response_data.get('result', {}).get('result', {})
if result.get('type') == 'object' and result.get('subtype') == 'error':
logger.error(f"JavaScript execution error in Steam: {result.get('description')}")
return None
return result.get('value')
except Exception as e:
logger.error(f"WebSocket interaction error with Steam: {e}")
return None
def add_to_steam(game_name: str, exec_line: str, cover_path: str) -> tuple[bool, str]:
"""
Add a non-Steam game to Steam via shortcuts.vdf with PortProton tag,
and download Steam Grid covers with correct sizes and names.
"""
if not exec_line or not exec_line.strip():
logger.error("Invalid exec_line: empty or whitespace")
return (False, "Executable command is empty or invalid")
# Parse exec_line to get the executable path
try:
entry_exec_split = shlex.split(exec_line)
if not entry_exec_split:
logger.error(f"Failed to parse exec_line: {exec_line}")
return (False, "Failed to parse executable command: no valid tokens")
if entry_exec_split[0] == "env" and len(entry_exec_split) >= 3:
exe_path = entry_exec_split[2]
elif entry_exec_split[0] == "flatpak" and len(entry_exec_split) >= 4:
exe_path = entry_exec_split[3]
else:
exe_path = entry_exec_split[-1]
except Exception as e:
logger.error(f"Failed to parse exec_line: {exec_line}, error: {e}")
return (False, f"Failed to parse executable command: {e}")
if not os.path.exists(exe_path):
logger.error(f"Executable not found: {exe_path}")
return (False, f"Executable file not found: {exe_path}")
portproton_dir = get_portproton_location()
if not portproton_dir:
logger.error("PortProton directory not found")
return (False, "PortProton directory not found")
steam_scripts_dir = os.path.join(portproton_dir, "steam_scripts")
os.makedirs(steam_scripts_dir, exist_ok=True)
safe_game_name = re.sub(r'[<>:"/\\|?*]', '_', game_name.strip())
script_path = os.path.join(steam_scripts_dir, f"{safe_game_name}.sh")
start_sh_path = os.path.join(portproton_dir, "data", "scripts", "start.sh")
if not os.path.exists(start_sh_path):
logger.error(f"start.sh not found at {start_sh_path}")
return (False, f"start.sh not found at {start_sh_path}")
if not os.path.exists(script_path):
script_content = f"""#!/usr/bin/env bash
export LD_PRELOAD=
export START_FROM_STEAM=1
"{start_sh_path}" "{exe_path}" "$@"
"""
try:
with open(script_path, "w", encoding="utf-8") as f:
f.write(script_content)
os.chmod(script_path, 0o755)
logger.info(f"Created launch script: {script_path}")
except Exception as e:
logger.error(f"Failed to create launch script {script_path}: {e}")
return (False, f"Failed to create launch script: {e}")
else:
logger.info(f"Launch script already exists: {script_path}")
generated_icon_path = os.path.join(portproton_dir, "data", "img", f"{safe_game_name}.png")
try:
img_dir = os.path.join(portproton_dir, "data", "img")
os.makedirs(img_dir, exist_ok=True)
if os.path.exists(generated_icon_path):
logger.info(f"Reusing existing thumbnail: {generated_icon_path}")
else:
success = generate_thumbnail(exe_path, generated_icon_path, size=128, force_resize=True)
if not success or not os.path.exists(generated_icon_path):
logger.warning(f"Failed to generate thumbnail for {exe_path}")
icon_path = ""
else:
logger.info(f"Generated thumbnail: {generated_icon_path}")
icon_path = generated_icon_path
except Exception as e:
logger.error(f"Failed to generate thumbnail for {exe_path}: {e}")
icon_path = ""
steam_home = get_steam_home()
if not steam_home:
logger.error("Steam home directory not found")
return (False, "Steam directory not found")
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.error("Failed to retrieve Steam user ID")
return (False, "Failed to get Steam user ID")
userdata_dir = steam_home / "userdata"
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
steam_shortcuts_path = user_dir / "config" / "shortcuts.vdf"
grid_dir = user_dir / "config" / "grid"
os.makedirs(grid_dir, exist_ok=True)
appid = None
was_api_used = False
logger.info("Attempting to add shortcut via Steam CEF API")
api_response = call_steam_api(
"createShortcut",
game_name,
script_path,
str(Path(script_path).parent),
icon_path,
""
)
if api_response and isinstance(api_response, dict) and 'id' in api_response:
appid = api_response['id']
was_api_used = True
logger.info(f"Shortcut successfully added via API. AppID: {appid}")
else:
logger.warning("Failed to add shortcut via API. Falling back to shortcuts.vdf")
backup_path = f"{steam_shortcuts_path}.backup"
if os.path.exists(steam_shortcuts_path):
try:
shutil.copy2(steam_shortcuts_path, backup_path)
logger.info(f"Created backup of shortcuts.vdf at {backup_path}")
except Exception as e:
logger.error(f"Failed to create backup of shortcuts.vdf: {e}")
return (False, f"Failed to create backup of shortcuts.vdf: {e}")
unique_string = f"{script_path}{game_name}"
baseid = zlib.crc32(unique_string.encode('utf-8')) & 0xffffffff
appid = baseid | 0x80000000
if appid > 0x7FFFFFFF:
aidvdf = appid - 0x100000000
else:
aidvdf = appid
shortcut = {
"appid": aidvdf,
"AppName": game_name,
"Exe": f'"{script_path}"',
"StartDir": f'"{os.path.dirname(script_path)}"',
"icon": icon_path,
"LaunchOptions": "",
"IsHidden": 0,
"AllowDesktopConfig": 1,
"AllowOverlay": 1,
"openvr": 0,
"Devkit": 0,
"DevkitGameID": "",
"LastPlayTime": 0,
"tags": {'0': 'PortProton'}
}
logger.info(f"Shortcut entry to be written: {shortcut}")
try:
if not os.path.exists(steam_shortcuts_path):
os.makedirs(os.path.dirname(steam_shortcuts_path), exist_ok=True)
open(steam_shortcuts_path, 'wb').close()
try:
if os.path.getsize(steam_shortcuts_path) > 0:
with open(steam_shortcuts_path, 'rb') as f:
shortcuts_data = vdf.binary_load(f)
else:
shortcuts_data = {"shortcuts": {}}
except Exception as load_err:
logger.warning(f"Failed to load existing shortcuts.vdf, starting fresh: {load_err}")
shortcuts_data = {"shortcuts": {}}
shortcuts = shortcuts_data.get("shortcuts", {})
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name and entry.get("Exe") == f'"{script_path}"':
logger.info(f"Game '{game_name}' already exists in Steam shortcuts")
return (False, f"Game '{game_name}' already exists in Steam")
new_index = str(len(shortcuts))
shortcuts[new_index] = shortcut
with open(steam_shortcuts_path, 'wb') as f:
vdf.binary_dump({"shortcuts": shortcuts}, f)
logger.info(f"Game '{game_name}' successfully added to Steam with covers")
except Exception as e:
logger.error(f"Failed to update shortcuts.vdf: {e}")
if os.path.exists(backup_path):
try:
shutil.copy2(backup_path, steam_shortcuts_path)
logger.info("Restored shortcuts.vdf from backup due to update failure")
except Exception as restore_err:
logger.error(f"Failed to restore shortcuts.vdf from backup: {restore_err}")
appid = None
if not appid:
return (False, "Failed to create shortcut using any method")
steam_appid = None
def on_game_info(game_info: dict):
nonlocal steam_appid
steam_appid = game_info.get("appid")
if not steam_appid or not isinstance(steam_appid, int):
logger.info("No valid Steam appid found, skipping cover download")
return
logger.info(f"Found Steam AppID {steam_appid} for cover download")
cover_types = [
("p.jpg", "library_600x900_2x.jpg"),
("_hero.jpg", "library_hero.jpg"),
("_logo.png", "logo.png"),
(".jpg", "header.jpg")
]
def on_cover_download(result_path: str | None, steam_name: str, index: int):
try:
if result_path and os.path.exists(result_path):
logger.info(f"Downloaded cover {steam_name} to {result_path}")
if was_api_used:
try:
with open(result_path, 'rb') as f:
img_b64 = base64.b64encode(f.read()).decode('utf-8')
logger.info(f"Applying cover type '{steam_name}' via API for AppID {appid}")
ext = Path(steam_name).suffix.lstrip('.')
call_steam_api("setGrid", appid, index, ext, img_b64)
except Exception as e:
logger.error(f"Failed to apply cover '{steam_name}' via API: {e}")
else:
logger.warning(f"Failed to download cover {steam_name} for appid {steam_appid}")
except Exception as e:
logger.error(f"Failed to process cover {steam_name} for appid {steam_appid}: {e}")
for i, (suffix, steam_name) in enumerate(cover_types):
cover_file = os.path.join(grid_dir, f"{appid}{suffix}")
cover_url = f"https://cdn.cloudflare.steamstatic.com/steam/apps/{steam_appid}/{steam_name}"
downloader.download_async(
cover_url,
cover_file,
timeout=5,
callback=lambda result, index=i, name=steam_name: on_cover_download(result, name, index)
)
get_steam_game_info_async(game_name, exec_line, on_game_info)
return (True, "Adding game to Steam, checking for covers...")
def remove_from_steam(game_name: str, exec_line: str) -> tuple[bool, str]:
"""
Remove a non-Steam game from Steam by deleting its entry from shortcuts.vdf.
"""
# Validate inputs
if not game_name or not game_name.strip():
logger.error("Invalid game_name: empty or whitespace")
return (False, "Game name is empty or invalid")
if not exec_line or not exec_line.strip():
logger.error("Invalid exec_line: empty or whitespace")
return (False, "Executable command is empty or invalid")
# Get PortProton directory
portproton_dir = get_portproton_location()
if not portproton_dir:
logger.error("PortProton directory not found")
return (False, "PortProton directory not found")
# Construct script path
safe_game_name = re.sub(r'[<>:"/\\|?*]', '_', game_name.strip())
script_path = os.path.join(portproton_dir, "steam_scripts", f"{safe_game_name}.sh")
# Get Steam home directory
steam_home = get_steam_home()
if not steam_home:
logger.error("Steam home directory not found")
return (False, "Steam directory not found")
# Get current Steam user ID
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.error("Failed to retrieve Steam user ID")
return (False, "Failed to get Steam user ID")
userdata_dir = steam_home / "userdata"
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
user_dir = userdata_dir / str(unsigned_id)
# Construct path to shortcuts.vdf and grid directory
steam_shortcuts_path = os.path.join(user_dir, "config", "shortcuts.vdf")
grid_dir = os.path.join(user_dir, "config", "grid")
# Check if shortcuts.vdf exists
if not os.path.exists(steam_shortcuts_path):
logger.info(f"shortcuts.vdf not found at {steam_shortcuts_path}")
return (False, f"Game '{game_name}' not found in Steam")
appid = None
# Load and modify shortcuts.vdf
try:
if os.path.getsize(steam_shortcuts_path) > 0:
with open(steam_shortcuts_path, 'rb') as f:
shortcuts_data = vdf.binary_load(f)
else:
shortcuts_data = {"shortcuts": {}}
except Exception as load_err:
logger.error(f"Failed to load shortcuts.vdf: {load_err}")
return (False, f"Failed to load shortcuts.vdf: {load_err}")
shortcuts = shortcuts_data.get("shortcuts", {})
new_shortcuts = {}
index = 0
# Filter out the matching shortcut
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name and entry.get("Exe") == f'"{script_path}"':
appid = convert_steam_id(int(entry.get("appid")))
logger.info(f"Found matching shortcut for '{game_name}' to remove")
continue
new_shortcuts[str(index)] = entry
index += 1
if not appid:
logger.info(f"Game '{game_name}' not found in Steam shortcuts")
return (False, f"Game '{game_name}' not found in Steam")
api_response = call_steam_api("removeShortcut", appid)
if api_response is not None: # API responded, even if response is empty
logger.info(f"Shortcut for AppID {appid} successfully removed via API")
else:
logger.warning("Failed to remove shortcut via API. Falling back to editing shortcuts.vdf")
# Create backup of shortcuts.vdf
backup_path = f"{steam_shortcuts_path}.backup"
try:
shutil.copy2(steam_shortcuts_path, backup_path)
logger.info(f"Created backup of shortcuts.vdf at {backup_path}")
except Exception as e:
logger.error(f"Failed to create backup of shortcuts.vdf: {e}")
return (False, f"Failed to create backup of shortcuts.vdf: {e}")
# Save updated shortcuts.vdf
try:
with open(steam_shortcuts_path, 'wb') as f:
vdf.binary_dump({"shortcuts": new_shortcuts}, f)
logger.info(f"Successfully updated shortcuts.vdf, removed '{game_name}'")
except Exception as e:
logger.error(f"Failed to update shortcuts.vdf: {e}")
if os.path.exists(backup_path):
try:
shutil.copy2(backup_path, steam_shortcuts_path)
logger.info("Restored shortcuts.vdf from backup due to update failure")
except Exception as restore_err:
logger.error(f"Failed to restore shortcuts.vdf from backup: {restore_err}")
return (False, f"Failed to update shortcuts.vdf: {e}")
# Delete cover files
cover_files = [
os.path.join(grid_dir, f"{appid}.jpg"),
os.path.join(grid_dir, f"{appid}p.jpg"),
os.path.join(grid_dir, f"{appid}_hero.jpg"),
os.path.join(grid_dir, f"{appid}_logo.png")
]
for cover_file in cover_files:
if os.path.exists(cover_file):
try:
os.remove(cover_file)
logger.info(f"Deleted cover file: {cover_file}")
except Exception as e:
logger.error(f"Failed to delete cover file {cover_file}: {e}")
else:
logger.info(f"Cover file not found: {cover_file}")
if os.path.exists(script_path):
try:
os.remove(script_path)
logger.info(f"Deleted steam script: {script_path}")
except Exception as e:
logger.error(f"Failed to delete steam script {script_path}: {e}")
else:
logger.info(f"Steam script not found: {script_path}")
logger.info(f"Game '{game_name}' successfully removed from Steam")
return (True, f"Game '{game_name}' removed from Steam")
def is_game_in_steam(game_name: str) -> bool:
steam_home = get_steam_home()
if steam_home is None:
logger.warning("Steam home directory not found")
return False
try:
last_user = get_last_steam_user(steam_home)
if not last_user or 'SteamID' not in last_user:
logger.warning("No valid Steam user found")
return False
user_id = last_user['SteamID']
unsigned_id = convert_steam_id(user_id)
steam_shortcuts_path = os.path.join(str(steam_home), "userdata", str(unsigned_id), "config", "shortcuts.vdf")
if not os.path.exists(steam_shortcuts_path):
logger.warning(f"Shortcuts file not found at {steam_shortcuts_path}")
return False
shortcuts_data = safe_vdf_load(steam_shortcuts_path)
shortcuts = shortcuts_data.get("shortcuts", {})
for _key, entry in shortcuts.items():
if entry.get("AppName") == game_name:
return True
except Exception as e:
logger.error(f"Failed to check if game {game_name} is in Steam: {e}")
return False