added utils.py

1. Добавил message_thread_id для всех команд, убрав костыль, который отвечал за корректную отправку
   сообщений в топик.
2. Функции (определение администратора, удаление сообщений) вынес в utils.py, от куда они будут вызываться в командах. Модули стали более читаемы из-за уменьшения количества строк кода в них.
3. Дописал manual_unban и добавил error в config.py
4. Оптимизация
This commit is contained in:
2025-07-28 21:03:28 +03:00
parent d73c0079f0
commit 853bfcdbb4
8 changed files with 692 additions and 492 deletions

View File

@@ -5,27 +5,16 @@ import logging
from database import db
from action_reporter import action_reporter
from utils import (
delete_messages,
check_admin_status,
check_target_status,
)
from config import COMMAND_MESSAGES
logger = logging.getLogger(__name__) # Получаем логгер для текущего модуля
# Возвращает причину бана
def extract_reason(words: str) -> str:
if words == []:
reason = 'отсутствует'
else:
reason = ' '.join(words)
return reason
# Удаляет два последних сообщения
async def delete_messages(bot: AsyncTeleBot, message: Message, time_sleep: int):
await asyncio.sleep(time_sleep)
await bot.delete_message(message.chat.id, message.message_id)
await bot.delete_message(message.chat.id, message.message_id+1)
# Регистрирует все обработчики команд
def register_handlers(bot: AsyncTeleBot):
@@ -37,10 +26,6 @@ def register_handlers(bot: AsyncTeleBot):
# Основная функция команды /ban
async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = None):
# Отправка сообщения в тему или обычный чат
send_message = bot.reply_to if message.is_topic_message else bot.send_message
chat_id = message if message.is_topic_message else message.chat.id
# Определяем целевого пользователя
target_user = None
@@ -50,91 +35,81 @@ async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = Non
# Разбиваем текст сообщения на части
parts_msg = message.text.split()
# Выводим помощь (/ban help)
if len(parts_msg) == 2 and parts_msg[1].strip() in ['help', 'помощь']:
await send_message(chat_id, COMMAND_MESSAGES['manual_ban'])
# Команда /ban help
if len(parts_msg) == 2 and parts_msg[1].strip() in ('help', 'помощь'):
# Отправляем инструкцию
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['manual_ban'],
message_thread_id=message.message_thread_id,
)
# Удаляем сообщения через 30 секунд
await delete_messages(bot, message, 30)
await delete_messages(bot, message, time_sleep=30, number_message=2)
return
try:
# Проверяем, является ли отправитель администратором
try:
admin_status = await bot.get_chat_member(message.chat.id, message.from_user.id)
# Проверяем статус администратора (создателя)
if admin_status.status not in ['administrator', 'creator']:
await send_message(chat_id, COMMAND_MESSAGES['no_admin_rights'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
# Проверяем право администратора на бан
if admin_status.status == 'administrator' and not admin_status.can_restrict_members:
await send_message(chat_id, COMMAND_MESSAGES['no_restrict_rights'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
except Exception as e:
await send_message(chat_id, f"⚠️ Ошибка: {str(e)}")
logger.error(f"Ошибка при получении статуса администратора: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
if await check_admin_status(bot, message) == 1:
return
# Если одно слово, то ответом на сообщение
# Команда через ответ на сообщение, если одно слово (/ban)
if len(parts_msg) == 1:
# Если это тема
# Если это топик
if message.is_topic_message:
# Если без ответа на сообщение, ошибка
# Если без ответа на сообщение
if message.message_thread_id == message.reply_to_message.message_id:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
# Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1)
return
# Если с ответом на сообщение
else:
target_user = message.reply_to_message.from_user
reason = extract_reason(parts_msg[1:])
# Если это обычный чат
# Собираем данные
target_user = message.reply_to_message.from_user
reason = 'отсутствует'
# Если это General (обычный чат)
elif message.reply_to_message and message.is_topic_message is None:
# Собираем данные
target_user = message.reply_to_message.from_user
reason = extract_reason(parts_msg[1:])
reason = 'отсутствует'
message.message_thread_id = None
# Удаляем сообщение, если команда неправильная
# Если команда неправильная
else:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
# Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1)
return
# В сообщении больше одного слова
else:
# Если второе слово это тег или ID
if parts_msg[1].strip().isdigit() or parts_msg[1].startswith('@'):
identifier = parts_msg[1].strip()
reason = extract_reason(parts_msg[2:])
# Поиск по ID
# Собираем данные
identifier = parts_msg[1].strip()
reason = ' '.join(parts_msg[2:]) if parts_msg[2:] != [] else 'отсутствует'
# Делаем поиск по ID
if identifier.isdigit():
# Делаем в int и ищем
# Ищем пользователя в базе данных
user_info = db.get_user(int(identifier))
# Если нашли пользователя
if user_info:
# Создаем объект пользователя из данных базы
# Создаем объект пользователя
target_user = User(
id=user_info[0],
first_name=user_info[1],
@@ -142,14 +117,16 @@ async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = Non
is_bot=False
)
# Поиск по тэгу
# Делаем поиск по тегу
elif identifier.startswith('@'):
# Убираем @ и ищем
# Ищем пользователя в базе данных (убрали @)
user_info = db.get_user_by_username(identifier[1:])
# Если нашли пользователя
if user_info:
# Создаем объект пользователя из данных базы
# Создаем объект пользователя
target_user = User(
id=user_info[0],
first_name=user_info[1],
@@ -157,67 +134,66 @@ async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = Non
is_bot=False
)
# Команда через ответ на сообщение с указанием причины
else:
# Если это тема
# Если это топик
if message.is_topic_message:
# Если без ответа на сообщение, ошибка
# Если без ответа на сообщение
if message.message_thread_id == message.reply_to_message.message_id:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
# Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1)
return
# Если с ответом на сообщение
else:
target_user = message.reply_to_message.from_user
reason = extract_reason(parts_msg[1:])
# Если это обычный чат
# Собираем данные
target_user = message.reply_to_message.from_user
reason = ' '.join(parts_msg[1:])
# Если это General (обычный чат)
elif message.reply_to_message and message.is_topic_message is None:
# Собираем данные
target_user = message.reply_to_message.from_user
reason = extract_reason(parts_msg[1:])
reason = ' '.join(parts_msg[1:])
message.message_thread_id = None
# Удаляем сообщение, если команда неправильная
# Если команда неправильная
else:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
# Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1)
return
# Если пользователь не найден
# Если пользователь не найден в базе данных
if not target_user:
await send_message(chat_id, COMMAND_MESSAGES['user_not_found'])
# Отправляем предупреждение
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['user_not_found'],
message_thread_id=message.message_thread_id,
)
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
await delete_messages(bot, message, time_sleep=5, number_message=2)
return
# Проверяем статус целевого пользователя
try:
target_status = await bot.get_chat_member(message.chat.id, target_user.id)
# Проверяем, является ли цель администратором или создателем
if target_status.status in ['administrator', 'creator']:
await send_message(chat_id, COMMAND_MESSAGES['cant_ban_admin'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
except Exception as e:
await send_message(chat_id, f"⚠️ Ошибка: {str(e)}")
logger.error(f"Ошибка при получении статуса пользователя: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
if await check_target_status(bot, message, target_user) == 1:
return
# Выполняем бан
try:
await bot.ban_chat_member(message.chat.id, target_user.id)
# Выполняем бан
await bot.ban_chat_member(
chat_id=message.chat.id,
user_id=target_user.id,
)
# Отправляем лог в админ-чат
await action_reporter.log_action(
@@ -226,28 +202,48 @@ async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = Non
admin_id=message.from_user.id,
reason=reason,
duration=None,
photo_path=photo_path
photo_path=photo_path,
)
# Отправляем сообщения, что пользователь получил бан
await send_message(chat_id, COMMAND_MESSAGES['banned'])
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['banned'],
message_thread_id=message.message_thread_id,
)
# Записываем действие в логи
logger.info(f"Пользователь {target_user.id} забанен администратором {message.from_user.id}.")
# Удаляем сообщения через 5 секунд
await asyncio.sleep(5)
await bot.delete_message(message.chat.id, message.message_id)
await bot.delete_message(message.chat.id, message.message_id+2)
await delete_messages(bot, message, time_sleep=5, number_message=2)
except Exception as e:
await send_message(chat_id, f"⚠️ Ошибка: {str(e)}")
# Отправляем ошибку
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['error'].format(e=str(e)),
message_thread_id=message.message_thread_id,
)
# Записываем ошибку в логи
logger.error(f"Ошибка бана: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
await delete_messages(bot, message, time_sleep=5, number_message=2)
except Exception as e:
await send_message(chat_id, COMMAND_MESSAGES['general_error'])
# Отправляем ошибку
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['general_error'],
message_thread_id=message.message_thread_id,
)
# Записываем ошибку в логи
logger.error(f"Общая ошибка в ban_command: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
await delete_messages(bot, message, time_sleep=5, number_message=2)