Files
LGBot/src/modules/mute.py

365 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message, User, ChatPermissions
import asyncio
import logging
import time
from database import db
from config import COMMAND_MESSAGES
logger = logging.getLogger(__name__) # Получаем логгер для текущего модуля
# Возвращает количество секунд
def parse_mute_time(time_str: str) -> int:
# Парсим строку времени
time_str = time_str.strip().lower()
# Минуты
if time_str.endswith('m') or time_str.endswith('м'):
try:
minutes = int(time_str[:-1])
return abs(minutes) * 60
except:
return None
# Часы
elif time_str.endswith('h') or time_str.endswith('ч'):
try:
hours = int(time_str[:-1])
return abs(hours) * 3600
except:
return None
# Дни
elif time_str.endswith('d') or time_str.endswith('д'):
try:
days = int(time_str[:-1])
return abs(days) * 86400
except:
return None
return None
# Возвращает причину мута
def extract_reason(words: str) -> str:
if words == []:
reason = 'отсутствует'
else:
reason = ' '.join(words)
return reason
# Форматирует время в нормальный вид
def format_time(seconds: int) -> str:
# Для минут
minutes = seconds // 60
if minutes < 60:
if minutes % 10 == 1 and minutes % 100 != 11:
return f"{minutes} минута"
elif 2 <= minutes % 10 <= 4 and minutes % 100 not in (12, 13, 14):
return f"{minutes} минуты"
else:
return f"{minutes} минут"
# Для часов
hours = minutes // 60
if hours < 24:
if hours % 10 == 1 and hours % 100 != 11:
return f"{hours} час"
elif 2 <= hours % 10 <= 4 and hours % 100 not in (12, 13, 14):
return f"{hours} часа"
else:
return f"{hours} часов"
# Для дней
days = hours // 24
if days % 10 == 1 and days % 100 != 11:
return f"{days} день"
elif 2 <= days % 10 <= 4 and days % 100 not in (12, 13, 14):
return f"{days} дня"
else:
return f"{days} дней"
# Удаляет два последних сообщения
async def delete_messages(bot: AsyncTeleBot, message: Message, time_sleep: int):
await asyncio.sleep(time_sleep)
await bot.delete_message(message.chat.id, message.message_id)
await bot.delete_message(message.chat.id, message.message_id+1)
def register_handlers(bot: AsyncTeleBot): # Регистрирует все обработчики команд
@bot.message_handler(commands=['mute']) # Обработчик команды /mute
async def mute_command(message: Message):
# Отправка сообщения в тему или обычный чат
send_message = bot.reply_to if message.is_topic_message else bot.send_message
chat_id = message if message.is_topic_message else message.chat.id
# Определяем целевого пользователя
target_user = None
# Отпределяем время
time_arg = None
# Определяем причину
reason = None
# Разбиваем текст сообщения на части
parts_msg = message.text.split()
try:
# Проверяем, является ли отправитель администратором
try:
admin_status = await bot.get_chat_member(message.chat.id, message.from_user.id)
# Проверяем статус администратора (создателя)
if admin_status.status not in ['administrator', 'creator']:
await send_message(chat_id, COMMAND_MESSAGES['no_admin_rights'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
# Проверяем право администратора на мут
if admin_status.status == 'administrator' and not admin_status.can_restrict_members:
await send_message(chat_id, COMMAND_MESSAGES['no_restrict_rights'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
except Exception as e:
await send_message(chat_id, f"⚠️ Ошибка: {str(e)}")
logger.error(f"Ошибка при получении статуса администратора: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
# Выводим помощь
if len(parts_msg) == 2 and parts_msg[1].strip() in ['help', 'помощь']:
await send_message(chat_id, COMMAND_MESSAGES['manual_mute'])
# Удаляем сообщения через 30 секунд
await delete_messages(bot, message, 30)
return
# Если одно слово, то удаляем сообщение. Ошибка
if len(parts_msg) == 1:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
return
# Если два слово, то ответом на сообщение
elif len(parts_msg) == 2:
# Если это тема
if message.is_topic_message:
# Если без ответа на сообщение, ошибка
if message.message_thread_id == message.reply_to_message.message_id:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
return
# Если с ответом на сообщение
else:
target_user = message.reply_to_message.from_user
time_arg = parts_msg[1]
reason = extract_reason(parts_msg[2:])
# Если это обычный чат
elif message.reply_to_message and message.is_topic_message is None:
target_user = message.reply_to_message.from_user
time_arg = parts_msg[1]
reason = extract_reason(parts_msg[2:])
# Удаляем сообщение, если команда неправильная
else:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
return
else:
# Если второе слово это тег или ID
if parts_msg[1].strip().isdigit() or parts_msg[1].startswith('@'):
identifier = parts_msg[1].strip()
time_arg = parts_msg[2]
reason = extract_reason(parts_msg[3:])
print(identifier)
# Поиск по ID
if identifier.isdigit():
# Делаем в int и ищем
user_info = db.get_user(int(identifier))
if user_info:
# Создаем объект пользователя из данных базы
target_user = User(
id=user_info[0],
first_name=user_info[1],
username=user_info[2],
is_bot=False
)
# Поиск по тегу
elif identifier.startswith('@'):
# Убираем @ и ищем
user_info = db.get_user_by_username(identifier[1:])
if user_info:
# Создаем объект пользователя из данных базы
target_user = User(
id=user_info[0],
first_name=user_info[1],
username=user_info[2],
is_bot=False
)
else:
# Если это тема
if message.is_topic_message:
# Если без ответа на сообщение, ошибка
if message.message_thread_id == message.reply_to_message.message_id:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
return
# Если с ответом на сообщение
else:
target_user = message.reply_to_message.from_user
time_arg = parts_msg[1]
reason = extract_reason(parts_msg[2:])
# Если это обычный чат
elif message.reply_to_message and message.is_topic_message is None:
target_user = message.reply_to_message.from_user
time_arg = parts_msg[1]
reason = extract_reason(parts_msg[2:])
# Удаляем сообщение, если команда неправильная
else:
await asyncio.sleep(3)
await bot.delete_message(message.chat.id, message.message_id)
return
# Если пользователь не найден
if not target_user:
await send_message(chat_id, COMMAND_MESSAGES['user_not_found'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
# Парсинг времени мута
mute_seconds = parse_mute_time(time_arg)
if mute_seconds is None:
await send_message(chat_id, COMMAND_MESSAGES['incorrect_time_format'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
# Минимальный мут 1 минута (60 секунд)
if mute_seconds < 60:
await send_message(chat_id, COMMAND_MESSAGES['min_mute'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
# Максимальный мут 30 дней (2592000 секунд)
if mute_seconds > 2592000:
await send_message(chat_id, COMMAND_MESSAGES['max_mute'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
# Проверяем статус целевого пользователя
try:
target_status = await bot.get_chat_member(message.chat.id, target_user.id)
# Проверяем, является ли цель администратором или создателем
if target_status.status in ['administrator', 'creator']:
await send_message(chat_id, COMMAND_MESSAGES['cant_mute_admin'])
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
except Exception as e:
await send_message(chat_id, f"⚠️ Ошибка: {str(e)}")
logger.error(f"Ошибка при получении статуса пользователя: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
return
# Выполняем мут
try:
# Вычисляем время окончания мута
until_date = int(time.time()) + mute_seconds
# Устанавливаем ограничения (только чтение)
permissions = ChatPermissions(
can_send_messages=False,
can_send_media_messages=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False,
)
await bot.restrict_chat_member(
chat_id=message.chat.id,
user_id=target_user.id,
permissions=permissions,
until_date=until_date
)
# Форматирование времени
time_display = format_time(mute_seconds)
# Отправляем сообщения, что пользователь получил мут
await send_message(chat_id, COMMAND_MESSAGES['muted'].format(time_display=time_display))
logger.info(f"Пользователь {target_user.id} получил мут на {time_display} от администратора {message.from_user.id}.")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
except Exception as e:
await send_message(chat_id, f"⚠️ Ошибка: {str(e)}")
logger.error(f"Ошибка мута: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)
except Exception as e:
await send_message(chat_id, COMMAND_MESSAGES['general_error'])
logger.error(f"Общая ошибка в mute_command: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, 5)