added modules

This commit is contained in:
Mikhail Tergoev 2025-03-04 18:55:47 +03:00
parent 2876c73735
commit c8cff90b3b
5 changed files with 256 additions and 0 deletions

47
modules/downloader.py Executable file

@ -0,0 +1,47 @@
import os
import requests
from tqdm import tqdm
from .log import *
from .files_worker import *
def try_download(url, save_path=None):
"""
Скачивает файл по указанному URL с отображением прогресса.
:param url: URL файла для скачивания.
:param save_path: Путь для сохранения файла. Если None или директория, то используется имя файла из URL.
"""
try:
# Отправляем GET-запрос на скачивание файла
response = requests.get(url, stream=True)
response.raise_for_status() # Проверяем, что запрос успешен
# Определяем имя файла, если save_path не указан
if save_path is None:
save_path = os.path.basename(url)
elif os.path.isdir(save_path):
save_path = save_path + "/" + os.path.basename(url)
# Получаем общий размер файла
total_size = int(response.headers.get("content-length", 0))
# Открываем файл для записи в бинарном режиме
with open(save_path, "wb") as file, tqdm(
desc=save_path, # Описание для прогресс-бара
total=total_size, # Общий размер файла
unit="B", # Единица измерения (байты)
unit_scale=True, # Масштабирование единиц (KB, MB и т.д.)
unit_divisor=1024, # Делитель для масштабирования (1024 для KB, MB)
) as progress_bar:
# Читаем данные по частям и записываем в файл
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
progress_bar.update(len(chunk)) # Обновляем прогресс-бар
log.info(f"Файл успешно скачан и сохранён как {save_path}.")
except requests.exceptions.RequestException as e:
log.error(f"Ошибка при скачивании файла: {e}")
except Exception as e:
log.error(f"Неизвестная ошибка: {e}")
except KeyboardInterrupt as e:
log.error(f"Прервано пользователем: {e}")

43
modules/env_var.py Executable file

@ -0,0 +1,43 @@
import os
from .log import *
# функции обработки переменных LINUX окружения
def print_env_var(var_name):
if var_name in os.environ:
value = os.environ[var_name]
log.info(f"Переменная {var_name}={value}")
else:
log.warning(f"Переменная {var_name} не определена")
def check_and_set_env_var(var_name, default_value):
if var_name not in os.environ or not os.environ[var_name]:
os.environ[var_name] = default_value
def add_to_env_var(var_name, separator, value):
if var_name not in os.environ:
os.environ[var_name] = value
else:
current_value = os.environ.get(var_name)
s = separator
if s + value + s not in s + current_value + s:
new_value = f"{current_value}{separator}{value}"
os.environ[var_name] = new_value
def rm_from_env_var(var_name, separator, value):
current_value = os.environ.get(var_name)
if value in current_value.split(separator):
new_value = separator.join([v for v in current_value.split(separator) if v != value])
os.environ[var_name] = new_value
def env_var(var_name, value):
match var_name:
case "WINEDLLOVERRIDES":
add_to_env_var("WINEDLLOVERRIDES", ";", value)
case "VKD3D_CONFIG":
add_to_env_var("VKD3D_CONFIG", ";", value)
case "RADV_PERFTEST":
add_to_env_var("RADV_PERFTEST", ";", value)
case "PW_VK_INSTANCE_LAYERS":
add_to_env_var("PW_VK_INSTANCE_LAYERS", ":", value)
case "LD_LIBRARY_PATH":
add_to_env_var("LD_LIBRARY_PATH", ":", value)

117
modules/files_worker.py Executable file

@ -0,0 +1,117 @@
import os
import shutil
import filecmp
import tarfile
import hashlib
from .log import *
def try_copy_file(source, destination): # функция копирования если файлы различаются
if not os.path.exists(source):
raise FileNotFoundError (f"file not found for copy: {source}")
return False
if os.path.exists(destination):
if filecmp.cmp(source, destination, shallow=False):
return True
if shutil.copy2(source, destination):
return True
else:
return False
def try_force_link_file(source, link):
if not os.path.exists(source):
raise FileNotFoundError (f"file not found for link: {source}")
return False
try:
if os.path.exists(link) or os.path.islink(link):
os.remove(link)
os.symlink(source, link)
except Exception as e:
print(f"failed to create link for file: {e}")
def try_remove_file(path):
if os.path.exists(file_path) and os.path.isfile(file_path):
try:
os.remove(file_path)
except Exception as e:
log.error(f"failed to remove file: {e}")
def create_new_dir(path):
if not os.path.exists(path):
try:
os.makedirs(path)
except Exception as e:
log.error(f"failed to create directory: {e}")
def try_force_link_dir(path, link):
if not os.path.exists(path):
raise FileNotFoundError (f"directory not found for link: {path}")
return False
try:
if os.path.exists(link) or os.path.islink(link):
os.remove(link)
os.symlink(path, link)
except Exception as e:
print(f"failed to create link for file: {e}")
def try_remove_dir(path):
if os.path.exist(path) and os.path.isdir(path):
try:
shutil.rmtree(path)
except Exception as e:
log.error(f"failed to remove directory: {e}")
def unpack(archive_path, extract_to=None):
try:
if extract_to is None:
# TODO: перенести распаковку по умолчанию в tmp
extract_to = os.path.dirname(archive_path)
elif not os.exists.isdir(extract_to):
create_new_dir(extract_to)
with tarfile.open(archive_path, mode="r:*") as tar:
tar.extractall(path=extract_to)
full_path = os.path.realpath(extract_to)
log.info(f"Архив {archive_path} успешно распакован в {full_path}")
except tarfile.TarError as e:
log.error(f"Ошибка при распаковке архива {archive_path}: {e}")
except Exception as e:
log.error(f"Неизвестная ошибка: {e}")
def check_hash_sum(check_file, check_sum):
if check_sum and isinstance(check_sum, str):
true_hash = check_sum
elif os.path.isfile(check_sum):
try:
with open(check_sum, "r", encoding="utf-8") as file:
first_line = file.readline().strip()
elements = first_line.split()
if elements:
true_hash = elements[0]
else:
log.error(f"Первая строка файла {check_sum} пуста.")
except FileNotFoundError:
log.error(f"Файл {check_sum} не найден.")
except Exception as e:
log.error(f"Ошибка при чтении файла: {e}")
else:
log.error(f"Verification sha256sum was failed: {check_file}")
with open(check_file,"rb") as f:
bytes = f.read() # read entire file as bytes
check_file_hash = hashlib.sha256(bytes).hexdigest()
if true_hash == check_file_hash:
log.info("Verification sha256sum was successfully.")
return True
else:
log.error("Verification sha256sum was failed.")
return False

39
modules/log.py Executable file

@ -0,0 +1,39 @@
import logging
import sys
class ColoredFormatter(logging.Formatter):
# ANSI escape sequences for colors
COLORS = {
'DEBUG': '\033[35m', # Purple
'INFO': '\033[36m', # Green
'WARNING': '\033[33m', # Yellow
'ERROR': '\033[31m', # Red
'CRITICAL': '\033[41m', # Red background
}
RESET = '\033[0m' # Reset to default color
def format(self, record):
color = self.COLORS.get(record.levelname, self.RESET)
message = super().format(record)
formatted_message = f"{color}{message}{self.RESET}"
if record.levelname == 'CRITICAL':
print(formatted_message)
sys.exit(1)
return formatted_message
# Настраиваем логирование
log = logging.getLogger()
# TODO: добавить case с переменной для управление уровнем
log.setLevel(logging.DEBUG)
# Создаем консольный обработчик
handler = logging.StreamHandler()
handler.setFormatter(ColoredFormatter('%(levelname)s: %(message)s'))
# Создаем файловый обработчик
# TODO: добавить условие для управления перемееной пути сохранения лога
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
log.addHandler(file_handler)
log.addHandler(handler)

10
test.py Executable file

@ -0,0 +1,10 @@
#!/usr/bin/env python3
from modules.log import *
from modules.env_var import *
from modules.files_worker import *
from modules.downloader import *
if __name__ == "__main__":
log.info("Привет мир!")