Compare commits

50 Commits

Author SHA1 Message Date
0bd399f121 Исправление синтаксической ошибки 2025-10-20 09:23:50 +03:00
c257c6c1a2 Исправление логики работы кармы
Добавление дополнительных обработчиков ошибок в работе кармы
Убрано кэширование сообщений от ботов3
2025-10-20 09:19:04 +03:00
c7b2961ae1 Убран медленный режим 2025-10-20 01:33:17 +03:00
8af2f128a7 Исправление дополнительное логирование /karma 2025-10-19 19:58:37 +03:00
4aba68d242 Исправление с границами слов
Исправление команды карма с юзернеймом
2025-10-19 19:50:32 +03:00
8bf512e509 Изменение увеличение кэша до 7 дней 2025-10-19 19:29:36 +03:00
459ed66e9a Изменение кэширования сообщений 2025-10-19 19:22:07 +03:00
de1c82c267 Исправления зависания 2 2025-10-19 19:08:52 +03:00
4a2aa00eb7 Исправления зависания 2025-10-19 18:42:10 +03:00
c4400fc244 Изменение логики медленного режима 2025-10-19 17:37:52 +03:00
b7f09ae719 Добавление ещё нескольких эмодзи 2025-10-19 14:54:46 +03:00
1619e82df1 Исправление канала с отправкой уведомления о карме в правильный канал 2025-10-19 14:32:55 +03:00
0ee7cb3bd4 Изменение логики эмодзи кармы как переключатель 2025-10-19 14:28:01 +03:00
9b11f21bc1 Добавление уведомления за карму от эмодзи 2025-10-19 14:18:59 +03:00
2b9e819944 Добавление команды установки кармы 2025-10-19 14:14:35 +03:00
58daea0492 Добавление логирования кэширования 2025-10-19 14:05:21 +03:00
63ac924a3d Добавление медленного режима в зависимости от кармы 2025-10-19 13:56:21 +03:00
be64915e9b Добавление кармы за реакцию большого пальца 2025-10-19 13:34:56 +03:00
61e9d31a75 Исключение ложного срабатывания при благодарности без ответа 2025-10-19 13:27:44 +03:00
6bdf996ca4 Увеличение времени на отображения сообщения с результатом увеличения кармы
Увеличение очков кармы за благодарность с !
2025-10-19 13:21:19 +03:00
c07a082694 Увеличение времени на отображения сообщения с результатом увеличения кармы 2025-10-19 13:18:23 +03:00
6b41e61d7e Исправление конкуренции обработчиков 2025-10-19 13:16:46 +03:00
1595acb4bb Улучшение обнаружения мата 2025-10-19 13:08:26 +03:00
44a8b54ddc Исправления критикал проблем 2025-10-19 12:52:54 +03:00
50d137ffc8 Исправление по карме 1 2025-10-19 12:21:19 +03:00
1e43cb6c98 Добавление логирования для дебага 2025-10-19 01:29:58 +03:00
15ac2fdb07 Добавление исправление ошибки работы кармы 2025-10-18 21:34:40 +03:00
a16b5322af Добавление возможности использования кармы 2025-10-18 21:17:41 +03:00
47cf59f710 Добавление команды /warn 2025-10-18 13:57:30 +03:00
2709c8ce40 Исправление определения пользователя для сброса нарушений 2025-10-18 13:41:35 +03:00
ab80af2744 Исправление определения прав 2025-10-18 13:27:05 +03:00
383abb0235 Исправление обработки прав администратора
Изменение сообщений о банах
2025-10-18 13:21:47 +03:00
295866ed2d Добавление обработки ошибок и валидации 2025-10-18 13:14:23 +03:00
ff397dc496 Исправления для работы в супергруппе 2 2025-10-13 14:55:37 +03:00
e0886fcfa8 Исправления для работы в супергруппе 1 2025-10-13 14:51:23 +03:00
313389d2bf Исправления для работы в супергруппе 2025-10-13 14:47:23 +03:00
7cdb7086a3 Исправление обработки ошибки админских сообщений 2 2025-10-13 14:35:53 +03:00
98d171dcf8 Исправление обработки ошибки админских сообщений 1 2025-10-13 14:34:03 +03:00
d8385d73f2 Исправление обработки ошибки админских сообщений 2025-10-13 14:31:06 +03:00
0426ae59cc исправление импорта 2025-10-13 14:25:16 +03:00
69d55a68b9 исправление работы списков слов
добавление сброса счётчиков нарушений
2025-10-13 14:20:18 +03:00
ab10879c06 исправление работы списков слов 2025-10-13 14:05:47 +03:00
1d32ec94e5 Исправление автомута 2025-10-12 16:33:28 +03:00
7ce23f8142 Добавлено логирование всех входящих сообщений в middleware 2025-10-12 16:28:20 +03:00
96da8266e9 Добавлено логирование в badwords_manager 2025-10-12 16:21:16 +03:00
80d1a3994c Исправление описаний 2025-10-12 16:11:27 +03:00
f6b888c6d3 Добавление автомута 2025-10-12 16:00:02 +03:00
643ad9feda Добавление скрипта обновления 2025-10-12 12:23:13 +03:00
f7b3866b6a Добавление скрипта обновления 2025-10-12 12:20:47 +03:00
ebe2b2c0fd Добавление подсказок по командом
Добавление сообщений о создании лога
2025-10-12 12:11:41 +03:00
29 changed files with 3248 additions and 99 deletions

View File

@@ -1,4 +1,3 @@
BOT_TOKEN = "..." # Токен бота получать у @BotFather BOT_TOKEN = "..." # Токен бота получать у @BotFather
ADMIN_CHAT_ID = -1001111111111 # ID админ-чата получать у @username_to_id_bot ADMIN_CHAT_ID = -1001111111111 # ID админ-чата получать у @username_to_id_bot
LOG_THREAD_ID = 2 # ID топика, брать из ссылки сообщения LOG_THREAD_ID = 2 # ID топика, брать из ссылки сообщения
ADMIN_IDS = "11111,22222" # ID администраторов получать у @username_to_id_bot

29
.gitignore vendored
View File

@@ -1,6 +1,33 @@
# Python
.venv/ .venv/
.env .env
__pycache__/ __pycache__/
*.pyc
*.pyo
*.pyd
*.db
*.sqlite
# IDE
.idea/ .idea/
.gigaide/
.claude/
.vscode/
*.iml
# Build
target/
build/
dist/
*.class
# Logs
bot.log bot.log
users.db *.log
# Databases
users.db
# OS
.DS_Store
Thumbs.db

181
README.md
View File

@@ -26,4 +26,183 @@ pip install -r requirements.txt
python src/main.py python src/main.py
``` ```
> Используется Python 3.11.0 > Используется Python 3.11.0
### Обновление на сервере
Для обновления бота на продакшн-сервере используйте скрипт `update.sh`:
```sh
./update.sh
```
Скрипт автоматически:
- Проверит наличие обновлений в git-репозитории
- Загрузит изменения (`git pull`)
- Перезапустит службу бота (`systemctl restart LGBot.service`)
- Покажет статус работы бота
---
## Система автоматического мьюта
Бот автоматически отслеживает использование нецензурной лексики и применяет прогрессирующие наказания.
### Как работает
1. **Обнаружение нарушений:**
- Каждое текстовое сообщение проверяется на наличие бранных слов
- Проверяются только группы и супергруппы (не личные сообщения)
- Команды (начинающиеся с `/`) не проверяются
2. **При обнаружении мата:**
- Сообщение удаляется автоматически
- Нарушение фиксируется в базе данных
- Пользователь получает мут соответствующего уровня
- Уведомление отправляется в чат и админ-чат
3. **Прогрессирующие наказания:**
| № нарушения | Длительность мута | № нарушения | Длительность мута |
|-------------|-------------------|-------------|-------------------|
| 1 | 5 минут | 9 | 1 день |
| 2 | 15 минут | 10 | 2 дня |
| 3 | 30 минут | 11 | 3 дня |
| 4 | 1 час | 12 | 5 дней |
| 5 | 2 часа | 13 | 7 дней |
| 8 | 12 часов | 16+ | **НАВСЕГДА** |
4. **Накопительный эффект:**
- Нарушения учитываются за последние **30 дней**
- Старые нарушения автоматически удаляются из базы
- Администраторы **освобождены** от автоматических мутов
### Управление списком бранных слов
⚠️ **Все команды доступны только администраторам чата**
#### Основные команды
```bash
# Показать справку
/badwords help
# Показать список бранных слов (первые 50)
/badwords list
# Статистика
/badwords count
# Добавить слово в список
/badwords add <слово>
# Удалить слово из списка
/badwords remove <слово>
```
#### Исключения
Исключения — слова, содержащие бранные корни, но не являющиеся матом (например: "республика", "документ"):
```bash
# Показать исключения
/badwords exceptions
# Добавить исключение
/badwords add_exception <слово>
# Удалить исключение
/badwords remove_exception <слово>
```
#### Служебные команды
```bash
# Перезагрузить списки из файла
/badwords reload
```
### Рекомендации
1. **Используйте корни слов**, а не полные формы:
- ✅ Правильно: `ебал` (поймает все формы)
- ❌ Неправильно: `ебала` (пропустит другие формы)
2. **Избегайте коротких корней** (могут давать ложные срабатывания)
3. **Тестируйте после добавления** нового слова
### Хранение данных
Все списки хранятся в файле `src/data/bad_words.json`:
```json
{
"bad_words": ["слово1", "слово2", ...],
"exceptions": [сключение1", сключение2", ...]
}
```
Изменения через команды применяются **немедленно**, перезапуск бота не требуется.
### Логирование
Все действия записываются в:
- **bot.log** - файл логов
- **Админ-чат** - уведомления о мутах
Примеры логов:
```
[INFO] Пользователь 123456789 получил автоматический мут на 5 минут за нецензурную лексику (нарушение #1)
[INFO] Нарушение пользователя 123456789 зафиксировано в чате -100123456789
[INFO] Администратор 987654321 добавил бранное слово: спам
```
### База данных
Таблица `violations` хранит все нарушения:
- `id` - уникальный идентификатор
- `user_id` - ID пользователя
- `chat_id` - ID чата
- `violation_date` - время нарушения (unix timestamp)
- `violation_type` - тип нарушения ('bad_language')
### Настройка системы
#### Изменение уровней мутов
Откройте `src/modules/auto_mute.py` и измените массив `MUTE_LEVELS`:
```python
MUTE_LEVELS = [
300, # 1. 5 минут
900, # 2. 15 минут
# ... добавьте или измените уровни
None, # Перманентный мут
]
```
#### Изменение периода накопления
Измените `VIOLATIONS_PERIOD` в `src/modules/auto_mute.py`:
```python
VIOLATIONS_PERIOD = 2592000 # 30 дней в секундах
```
### Требования
- Python 3.7+
- pyTelegramBotAPI (telebot)
- SQLite3
- Права бота в чате:
- Удаление сообщений
- Ограничение пользователей
### Устранение проблем
При возникновении проблем проверьте:
1. Логи бота в файле `bot.log`
2. Права бота в чате (удаление сообщений, ограничение пользователей)
3. Корректность настроек в `.env`
4. Наличие файла `src/data/bad_words.json`

View File

@@ -5,7 +5,6 @@ asyncio==3.4.3
attrs==25.3.0 attrs==25.3.0
certifi==2025.6.15 certifi==2025.6.15
charset-normalizer==3.4.2 charset-normalizer==3.4.2
dotenv==0.9.9
frozenlist==1.7.0 frozenlist==1.7.0
idna==3.10 idna==3.10
multidict==6.6.3 multidict==6.6.3

View File

@@ -1,5 +1,6 @@
from telebot.async_telebot import AsyncTeleBot from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message from telebot.types import Message
from typing import Optional
import logging import logging
import os import os
from database import db from database import db
@@ -26,11 +27,18 @@ class ActionReporter:
if tag: if tag:
text += f"• Tag: <code>@{tag}</code>\n" text += f"• Tag: <code>@{tag}</code>\n"
text += f"• ID: <code>{user_id}</code>" text += f"• ID: <code>{user_id}</code>"
else:
# Пользователь не найден в БД
text = f"👤 <b>Пользователь:</b>\n• ID: <code>{user_id}</code>"
return text return text
# Получает информацию об администраторе # Получает информацию об администраторе
async def _get_admin_info(self, admin_id: int) -> str: async def _get_admin_info(self, admin_id: Optional[int]) -> str:
# Если админ не указан (автоматическое действие)
if admin_id is None:
return "🤖 <b>Администратор:</b> Автоматическое действие"
admin_info = db.get_user(admin_id) admin_info = db.get_user(admin_id)
if admin_info: if admin_info:
@@ -44,11 +52,14 @@ class ActionReporter:
if tag: if tag:
text += f"• Tag: <code>@{tag}</code>\n" text += f"• Tag: <code>@{tag}</code>\n"
text += f"• ID: <code>{admin_id}</code>" text += f"• ID: <code>{admin_id}</code>"
else:
# Администратор не найден в БД
text = f"🛡 <b>Администратор:</b>\n• ID: <code>{admin_id}</code>"
return text return text
# Отправляет лог действия в админ-чат # Отправляет лог действия в админ-чат
async def log_action(self, action: str, user_id: int, admin_id: int, reason: str, duration: str, photo_path: str): async def log_action(self, action: str, user_id: int, admin_id: Optional[int], reason: str, duration: str, photo_path: Optional[str] = None):
try: try:
# Получаем информацию о пользователе и администраторе # Получаем информацию о пользователе и администраторе

326
src/bad_words.py Normal file
View File

@@ -0,0 +1,326 @@
# Система управления бранными словами
# Список слов хранится в JSON файле для возможности управления через команды
import json
import os
import logging
import re
logger = logging.getLogger(__name__)
# Путь к файлу с бранными словами
BAD_WORDS_FILE = os.path.join(os.path.dirname(__file__), 'data', 'bad_words.json')
# Кэш для загруженных слов
_bad_words_cache = None
_exceptions_cache = None
_whole_word_patterns_cache = None
_contains_patterns_cache = None
def load_bad_words():
"""
Загружает список бранных слов из JSON файла.
Поддерживает как новый формат (patterns), так и старый (bad_words).
Returns:
tuple: (список бранных слов, список исключений)
"""
global _bad_words_cache, _exceptions_cache, _whole_word_patterns_cache, _contains_patterns_cache
try:
with open(BAD_WORDS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# Проверяем, какой формат используется
if 'patterns' in data:
# Новый формат с паттернами
patterns = data['patterns']
_whole_word_patterns_cache = patterns.get('whole_word', [])
_contains_patterns_cache = patterns.get('contains', [])
_exceptions_cache = data.get('exceptions', [])
# Для обратной совместимости объединяем в один список
_bad_words_cache = _whole_word_patterns_cache + _contains_patterns_cache
logger.info(
f"Загружено паттернов: {len(_whole_word_patterns_cache)} whole_word, "
f"{len(_contains_patterns_cache)} contains, {len(_exceptions_cache)} исключений"
)
else:
# Старый формат для обратной совместимости
_bad_words_cache = data.get('bad_words', [])
_exceptions_cache = data.get('exceptions', [])
_whole_word_patterns_cache = []
_contains_patterns_cache = _bad_words_cache.copy()
logger.info(f"Загружено {len(_bad_words_cache)} бранных слов (старый формат) и {len(_exceptions_cache)} исключений")
return _bad_words_cache, _exceptions_cache
except FileNotFoundError:
logger.error(f"Файл {BAD_WORDS_FILE} не найден")
return [], []
except json.JSONDecodeError as e:
logger.error(f"Ошибка чтения JSON: {e}")
return [], []
def save_bad_words(bad_words: list, exceptions: list):
"""
Сохраняет список бранных слов в JSON файл.
Args:
bad_words: Список бранных слов
exceptions: Список исключений
Returns:
bool: True если успешно, иначе False
"""
global _bad_words_cache, _exceptions_cache
try:
# Создаем директорию, если её нет
os.makedirs(os.path.dirname(BAD_WORDS_FILE), exist_ok=True)
data = {
'bad_words': sorted(list(set(bad_words))), # Убираем дубликаты и сортируем
'exceptions': sorted(list(set(exceptions)))
}
with open(BAD_WORDS_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# Обновляем кэш
_bad_words_cache = data['bad_words']
_exceptions_cache = data['exceptions']
logger.info(f"Сохранено {len(_bad_words_cache)} бранных слов и {len(_exceptions_cache)} исключений")
return True
except Exception as e:
logger.error(f"Ошибка сохранения: {e}")
return False
def get_bad_words():
"""Возвращает список бранных слов (с кэшированием)"""
global _bad_words_cache
if _bad_words_cache is None:
load_bad_words()
return _bad_words_cache or []
def get_exceptions():
"""Возвращает список исключений (с кэшированием)"""
global _exceptions_cache
if _exceptions_cache is None:
load_bad_words()
return _exceptions_cache or []
def reload_words():
"""Перезагружает списки из файла (сбрасывает кэш)"""
global _bad_words_cache, _exceptions_cache
_bad_words_cache = None
_exceptions_cache = None
return load_bad_words()
def normalize_text(text: str) -> str:
"""
Нормализует текст для обхода обфускации.
Убирает:
- Звездочки, точки, подчеркивания между буквами (х*й, х.у.й, х_у_й → хуй)
- Повторяющиеся символы (хууууууй → хуй)
- ОДИНОЧНЫЕ пробелы между ОДИНОЧНЫМИ буквами (х у й → хуй, но "не бу" остаётся "не бу")
Args:
text: Исходный текст
Returns:
Нормализованный текст
"""
if not text:
return text
# Приводим к нижнему регистру
normalized = text.lower()
# Циклически убираем обфускацию, пока что-то меняется
max_iterations = 10
for _ in range(max_iterations):
before = normalized
# Убираем звёздочки, точки, дефисы, подчёркивания между буквами
# х*й, х.у.й, х_у_й → хуй
normalized = re.sub(r'([а-яё])[\*\.\-_]+([а-яё])', r'\1\2', normalized)
# Убираем ОДИНОЧНЫЕ пробелы между ОДИНОЧНЫМИ буквами (обфускация)
# "х у й" → "хуй", но "не бу" → "не бу" (не склеиваем обычные слова)
# Паттерн: одиночная буква + пробелы + одиночная буква
normalized = re.sub(r'\b([а-яё])\s+([а-яё])\b', r'\1\2', normalized)
if before == normalized:
break
# Убираем повторяющиеся буквы (более 2 подряд)
# хууууууй → хуй, пииииздец → пиздец
normalized = re.sub(r'([а-яё])\1{2,}', r'\1', normalized)
return normalized
# Предзагружаем слова при импорте модуля в кэш
load_bad_words()
def contains_bad_word(text: str) -> bool:
"""
Проверяет, содержит ли текст бранные слова.
Использует:
- Нормализацию текста для обхода обфускации
- Проверку границ слов для whole_word паттернов
- Проверку подстрок для contains паттернов
- Список исключений
Args:
text: Текст для проверки
Returns:
True, если найдено бранное слово, иначе False
"""
if not text:
return False
# Нормализуем текст (обход обфускации)
normalized_text = normalize_text(text)
# Получаем паттерны и исключения
global _whole_word_patterns_cache, _contains_patterns_cache, _exceptions_cache
# Если кэш не загружен, загружаем
if _whole_word_patterns_cache is None:
load_bad_words()
whole_word_patterns = _whole_word_patterns_cache or []
contains_patterns = _contains_patterns_cache or []
exceptions = _exceptions_cache or []
# 1. Проверяем whole_word паттерны (только целые слова)
for pattern in whole_word_patterns:
# Используем границы слов \b для поиска только целых слов
regex = r'\b' + re.escape(pattern) + r'\b'
if re.search(regex, normalized_text, re.IGNORECASE):
# Проверяем, не входит ли в исключения
is_exception = False
for exception in exceptions:
if exception in normalized_text and pattern in exception:
is_exception = True
break
if not is_exception:
return True
# 2. Проверяем contains паттерны (любое вхождение)
for pattern in contains_patterns:
if pattern in normalized_text:
# Проверяем все вхождения паттерна
start = 0
while True:
pos = normalized_text.find(pattern, start)
if pos == -1:
break
# Проверяем, не входит ли в исключение
is_exception = False
for exception in exceptions:
if pattern in exception:
exc_start = normalized_text.find(exception, max(0, pos - len(exception)))
if exc_start != -1:
exc_end = exc_start + len(exception)
if exc_start <= pos < exc_end:
is_exception = True
break
if not is_exception:
return True
start = pos + 1
return False
def get_bad_words_from_text(text: str) -> list:
"""
Возвращает список найденных бранных слов в тексте.
Использует:
- Нормализацию текста для обхода обфускации
- Проверку границ слов для whole_word паттернов
- Проверку подстрок для contains паттернов
- Список исключений
Args:
text: Текст для проверки
Returns:
Список найденных бранных слов
"""
if not text:
return []
# Нормализуем текст (обход обфускации)
normalized_text = normalize_text(text)
found_words = []
# Получаем паттерны и исключения
global _whole_word_patterns_cache, _contains_patterns_cache, _exceptions_cache
# Если кэш не загружен, загружаем
if _whole_word_patterns_cache is None:
load_bad_words()
whole_word_patterns = _whole_word_patterns_cache or []
contains_patterns = _contains_patterns_cache or []
exceptions = _exceptions_cache or []
# 1. Проверяем whole_word паттерны (только целые слова)
for pattern in whole_word_patterns:
# Используем границы слов \b для поиска только целых слов
regex = r'\b' + re.escape(pattern) + r'\b'
if re.search(regex, normalized_text, re.IGNORECASE):
# Проверяем, не входит ли в исключения
is_exception = False
for exception in exceptions:
if exception in normalized_text and pattern in exception:
is_exception = True
break
if not is_exception and pattern not in found_words:
found_words.append(pattern)
# 2. Проверяем contains паттерны (любое вхождение)
for pattern in contains_patterns:
if pattern in normalized_text:
# Проверяем все вхождения паттерна
start = 0
word_is_valid = False
while True:
pos = normalized_text.find(pattern, start)
if pos == -1:
break
# Проверяем, не входит ли в исключение
is_exception = False
for exception in exceptions:
if pattern in exception:
exc_start = normalized_text.find(exception, max(0, pos - len(exception)))
if exc_start != -1:
exc_end = exc_start + len(exception)
if exc_start <= pos < exc_end:
is_exception = True
break
if not is_exception:
word_is_valid = True
break
start = pos + 1
# Добавляем слово только если оно действительно найдено (не в исключении)
if word_is_valid and pattern not in found_words:
found_words.append(pattern)
return found_words

View File

@@ -7,19 +7,46 @@ DATABASE_NAME = 'users.db'
# Название файла для логов # Название файла для логов
LOG_FILE_NAME = 'bot.log' LOG_FILE_NAME = 'bot.log'
# ===========================================
# Временные константы (в секундах)
# ===========================================
# Период учёта нарушений (30 дней)
VIOLATIONS_PERIOD = 2592000
# Кулдаун для благодарностей (1 час)
THANK_COOLDOWN = 3600
# Периоды для предупреждений
ONE_WEEK = 604800 # 7 дней
TWO_WEEKS = 1209600 # 14 дней
# Максимальное время мута (30 дней)
MAX_MUTE_TIME = 2592000
# Сообщения команд # Сообщения команд
COMMAND_MESSAGES = { COMMAND_MESSAGES = {
'start': 'Бот-администратор для чата @linux_gaming_ru', 'start': 'Бот-администратор для чата @linux_gaming_ru',
'log': (
"<b>📋 Как создать лог ошибки</b>\n\n"
"Подробная инструкция по созданию логов для диагностики проблем:\n\n"
"🔗 <a href='https://linux-gaming.ru/t/kak-sozdat-log-oshibki/25'>Перейти к инструкции</a>"
),
'help': ( 'help': (
"<b>📚 Справочник команд администратора</b>\n\n" "<b>📚 Справочник команд администратора</b>\n\n"
"<u>Основные команды:</u>\n" "<u>Основные команды:</u>\n"
"• <code>/start</code> - Начало работы\n" "• <code>/start</code> - Начало работы\n"
"• <code>/help</code> - Этот справочник\n\n" "• <code>/help</code> - Этот справочник\n"
"• <code>/log</code> - Инструкция по созданию логов\n\n"
"<u>🛠 Команды модерации:</u>\n" "<u>🛠 Команды модерации:</u>\n"
"• <code>/warn help</code> - Выдать предупреждение\n"
"• <code>/mute help</code> - Инструкция по муту\n" "• <code>/mute help</code> - Инструкция по муту\n"
"• <code>/unmute help</code> - Снятие мута\n" "• <code>/unmute help</code> - Снятие мута\n"
"• <code>/ban help</code> - Инструкция по бану\n" "• <code>/ban help</code> - Инструкция по бану\n"
"• <code>/unban help</code> - Снятие бана\n\n" "• <code>/unban help</code> - Снятие бана\n\n"
"<u>⭐ Система кармы:</u>\n"
"• <code>/karma</code> - Просмотр кармы\n"
"• <code>/top</code> - Топ пользователей по карме\n\n"
"<i> Для подробностей по конкретной команде используйте: /команда help</i>" "<i> Для подробностей по конкретной команде используйте: /команда help</i>"
), ),
'manual_mute': ( 'manual_mute': (
@@ -90,6 +117,117 @@ COMMAND_MESSAGES = {
'banned': '✅ Пользователь успешно забанен.', 'banned': '✅ Пользователь успешно забанен.',
'unbanned': '✅ Пользователь успешно разбанен.', 'unbanned': '✅ Пользователь успешно разбанен.',
'error': '⚠️ Ошибка: {e}', 'error': '⚠️ Ошибка: {e}',
'general_error': '⚠️ Произошла непредвиденная ошибка.' 'general_error': '⚠️ Произошла непредвиденная ошибка.',
'auto_mute_warning': (
'⚠️ Пользователь <b>{user_name}</b> получил мут на <b>{duration}</b> '
'за использование нецензурной лексики.\n\n'
'📊 Нарушение #{count}\n'
'💡 При повторных нарушениях время мута будет увеличиваться.'
),
'auto_mute_permanent': (
'⛔️ Пользователь <b>{user_name}</b> получил перманентный мут '
'за злостное нарушение правил чата (использование нецензурной лексики).\n\n'
'📊 Количество нарушений: <b>{count}</b>\n'
'🔒 Режим: только чтение (навсегда)'
),
'badwords_help': (
"<b>🔧 Управление списком бранных слов</b>\n\n"
"<u>Основные команды:</u>\n"
"• <code>/badwords list</code> - Показать список слов\n"
"• <code>/badwords count</code> - Статистика\n"
"• <code>/badwords add [слово]</code> - Добавить слово\n"
"• <code>/badwords remove [слово]</code> - Удалить слово\n\n"
"<u>Исключения:</u>\n"
"• <code>/badwords exceptions</code> - Список исключений\n"
"• <code>/badwords add_exception [слово]</code> - Добавить\n"
"• <code>/badwords remove_exception [слово]</code> - Удалить\n\n"
"<u>Прочее:</u>\n"
"• <code>/badwords reload</code> - Перезагрузить из файла\n\n"
"<i>💡 Все изменения применяются немедленно</i>"
),
'reset_violations_help': (
"<b>🔄 Команда /reset_violations</b>\n\n"
"<i>Сбрасывает счётчик нарушений пользователя</i>\n\n"
"<u>🎯 Способы использования:</u>\n"
"1. Ответ на сообщение:\n"
" <code>/reset_violations</code>\n"
"2. По тегу пользователя:\n"
" <code>/reset_violations @username</code>\n"
"3. По ID пользователя:\n"
" <code>/reset_violations 123456789</code>\n\n"
"<i> Сбрасывает все записи об автомутах пользователя</i>"
),
'manual_warn': (
"<b>⚠️ Команда /warn</b>\n\n"
"<i>Выдает официальное предупреждение пользователю</i>\n\n"
"<u>🎯 Способы использования:</u>\n"
"1. Ответ на сообщение:\n"
" <code>/warn причина</code>\n"
"2. По тегу пользователя:\n"
" <code>/warn @username причина</code>\n"
"3. По ID пользователя:\n"
" <code>/warn 123456789 причина</code>\n\n"
"<b>📋 Система накопления:</b>\n"
"• 1-й варн: просто предупреждение\n"
"• 2-й варн за неделю: автомут на 7 дней (строгое)\n"
"• 2-й варн за 2 недели: автомут на 1 день (мягкое)\n\n"
"<i> Причину обязательно указывайте для прозрачности</i>"
),
'warned': '⚠️ Пользователь получил предупреждение.',
'warned_auto_mute_day': '⚠️ Пользователь получил предупреждение и автомут на 1 день (2-е предупреждение за 2 недели).',
'warned_auto_mute_week': '⚠️ Пользователь получил предупреждение и автомут на 7 дней (2-е предупреждение за неделю - строгое наказание).',
'karma_help': (
"<b>⭐ Команда /karma</b>\n\n"
"<i>Показывает карму пользователя в этом чате</i>\n\n"
"<u>🎯 Способы использования:</u>\n"
"1. Показать свою карму:\n"
" <code>/karma</code>\n"
"2. По тегу пользователя:\n"
" <code>/karma @username</code>\n"
"3. Ответ на сообщение:\n"
" Ответьте на сообщение: <code>/karma</code>\n\n"
"<b>💡 Как начислить карму?</b>\n"
"<u>Способ 1: Ответить на сообщение</u>\n"
"• спасибо → +1 карма\n"
"• благодарю → +1 карма\n"
"• спс, сенкс, thanks и др. → +1 карма\n\n"
"<u>Способ 2: Поставить реакцию (работает как переключатель)</u>\n"
"• Поставил 👍 → +1 карма | Убрал 👍 → -1 карма\n"
"• Поставил 👎 → -1 карма | Убрал 👎 → +1 карма\n"
"• Поставил 🔥 → +2 кармы | Убрал 🔥 → -2 кармы\n"
"• Поставил ❤ → +5 кармы | Убрал ❤ → -5 кармы\n"
"• Поставил ❤‍🔥 → +10 кармы | Убрал ❤‍🔥 → -10 кармы\n"
"• Нет ограничений по времени для реакций!\n\n"
"<b>🔥 БОНУС: Благодарность с восклицательным знаком даёт x2 кармы!</b>\n"
"• спасибо! → +2 кармы 👍👍\n"
"• thanks! → +2 кармы 👍👍\n\n"
"<b>⚠️ Снятие кармы:</b>\n"
"• Предупреждение (/warn): -5 кармы\n"
"• Мут (/mute или автомут): -10 кармы\n\n"
"<i>⏱ Одному пользователю можно давать карму раз в час</i>"
),
'top_karma_help': (
"<b>🏆 Команда /top</b>\n\n"
"<i>Показывает топ-10 пользователей по карме в этом чате</i>\n\n"
"<u>🎯 Использование:</u>\n"
" <code>/top</code>\n\n"
"<i>💡 Система кармы поощряет активных и полезных участников чата!</i>"
),
'setkarma_help': (
"<b>🎚 Команда /setkarma</b>\n\n"
"<i>Устанавливает карму пользователя в указанное значение (только для администраторов)</i>\n\n"
"<u>🎯 Способы использования:</u>\n"
"1. Ответ на сообщение:\n"
" <code>/setkarma 100</code>\n"
"2. По тегу пользователя:\n"
" <code>/setkarma @username 50</code>\n"
"3. По ID пользователя:\n"
" <code>/setkarma 123456789 -10</code>\n\n"
"<b>💡 Примеры:</b>\n"
"• Установить карму на 0: <code>/setkarma @user 0</code>\n"
"• Установить отрицательную карму: <code>/setkarma @user -50</code>\n"
"• Установить высокую карму: <code>/setkarma @user 1000</code>\n\n"
"<i>⚠️ Команда доступна только администраторам с правами ограничения</i>"
)
} }

96
src/data/bad_words.json Normal file
View File

@@ -0,0 +1,96 @@
{
"patterns": {
"whole_word": [
"гей",
"гомик",
"гомос",
"даун",
"дебил",
"гандон",
"мразь",
"мраз",
"козел",
"козл",
"урод",
"урода",
"тварь",
"твар",
"падла",
"гнида",
"гнид"
],
"contains": [
"хуй",
"хуе",
"хуи",
"хую",
"хуя",
"хер",
"пизд",
"пизж",
"пезд",
"ебал",
"ебан",
"ебат",
"ебу",
"ебош",
"ебля",
"ебет",
"бля",
"блядь",
"блять",
"сука",
"суки",
"сучк",
"сучар",
"мудак",
"мудил",
"муди",
"долбоеб",
"долбаеб",
"уебан",
"уебок",
"хуесос",
"пидор",
"пидар",
"педик",
"педр",
"шлюх",
"шалав",
"еблан",
"говн",
"срать",
"сраль",
"серун",
"дрочи",
"дроч",
"жоп",
"жёп",
"залуп",
"ублюдо",
"ублюд",
"сволочь",
"сволоч",
"выблядо",
"хуета",
"хуйн",
"охуе",
"охуи",
"охуя",
"нахуй",
"нахер",
"похуй",
"похер",
"захуя",
"ахуе",
"впизду",
"попизд"
]
},
"exceptions": [
"республика",
"документ",
"документы"
],
"_comment": "whole_word - только целые слова (не часть другого слова), contains - любое вхождение подстроки"
}

25
src/data/thank_words.json Normal file
View File

@@ -0,0 +1,25 @@
{
"thank_words": [
"спасибо",
"благодарю",
"спс",
"сенкс",
"сенкью",
"thanks",
"thank you",
"thx",
"ty",
"дякую",
"дзякуй",
"рахмет",
"пасиб",
"пасибо",
"спасибочки",
"благодарочка",
"мерси",
"merci",
"danke",
"gracias",
"grazie"
]
}

View File

@@ -1,5 +1,6 @@
import sqlite3 import sqlite3
import os import os
import time
from typing import Optional, Tuple from typing import Optional, Tuple
import logging import logging
@@ -13,7 +14,7 @@ class Database: # Инициализация класса
self.db_name = db_name self.db_name = db_name
self._init_db() self._init_db()
# Инициализирует базу данных и создает таблицу, если она не существует # Инициализирует базу данных и создает таблицы, если они не существуют
def _init_db(self): def _init_db(self):
with self._get_connection() as connect: with self._get_connection() as connect:
cursor = connect.cursor() cursor = connect.cursor()
@@ -24,7 +25,93 @@ class Database: # Инициализация класса
tag TEXT tag TEXT
) )
''') ''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS violations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
chat_id INTEGER NOT NULL,
violation_date INTEGER NOT NULL,
violation_type TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS warnings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
chat_id INTEGER NOT NULL,
warn_date INTEGER NOT NULL,
reason TEXT NOT NULL,
admin_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS karma (
user_id INTEGER NOT NULL,
chat_id INTEGER NOT NULL,
karma_points INTEGER DEFAULT 0,
PRIMARY KEY (user_id, chat_id),
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS karma_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_user_id INTEGER NOT NULL,
to_user_id INTEGER NOT NULL,
chat_id INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
FOREIGN KEY (from_user_id) REFERENCES users (id),
FOREIGN KEY (to_user_id) REFERENCES users (id)
)
''')
# Создаём индексы для оптимизации часто используемых запросов
# Индекс для проверки нарушений пользователя в чате за период
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_violations_user_chat_date
ON violations(user_id, chat_id, violation_date)
''')
# Индекс для проверки предупреждений пользователя в чате за период
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_warnings_user_chat_date
ON warnings(user_id, chat_id, warn_date)
''')
# Индекс для проверки истории кармы (кулдаун благодарностей)
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_karma_history_cooldown
ON karma_history(from_user_id, to_user_id, chat_id, timestamp)
''')
# Индекс для поиска пользователя по username
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_users_tag
ON users(tag COLLATE NOCASE)
''')
# Таблица для кэша сообщений (для обработки реакций)
cursor.execute('''
CREATE TABLE IF NOT EXISTS message_cache (
chat_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
thread_id INTEGER,
timestamp INTEGER NOT NULL,
PRIMARY KEY (chat_id, message_id)
)
''')
# Индекс для быстрой очистки старых записей
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_message_cache_timestamp
ON message_cache(timestamp)
''')
connect.commit() connect.commit()
logger.info("База данных и индексы успешно инициализированы")
# Возвращает соединение с базой данных # Возвращает соединение с базой данных
def _get_connection(self): def _get_connection(self):
@@ -70,15 +157,308 @@ class Database: # Инициализация класса
def get_user_by_username(self, username: str) -> Optional[Tuple]: def get_user_by_username(self, username: str) -> Optional[Tuple]:
if not username: if not username:
return None return None
with self._get_connection() as connect: with self._get_connection() as connect:
cursor = connect.cursor() cursor = connect.cursor()
cursor.execute(''' cursor.execute('''
SELECT id, nickname, tag SELECT id, nickname, tag
FROM users FROM users
WHERE LOWER(tag) = LOWER(?) WHERE LOWER(tag) = LOWER(?)
''', (username,)) ''', (username,))
return cursor.fetchone() return cursor.fetchone()
# Добавляет нарушение в базу данных
def add_violation(self, user_id: int, chat_id: int, violation_type: str = 'bad_language'):
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
INSERT INTO violations (user_id, chat_id, violation_date, violation_type)
VALUES (?, ?, ?, ?)
''', (user_id, chat_id, int(time.time()), violation_type))
connect.commit()
logger.info(f"Нарушение пользователя {user_id} зафиксировано в чате {chat_id}")
# Получает количество нарушений за период (по умолчанию - за последний месяц)
def get_violations_count(self, user_id: int, chat_id: int, period_seconds: int = 2592000) -> int:
with self._get_connection() as connect:
cursor = connect.cursor()
cutoff_time = int(time.time()) - period_seconds
cursor.execute('''
SELECT COUNT(*)
FROM violations
WHERE user_id = ? AND chat_id = ? AND violation_date > ?
''', (user_id, chat_id, cutoff_time))
result = cursor.fetchone()
return result[0] if result else 0
# Получает все нарушения пользователя
def get_user_violations(self, user_id: int, chat_id: int):
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
SELECT id, violation_date, violation_type
FROM violations
WHERE user_id = ? AND chat_id = ?
ORDER BY violation_date DESC
''', (user_id, chat_id))
return cursor.fetchall()
# Очищает старые нарушения (старше указанного периода)
def clean_old_violations(self, period_seconds: int = 2592000):
with self._get_connection() as connect:
cursor = connect.cursor()
cutoff_time = int(time.time()) - period_seconds
cursor.execute('''
DELETE FROM violations
WHERE violation_date < ?
''', (cutoff_time,))
deleted_count = cursor.rowcount
connect.commit()
logger.info(f"Удалено старых нарушений: {deleted_count}")
return deleted_count
# Сбрасывает все нарушения пользователя в чате
def reset_user_violations(self, user_id: int, chat_id: int):
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
DELETE FROM violations
WHERE user_id = ? AND chat_id = ?
''', (user_id, chat_id))
deleted_count = cursor.rowcount
connect.commit()
logger.info(f"Сброшено {deleted_count} нарушений пользователя {user_id} в чате {chat_id}")
return deleted_count
# Добавляет предупреждение в базу данных
def add_warning(self, user_id: int, chat_id: int, reason: str, admin_id: int):
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
INSERT INTO warnings (user_id, chat_id, warn_date, reason, admin_id)
VALUES (?, ?, ?, ?, ?)
''', (user_id, chat_id, int(time.time()), reason, admin_id))
connect.commit()
logger.info(f"Предупреждение пользователю {user_id} выдано администратором {admin_id} в чате {chat_id}")
# Получает количество предупреждений за период
def get_warnings_count(self, user_id: int, chat_id: int, period_seconds: int) -> int:
with self._get_connection() as connect:
cursor = connect.cursor()
cutoff_time = int(time.time()) - period_seconds
cursor.execute('''
SELECT COUNT(*)
FROM warnings
WHERE user_id = ? AND chat_id = ? AND warn_date > ?
''', (user_id, chat_id, cutoff_time))
result = cursor.fetchone()
return result[0] if result else 0
# Получает все предупреждения пользователя
def get_user_warnings(self, user_id: int, chat_id: int):
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
SELECT id, warn_date, reason, admin_id
FROM warnings
WHERE user_id = ? AND chat_id = ?
ORDER BY warn_date DESC
''', (user_id, chat_id))
return cursor.fetchall()
# Сбрасывает все предупреждения пользователя в чате
def reset_user_warnings(self, user_id: int, chat_id: int):
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
DELETE FROM warnings
WHERE user_id = ? AND chat_id = ?
''', (user_id, chat_id))
deleted_count = cursor.rowcount
connect.commit()
logger.info(f"Сброшено {deleted_count} предупреждений пользователя {user_id} в чате {chat_id}")
return deleted_count
# Добавляет карму пользователю
def add_karma(self, user_id: int, chat_id: int, amount: int = 1):
with self._get_connection() as connect:
cursor = connect.cursor()
# Проверяем существование записи
cursor.execute('SELECT karma_points FROM karma WHERE user_id = ? AND chat_id = ?', (user_id, chat_id))
result = cursor.fetchone()
if result:
# Обновляем существующую карму
new_karma = result[0] + amount
cursor.execute('''
UPDATE karma
SET karma_points = ?
WHERE user_id = ? AND chat_id = ?
''', (new_karma, user_id, chat_id))
else:
# Создаем новую запись
cursor.execute('''
INSERT INTO karma (user_id, chat_id, karma_points)
VALUES (?, ?, ?)
''', (user_id, chat_id, amount))
connect.commit()
logger.info(f"Пользователю {user_id} добавлено {amount} кармы в чате {chat_id}")
# Устанавливает карму пользователя в указанное значение
def set_karma(self, user_id: int, chat_id: int, karma_value: int):
with self._get_connection() as connect:
cursor = connect.cursor()
# Проверяем существование записи
cursor.execute('SELECT karma_points FROM karma WHERE user_id = ? AND chat_id = ?', (user_id, chat_id))
result = cursor.fetchone()
if result:
# Обновляем существующую карму
cursor.execute('''
UPDATE karma
SET karma_points = ?
WHERE user_id = ? AND chat_id = ?
''', (karma_value, user_id, chat_id))
else:
# Создаем новую запись
cursor.execute('''
INSERT INTO karma (user_id, chat_id, karma_points)
VALUES (?, ?, ?)
''', (user_id, chat_id, karma_value))
connect.commit()
logger.info(f"Карма пользователя {user_id} установлена на {karma_value} в чате {chat_id}")
# Получает карму пользователя
def get_karma(self, user_id: int, chat_id: int) -> int:
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
SELECT karma_points
FROM karma
WHERE user_id = ? AND chat_id = ?
''', (user_id, chat_id))
result = cursor.fetchone()
return result[0] if result else 0
# Получает топ пользователей по карме
def get_top_karma(self, chat_id: int, limit: int = 10):
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
SELECT k.user_id, u.nickname, u.tag, k.karma_points
FROM karma k
JOIN users u ON k.user_id = u.id
WHERE k.chat_id = ? AND k.karma_points > 0
ORDER BY k.karma_points DESC
LIMIT ?
''', (chat_id, limit))
return cursor.fetchall()
# Атомарно проверяет кулдаун и добавляет запись в историю кармы
# Возвращает True если благодарность засчитана, False если кулдаун не прошёл
def try_add_karma_thank(self, from_user_id: int, to_user_id: int, chat_id: int, cooldown_seconds: int = 3600) -> bool:
with self._get_connection() as connect:
cursor = connect.cursor()
current_time = int(time.time())
cutoff_time = current_time - cooldown_seconds
# Проверяем кулдаун и добавляем запись в одной транзакции
# Это предотвращает race condition при параллельных запросах
cursor.execute('''
SELECT COUNT(*)
FROM karma_history
WHERE from_user_id = ? AND to_user_id = ? AND chat_id = ? AND timestamp > ?
''', (from_user_id, to_user_id, chat_id, cutoff_time))
result = cursor.fetchone()
# Если кулдаун не прошёл, возвращаем False
if result and result[0] > 0:
return False
# Кулдаун прошёл, добавляем запись в той же транзакции
cursor.execute('''
INSERT INTO karma_history (from_user_id, to_user_id, chat_id, timestamp)
VALUES (?, ?, ?, ?)
''', (from_user_id, to_user_id, chat_id, current_time))
connect.commit()
logger.info(f"Пользователь {from_user_id} поблагодарил {to_user_id} в чате {chat_id}")
return True
# УСТАРЕВШИЙ МЕТОД - оставлен для обратной совместимости
# Используйте try_add_karma_thank() для атомарной операции без race condition
def can_thank(self, from_user_id: int, to_user_id: int, chat_id: int, cooldown_seconds: int = 3600) -> bool:
with self._get_connection() as connect:
cursor = connect.cursor()
cutoff_time = int(time.time()) - cooldown_seconds
cursor.execute('''
SELECT COUNT(*)
FROM karma_history
WHERE from_user_id = ? AND to_user_id = ? AND chat_id = ? AND timestamp > ?
''', (from_user_id, to_user_id, chat_id, cutoff_time))
result = cursor.fetchone()
return result[0] == 0 if result else True
# УСТАРЕВШИЙ МЕТОД - оставлен для обратной совместимости
# Используйте try_add_karma_thank() для атомарной операции без race condition
def add_karma_history(self, from_user_id: int, to_user_id: int, chat_id: int):
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
INSERT INTO karma_history (from_user_id, to_user_id, chat_id, timestamp)
VALUES (?, ?, ?, ?)
''', (from_user_id, to_user_id, chat_id, int(time.time())))
connect.commit()
logger.info(f"Пользователь {from_user_id} поблагодарил {to_user_id} в чате {chat_id}")
# Добавляет сообщение в кэш
def cache_message(self, chat_id: int, message_id: int, user_id: int, thread_id: Optional[int] = None):
with self._get_connection() as connect:
cursor = connect.cursor()
current_time = int(time.time())
cursor.execute('''
INSERT OR REPLACE INTO message_cache (chat_id, message_id, user_id, thread_id, timestamp)
VALUES (?, ?, ?, ?, ?)
''', (chat_id, message_id, user_id, thread_id, current_time))
connect.commit()
# Получает информацию о сообщении из кэша
# Возвращает (user_id, thread_id) или None если не найдено
def get_cached_message(self, chat_id: int, message_id: int) -> Optional[Tuple[int, Optional[int]]]:
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('''
SELECT user_id, thread_id
FROM message_cache
WHERE chat_id = ? AND message_id = ?
''', (chat_id, message_id))
result = cursor.fetchone()
return result if result else None
# Очищает сообщения старше указанного времени (по умолчанию 24 часа)
def cleanup_old_messages(self, max_age_seconds: int = 86400):
with self._get_connection() as connect:
cursor = connect.cursor()
cutoff_time = int(time.time()) - max_age_seconds
cursor.execute('''
DELETE FROM message_cache
WHERE timestamp < ?
''', (cutoff_time,))
deleted_count = cursor.rowcount
connect.commit()
if deleted_count > 0:
logger.info(f"Удалено {deleted_count} старых сообщений из кэша")
return deleted_count
# Получает количество сообщений в кэше
def get_cache_size(self) -> int:
with self._get_connection() as connect:
cursor = connect.cursor()
cursor.execute('SELECT COUNT(*) FROM message_cache')
result = cursor.fetchone()
return result[0] if result else 0
# Создаем экземпляр базы данных для импорта в других модулях # Создаем экземпляр базы данных для импорта в других модулях
db = Database() db = Database()

0
src/lgbot.db Normal file
View File

View File

@@ -6,13 +6,15 @@ from config import LOG_FILE_NAME
class ColoredFormatter(logging.Formatter): # Цветные логи (для терминала) class ColoredFormatter(logging.Formatter): # Цветные логи (для терминала)
LEVEL_COLORS = { LEVEL_COLORS = {
logging.DEBUG: '\033[96m', # Голубой для DEBUG
logging.INFO: '\033[92m', logging.INFO: '\033[92m',
logging.WARNING: '\033[93m', logging.WARNING: '\033[93m',
logging.ERROR: '\033[91m', logging.ERROR: '\033[91m',
logging.CRITICAL: '\033[91m' logging.CRITICAL: '\033[91m'
} }
LEVEL_NAMES = { LEVEL_NAMES = {
logging.DEBUG: "D",
logging.INFO: "I", logging.INFO: "I",
logging.WARNING: "W", logging.WARNING: "W",
logging.ERROR: "E", logging.ERROR: "E",
@@ -50,7 +52,7 @@ def setup_logging(): # Инициализирует систему логиро
# Создаем корневой логгер # Создаем корневой логгер
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO) # INFO для продакшена
# Проверяем, не настроен ли логгер ранее # Проверяем, не настроен ли логгер ранее
if not logger.hasHandlers(): if not logger.hasHandlers():

View File

@@ -18,25 +18,60 @@ from config import MODULES_DIR
# Загружаем токен бота из .env # Загружаем токен бота из .env
load_dotenv() load_dotenv()
# Валидация переменных окружения
def validate_env_vars():
"""Проверяет наличие всех необходимых переменных окружения"""
required_vars = {
"BOT_TOKEN": "Токен бота",
"ADMIN_CHAT_ID": "ID админ-чата",
"LOG_THREAD_ID": "ID топика для логов"
}
missing_vars = []
for var_name, description in required_vars.items():
value = os.getenv(var_name)
if not value or value.strip() == "" or value == "...":
missing_vars.append(f"{var_name} ({description})")
if missing_vars:
print("\n❌ ОШИБКА: Не заполнены необходимые переменные окружения в файле .env:")
for var in missing_vars:
print(f"{var}")
print("\nПожалуйста, заполните файл .env на основе .env.example")
sys.exit(1)
# Проверяем переменные окружения
validate_env_vars()
bot = AsyncTeleBot(os.getenv("BOT_TOKEN"), parse_mode="html") bot = AsyncTeleBot(os.getenv("BOT_TOKEN"), parse_mode="html")
# Загружаем ID админ-чата из .env и инициализируемся для логов в чат # Загружаем ID админ-чата из .env и инициализируемся для логов в чат
init_action_reporter(bot, os.getenv("ADMIN_CHAT_ID"), os.getenv("LOG_THREAD_ID")) init_action_reporter(bot, int(os.getenv("ADMIN_CHAT_ID")), int(os.getenv("LOG_THREAD_ID")))
# Получаем логгер для текущего модуля # Получаем логгер для текущего модуля
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Middleware для автоматического обновления информации о пользователях в базе данных # Middleware для автоматического обновления информации о пользователях в базе данных
# И проверки на нецензурную лексику (выполняется ДО всех обработчиков)
class UserUpdateMiddleware(BaseMiddleware): class UserUpdateMiddleware(BaseMiddleware):
def __init__(self, db): def __init__(self, db, bot):
super().__init__() super().__init__()
# message - все обычные сообщения # message - все обычные сообщения
# chat_member - события изменения статуса участников чата # chat_member - события изменения статуса участников чата
self.update_types = ['message', 'chat_member'] self.update_types = ['message', 'chat_member']
self.db = db self.db = db
self.bot = bot
# Обработчик, вызываемый ДО обработки сообщения основными хэндлерами # Обработчик, вызываемый ДО обработки сообщения основными хэндлерами
async def pre_process(self, message, data): async def pre_process(self, message, data):
# Проверяем, что это действительно сообщение (а не ChatMemberUpdated)
if not hasattr(message, 'content_type'):
# Это не Message объект (например ChatMemberUpdated), пропускаем
return data
# Логируем ВСЕ входящие сообщения для отладки
logger.info(f"[MIDDLEWARE] Получено сообщение от {message.from_user.id}, тип: {message.content_type}, текст: {message.text if hasattr(message, 'text') else 'N/A'}")
# Обработка пользователей, отправившие сообщение # Обработка пользователей, отправившие сообщение
if message.content_type == 'text': if message.content_type == 'text':
@@ -45,9 +80,25 @@ class UserUpdateMiddleware(BaseMiddleware):
nickname=message.from_user.first_name, nickname=message.from_user.first_name,
tag=message.from_user.username tag=message.from_user.username
) )
# ВАЖНО: Проверяем на мат ДО передачи другим обработчикам
# Это позволяет auto_mute работать независимо от karma_tracker
await self._check_profanity(message)
# ВАЖНО: Кэшируем ВСЕ сообщения для обработки реакций (не только текстовые!)
# Пользователи могут ставить реакции на фото, видео, стикеры и т.д.
try:
karma_module = importlib.import_module("modules.0_karma_tracker")
if message.chat.type in ['group', 'supergroup']:
# Передаём message_thread_id для правильной отправки уведомлений в топики
thread_id = getattr(message, 'message_thread_id', None)
karma_module._cache_message(message.chat.id, message.message_id, message.from_user.id, thread_id)
logger.info(f"[CACHE] Сообщение {message.message_id} от {message.from_user.id} добавлено в кэш, chat_id={message.chat.id}, thread_id={thread_id}")
except Exception as e:
logger.error(f"Ошибка кэширования сообщения: {e}", exc_info=True)
# Обработка новых участников группы # Обработка новых участников группы
elif message.content_type == 'new_chat_members': if message.content_type == 'new_chat_members':
for new_member in message.new_chat_members: for new_member in message.new_chat_members:
self.db.add_or_update_user( self.db.add_or_update_user(
user_id=new_member.id, user_id=new_member.id,
@@ -55,20 +106,39 @@ class UserUpdateMiddleware(BaseMiddleware):
tag=new_member.username tag=new_member.username
) )
return data return data
# Проверка на нецензурную лексику (вызывается в middleware)
async def _check_profanity(self, message):
"""Проверяет сообщение на мат и применяет мут если нужно"""
from bad_words import contains_bad_word
# Только для групповых чатов и не команды
if message.chat.type not in ['group', 'supergroup']:
return
if not message.text or message.text.startswith('/'):
return
# Проверяем наличие мата
if not contains_bad_word(message.text):
return
# Импортируем функцию проверки из auto_mute (если модуль загружен)
try:
from modules.auto_mute import check_message_for_profanity
await check_message_for_profanity(self.bot, message)
except ImportError:
logger.warning("Модуль auto_mute не загружен, пропускаем проверку мата")
# Обработчик, вызываемый ПОСЛЕ обработки сообщения основными хэндлерами # Обработчик, вызываемый ПОСЛЕ обработки сообщения основными хэндлерами
async def post_process(self, message, data, exception): async def post_process(self, message, data, exception):
pass pass
# Регистрируем middleware # Регистрируем middleware
bot.setup_middleware(UserUpdateMiddleware(db)) bot.setup_middleware(UserUpdateMiddleware(db, bot))
# Загружает все модули из директории /modules # Загружает все модули из директории /modules
async def load_modules(): async def load_modules():
# Инициализация логирования
setup_logging()
# Переменная для подсчёта модулей # Переменная для подсчёта модулей
loaded_count = 0 loaded_count = 0
@@ -109,7 +179,33 @@ async def load_modules():
# Записываем отчет о модулях в логи # Записываем отчет о модулях в логи
logger.info(f"Загружено модулей: {loaded_count} шт. Бот запущен.") logger.info(f"Загружено модулей: {loaded_count} шт. Бот запущен.")
# Устанавливаем меню команд бота
async def setup_bot_commands():
from telebot.types import BotCommand
commands = [
BotCommand("start", "Начало работы с ботом"),
BotCommand("help", "Справка по всем командам"),
BotCommand("log", "Инструкция по созданию лога ошибки"),
BotCommand("warn", "Выдать предупреждение. Использование: /warn help"),
BotCommand("ban", "Забанить пользователя. Использование: /ban help"),
BotCommand("unban", "Разбанить пользователя. Использование: /unban help"),
BotCommand("mute", "Замутить пользователя. Использование: /mute help"),
BotCommand("unmute", "Размутить пользователя. Использование: /unmute help"),
BotCommand("badwords", "Управление списком бранных слов. /badwords help"),
BotCommand("reset_violations", "Сбросить счётчик нарушений пользователя"),
BotCommand("botdata", "Получить данные бота (только для админов)"),
BotCommand("karma", "Просмотр кармы пользователя"),
BotCommand("top", "Топ-10 пользователей по карме"),
BotCommand("setkarma", "Установить карму пользователя. Использование: /setkarma help"),
]
await bot.set_my_commands(commands)
logger.info("Команды бота успешно установлены.")
async def main(): async def main():
# Инициализация логирования (должна быть первой)
setup_logging()
# Очищаем терминал # Очищаем терминал
os.system('clear') os.system('clear')
@@ -119,8 +215,12 @@ async def main():
# Проверяем и загружаем модули # Проверяем и загружаем модули
await load_modules() await load_modules()
# Запускаем бота # Устанавливаем команды бота
await bot.infinity_polling() await setup_bot_commands()
# Запускаем бота с обработкой реакций
logger.info("Запуск бота с allowed_updates: message, message_reaction, chat_member")
await bot.infinity_polling(allowed_updates=['message', 'message_reaction', 'chat_member'])
except Exception as e: except Exception as e:

View File

@@ -0,0 +1,332 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message, MessageReactionUpdated, ReactionTypeEmoji
import asyncio
import logging
from database import db
from thank_words import contains_thank_word
from bad_words import contains_bad_word
from config import THANK_COOLDOWN
logger = logging.getLogger(__name__)
# Фоновая задача для автоочистки старых сообщений
_cleanup_task = None
def _cache_message(chat_id: int, message_id: int, user_id: int, message_thread_id: int = None):
"""Добавляет сообщение в кэш БД"""
db.cache_message(chat_id, message_id, user_id, message_thread_id)
def _get_cached_message(chat_id: int, message_id: int):
"""Получает (user_id, message_thread_id) из кэша БД"""
return db.get_cached_message(chat_id, message_id)
async def _cleanup_old_cache():
"""Фоновая задача для очистки старых сообщений из кэша каждые 6 часов"""
while True:
try:
await asyncio.sleep(21600) # Ждём 6 часов
deleted = db.cleanup_old_messages(max_age_seconds=604800) # Удаляем старше 7 дней
cache_size = db.get_cache_size()
logger.info(f"[CACHE CLEANUP] Удалено: {deleted}, размер кэша: {cache_size} сообщений")
except Exception as e:
logger.error(f"[CACHE CLEANUP] Ошибка очистки кэша: {e}", exc_info=True)
def register_handlers(bot: AsyncTeleBot):
"""Регистрирует обработчики для отслеживания благодарностей"""
logger.info("Регистрация обработчика благодарностей (karma_tracker)")
# Запускаем фоновую задачу очистки старых сообщений из кэша
global _cleanup_task
if _cleanup_task is None or _cleanup_task.done():
_cleanup_task = asyncio.create_task(_cleanup_old_cache())
cache_size = db.get_cache_size()
logger.info(f"[CACHE] Запущена автоочистка кэша. Текущий размер: {cache_size} сообщений")
@bot.message_reaction_handler(func=lambda m: True)
async def handle_reaction(reaction: MessageReactionUpdated):
"""
Обрабатывает реакции на сообщения.
Реакции работают как переключатель:
- Поставил 👍 → +1 карма | Убрал 👍 → -1 карма
- Поставил 👎 → -1 карма | Убрал 👎 → +1 карма
- Поставил 🔥 → +2 кармы | Убрал 🔥 → -2 кармы
- Поставил ❤ → +5 кармы | Убрал ❤ → -5 кармы
- Поставил ❤‍🔥 → +10 кармы | Убрал ❤‍🔥 → -10 кармы
"""
try:
logger.info(f"[KARMA] Получена реакция от {reaction.user.id}")
# Проверяем, что это групповой чат
if reaction.chat.type not in ['group', 'supergroup']:
logger.info(f"[KARMA] Пропуск реакции - не групповой чат")
return
from_user = reaction.user
chat_id = reaction.chat.id
# Получаем автора сообщения и топик из кэша
cached_data = _get_cached_message(chat_id, reaction.message_id)
if not cached_data:
logger.warning(f"[KARMA] Сообщение {reaction.message_id} не найдено в кэше")
return
to_user_id, message_thread_id = cached_data
# Защита от самооценки
if from_user.id == to_user_id:
logger.info(f"Пользователь {from_user.id} попытался поставить реакцию на своё сообщение")
return
# Получаем информацию о пользователе из БД
to_user_info = db.get_user(to_user_id)
if not to_user_info:
logger.warning(f"[KARMA] Пользователь {to_user_id} не найден в БД")
return
# Примечание: мы не проверяем является ли to_user_id ботом, т.к.:
# 1. Сообщения ботов не кэшируются (только пользовательские)
# 2. Если бот все же попал в кэш, это исключительный случай и не критично
# Проверяем старые реакции
old_thumbs_up = False
old_thumbs_down = False
old_heart = False
old_fire_heart = False
old_fire = False
if reaction.old_reaction:
for react in reaction.old_reaction:
if isinstance(react, ReactionTypeEmoji):
if react.emoji == "👍":
old_thumbs_up = True
elif react.emoji == "👎":
old_thumbs_down = True
elif react.emoji == "":
old_heart = True
elif react.emoji == "❤‍🔥":
old_fire_heart = True
elif react.emoji == "🔥":
old_fire = True
# Проверяем новые реакции
new_thumbs_up = False
new_thumbs_down = False
new_heart = False
new_fire_heart = False
new_fire = False
if reaction.new_reaction:
for react in reaction.new_reaction:
if isinstance(react, ReactionTypeEmoji):
if react.emoji == "👍":
new_thumbs_up = True
elif react.emoji == "👎":
new_thumbs_down = True
elif react.emoji == "":
new_heart = True
elif react.emoji == "❤‍🔥":
new_fire_heart = True
elif react.emoji == "🔥":
new_fire = True
# Определяем изменение кармы (накапливаем все изменения)
karma_change = 0
actions = [] # Список всех действий для логирования
# Логика изменения кармы - проверяем ВСЕ реакции (не elif!)
# Это важно, т.к. пользователь может менять реакции (убрать 👍 и поставить 🔥)
# Проверяем 👍
if new_thumbs_up and not old_thumbs_up:
karma_change += 1
actions.append("поставил 👍 (+1)")
elif old_thumbs_up and not new_thumbs_up:
karma_change -= 1
actions.append("убрал 👍 (-1)")
# Проверяем 👎
if new_thumbs_down and not old_thumbs_down:
karma_change -= 1
actions.append("поставил 👎 (-1)")
elif old_thumbs_down and not new_thumbs_down:
karma_change += 1
actions.append("убрал 👎 (+1)")
# Проверяем ❤
if new_heart and not old_heart:
karma_change += 5
actions.append("поставил ❤ (+5)")
elif old_heart and not new_heart:
karma_change -= 5
actions.append("убрал ❤ (-5)")
# Проверяем ❤‍🔥
if new_fire_heart and not old_fire_heart:
karma_change += 10
actions.append("поставил ❤‍🔥 (+10)")
elif old_fire_heart and not new_fire_heart:
karma_change -= 10
actions.append("убрал ❤‍🔥 (-10)")
# Проверяем 🔥
if new_fire and not old_fire:
karma_change += 2
actions.append("поставил 🔥 (+2)")
elif old_fire and not new_fire:
karma_change -= 2
actions.append("убрал 🔥 (-2)")
# Если нет изменений - выходим
if karma_change == 0:
logger.info(f"[KARMA] Нет изменений в реакциях")
return
# Формируем текст действий для логирования
action_text = ", ".join(actions)
logger.info(f"[KARMA] {action_text} от {from_user.id} для {to_user_id}, итоговое изменение кармы: {karma_change}")
# Изменяем карму
db.add_karma(to_user_id, chat_id, karma_change)
# Получаем новую карму
new_karma = db.get_karma(to_user_id, chat_id)
# Формируем имя пользователя (из БД: id, nickname, tag)
to_user_display = f"@{to_user_info[2]}" if to_user_info[2] else to_user_info[1]
# Формируем эмодзи для уведомления (берем первое действие или дефолтное)
notification_emoji = ""
if "👍" in action_text:
notification_emoji = "👍"
elif "👎" in action_text:
notification_emoji = "👎"
elif "🔥" in action_text and "❤‍🔥" not in action_text:
notification_emoji = "🔥"
elif "❤‍🔥" in action_text:
notification_emoji = "❤‍🔥"
elif "" in action_text:
notification_emoji = ""
# Отправляем уведомление
karma_sign = f"+{karma_change}" if karma_change > 0 else str(karma_change)
change_word = "увеличена" if karma_change > 0 else "уменьшена"
response = f"{notification_emoji} Карма пользователя {to_user_display} {change_word} ({karma_sign})! Текущая карма: {new_karma}"
logger.info(f"[KARMA] Отправка уведомления в чат {chat_id}")
try:
sent_message = await bot.send_message(
chat_id,
response,
message_thread_id=message_thread_id
)
logger.info(f"[KARMA] Уведомление отправлено успешно, message_id={sent_message.message_id}")
# Удаляем уведомление через 10 секунд В ФОНЕ (не блокируя обработку других реакций)
async def delete_notification():
try:
await asyncio.sleep(10)
await bot.delete_message(chat_id, sent_message.message_id)
logger.info(f"[KARMA] Уведомление удалено")
except Exception as e:
logger.error(f"Не удалось удалить уведомление о карме: {e}")
# Запускаем удаление в фоне
asyncio.create_task(delete_notification())
except Exception as e:
logger.error(f"Ошибка отправки уведомления о карме: {e}", exc_info=True)
except Exception as e:
logger.error(f"Ошибка при обработке реакции: {e}", exc_info=True)
@bot.message_handler(func=lambda message: message.reply_to_message is not None and message.text and not message.text.startswith('/'))
async def handle_thank_message(message: Message):
"""
Обрабатывает сообщения, которые являются ответами на другие сообщения.
Если сообщение содержит благодарность, начисляет карму автору оригинального сообщения.
"""
try:
logger.info(f"[KARMA] Получено reply-сообщение: {message.text[:50]}")
# Проверяем, что это групповой чат
if message.chat.type not in ['group', 'supergroup']:
logger.info(f"[KARMA] Пропуск - не групповой чат: {message.chat.type}")
return
# ВАЖНО: В топиках каждое сообщение технически является reply на первое сообщение топика
# Проверяем, что это реальный reply на сообщение пользователя, а не просто сообщение в топике
if message.is_topic_message and message.reply_to_message.message_id == message.message_thread_id:
logger.info(f"[KARMA] Пропуск - это сообщение в топике (не reply на пользователя)")
return
# Проверяем наличие благодарственных слов
if not contains_thank_word(message.text):
logger.info(f"[KARMA] Нет слов благодарности в: {message.text[:50]}")
return
logger.info(f"[KARMA] Обнаружена благодарность от {message.from_user.id}: {message.text[:50]}")
# Проверяем, что в сообщении нет мата (не начисляем карму за мат)
if contains_bad_word(message.text):
logger.info(f"Пользователь {message.from_user.id} написал благодарность с матом - карма не начислена")
return
from_user = message.from_user
to_user = message.reply_to_message.from_user
chat_id = message.chat.id
# Защита от самоблагодарности
if from_user.id == to_user.id:
logger.info(f"Пользователь {from_user.id} попытался поблагодарить сам себя")
return
# Проверяем, не является ли благодарность ботам
if to_user.is_bot:
logger.info(f"Пользователь {from_user.id} попытался поблагодарить бота")
return
# Атомарно проверяем кулдаун и записываем благодарность
# Это предотвращает race condition при параллельных запросах
if not db.try_add_karma_thank(from_user.id, to_user.id, chat_id, THANK_COOLDOWN):
logger.info(f"Пользователь {from_user.id} уже благодарил {to_user.id} недавно")
# Молча игнорируем, чтобы не спамить
return
# Определяем количество кармы: x2 если есть восклицательный знак
karma_amount = 2 if '!' in message.text else 1
# Начисляем карму (благодарность уже записана атомарно выше)
db.add_karma(to_user.id, chat_id, karma_amount)
# Получаем новую карму пользователя
new_karma = db.get_karma(to_user.id, chat_id)
# Формируем имя пользователя для отображения
to_user_name = to_user.first_name
if to_user.username:
to_user_display = f"@{to_user.username}"
else:
to_user_display = to_user_name
# Отправляем уведомление с указанием количества кармы
karma_emoji = "👍👍" if karma_amount == 2 else "👍"
karma_change = f"+{karma_amount}"
response = f"{karma_emoji} Карма пользователя {to_user_display} увеличена ({karma_change})! Текущая карма: {new_karma}"
try:
sent_message = await bot.reply_to(message, response)
logger.info(f"Пользователь {from_user.id} поблагодарил {to_user.id}, карма: {new_karma}")
# Удаляем уведомление через 25 секунд В ФОНЕ
async def delete_thank_notification():
try:
await asyncio.sleep(25)
await bot.delete_message(chat_id, sent_message.message_id)
except Exception as e:
logger.error(f"Не удалось удалить уведомление о карме: {e}")
# Запускаем удаление в фоне
asyncio.create_task(delete_thank_notification())
except Exception as e:
logger.error(f"Ошибка отправки уведомления о благодарности: {e}", exc_info=True)
except Exception as e:
logger.error(f"Ошибка при обработке благодарности: {e}", exc_info=True)

217
src/modules/auto_mute.py Normal file
View File

@@ -0,0 +1,217 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message, ChatPermissions
import asyncio
import logging
import time
from database import db
from bad_words import contains_bad_word, get_bad_words_from_text
from action_reporter import action_reporter
from utils import delete_messages, format_mute_time
from config import VIOLATIONS_PERIOD
# Получаем логгер для текущего модуля
logger = logging.getLogger(__name__)
# Система прогрессирующих мутов (в секундах)
# Более плавная прогрессия для накопительного эффекта
MUTE_LEVELS = [
300, # 1. 5 минут (первое нарушение - символический мут)
900, # 2. 15 минут
1800, # 3. 30 минут
3600, # 4. 1 час
7200, # 5. 2 часа
14400, # 6. 4 часа
28800, # 7. 8 часов
43200, # 8. 12 часов
86400, # 9. 1 день
172800, # 10. 2 дня
259200, # 11. 3 дня
432000, # 12. 5 дней
604800, # 13. 7 дней
None, # 14. Перманентный мут (режим только чтения навсегда)
]
def get_mute_duration(violations_count: int) -> int:
"""
Определяет длительность мута на основе количества нарушений.
Args:
violations_count: Количество нарушений пользователя
Returns:
Длительность мута в секундах (или None для перманентного мута)
"""
if violations_count < 1:
return MUTE_LEVELS[0]
# Индекс уровня мута (количество нарушений - 1, т.к. начинаем с 0)
level_index = violations_count - 1
# Если превысили количество уровней, возвращаем перманентный мут
if level_index >= len(MUTE_LEVELS):
return None
return MUTE_LEVELS[level_index]
async def apply_mute(bot: AsyncTeleBot, message: Message, user_id: int, duration: int, violations_count: int, bad_words_found: list = None):
"""
Применяет мут к пользователю.
Args:
bot: Экземпляр бота
message: Сообщение, которое вызвало мут
user_id: ID пользователя
duration: Длительность мута в секундах (None для перманентного)
violations_count: Количество нарушений
bad_words_found: Список найденных плохих слов
"""
try:
# Устанавливаем ограничения (только чтение)
permissions = ChatPermissions(
can_send_messages=False,
can_send_media_messages=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False,
)
# Вычисляем время окончания мута
until_date = None if duration is None else int(time.time()) + duration
# Выполняем мут
await bot.restrict_chat_member(
chat_id=message.chat.id,
user_id=user_id,
permissions=permissions,
until_date=until_date
)
# Снимаем карму за автомут
db.add_karma(user_id, message.chat.id, -10)
logger.info(f"Снято 10 кармы пользователю {user_id} за автомут")
# Удаляем сообщение с матом
try:
await bot.delete_message(chat_id=message.chat.id, message_id=message.message_id)
except Exception as e:
logger.warning(f"Не удалось удалить сообщение: {e}")
# Формируем информацию о найденных словах
words_info = ""
if bad_words_found:
words_list = ", ".join([f"«{word}»" for word in bad_words_found])
words_info = f"Найдено слов: {words_list}"
else:
words_info = "Использование нецензурной лексики"
# Формируем сообщение о муте
if duration is None:
time_display = "навсегда"
warning_msg = (
f"⛔️ Пользователь <b>{message.from_user.first_name}</b> получил перманентный мут "
f"за злостное нарушение правил чата (использование нецензурной лексики).\n\n"
f"📊 Количество нарушений: <b>{violations_count}</b>\n"
f"🔒 Режим: только чтение (навсегда)"
)
else:
time_display = format_mute_time(duration)
warning_msg = (
f"⚠️ Пользователь <b>{message.from_user.first_name}</b> получил мут на <b>{time_display}</b> "
f"за использование нецензурной лексики.\n\n"
f"📊 Нарушение #{violations_count}\n"
f"💡 При повторных нарушениях время мута будет увеличиваться."
)
# Отправляем сообщение в чат
await bot.send_message(
chat_id=message.chat.id,
text=warning_msg,
message_thread_id=message.message_thread_id,
)
# Формируем причину с полным текстом сообщения
reason_text = f"{words_info} (нарушение #{violations_count})\n\n📝 <b>Текст сообщения:</b>\n<code>{message.text}</code>"
# Отправляем сообщение-лог в админ-чат
await action_reporter.log_action(
action="АВТОМУТ",
user_id=user_id,
admin_id=None, # Автоматическое действие
reason=reason_text,
duration=time_display,
)
# Записываем действие в логи
logger.info(
f"Пользователь {user_id} получил автоматический мут на {time_display} "
f"за нецензурную лексику (нарушение #{violations_count})"
)
except Exception as e:
logger.error(f"Ошибка при применении мута: {e}")
async def check_message_for_profanity(bot: AsyncTeleBot, message: Message):
"""
Проверяет сообщение на наличие бранных слов и применяет мут при необходимости.
Args:
bot: Экземпляр бота
message: Сообщение для проверки
"""
# Проверяем только текстовые сообщения
if not message.text:
return
# Не проверяем команды
if message.text.startswith('/'):
return
# Проверяем, содержит ли сообщение бранные слова
if not contains_bad_word(message.text):
return
# Получаем список найденных плохих слов
bad_words_found = get_bad_words_from_text(message.text)
# Получаем ID пользователя и чата
user_id = message.from_user.id
chat_id = message.chat.id
# Проверяем, является ли отправитель администратором
try:
chat_member = await bot.get_chat_member(chat_id, user_id)
if chat_member.status in ['administrator', 'creator']:
logger.info(f"Администратор {user_id} использовал нецензурную лексику, мут не применен")
return
except Exception as e:
logger.error(f"Ошибка проверки статуса пользователя: {e}")
return
# Добавляем нарушение в базу данных
db.add_violation(user_id, chat_id, violation_type='bad_language')
# Получаем количество нарушений за последний месяц
violations_count = db.get_violations_count(user_id, chat_id, VIOLATIONS_PERIOD)
# Определяем длительность мута
mute_duration = get_mute_duration(violations_count)
# Применяем мут
await apply_mute(bot, message, user_id, mute_duration, violations_count, bad_words_found)
def register_handlers(bot: AsyncTeleBot):
"""
Регистрирует обработчики для автоматического мута.
ПРИМЕЧАНИЕ: Фактическая проверка мата происходит в middleware (main.py),
а не в обработчике. Это позволяет проверять все сообщения ДО того, как
они попадут к другим обработчикам (например, karma_tracker).
Этот метод оставлен для совместимости с архитектурой модулей.
"""
logger.info("Модуль автоматического мута успешно загружен (проверка в middleware)")

View File

@@ -0,0 +1,237 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message
import logging
from bad_words import (
get_bad_words,
get_exceptions,
save_bad_words,
reload_words
)
from utils import check_admin_status, delete_messages
from config import COMMAND_MESSAGES
logger = logging.getLogger(__name__)
def register_handlers(bot: AsyncTeleBot):
"""Регистрирует обработчики команд управления бранными словами"""
@bot.message_handler(commands=['badwords'])
async def badwords_command(message: Message):
"""Главная команда управления списком бранных слов"""
logger.info(f"Команда /badwords получена от пользователя {message.from_user.id}")
# Проверяем права администратора (без проверки прав на ограничение участников)
admin_check = await check_admin_status(bot, message, check_restrict_rights=False)
if admin_check == 1:
logger.info(f"Пользователь {message.from_user.id} не является администратором")
return
logger.info(f"Пользователь {message.from_user.id} прошел проверку прав администратора")
parts = message.text.split(maxsplit=2)
# /badwords без параметров - показываем help
if len(parts) == 1:
await show_help(bot, message)
return
subcommand = parts[1].lower()
# Обработка подкоманд
if subcommand == 'help':
await show_help(bot, message)
elif subcommand == 'list':
await list_bad_words(bot, message)
elif subcommand == 'count':
await count_words(bot, message)
elif subcommand == 'add':
if len(parts) < 3:
await send_temp_message(bot, message, "❌ Укажите слово для добавления: /badwords add <слово>")
else:
await add_bad_word(bot, message, parts[2])
elif subcommand == 'remove':
if len(parts) < 3:
await send_temp_message(bot, message, "❌ Укажите слово для удаления: /badwords remove <слово>")
else:
await remove_bad_word(bot, message, parts[2])
elif subcommand == 'exceptions':
await list_exceptions(bot, message)
elif subcommand == 'add_exception':
if len(parts) < 3:
await send_temp_message(bot, message, "❌ Укажите слово для добавления в исключения: /badwords add_exception <слово>")
else:
await add_exception(bot, message, parts[2])
elif subcommand == 'remove_exception':
if len(parts) < 3:
await send_temp_message(bot, message, "❌ Укажите слово для удаления из исключений: /badwords remove_exception <слово>")
else:
await remove_exception(bot, message, parts[2])
elif subcommand == 'reload':
await reload_wordlist(bot, message)
else:
await send_temp_message(bot, message, f"❌ Неизвестная команда: {subcommand}\n\nИспользуйте /badwords help")
async def show_help(bot: AsyncTeleBot, message: Message):
"""Показывает справку по командам управления бранными словами"""
help_text = COMMAND_MESSAGES['badwords_help']
await bot.send_message(
chat_id=message.chat.id,
text=help_text,
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=60, number_message=2)
async def list_bad_words(bot: AsyncTeleBot, message: Message):
"""Показывает список бранных слов (первые 50)"""
words = get_bad_words()
if not words:
text = "📝 Список бранных слов пуст."
else:
# Показываем только первые 50 слов
display_words = words[:50]
text = f"📝 <b>Бранные слова</b> (всего: {len(words)})\n\n"
text += ", ".join([f"<code>{word}</code>" for word in display_words])
if len(words) > 50:
text += f"\n\n<i>...и ещё {len(words) - 50} слов</i>"
await bot.send_message(
chat_id=message.chat.id,
text=text,
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=30, number_message=2)
async def count_words(bot: AsyncTeleBot, message: Message):
"""Показывает статистику по спискам"""
words = get_bad_words()
exceptions = get_exceptions()
text = (
f"📊 <b>Статистика списков</b>\n\n"
f"🚫 Бранных слов: <b>{len(words)}</b>\n"
f"✅ Исключений: <b>{len(exceptions)}</b>"
)
await bot.send_message(
chat_id=message.chat.id,
text=text,
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=15, number_message=2)
async def add_bad_word(bot: AsyncTeleBot, message: Message, word: str):
"""Добавляет слово в список бранных"""
word = word.lower().strip()
words = get_bad_words()
exceptions = get_exceptions()
if word in words:
await send_temp_message(bot, message, f"⚠️ Слово '<code>{word}</code>' уже есть в списке.")
return
words.append(word)
if save_bad_words(words, exceptions):
reload_words() # Перезагружаем кэш
await send_temp_message(bot, message, f"✅ Слово '<code>{word}</code>' добавлено в список бранных.")
logger.info(f"Администратор {message.from_user.id} добавил бранное слово: {word}")
else:
await send_temp_message(bot, message, "❌ Ошибка при сохранении файла.")
async def remove_bad_word(bot: AsyncTeleBot, message: Message, word: str):
"""Удаляет слово из списка бранных"""
word = word.lower().strip()
words = get_bad_words()
exceptions = get_exceptions()
if word not in words:
await send_temp_message(bot, message, f"⚠️ Слово '<code>{word}</code>' не найдено в списке.")
return
words.remove(word)
if save_bad_words(words, exceptions):
reload_words() # Перезагружаем кэш
await send_temp_message(bot, message, f"✅ Слово '<code>{word}</code>' удалено из списка бранных.")
logger.info(f"Администратор {message.from_user.id} удалил бранное слово: {word}")
else:
await send_temp_message(bot, message, "❌ Ошибка при сохранении файла.")
async def list_exceptions(bot: AsyncTeleBot, message: Message):
"""Показывает список исключений"""
exceptions = get_exceptions()
if not exceptions:
text = "📝 Список исключений пуст."
else:
text = f"📝 <b>Исключения</b> (всего: {len(exceptions)})\n\n"
text += ", ".join([f"<code>{word}</code>" for word in exceptions])
await bot.send_message(
chat_id=message.chat.id,
text=text,
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=30, number_message=2)
async def add_exception(bot: AsyncTeleBot, message: Message, word: str):
"""Добавляет слово в список исключений"""
word = word.lower().strip()
words = get_bad_words()
exceptions = get_exceptions()
if word in exceptions:
await send_temp_message(bot, message, f"⚠️ Слово '<code>{word}</code>' уже есть в исключениях.")
return
exceptions.append(word)
if save_bad_words(words, exceptions):
reload_words() # Перезагружаем кэш
await send_temp_message(bot, message, f"✅ Слово '<code>{word}</code>' добавлено в исключения.")
logger.info(f"Администратор {message.from_user.id} добавил исключение: {word}")
else:
await send_temp_message(bot, message, "❌ Ошибка при сохранении файла.")
async def remove_exception(bot: AsyncTeleBot, message: Message, word: str):
"""Удаляет слово из списка исключений"""
word = word.lower().strip()
words = get_bad_words()
exceptions = get_exceptions()
if word not in exceptions:
await send_temp_message(bot, message, f"⚠️ Слово '<code>{word}</code>' не найдено в исключениях.")
return
exceptions.remove(word)
if save_bad_words(words, exceptions):
reload_words() # Перезагружаем кэш
await send_temp_message(bot, message, f"✅ Слово '<code>{word}</code>' удалено из исключений.")
logger.info(f"Администратор {message.from_user.id} удалил исключение: {word}")
else:
await send_temp_message(bot, message, "❌ Ошибка при сохранении файла.")
async def reload_wordlist(bot: AsyncTeleBot, message: Message):
"""Перезагружает списки слов из файла"""
words, exceptions = reload_words()
text = (
f"🔄 <b>Списки перезагружены</b>\n\n"
f"🚫 Бранных слов: <b>{len(words)}</b>\n"
f"✅ Исключений: <b>{len(exceptions)}</b>"
)
await send_temp_message(bot, message, text)
logger.info(f"Администратор {message.from_user.id} перезагрузил списки слов")
async def send_temp_message(bot: AsyncTeleBot, message: Message, text: str, time_sleep: int = 10):
"""Отправляет временное сообщение, которое удаляется через указанное время"""
await bot.send_message(
chat_id=message.chat.id,
text=text,
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=time_sleep, number_message=2)

View File

@@ -62,7 +62,7 @@ async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = Non
if message.is_topic_message: if message.is_topic_message:
# Если без ответа на сообщение # Если без ответа на сообщение
if message.message_thread_id == message.reply_to_message.message_id: if not message.reply_to_message or message.message_thread_id == message.reply_to_message.message_id:
# Удаляем сообщение через 3 секунды # Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1) await delete_messages(bot, message, time_sleep=3, number_message=1)
@@ -76,7 +76,7 @@ async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = Non
reason = 'отсутствует' reason = 'отсутствует'
# Если это General (обычный чат) # Если это General (обычный чат)
elif message.reply_to_message and message.is_topic_message is None: elif message.reply_to_message and not message.is_topic_message:
# Собираем данные # Собираем данные
target_user = message.reply_to_message.from_user target_user = message.reply_to_message.from_user
@@ -141,7 +141,7 @@ async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = Non
if message.is_topic_message: if message.is_topic_message:
# Если без ответа на сообщение # Если без ответа на сообщение
if message.message_thread_id == message.reply_to_message.message_id: if not message.reply_to_message or message.message_thread_id == message.reply_to_message.message_id:
# Удаляем сообщение через 3 секунды # Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1) await delete_messages(bot, message, time_sleep=3, number_message=1)
@@ -155,7 +155,7 @@ async def ban_command(bot: AsyncTeleBot, message: Message, photo_path: str = Non
reason = ' '.join(parts_msg[1:]) reason = ' '.join(parts_msg[1:])
# Если это General (обычный чат) # Если это General (обычный чат)
elif message.reply_to_message and message.is_topic_message is None: elif message.reply_to_message and not message.is_topic_message:
# Собираем данные # Собираем данные
target_user = message.reply_to_message.from_user target_user = message.reply_to_message.from_user

View File

@@ -4,16 +4,13 @@ import asyncio
import logging import logging
import os import os
from utils import delete_messages from utils import delete_messages, check_admin_status
from config import COMMAND_MESSAGES, DATABASE_NAME, LOG_FILE_NAME from config import COMMAND_MESSAGES, DATABASE_NAME, LOG_FILE_NAME
# Получаем логгер для текущего модуля # Получаем логгер для текущего модуля
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Загружаем id администраторов из .env
ADMIN_IDS = [int(id_str.strip()) for id_str in os.getenv('ADMIN_IDS').split(',')]
# Регистрирует все обработчики команд # Регистрирует все обработчики команд
def register_handlers(bot: AsyncTeleBot): def register_handlers(bot: AsyncTeleBot):
@@ -23,36 +20,28 @@ def register_handlers(bot: AsyncTeleBot):
try: try:
# Если id администратора совпадает # Проверяем права администратора (без проверки прав на ограничение участников)
if message.from_user.id in ADMIN_IDS: admin_check = await check_admin_status(bot, message, check_restrict_rights=False)
if admin_check == 1:
logger.info(f"Пользователь {message.from_user.id} не является администратором")
return
# Отправляем базу данных # Если проверка пройдена
await bot.send_document(
chat_id=message.chat.id,
document=open(DATABASE_NAME, 'rb')
)
# Отправляем файл с логами # Отправляем базу данных
await bot.send_document( await bot.send_document(
chat_id=message.chat.id, chat_id=message.chat.id,
document=open(LOG_FILE_NAME, 'rb') document=open(DATABASE_NAME, 'rb')
) )
# Записываем действие в логи # Отправляем файл с логами
logger.info(f"Администратор {message.from_user.id} запустил /botdata.") await bot.send_document(
chat_id=message.chat.id,
document=open(LOG_FILE_NAME, 'rb')
)
# Если id администратора не совпадает # Записываем действие в логи
else: logger.info(f"Администратор {message.from_user.id} запустил /botdata.")
# Отправляем предупреждение
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['no_admin_rights'],
message_thread_id=message.message_thread_id,
)
# Записываем действие в логи
logger.info(f"Пользователь {message.from_user.id} запустил /botdata.")
except Exception as e: except Exception as e:

View File

@@ -0,0 +1,155 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message
import asyncio
import logging
from database import db
logger = logging.getLogger(__name__)
async def _delete_message_delayed(bot: AsyncTeleBot, chat_id: int, message_id: int, delay: int):
"""Удаляет сообщение с задержкой"""
try:
await asyncio.sleep(delay)
await bot.delete_message(chat_id, message_id)
except Exception as e:
logger.error(f"Не удалось удалить сообщение {message_id}: {e}")
def register_handlers(bot: AsyncTeleBot):
"""Регистрирует обработчики команд для системы кармы"""
@bot.message_handler(commands=['karma', 'rating'])
async def handle_karma_command(message: Message):
"""
Команда /karma - показывает карму пользователя
Использование:
/karma - показать свою карму
/karma @username - показать карму пользователя
/karma (в ответ на сообщение) - показать карму автора сообщения
"""
try:
# Проверяем, что это групповой чат
if message.chat.type not in ['group', 'supergroup']:
await bot.reply_to(message, "❌ Эта команда работает только в групповых чатах")
return
chat_id = message.chat.id
target_user = None
target_user_id = None
# Если команда - ответ на сообщение
if message.reply_to_message:
target_user = message.reply_to_message.from_user
target_user_id = target_user.id
# Если указан username в команде
elif len(message.text.split()) > 1:
username_arg = message.text.split()[1]
# Убираем @ если есть
username = username_arg.lstrip('@')
# Ищем пользователя в БД
user_data = db.get_user_by_username(username)
if user_data:
target_user_id = user_data[0]
logger.info(f"[KARMA CMD] Найден пользователь по username '{username}': id={user_data[0]}, nickname={user_data[1]}, tag={user_data[2]}")
target_user = type('User', (), {
'id': user_data[0],
'first_name': user_data[1],
'username': user_data[2]
})()
else:
await bot.reply_to(message, f"❌ Пользователь @{username} не найден в базе данных")
return
# Иначе показываем карму отправителя команды
else:
target_user = message.from_user
target_user_id = target_user.id
# Получаем карму
karma = db.get_karma(target_user_id, chat_id)
# Формируем имя пользователя
if hasattr(target_user, 'username') and target_user.username:
user_display = f"@{target_user.username}"
else:
user_display = target_user.first_name
logger.info(f"[KARMA CMD] Показываем карму: user_id={target_user_id}, username={getattr(target_user, 'username', None)}, display={user_display}, karma={karma}")
# Определяем эмодзи в зависимости от кармы
if karma == 0:
emoji = "😐"
elif karma < 5:
emoji = "🙂"
elif karma < 10:
emoji = "😊"
elif karma < 20:
emoji = "😄"
elif karma < 50:
emoji = "🌟"
else:
emoji = ""
response = f"{emoji} Карма пользователя {user_display}: {karma}"
sent_message = await bot.reply_to(message, response)
# Удаляем команду через 20 секунд и ответ через 60 секунд
asyncio.create_task(_delete_message_delayed(bot, chat_id, message.message_id, 20))
asyncio.create_task(_delete_message_delayed(bot, chat_id, sent_message.message_id, 60))
except Exception as e:
logger.error(f"Ошибка при обработке команды /karma: {e}", exc_info=True)
await bot.reply_to(message, "❌ Произошла ошибка при получении кармы")
@bot.message_handler(commands=['top', 'leaderboard', 'topkarma'])
async def handle_top_command(message: Message):
"""
Команда /top - показывает топ пользователей по карме
"""
try:
# Проверяем, что это групповой чат
if message.chat.type not in ['group', 'supergroup']:
await bot.reply_to(message, "❌ Эта команда работает только в групповых чатах")
return
chat_id = message.chat.id
# Получаем топ 10 пользователей
top_users = db.get_top_karma(chat_id, limit=10)
if not top_users:
await bot.reply_to(message, "📊 В этом чате пока нет пользователей с кармой")
return
# Формируем сообщение
response = "🏆 <b>Топ-10 пользователей по карме:</b>\n\n"
medals = ["🥇", "🥈", "🥉"]
for idx, (user_id, nickname, tag, karma_points) in enumerate(top_users, 1):
# Определяем медаль для топ-3
if idx <= 3:
medal = medals[idx - 1]
else:
medal = f"{idx}."
# Формируем отображение пользователя
if tag:
user_display = f"@{tag}"
else:
user_display = nickname or f"ID: {user_id}"
response += f"{medal} {user_display} — <b>{karma_points}</b> кармы\n"
sent_message = await bot.reply_to(message, response)
# Удаляем команду через 20 секунд и ответ через 60 секунд
asyncio.create_task(_delete_message_delayed(bot, chat_id, message.message_id, 20))
asyncio.create_task(_delete_message_delayed(bot, chat_id, sent_message.message_id, 60))
except Exception as e:
logger.error(f"Ошибка при обработке команды /top: {e}", exc_info=True)
await bot.reply_to(message, "❌ Произошла ошибка при получении топа")

87
src/modules/log.py Normal file
View File

@@ -0,0 +1,87 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message
import logging
import re
from config import COMMAND_MESSAGES
# Получаем логгер для текущего модуля
logger = logging.getLogger(__name__)
# Функция для отправки сообщения с инструкцией по логам
async def send_log_instruction(bot: AsyncTeleBot, message: Message):
try:
# Отправляем сообщение со ссылкой на инструкцию
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['log'],
message_thread_id=message.message_thread_id,
disable_web_page_preview=False,
)
except Exception as e:
# Записываем ошибку в логи
logger.error(f"Ошибка отправки инструкции по логам: {str(e)}")
# Функция проверки наличия триггерных фраз
def contains_log_trigger(text: str) -> bool:
if not text:
return False
# Список триггерных фраз (регистронезависимый поиск)
triggers = [
r'лог\?',
r'приложите\s+лог',
r'приложи\s+лог'
]
text_lower = text.lower()
for trigger in triggers:
if re.search(trigger, text_lower):
return True
return False
# Регистрирует все обработчики команд
def register_handlers(bot: AsyncTeleBot):
# Обработчик команды /log
@bot.message_handler(commands=['log'])
async def log_command(message: Message):
try:
# Отправляем инструкцию
await send_log_instruction(bot, message)
# Записываем действие в логи
logger.info(f"Пользователь {message.from_user.id} запустил /log.")
except Exception as e:
# Отправляем ошибку
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['general_error'],
message_thread_id=message.message_thread_id,
)
# Записываем ошибку в логи
logger.error(f"Общая ошибка в log_command: {str(e)}")
# Автоматический обработчик триггерных фраз
@bot.message_handler(func=lambda message: message.content_type == 'text' and contains_log_trigger(message.text))
async def auto_log_trigger(message: Message):
try:
# Отправляем инструкцию
await send_log_instruction(bot, message)
# Записываем действие в логи
logger.info(f"Автоматически отправлена инструкция по логам в ответ на сообщение от {message.from_user.id}.")
except Exception as e:
# Записываем ошибку в логи
logger.error(f"Ошибка в auto_log_trigger: {str(e)}")

View File

@@ -33,7 +33,7 @@ async def mute_command(bot: AsyncTeleBot, message: Message, photo_path: str = No
# Определяем целевого пользователя # Определяем целевого пользователя
target_user = None target_user = None
# Отпределяем время # Определяем время
time_arg = None time_arg = None
# Определяем причину # Определяем причину
@@ -76,7 +76,7 @@ async def mute_command(bot: AsyncTeleBot, message: Message, photo_path: str = No
if message.is_topic_message: if message.is_topic_message:
# Если без ответа на сообщение # Если без ответа на сообщение
if message.message_thread_id == message.reply_to_message.message_id: if not message.reply_to_message or message.message_thread_id == message.reply_to_message.message_id:
# Удаляем сообщение через 3 секунды # Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1) await delete_messages(bot, message, time_sleep=3, number_message=1)
@@ -91,7 +91,7 @@ async def mute_command(bot: AsyncTeleBot, message: Message, photo_path: str = No
reason = 'отсутствует' reason = 'отсутствует'
# Если это General (обычный чат) # Если это General (обычный чат)
elif message.reply_to_message and message.is_topic_message is None: elif message.reply_to_message and not message.is_topic_message:
# Собираем данные # Собираем данные
target_user = message.reply_to_message.from_user target_user = message.reply_to_message.from_user
@@ -158,7 +158,7 @@ async def mute_command(bot: AsyncTeleBot, message: Message, photo_path: str = No
if message.is_topic_message: if message.is_topic_message:
# Если без ответа на сообщение # Если без ответа на сообщение
if message.message_thread_id == message.reply_to_message.message_id: if not message.reply_to_message or message.message_thread_id == message.reply_to_message.message_id:
# Удаляем сообщение через 3 секунды # Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1) await delete_messages(bot, message, time_sleep=3, number_message=1)
@@ -173,7 +173,7 @@ async def mute_command(bot: AsyncTeleBot, message: Message, photo_path: str = No
reason = ' '.join(parts_msg[2:]) reason = ' '.join(parts_msg[2:])
# Если это General (обычный чат) # Если это General (обычный чат)
elif message.reply_to_message and message.is_topic_message is None: elif message.reply_to_message and not message.is_topic_message:
# Собираем данные # Собираем данные
target_user = message.reply_to_message.from_user target_user = message.reply_to_message.from_user
@@ -233,8 +233,11 @@ async def mute_command(bot: AsyncTeleBot, message: Message, photo_path: str = No
await delete_messages(bot, message, time_sleep=5, number_message=2) await delete_messages(bot, message, time_sleep=5, number_message=2)
return return
# Максимальное время мута - 30 дней (2592000 секунд) # Импортируем максимальное время мута
if mute_seconds > 2592000: from config import MAX_MUTE_TIME
# Максимальное время мута - 30 дней
if mute_seconds > MAX_MUTE_TIME:
# Отправляем предупреждение # Отправляем предупреждение
await bot.send_message( await bot.send_message(
@@ -276,6 +279,10 @@ async def mute_command(bot: AsyncTeleBot, message: Message, photo_path: str = No
until_date=until_date until_date=until_date
) )
# Снимаем карму за мут
db.add_karma(target_user.id, message.chat.id, -10)
logger.info(f"Снято 10 кармы пользователю {target_user.id} за мут")
# Форматируем время в удобный формат # Форматируем время в удобный формат
time_display = format_mute_time(mute_seconds) time_display = format_mute_time(mute_seconds)

View File

@@ -0,0 +1,127 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message
import logging
from database import db
from utils import check_admin_status, delete_messages
from config import COMMAND_MESSAGES
logger = logging.getLogger(__name__)
def register_handlers(bot: AsyncTeleBot):
"""Регистрирует обработчик команды сброса нарушений"""
@bot.message_handler(commands=['reset_violations'])
async def reset_violations_command(message: Message):
"""Команда для сброса счётчика нарушений пользователя"""
logger.info(f"Команда /reset_violations получена от пользователя {message.from_user.id}")
# Проверяем права администратора (без проверки can_restrict_members)
admin_check = await check_admin_status(bot, message, check_restrict_rights=False)
if admin_check == 1:
logger.info(f"Пользователь {message.from_user.id} не является администратором")
return
logger.info(f"Пользователь {message.from_user.id} прошел проверку прав администратора")
# Определяем целевого пользователя
target_user = None
target_user_id = None
# Парсим команду
parts = message.text.split(maxsplit=1)
# Если есть аргументы в команде (username или ID) - используем их
if len(parts) >= 2:
identifier = parts[1].strip()
# Попытка получить по username
if identifier.startswith('@'):
username = identifier[1:]
logger.info(f"Поиск пользователя по username: {username}")
user_data = db.get_user_by_username(username)
if user_data:
target_user_id = user_data[0]
logger.info(f"Найден пользователь в БД: id={user_data[0]}, nickname={user_data[1]}, tag={user_data[2]}")
else:
logger.warning(f"Пользователь с username {username} не найден в БД")
await send_temp_message(
bot,
message,
COMMAND_MESSAGES['user_not_found']
)
return
# Попытка получить по ID
else:
try:
target_user_id = int(identifier)
logger.info(f"Использован ID пользователя: {target_user_id}")
except ValueError:
await send_temp_message(
bot,
message,
COMMAND_MESSAGES['user_not_found']
)
return
# Если аргументов нет - пробуем использовать reply_to_message
elif message.reply_to_message:
target_user = message.reply_to_message.from_user
target_user_id = target_user.id
logger.info(f"Использован пользователь из reply_to_message: id={target_user_id}")
# Если ни аргументов, ни reply нет - показываем help
else:
await send_temp_message(
bot,
message,
COMMAND_MESSAGES['reset_violations_help']
)
return
# Проверяем, что нашли пользователя
if not target_user_id:
await send_temp_message(
bot,
message,
COMMAND_MESSAGES['user_not_found']
)
return
# Получаем информацию о пользователе из базы
user_info = db.get_user(target_user_id)
# Получаем текущее количество нарушений
violations_count = db.get_violations_count(target_user_id, message.chat.id)
# Сбрасываем нарушения
deleted_count = db.reset_user_violations(target_user_id, message.chat.id)
# Формируем сообщение
if user_info:
_, nickname, tag = user_info
user_display = f"<b>{nickname}</b>"
if tag:
user_display += f" (@{tag})"
else:
user_display = f"<code>{target_user_id}</code>"
response = (
f"✅ Счётчик нарушений сброшен\n\n"
f"👤 Пользователь: {user_display}\n"
f"📊 Удалено нарушений: <b>{deleted_count}</b>"
)
await send_temp_message(bot, message, response, time_sleep=30)
logger.info(f"Администратор {message.from_user.id} сбросил счётчик нарушений пользователя {target_user_id}")
async def send_temp_message(bot: AsyncTeleBot, message: Message, text: str, time_sleep: int = 10):
"""Отправляет временное сообщение, которое удаляется через указанное время"""
await bot.send_message(
chat_id=message.chat.id,
text=text,
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=time_sleep, number_message=2)

198
src/modules/setkarma.py Normal file
View File

@@ -0,0 +1,198 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message, User
import logging
from database import db
from action_reporter import action_reporter
from utils import delete_messages, check_admin_status
from config import COMMAND_MESSAGES
# Получаем логгер для текущего модуля
logger = logging.getLogger(__name__)
# Регистрирует обработчик команды
def register_handlers(bot: AsyncTeleBot):
# Обработчик команды /setkarma
@bot.message_handler(commands=['setkarma'])
async def _setkarma_command_wrapper(message: Message):
await setkarma_command(bot, message)
# Основная функция команды /setkarma
async def setkarma_command(bot: AsyncTeleBot, message: Message):
"""Устанавливает карму пользователя в указанное значение"""
# Определяем целевого пользователя
target_user = None
# Определяем новое значение кармы
new_karma_value = None
# Разбиваем текст сообщения на части
parts_msg = message.text.split()
# Команда /setkarma help
if len(parts_msg) == 2 and parts_msg[1].strip() in ('help', 'помощь'):
# Отправляем инструкцию
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['setkarma_help'],
message_thread_id=message.message_thread_id,
)
# Удаляем сообщения через 30 секунд
await delete_messages(bot, message, time_sleep=30, number_message=2)
return
try:
# Проверяем, является ли отправитель администратором
if await check_admin_status(bot, message) == 1:
return
# Проверяем, что это групповой чат
if message.chat.type not in ['group', 'supergroup']:
await bot.send_message(
chat_id=message.chat.id,
text="❌ Эта команда работает только в групповых чатах.",
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=5, number_message=2)
return
# Если недостаточно аргументов
if len(parts_msg) < 2:
await delete_messages(bot, message, time_sleep=3, number_message=1)
return
# Команда через ответ на сообщение: /setkarma 100
if message.reply_to_message and (not message.is_topic_message or message.message_thread_id != message.reply_to_message.message_id):
if len(parts_msg) >= 2:
target_user = message.reply_to_message.from_user
try:
new_karma_value = int(parts_msg[1])
except ValueError:
await bot.send_message(
chat_id=message.chat.id,
text="❌ Неверный формат кармы. Укажите целое число.",
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=5, number_message=2)
return
# Команда с указанием пользователя: /setkarma @username 100 или /setkarma 123456789 100
elif len(parts_msg) >= 3 and (parts_msg[1].strip().isdigit() or parts_msg[1].startswith('@')):
identifier = parts_msg[1].strip()
try:
new_karma_value = int(parts_msg[2])
except ValueError:
await bot.send_message(
chat_id=message.chat.id,
text="❌ Неверный формат кармы. Укажите целое число.",
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=5, number_message=2)
return
# Поиск по ID
if identifier.isdigit():
user_info = db.get_user(int(identifier))
if user_info:
target_user = User(
id=user_info[0],
first_name=user_info[1],
username=user_info[2],
is_bot=False
)
# Поиск по тегу
elif identifier.startswith('@'):
user_info = db.get_user_by_username(identifier[1:])
if user_info:
target_user = User(
id=user_info[0],
first_name=user_info[1],
username=user_info[2],
is_bot=False
)
# Если команда неправильная
else:
await delete_messages(bot, message, time_sleep=3, number_message=1)
return
# Если пользователь не найден
if not target_user:
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['user_not_found'],
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=5, number_message=2)
return
# Проверяем, не пытается ли установить карму себе
if message.from_user.id == target_user.id:
await bot.send_message(
chat_id=message.chat.id,
text="❌ Нельзя устанавливать карму самому себе.",
message_thread_id=message.message_thread_id,
)
await delete_messages(bot, message, time_sleep=5, number_message=2)
return
# Получаем текущую карму
old_karma = db.get_karma(target_user.id, message.chat.id)
# Устанавливаем новую карму
db.set_karma(target_user.id, message.chat.id, new_karma_value)
# Формируем имя пользователя для отображения
target_user_display = f"@{target_user.username}" if target_user.username else target_user.first_name
# Вычисляем разницу
karma_diff = new_karma_value - old_karma
diff_sign = "+" if karma_diff > 0 else ""
# Отправляем сообщение-лог в админ-чат
await action_reporter.log_action(
action="УСТАНОВКА КАРМЫ",
user_id=target_user.id,
admin_id=message.from_user.id,
reason=f"Карма изменена: {old_karma}{new_karma_value} ({diff_sign}{karma_diff})",
duration=None,
)
# Отправляем сообщение в чат
response = (
f"✅ Карма пользователя {target_user_display} установлена на <b>{new_karma_value}</b>\n"
f"Было: {old_karma} → Стало: {new_karma_value} ({diff_sign}{karma_diff})"
)
await bot.send_message(
chat_id=message.chat.id,
text=response,
message_thread_id=message.message_thread_id,
)
# Записываем действие в логи
logger.info(
f"Администратор {message.from_user.id} установил карму пользователя {target_user.id} "
f"на {new_karma_value} (было {old_karma})"
)
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, time_sleep=5, number_message=2)
except Exception as e:
# Отправляем ошибку
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['general_error'],
message_thread_id=message.message_thread_id,
)
# Записываем ошибку в логи
logger.error(f"Общая ошибка в setkarma_command: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, time_sleep=5, number_message=2)

View File

@@ -69,7 +69,7 @@ def register_handlers(bot: AsyncTeleBot):
target_user = message.reply_to_message.from_user target_user = message.reply_to_message.from_user
# Если это General (обычный чат) # Если это General (обычный чат)
elif message.reply_to_message and message.is_topic_message is None: elif message.reply_to_message and not message.is_topic_message:
# Собираем данные # Собираем данные
target_user = message.reply_to_message.from_user target_user = message.reply_to_message.from_user

View File

@@ -69,7 +69,7 @@ def register_handlers(bot: AsyncTeleBot):
target_user = message.reply_to_message.from_user target_user = message.reply_to_message.from_user
# Если это General (обычный чат) # Если это General (обычный чат)
elif message.reply_to_message and message.is_topic_message is None: elif message.reply_to_message and not message.is_topic_message:
# Собираем данные # Собираем данные
target_user = message.reply_to_message.from_user target_user = message.reply_to_message.from_user

248
src/modules/warn.py Normal file
View File

@@ -0,0 +1,248 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import Message, User, ChatPermissions
import logging
import time
from database import db
from action_reporter import action_reporter
from utils import (
delete_messages,
check_admin_status,
check_target_status,
)
from config import COMMAND_MESSAGES
# Получаем логгер для текущего модуля
logger = logging.getLogger(__name__)
# Регистрирует все обработчики команд
def register_handlers(bot: AsyncTeleBot):
# Обработчик команды /warn
@bot.message_handler(commands=['warn'])
async def _warn_command_wrapper(message: Message):
await warn_command(bot, message)
# Основная функция команды /warn
async def warn_command(bot: AsyncTeleBot, message: Message):
# Определяем целевого пользователя
target_user = None
# Определяем причину
reason = None
# Разбиваем текст сообщения на части
parts_msg = message.text.split()
# Команда /warn help
if len(parts_msg) == 2 and parts_msg[1].strip() in ('help', 'помощь'):
# Отправляем инструкцию
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['manual_warn'],
message_thread_id=message.message_thread_id,
)
# Удаляем сообщения через 30 секунд
await delete_messages(bot, message, time_sleep=30, number_message=2)
return
try:
# Проверяем, является ли отправитель администратором с правом ограничения
if await check_admin_status(bot, message) == 1:
return
# Если одно слово (/warn)
if len(parts_msg) == 1:
# Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1)
return
# Команда через ответ на сообщение, если два или более слов (/warn причина)
if message.reply_to_message and (not message.is_topic_message or message.message_thread_id != message.reply_to_message.message_id):
# Собираем данные
target_user = message.reply_to_message.from_user
reason = ' '.join(parts_msg[1:]) if len(parts_msg) > 1 else 'отсутствует'
# Если второе слово это тег или ID
elif len(parts_msg) >= 2 and (parts_msg[1].strip().isdigit() or parts_msg[1].startswith('@')):
# Собираем данные
identifier = parts_msg[1].strip()
reason = ' '.join(parts_msg[2:]) if len(parts_msg) > 2 else 'отсутствует'
# Делаем поиск по ID
if identifier.isdigit():
# Ищем пользователя в базе данных
user_info = db.get_user(int(identifier))
# Если нашли пользователя
if user_info:
# Создаем объект пользователя
target_user = User(
id=user_info[0],
first_name=user_info[1],
username=user_info[2],
is_bot=False
)
# Делаем поиск по тегу
elif identifier.startswith('@'):
# Ищем пользователя в базе данных (убрали @)
user_info = db.get_user_by_username(identifier[1:])
# Если нашли пользователя
if user_info:
# Создаем объект пользователя
target_user = User(
id=user_info[0],
first_name=user_info[1],
username=user_info[2],
is_bot=False
)
# Если команда неправильная
else:
# Удаляем сообщение через 3 секунды
await delete_messages(bot, message, time_sleep=3, number_message=1)
return
# Если пользователь не найден в базе данных
if not target_user:
# Отправляем предупреждение
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['user_not_found'],
message_thread_id=message.message_thread_id,
)
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, time_sleep=5, number_message=2)
return
# Проверяем статус целевого пользователя
if await check_target_status(bot, message, target_user) == 1:
return
# Добавляем предупреждение в БД
db.add_warning(
user_id=target_user.id,
chat_id=message.chat.id,
reason=reason,
admin_id=message.from_user.id
)
# Снимаем карму за предупреждение
db.add_karma(target_user.id, message.chat.id, -5)
logger.info(f"Снято 5 кармы пользователю {target_user.id} за предупреждение")
# Импортируем константы времени
from config import ONE_WEEK, TWO_WEEKS
# Проверяем количество предупреждений
warns_week = db.get_warnings_count(target_user.id, message.chat.id, ONE_WEEK)
warns_two_weeks = db.get_warnings_count(target_user.id, message.chat.id, TWO_WEEKS)
logger.info(f"Предупреждений за неделю: {warns_week}, за 2 недели: {warns_two_weeks}")
# Определяем, нужно ли применять мут
mute_applied = False
mute_duration = 0
mute_duration_text = ""
response_message = COMMAND_MESSAGES['warned']
# Если это уже 2+ предупреждение за неделю -> мут на неделю
if warns_week >= 2:
mute_duration = 604800 # 7 дней
mute_duration_text = "7 дней"
response_message = COMMAND_MESSAGES['warned_auto_mute_week']
mute_applied = True
logger.info(f"Применен мут на неделю (предупреждений за неделю: {warns_week})")
# Если это 2-е предупреждение за 2 недели (но не за неделю) -> мут на сутки
elif warns_two_weeks >= 2:
mute_duration = 86400 # 1 день
mute_duration_text = "1 день"
response_message = COMMAND_MESSAGES['warned_auto_mute_day']
mute_applied = True
logger.info(f"Применен мут на сутки (предупреждений за 2 недели: {warns_two_weeks})")
# Применяем мут если нужно
if mute_applied:
try:
# Вычисляем время окончания мута
until_date = int(time.time()) + mute_duration
# Устанавливаем ограничения (только чтение)
permissions = ChatPermissions(
can_send_messages=False,
can_send_media_messages=False,
can_send_polls=False,
can_send_other_messages=False,
can_add_web_page_previews=False,
can_change_info=False,
can_invite_users=False,
can_pin_messages=False,
)
# Выполняем мут
await bot.restrict_chat_member(
chat_id=message.chat.id,
user_id=target_user.id,
permissions=permissions,
until_date=until_date
)
logger.info(f"Пользователь {target_user.id} получил автомут на {mute_duration_text} после варна")
except Exception as e:
logger.error(f"Ошибка при применении мута после варна: {str(e)}")
# Отправляем сообщение-лог в админ-чат
await action_reporter.log_action(
action="ВАРН" if not mute_applied else f"ВАРН + МУТ ({mute_duration_text})",
user_id=target_user.id,
admin_id=message.from_user.id,
reason=reason,
duration=mute_duration_text if mute_applied else None,
)
# Отправляем сообщение в чат
await bot.send_message(
chat_id=message.chat.id,
text=response_message,
message_thread_id=message.message_thread_id,
)
# Записываем действие в логи
logger.info(f"Администратор {message.from_user.id} выдал предупреждение пользователю {target_user.id}. Причина: {reason}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, time_sleep=5, number_message=2)
except Exception as e:
# Отправляем ошибку
await bot.send_message(
chat_id=message.chat.id,
text=COMMAND_MESSAGES['general_error'],
message_thread_id=message.message_thread_id,
)
# Записываем ошибку в логи
logger.error(f"Общая ошибка в warn_command: {str(e)}")
# Удаляем сообщения через 5 секунд
await delete_messages(bot, message, time_sleep=5, number_message=2)

109
src/thank_words.py Normal file
View File

@@ -0,0 +1,109 @@
import json
import os
import logging
logger = logging.getLogger(__name__)
# Путь к файлу с благодарственными словами
THANK_WORDS_FILE = os.path.join(os.path.dirname(__file__), 'data', 'thank_words.json')
# Кэш для быстрой проверки
_thank_words_cache = None
def _load_thank_words():
"""Загружает список благодарственных слов из файла"""
global _thank_words_cache
try:
with open(THANK_WORDS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
_thank_words_cache = [word.lower() for word in data.get('thank_words', [])]
logger.info(f"Загружено {len(_thank_words_cache)} благодарственных слов")
return _thank_words_cache
except FileNotFoundError:
logger.error(f"Файл {THANK_WORDS_FILE} не найден")
_thank_words_cache = []
return []
except json.JSONDecodeError as e:
logger.error(f"Ошибка разбора JSON: {e}")
_thank_words_cache = []
return []
def get_thank_words():
"""Возвращает список благодарственных слов (с кэшированием)"""
global _thank_words_cache
if _thank_words_cache is None:
_load_thank_words()
return _thank_words_cache
def contains_thank_word(text: str) -> bool:
"""
Проверяет, содержит ли текст благодарственные слова
Args:
text: Текст для проверки
Returns:
True если найдено хотя бы одно благодарственное слово
"""
if not text:
return False
text_lower = text.lower()
thank_words = get_thank_words()
# Разбиваем текст на слова для проверки
words = text_lower.split()
# Проверяем каждое слово и фразы из 2 слов
for i, word in enumerate(words):
# Очищаем от знаков препинания
clean_word = ''.join(c for c in word if c.isalnum())
if clean_word in thank_words:
return True
# Проверяем фразы из двух слов (например, "thank you")
if i < len(words) - 1:
two_word_phrase = clean_word + ' ' + ''.join(c for c in words[i+1] if c.isalnum())
if two_word_phrase in thank_words:
return True
return False
def get_thank_words_from_text(text: str) -> list:
"""
Возвращает список найденных благодарственных слов в тексте
Args:
text: Текст для анализа
Returns:
Список найденных благодарственных слов
"""
if not text:
return []
text_lower = text.lower()
thank_words = get_thank_words()
found_words = []
words = text_lower.split()
for i, word in enumerate(words):
clean_word = ''.join(c for c in word if c.isalnum())
if clean_word in thank_words and clean_word not in found_words:
found_words.append(clean_word)
# Проверяем фразы из двух слов
if i < len(words) - 1:
clean_next = ''.join(c for c in words[i+1] if c.isalnum())
two_word_phrase = clean_word + ' ' + clean_next
if two_word_phrase in thank_words and two_word_phrase not in found_words:
found_words.append(two_word_phrase)
return found_words
def reload_thank_words():
"""Перезагружает список благодарственных слов из файла"""
global _thank_words_cache
_thank_words_cache = None
return _load_thank_words()

View File

@@ -3,33 +3,98 @@ from telebot.types import Message
import asyncio import asyncio
import logging import logging
from config import COMMAND_MESSAGES
# Получаем логгер для текущего модуля # Получаем логгер для текущего модуля
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Удаляет определённое количество сообщения # Удаляет определённое количество сообщения В ФОНЕ (не блокирует обработку других событий)
async def delete_messages(bot: AsyncTeleBot, message: Message, time_sleep: int, number_message: int): async def delete_messages(bot: AsyncTeleBot, message: Message, time_sleep: int, number_message: int):
await asyncio.sleep(time_sleep) async def _delete_task():
for i in range(number_message): try:
await bot.delete_message(message.chat.id, message.message_id+i) await asyncio.sleep(time_sleep)
for i in range(number_message):
try:
await bot.delete_message(message.chat.id, message.message_id+i)
except Exception as e:
logger.debug(f"Не удалось удалить сообщение {message.message_id+i}: {e}")
except Exception as e:
logger.error(f"Ошибка в задаче удаления сообщений: {e}")
# Запускаем удаление в фоне
asyncio.create_task(_delete_task())
# Проверяет, является ли отправитель администратором # Проверяет, является ли отправитель администратором
async def check_admin_status(bot: AsyncTeleBot, message: Message): async def check_admin_status(bot: AsyncTeleBot, message: Message, check_restrict_rights: bool = True):
if message.reply_to_message and message.is_topic_message is None: # Сохраняем оригинальный thread_id для восстановления после проверки
message.message_thread_id = None original_thread_id = message.message_thread_id
# ВАЖНО: Временно сбрасываем thread_id для проверки прав на уровне ВСЕЙ группы
# Иначе Telegram вернет статус в контексте топика, а не группы
message.message_thread_id = None
try: try:
# Логируем параметры проверки
logger.info(f"Проверка прав: chat_id={message.chat.id}, user_id={message.from_user.id}, "
f"is_topic={message.is_topic_message}, original_thread_id={original_thread_id}")
# Получаем статус отправителя # Получаем статус отправителя на уровне ВСЕЙ группы (без thread_id)
admin_status = await bot.get_chat_member(message.chat.id, message.from_user.id) admin_status = await bot.get_chat_member(message.chat.id, message.from_user.id)
# Логируем полученный статус
logger.info(f"Получен статус пользователя {message.from_user.id}: status={admin_status.status}, "
f"can_restrict={getattr(admin_status, 'can_restrict_members', None)}")
# Детальное логирование всех прав (для отладки)
logger.debug(f"Детальный статус пользователя {message.from_user.id}: "
f"can_be_edited={getattr(admin_status, 'can_be_edited', None)}, "
f"can_manage_chat={getattr(admin_status, 'can_manage_chat', None)}, "
f"can_delete_messages={getattr(admin_status, 'can_delete_messages', None)}, "
f"can_restrict_members={getattr(admin_status, 'can_restrict_members', None)}, "
f"can_promote_members={getattr(admin_status, 'can_promote_members', None)}, "
f"can_change_info={getattr(admin_status, 'can_change_info', None)}, "
f"can_invite_users={getattr(admin_status, 'can_invite_users', None)}, "
f"can_post_messages={getattr(admin_status, 'can_post_messages', None)}, "
f"can_edit_messages={getattr(admin_status, 'can_edit_messages', None)}, "
f"can_pin_messages={getattr(admin_status, 'can_pin_messages', None)}, "
f"user={getattr(admin_status, 'user', None)}")
# ВАЖНО: В каналах администраторы могут иметь статус 'left', но при этом иметь права администратора
# Проверяем наличие прав администратора через can_* поля
is_admin = admin_status.status in ('administrator', 'creator')
logger.debug(f"Начальная проверка: is_admin={is_admin}, status={admin_status.status}")
# Для каналов: если статус 'left', но есть админские права - считаем администратором
if admin_status.status == 'left':
logger.debug(f"Обнаружен статус 'left', проверяем админские права...")
# Проверяем наличие любых админских прав
has_admin_rights = any([
getattr(admin_status, 'can_be_edited', False),
getattr(admin_status, 'can_manage_chat', False),
getattr(admin_status, 'can_delete_messages', False),
getattr(admin_status, 'can_restrict_members', False),
getattr(admin_status, 'can_promote_members', False),
getattr(admin_status, 'can_change_info', False),
getattr(admin_status, 'can_invite_users', False),
getattr(admin_status, 'can_post_messages', False),
getattr(admin_status, 'can_edit_messages', False),
getattr(admin_status, 'can_pin_messages', False),
])
if has_admin_rights:
is_admin = True
logger.info(f"Пользователь {message.from_user.id} имеет статус 'left', но обнаружены админские права")
# Проверка наличия прав администратора/создателя # Проверка наличия прав администратора/создателя
if admin_status.status not in ('administrator', 'creator'): if not is_admin:
# Восстанавливаем thread_id для отправки сообщения в правильный топик
message.message_thread_id = original_thread_id
# Отправляем предупреждение # Отправляем предупреждение
await bot.send_message( await bot.send_message(
chat_id=message.chat.id, chat_id=message.chat.id,
text=COMMAND_MESSAGES['no_admin_rights'], text=COMMAND_MESSAGES['no_admin_rights'],
message_thread_id=message.message_thread_id, message_thread_id=message.message_thread_id,
) )
@@ -37,13 +102,15 @@ async def check_admin_status(bot: AsyncTeleBot, message: Message):
await delete_messages(bot, message, time_sleep=5, number_message=2) await delete_messages(bot, message, time_sleep=5, number_message=2)
return 1 return 1
# Проверка права на ограничение участников # Проверка права на ограничение участников (опционально)
if admin_status.status == 'administrator' and not admin_status.can_restrict_members: if check_restrict_rights and admin_status.status == 'administrator' and not admin_status.can_restrict_members:
# Восстанавливаем thread_id для отправки сообщения в правильный топик
message.message_thread_id = original_thread_id
# Отправляем предупреждение # Отправляем предупреждение
await bot.send_message( await bot.send_message(
chat_id=message.chat.id, chat_id=message.chat.id,
text=COMMAND_MESSAGES['no_restrict_rights'], text=COMMAND_MESSAGES['no_restrict_rights'],
message_thread_id=message.message_thread_id, message_thread_id=message.message_thread_id,
) )
@@ -51,12 +118,18 @@ async def check_admin_status(bot: AsyncTeleBot, message: Message):
await delete_messages(bot, message, time_sleep=5, number_message=2) await delete_messages(bot, message, time_sleep=5, number_message=2)
return 1 return 1
# Восстанавливаем thread_id после успешной проверки
message.message_thread_id = original_thread_id
return 0
except Exception as e: except Exception as e:
# Восстанавливаем thread_id для отправки сообщения об ошибке
message.message_thread_id = original_thread_id
# Отправляем ошибку # Отправляем ошибку
await bot.send_message( await bot.send_message(
chat_id=message.chat.id, chat_id=message.chat.id,
text=COMMAND_MESSAGES['error'].format(e=str(e)), text=COMMAND_MESSAGES['error'].format(e=str(e)),
message_thread_id=message.message_thread_id, message_thread_id=message.message_thread_id,
) )
@@ -70,37 +143,48 @@ async def check_admin_status(bot: AsyncTeleBot, message: Message):
# Проверяет статус целевого пользователя # Проверяет статус целевого пользователя
async def check_target_status(bot: AsyncTeleBot, message: Message, target_user): async def check_target_status(bot: AsyncTeleBot, message: Message, target_user):
if message.reply_to_message and message.is_topic_message is None: # Сохраняем оригинальный thread_id
message.message_thread_id = None original_thread_id = message.message_thread_id
# Временно сбрасываем thread_id для проверки статуса на уровне ВСЕЙ группы
message.message_thread_id = None
try: try:
# Получаем статус пользователя # Получаем статус пользователя на уровне ВСЕЙ группы
target_status = await bot.get_chat_member( target_status = await bot.get_chat_member(
chat_id=message.chat.id, chat_id=message.chat.id,
user_id=target_user.id, user_id=target_user.id,
) )
# Проверяем, является ли цель администратором или создателем # Проверяем, является ли цель администратором или создателем
if target_status.status in ('administrator', 'creator'): if target_status.status in ('administrator', 'creator'):
# Восстанавливаем thread_id для отправки сообщения
message.message_thread_id = original_thread_id
# Отправляем предупреждение # Отправляем предупреждение
await bot.send_message( await bot.send_message(
chat_id=message.chat.id, chat_id=message.chat.id,
text=COMMAND_MESSAGES['cant_mute_admin'], text=COMMAND_MESSAGES['cant_mute_admin'],
message_thread_id=message.message_thread_id, message_thread_id=message.message_thread_id,
) )
# Удаляем сообщения через 5 секунд # Удаляем сообщения через 5 секунд
await delete_messages(bot, message, time_sleep=5, number_message=2) await delete_messages(bot, message, time_sleep=5, number_message=2)
return return 1
# Восстанавливаем thread_id после успешной проверки
message.message_thread_id = original_thread_id
return 0
except Exception as e: except Exception as e:
# Восстанавливаем thread_id для отправки сообщения об ошибке
message.message_thread_id = original_thread_id
# Отправляем ошибку # Отправляем ошибку
await bot.send_message( await bot.send_message(
chat_id=message.chat.id, chat_id=message.chat.id,
text=COMMAND_MESSAGES['error'].format(e=str(e)), text=COMMAND_MESSAGES['error'].format(e=str(e)),
message_thread_id=message.message_thread_id, message_thread_id=message.message_thread_id,
) )
@@ -109,7 +193,7 @@ async def check_target_status(bot: AsyncTeleBot, message: Message, target_user):
# Удаляем сообщения через 5 секунд # Удаляем сообщения через 5 секунд
await delete_messages(bot, message, time_sleep=5, number_message=2) await delete_messages(bot, message, time_sleep=5, number_message=2)
return return 1
# Возвращает количество секунд # Возвращает количество секунд
def parse_mute_time(time_str: str) -> int | None: def parse_mute_time(time_str: str) -> int | None:

77
update.sh Executable file
View File

@@ -0,0 +1,77 @@
#!/bin/bash
# Скрипт для обновления бота из git-репозитория
# Запускается на сервере в директории /opt/LGBot
echo "🔄 Обновление LGBot..."
echo "================================"
# Проверяем, находимся ли мы в git-репозитории
if [ ! -d ".git" ]; then
echo "❌ Ошибка: это не git-репозиторий"
exit 1
fi
# Показываем текущую ветку
CURRENT_BRANCH=$(git branch --show-current)
echo "📍 Текущая ветка: $CURRENT_BRANCH"
# Проверяем наличие изменений
echo ""
echo "📊 Статус репозитория:"
git status --short
# Получаем изменения из удаленного репозитория
echo ""
echo "📥 Получение изменений из репозитория..."
git fetch origin
# Проверяем, есть ли обновления
LOCAL=$(git rev-parse HEAD)
REMOTE=$(git rev-parse origin/$CURRENT_BRANCH)
if [ "$LOCAL" = "$REMOTE" ]; then
echo "✅ Бот уже обновлен до последней версии"
echo ""
echo "Последний коммит:"
git log -1 --pretty=format:"%h - %s (%ar)" --abbrev-commit
exit 0
fi
# Показываем, какие коммиты будут применены
echo ""
echo "📝 Новые коммиты:"
git log HEAD..origin/$CURRENT_BRANCH --pretty=format:"%h - %s (%ar)" --abbrev-commit
# Обновляем код
echo ""
echo "⬇️ Применение обновлений..."
git pull origin $CURRENT_BRANCH
if [ $? -ne 0 ]; then
echo "❌ Ошибка при обновлении из git"
exit 1
fi
echo "✅ Код успешно обновлен"
# Перезапускаем бота
echo ""
echo "🔄 Перезапуск бота..."
systemctl restart LGBot.service
if [ $? -eq 0 ]; then
echo "✅ Бот успешно перезапущен"
# Ждем 2 секунды и проверяем статус
sleep 2
echo ""
echo "📊 Статус службы:"
systemctl status LGBot.service --no-pager -l | head -10
else
echo "❌ Ошибка при перезапуске бота"
exit 1
fi
echo ""
echo "🎉 Обновление завершено!"