from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message, ChatPermissions
import asyncio
import logging
import time
from database import db
from bad_words import contains_bad_word, get_bad_words_from_text
from action_reporter import action_reporter
from utils import delete_messages, format_mute_time
# Получаем логгер для текущего модуля
logger = logging.getLogger(__name__)
# Система прогрессирующих мутов (в секундах)
# Более плавная прогрессия для накопительного эффекта
MUTE_LEVELS = [
300, # 1. 5 минут (первое нарушение - символический мут)
900, # 2. 15 минут
1800, # 3. 30 минут
3600, # 4. 1 час
7200, # 5. 2 часа
14400, # 6. 4 часа
28800, # 7. 8 часов
43200, # 8. 12 часов
86400, # 9. 1 день
172800, # 10. 2 дня
259200, # 11. 3 дня
432000, # 12. 5 дней
604800, # 13. 7 дней
None, # 16. Перманентный мут (режим только чтения навсегда)
]
# Период для подсчета нарушений (30 дней в секундах)
VIOLATIONS_PERIOD = 2592000
def get_mute_duration(violations_count: int) -> int:
"""
Определяет длительность мута на основе количества нарушений.
Args:
violations_count: Количество нарушений пользователя
Returns:
Длительность мута в секундах (или None для перманентного мута)
"""
if violations_count < 1:
return MUTE_LEVELS[0]
# Индекс уровня мута (количество нарушений - 1, т.к. начинаем с 0)
level_index = violations_count - 1
# Если превысили количество уровней, возвращаем перманентный мут
if level_index >= len(MUTE_LEVELS):
return None
return MUTE_LEVELS[level_index]
async def apply_mute(bot: AsyncTeleBot, message: Message, user_id: int, duration: int, violations_count: int):
"""
Применяет мут к пользователю.
Args:
bot: Экземпляр бота
message: Сообщение, которое вызвало мут
user_id: ID пользователя
duration: Длительность мута в секундах (None для перманентного)
violations_count: Количество нарушений
"""
try:
# Устанавливаем ограничения (только чтение)
permissions = ChatPermissions(
can_send_messages=False,
can_send_media_messages=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False,
)
# Вычисляем время окончания мута
until_date = None if duration is None else int(time.time()) + duration
# Выполняем мут
await bot.restrict_chat_member(
chat_id=message.chat.id,
user_id=user_id,
permissions=permissions,
until_date=until_date
)
# Удаляем сообщение с матом
try:
await bot.delete_message(chat_id=message.chat.id, message_id=message.message_id)
except Exception as e:
logger.warning(f"Не удалось удалить сообщение: {e}")
# Формируем сообщение о муте
if duration is None:
time_display = "навсегда"
warning_msg = (
f"⛔️ Пользователь {message.from_user.first_name} получил перманентный мут "
f"за злостное нарушение правил чата (использование нецензурной лексики).\n\n"
f"📊 Количество нарушений: {violations_count}\n"
f"🔒 Режим: только чтение (навсегда)"
)
else:
time_display = format_mute_time(duration)
warning_msg = (
f"⚠️ Пользователь {message.from_user.first_name} получил мут на {time_display} "
f"за использование нецензурной лексики.\n\n"
f"📊 Нарушение #{violations_count}\n"
f"💡 При повторных нарушениях время мута будет увеличиваться."
)
# Отправляем сообщение в чат
await bot.send_message(
chat_id=message.chat.id,
text=warning_msg,
message_thread_id=message.message_thread_id,
)
# Отправляем сообщение-лог в админ-чат
await action_reporter.log_action(
action="АВТОМУТ",
user_id=user_id,
admin_id=None, # Автоматическое действие
reason=f"Использование нецензурной лексики (нарушение #{violations_count})",
duration=time_display,
)
# Записываем действие в логи
logger.info(
f"Пользователь {user_id} получил автоматический мут на {time_display} "
f"за нецензурную лексику (нарушение #{violations_count})"
)
except Exception as e:
logger.error(f"Ошибка при применении мута: {e}")
async def check_message_for_profanity(bot: AsyncTeleBot, message: Message):
"""
Проверяет сообщение на наличие бранных слов и применяет мут при необходимости.
Args:
bot: Экземпляр бота
message: Сообщение для проверки
"""
# Проверяем только текстовые сообщения
if not message.text:
return
# Не проверяем команды
if message.text.startswith('/'):
return
# Проверяем, содержит ли сообщение бранные слова
if not contains_bad_word(message.text):
return
# Получаем ID пользователя и чата
user_id = message.from_user.id
chat_id = message.chat.id
# Проверяем, является ли отправитель администратором
try:
chat_member = await bot.get_chat_member(chat_id, user_id)
if chat_member.status in ['administrator', 'creator']:
logger.info(f"Администратор {user_id} использовал нецензурную лексику, мут не применен")
return
except Exception as e:
logger.error(f"Ошибка проверки статуса пользователя: {e}")
return
# Добавляем нарушение в базу данных
db.add_violation(user_id, chat_id, violation_type='bad_language')
# Получаем количество нарушений за последний месяц
violations_count = db.get_violations_count(user_id, chat_id, VIOLATIONS_PERIOD)
# Определяем длительность мута
mute_duration = get_mute_duration(violations_count)
# Применяем мут
await apply_mute(bot, message, user_id, mute_duration, violations_count)
def register_handlers(bot: AsyncTeleBot):
"""
Регистрирует обработчики для автоматического мута.
"""
# Обработчик всех текстовых сообщений (кроме команд)
@bot.message_handler(func=lambda message: message.text and not message.text.startswith('/') and message.chat.type in ['group', 'supergroup'])
async def handle_text_message(message: Message):
await check_message_for_profanity(bot, message)
logger.info("Модуль автоматического мута успешно загружен")