* Добавлена начальная возможность переключения уровня логирования

* Добавлены ещё несколько функций работы с файлами
* Добавлена начальная поддержка фетчинга интернет ресурсов
* Исправлены мелкие ошибки
This commit is contained in:
2025-03-07 00:52:13 +03:00
parent 4cda6bdeb0
commit a9852dbffa
7 changed files with 238 additions and 74 deletions

View File

@ -1,72 +0,0 @@
import os
import glob
import re
from .log import *
def main_clear_db():
base_dir = os.path.dirname(os.path.realpath(__file__))
portwine_db_path = os.path.join(base_dir, 'portwine_db')
os.makedirs(portwine_db_path, exist_ok=True)
for filename in glob.glob(os.path.join(portwine_db_path, '*')): # Установка разрешений на файлы
os.chmod(filename, 0o644)
duplicate_finder = {} #Поиск дубликатов в файлах
for filename in glob.glob(os.path.join(portwine_db_path, '*')):
with open(filename, 'r') as file:
lines = file.readlines()
for line in lines:
if '.exe' in line and '#' in line:
line = line.strip()
if line not in duplicate_finder:
duplicate_finder[line] = []
duplicate_finder[line].append(filename)
duplicates = {line: files for line, files in duplicate_finder.items() if len(files) > 1}
if duplicates:
log.warning("Обнаружены дубликаты в файлах:")
for line, files in duplicates.items():
for file in files:
log.info(f"{file.split('portwine_db/')[1]} содержит дубликат: {line}")
exit(1)
for ppdb in glob.glob(os.path.join(portwine_db_path, '*')): # Обработка каждого файла
log.debug(ppdb)
with open(ppdb, 'r') as file: # Удаление определённых строк
lines = file.readlines()
lines = [line for line in lines if not line.startswith(('##export', '##add_'))]
lines = [line for line in lines if not (
re.search(r'MANGOHUD|FPS_LIMIT|VKBASALT|_RAY_TRACING|_DLSS|PW_GUI_DISABLED_CS|PW_USE_GAMEMODE|'
r'PW_USE_SYSTEM_VK_LAYERS|PW_DISABLE_COMPOSITING|PW_USE_EAC_AND_BE|PW_USE_OBS_VKCAPTURE|'
r'GAMESCOPE|PW_GS', line)
)]
if any(re.search(r'PW_USE_DGVOODOO2="0"|PW_DGVOODOO2="0"', line) for line in lines):
lines = [line for line in lines if not re.search(r'PW_USE_DGVOODOO2|PW_DGV', line)]
lines = [re.sub(r'export PW_WINE_USE=.*', 'export PW_WINE_USE="WINE_LG"', line)
if 'PW_WINE_USE="WINE_LG' in line else line for line in lines]
lines = [re.sub(r'export PW_WINE_USE=.*', 'export PW_WINE_USE="PROTON_LG"', line)
if 'PW_WINE_USE="PROTON_LG' in line else line for line in lines]
with open(ppdb, 'w') as file: # Сохранение изменений
file.writelines(lines)
ppdb_base = os.path.basename(ppdb) # Переименование файлов
if ppdb_base.endswith('.exe.ppdb'):
new_name = f"{ppdb_base[:-9]}.ppdb"
os.rename(ppdb, os.path.join(portwine_db_path, new_name))
elif ppdb_base.endswith('.EXE.ppdb'):
new_name = f"{ppdb_base[:-9]}.ppdb"
os.rename(ppdb, os.path.join(portwine_db_path, new_name))
elif not ppdb_base.endswith('.ppdb'):
new_name = f"{ppdb_base}.ppdb"
os.rename(ppdb, os.path.join(portwine_db_path, new_name))
log.info("ГОТОВО!")
exit(0)

View File

@ -62,6 +62,25 @@ def try_force_link_dir(path, link):
except Exception as e:
log.error(f"failed to create link for file: {e}")
def replace_file(file_path, file_name): # функция замены файла (сначала запись во временный файл, потом замена)
try:
if os.path.exists(file_name) and os.path.getsize(file_name) > 0:
os.replace(file_name, file_path) # Меняем местами файлы, если временный файл не пуст
log.info(f"Данные успешно обновлены в {file_path}.")
else:
log.warning(f"Временный файл {file_name} пуст, замена в {file_path} не выполнена.")
if os.path.exists(file_name):
os.remove(file_name) # Удаляем пустой временный файл
except Exception as e:
log.error(f"Ошибка при замене файла: {e}")
def try_write_temp_file(file_path, file_name): # функция записи в tmp
try:
with open(file_path, 'w') as file:
file.write("\n".join(file_name)) # Записываем все имена файлов во временный файл
except Exception as e:
log.error(f"Ошибка при записи во временный файл {file_path}: {e}")
def try_remove_dir(path):
if os.path.exists(path) and os.path.isdir(path):
try:
@ -69,7 +88,22 @@ def try_remove_dir(path):
except Exception as e:
log.error(f"failed to remove directory: {e}")
def get_last_modified_time(file_path): # функция получения времени последнего изменения файла
try:
return os.path.getmtime(file_path)
except FileNotFoundError:
log.warning(f"Файл не найден: {file_path}")
return None
except Exception as e:
log.error(f"Ошибка при получении времени изменения файла {file_path}: {e}")
return None
def unpack(archive_path, extract_to=None):
# Проверяем, существует ли архив
if not os.path.isfile(archive_path):
log.error(f"Архив {archive_path} не найден.")
return False
try:
if extract_to is None:
# TODO: перенести распаковку по умолчанию в tmp
@ -83,8 +117,15 @@ def unpack(archive_path, extract_to=None):
log.info(f"Архив {archive_path} успешно распакован в {full_path}")
except tarfile.TarError as e:
log.error(f"Ошибка при распаковке архива {archive_path}: {e}")
return False
except PermissionError:
log.error(f"Ошибка доступа к файлу {archive_path}. Убедитесь, что у вас есть права на чтение.")
return False
except Exception as e:
log.error(f"Неизвестная ошибка: {e}")
return False
return True
def check_hash_sum(check_file, check_sum):
if check_sum and isinstance(check_sum, str):

109
modules/source_fetcher.py Normal file
View File

@ -0,0 +1,109 @@
import re
import requests
import time
from modules.downloader import try_download
from modules.files_worker import *
count_wines = 25 # Количество элементов для записи в .tmp файл
repos = { # Список репозиториев для обработки и их короткие имена
"GloriousEggroll/proton-ge-custom": "proton-ge-custom",
"Kron4ek/Wine-Builds": "Kron4ek",
"GloriousEggroll/wine-ge-custom": "wine-ge-custom",
"CachyOS/proton-cachyos": "proton-cachyos",
"Castro-Fidel/wine_builds": "LG"
}
def source_list_checker(tmp_path): # Проверка наличия и обновления файлов со списками исходников
for repo, short_name in repos.items():
output_file = os.path.join(tmp_path, f"{short_name}.tmp")
if not os.path.exists(output_file):
log.info(f"Файл {output_file} не существует. Получаем данные из репозитория.")
source_list_downloader(repo, tmp_path, short_name, output_file)
continue # Переходим к следующему репозиторию
if os.path.getsize(output_file) == 0: # Проверяем, является ли файл пустым
log.info(f"Файл {output_file} пуст. Обновляем данные.")
source_list_downloader(repo, tmp_path, short_name, output_file)
continue # Переходим к следующему репозиторию
last_modified_time = get_last_modified_time(output_file) # Получаем время последнего изменения файла
if last_modified_time is None: # Если время не удалось получить, пробуем обновить файл
log.info(f"Не удалось получить время последнего изменения для {output_file}. Попытаемся обновить.")
source_list_downloader(repo, tmp_path, short_name, output_file)
continue # Переходим к следующему репозиторию
current_time = time.time()
if current_time - last_modified_time >= 10800: # 10800 секунд = 3 часа, проверяем, устарел ли файл
log.info(f"Файл {output_file} устарел. Обновляем данные.")
source_list_downloader(repo, tmp_path, short_name, output_file)
else:
log.info(f"Файл {output_file} существует и был обновлён менее 3 часов назад. Используем кэшированные данные.")
def source_list_downloader(repo, tmp_path, short_name, output_file):
url = f"https://api.github.com/repos/{repo}/releases?per_page={count_wines}"
temp_file = os.path.join(tmp_path, f"{short_name}.tmp.new") # Временный файл
try:
response = requests.get(url)
response.raise_for_status() # Возбудим исключение для ошибок HTTP
releases = response.json()
log.debug(f"Ответ API: {releases}")
tar_files = [] # Проверяем каждый релиз
for release in releases:
assets = release.get('assets', [])
for asset in assets:
asset_name = asset['name']
if (
(re.search(r'(wine|proton)', asset_name, re.IGNORECASE) or
re.search(r'^GE-Proton\d+-\d+\.tar\.gz$', asset_name) or
re.search(r'^GE-Proton\d+(-\d+)?\.tar\.xz$', asset_name)) and
(asset_name.endswith('.tar.gz') or asset_name.endswith('.tar.xz'))
):
tar_files.append(asset_name) # Собираем все подходящие файлы
log.debug(f"Найденный файл: {asset_name}")
if not tar_files:
log.warning(f"Нет подходящих файлов в репозитории {repo}.")
return # Выходим из функции, если нет файлов
with open(temp_file, 'w') as file: # Записываем найденные файлы во временный файл
file.write("\n".join(tar_files))
log.info(f"Данные успешно записаны в временный файл {temp_file}.")
if os.path.exists(temp_file): # Если запись прошла успешно, заменяем основной файл
os.replace(temp_file, output_file)
log.info(f"Файл {output_file} успешно обновлен.")
except requests.exceptions.RequestException as e:
log.error(f"Ошибка при получении данных из {repo}: {str(e)}")
def get_sources(args, tmp_path, dist_path):
os.makedirs(tmp_path, exist_ok=True)
source_list_checker(tmp_path)
if args:
for arg in args:
for repo, short_name in repos.items(): # определяем короткое имя репозитория
tmp_file_path = os.path.join(tmp_path, f"{short_name}.tmp")
if os.path.exists(tmp_file_path): # проверяем наличие файла
with open(tmp_file_path, 'r') as file:
all_tar_gz_files = file.read().splitlines()
for file_to_download in all_tar_gz_files: # Ищем совпадение в файле
if arg in file_to_download:
log.info(f"Найдено совпадение для '{arg}': {file_to_download} в файле {tmp_file_path}")
url = f"https://github.com/{repo}/releases/latest/download/{file_to_download}" # Получаем URL файла
tmp_file = os.path.join(tmp_path, file_to_download)
if not os.path.exists(tmp_file):
try:
try_download(url, tmp_path)
unpack(tmp_file, dist_path)
try_remove_file(tmp_file)
except Exception as e:
log.error(f"Ошибка при загрузке или распаковке: {str(e)}")