Compare commits

...

15 Commits

Author SHA1 Message Date
47e1adfed7 - исправление ошибки с лишней скобкой в vk-функции 2025-08-18 12:41:55 +03:00
8a01f52c69 - исправление ошибки с лишней скобкой в vk-функции 2025-08-18 12:41:41 +03:00
0990e5191a - создание и использование виртуального окружения 2025-08-18 12:32:34 +03:00
2de09231ea - requirements.txt 2025-08-18 12:29:32 +03:00
50d505f887 - добавление скриптов для systemd 2025-08-11 16:12:47 +03:00
de5a5e9248 Восстановление keys_example.py после очистки истории
🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-11 15:22:59 +03:00
845f96209d Завершение модульной рефакторизации и исправления
Исправлены все основные проблемы:
- Исправлена логика фильтрации сообщений по топикам в Telegram
- Исправлен бесконечный цикл в VK клиенте get_wall_posts()
- Добавлена асинхронная поддержка для VK в главном файле
- Дедупликация работает корректно для всех платформ
- Добавлена полная документация в CLAUDE.md и README.md

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-11 15:11:39 +03:00
188acdd812 Исправление цикла для ВК и небольшие правки 2024-11-10 21:27:50 +03:00
9483aad1fa Исправление цикла для ВК и небольшие правки 2024-08-06 20:23:25 +03:00
bc22a8a129 обновление публикации новостей от stable до stable 2024-07-25 20:53:10 +03:00
570e0b58f9 обновление публикации новостей от stable до stable 2024-07-25 14:18:45 +03:00
7e726a06b5 обновление публикации новостей от stable до stable 2024-07-24 23:06:24 +03:00
4ad6c4dafc Начало изменений для новостей о stable скриптах 2024-07-23 17:27:48 +03:00
e86ed6e4b1 gitignore 2024-07-23 11:22:11 +03:00
1b1746db06 исправление форматирования 2024-06-16 16:04:34 +03:00
23 changed files with 569806 additions and 573 deletions

11
.gitignore vendored
View File

@@ -1,6 +1,11 @@
.fleet .fleet
__pycache__ __pycache__
keys*.py
vkdel.py
tgdel.py
*.session *.session
.idea
.gigaide
keys.py
keys_*.py
!keys_example.py
venv/
.env
*.pyc

113
CLAUDE.md Normal file
View File

@@ -0,0 +1,113 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Архитектура проекта
Это Python-проект для автоматизации новостных ботов с модульной архитектурой, которые публикуют контент в Telegram, VK и опционально в Discord.
### Модульная структура:
- `news-bot-modular.py` - новый основной файл с модульной архитектурой
- `news-bot.py` - оригинальный монолитный файл (deprecated)
- `config.py` - централизованная конфигурация
- `logger.py` - настройка логирования
- `content_processor.py` - обработка и форматирование контента
- `telegram_client.py` - клиент для работы с Telegram API
- `vk_client.py` - клиент для работы с VK API
- `discord_client.py` - опциональный клиент для Discord API
- `site_api.py` - работа с API сайта linux-gaming.ru
- `del-tests.py` - утилита для удаления сообщений из всех каналов
- `keys.py` - конфигурация с API ключами (не включен в git)
### Структура бота:
- **Получение новостей**: парсинг с сайта `linux-gaming.ru`
- **Обработка контента**: конвертация HTML в markdown, удаление дубликатов ссылок
- **Публикация**: отправка в Telegram, VK и опционально Discord
- **Планировщик**: автоматическое выполнение каждые 30 минут
## Команды разработки
### Запуск нового модульного бота:
```bash
python3 news-bot-modular.py
```
### Запуск старого монолитного бота:
```bash
python3 news-bot.py
```
### Запуск утилиты удаления сообщений:
```bash
python3 del-tests.py
```
### Проверка синтаксиса Python:
```bash
python3 -m py_compile news-bot.py
python3 -m py_compile del-tests.py
```
## Зависимости
Проект использует следующие библиотеки:
- `telethon` - для работы с Telegram API
- `requests` - для HTTP запросов
- `schedule` - планировщик задач
- `colorlog` - цветное логирование
- `html2text` - конвертация HTML в текст
- `beautifulsoup4` - парсинг HTML
- `discord.py` - для работы с Discord API (опционально)
Установка основных зависимостей:
```bash
pip3 install telethon requests schedule colorlog html2text beautifulsoup4
```
Для включения Discord функциональности:
```bash
pip3 install discord.py
```
## Конфигурация
Настройки хранятся в `keys.py`:
- `api_key_site` - ключ для доступа к API сайта
- `api_id_tg`, `api_hash_tg` - данные Telegram API
- `api_key_vk`, `user_token_vk` - токены VK API
- `discord_enabled` - включить/отключить Discord (по умолчанию False)
- `discord_token` - токен Discord бота (если включен)
- `dicord_channel` - ID канала Discord (если включен)
- `cat_num` - номер категории новостей (8)
- `start_topic_id` - ID темы для начала публикации
## Основные функции
### Обработка контента:
- `html_to_text()` - конвертация HTML в markdown
- `remove_duplicate_links()` - удаление дубликатов ссылок
- `convert_links()` - обработка URL параметров
### Модульные классы:
- `ContentProcessor` - обработка и форматирование контента
- `TelegramNewsClient` - работа с Telegram API
- `VKClient` - работа с VK API
- `DiscordClient` - работа с Discord API (опционально)
- `SiteAPI` - работа с API сайта linux-gaming.ru
### Проверка статуса модулей:
```python
# Проверить, какие модули включены
bot.telegram_client.is_enabled() # True
bot.vk_client.is_enabled() # True
bot.discord_client.is_enabled() # True/False в зависимости от конфигурации
```
## Логирование
Используется цветное логирование с уровнями:
- DEBUG (cyan) - детальная отладочная информация
- INFO (green) - основная информация о работе
- WARNING (yellow) - предупреждения
- ERROR (red) - ошибки
- CRITICAL (red on white) - критические ошибки

151
README.md Normal file
View File

@@ -0,0 +1,151 @@
# News Bot для Linux Gaming
Автоматизированный бот для публикации новостей с сайта linux-gaming.ru в Telegram, VK и опционально в Discord.
## ✨ Особенности
- **Модульная архитектура** - каждая платформа в отдельном модуле
- **Опциональный Discord** - можно включать/отключать через конфигурацию
- **Обработка контента** - автоматическая конвертация HTML в markdown
- **Дедупликация** - исключение повторных публикаций
- **Планировщик** - автоматическое выполнение каждые 30 минут
- **Цветное логирование** - удобная отладка и мониторинг
## 🚀 Быстрый старт
### 1. Установка зависимостей
```bash
# Основные зависимости
pip3 install telethon requests schedule colorlog html2text beautifulsoup4
# Для Discord (опционально)
pip3 install discord.py
```
### 2. Конфигурация
```bash
# Скопировать пример конфигурации
cp keys_example.py keys.py
# Отредактировать keys.py с вашими API ключами
nano keys.py
```
### 3. Запуск
```bash
# Новый модульный бот (рекомендуется)
python3 news-bot-modular.py
# Или старый монолитный бот
python3 news-bot.py
```
## 📁 Структура проекта
```
├── news-bot-modular.py # Новый основной файл
├── news-bot.py # Старый монолитный файл
├── config.py # Централизованная конфигурация
├── logger.py # Настройка логирования
├── content_processor.py # Обработка контента
├── telegram_client.py # Telegram API клиент
├── vk_client.py # VK API клиент
├── discord_client.py # Discord API клиент (опциональный)
├── site_api.py # API сайта linux-gaming.ru
├── del-tests.py # Утилита очистки каналов
├── keys_example.py # Пример конфигурации
└── keys.py # Реальная конфигурация (создается вручную)
```
## ⚙️ Конфигурация
Настройки в файле `keys.py`:
| Параметр | Описание | Обязательный |
|----------|----------|--------------|
| `api_key_site` | Ключ API сайта | ✅ |
| `api_id_tg` | Telegram API ID | ✅ |
| `api_hash_tg` | Telegram API Hash | ✅ |
| `channel_username_tg` | Telegram канал | ✅ |
| `api_key_vk` | VK API ключ | ✅ |
| `user_token_vk` | VK пользовательский токен | ✅ |
| `own_id` | ID VK группы/страницы | ✅ |
| `discord_enabled` | Включить Discord | ❌ (по умолчанию False) |
| `discord_token` | Discord бот токен | ❌ (если Discord включен) |
| `dicord_channel` | Discord канал ID | ❌ (если Discord включен) |
| `cat_num` | Номер категории новостей | ✅ (по умолчанию 8) |
| `start_topic_id` | Стартовый ID темы | ✅ (по умолчанию 0) |
## 🔧 Модули
### ContentProcessor
Обрабатывает и форматирует контент:
- Конвертация HTML → Markdown
- Удаление дубликатов ссылок
- Форматирование под каждую платформу
### TelegramNewsClient
- Асинхронная работа с Telegram API
- Обработка FloodWait ошибок
- Проверка существующих сообщений
### VKClient
- Публикация в VK
- Специальная обработка для постов о скриптах
- Прикрепление ссылок и изображений
### DiscordClient (опциональный)
- Включается/выключается через конфигурацию
- Автоматическая проверка зависимостей
- Разбивка длинных сообщений
### SiteAPI
- Получение новостей с сайта
- Мониторинг новых версий скриптов
- Публикация обновлений
## 🎯 Использование
```python
from news_bot_modular import NewsBot
bot = NewsBot()
# Проверить статус модулей
print("Telegram:", bot.telegram_client.is_enabled())
print("VK:", bot.vk_client.is_enabled())
print("Discord:", bot.discord_client.is_enabled())
# Запустить бота
bot.start()
```
## 🛠 Утилиты
### Очистка каналов
```bash
python3 del-tests.py
```
Удаляет все сообщения из всех настроенных каналов.
### Проверка синтаксиса
```bash
python3 -m py_compile news-bot-modular.py
python3 -m py_compile content_processor.py
```
## 🐛 Отладка
Логи с цветовой маркировкой:
- 🔵 DEBUG - детальная информация
- 🟢 INFO - основные события
- 🟡 WARNING - предупреждения
- 🔴 ERROR - ошибки
- ⚪ CRITICAL - критические ошибки
## 📝 Лицензия
Этот проект является частной разработкой для автоматизации публикации новостей Linux Gaming.

283849
asyncio Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,33 @@
[Unit]
Description=Linux Gaming News Bot
Documentation=https://github.com/xpamych/bot-news-linux-gaming
After=network.target network-online.target
Wants=network-online.target
[Service]
Type=simple
User=xpamych
Group=xpamych
WorkingDirectory=/home/xpamych/Yandex.Disk/IdeaProjects/bot-news-linux-gaming
ExecStart=/usr/bin/python3 /home/xpamych/Yandex.Disk/IdeaProjects/bot-news-linux-gaming/news-bot-modular.py
Restart=always
RestartSec=10
# Переменные окружения
Environment=PYTHONPATH=/home/xpamych/Yandex.Disk/IdeaProjects/bot-news-linux-gaming
Environment=PYTHONUNBUFFERED=1
# Ограничения безопасности
NoNewPrivileges=yes
PrivateTmp=yes
ProtectSystem=strict
ProtectHome=read-only
ReadWritePaths=/home/xpamych/Yandex.Disk/IdeaProjects/bot-news-linux-gaming
# Логирование
StandardOutput=journal
StandardError=journal
SyslogIdentifier=news-bot
[Install]
WantedBy=multi-user.target

52
config.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
try:
import keys
except ImportError:
raise ImportError("Файл keys.py не найден. Создайте его с необходимыми конфигурационными параметрами.")
URL_POST = "https://linux-gaming.ru/posts.json"
URL_NEWS = f"https://linux-gaming.ru/c/news/{keys.cat_num}.json"
URL_VK_POST = "https://api.vk.com/method/wall.post"
URL_VK_GET = "https://api.vk.com/method/wall.get"
URL_CHANGELOG = "https://gitlab.eterfund.ru/Castro-Fidel/PortWINE/raw/master/data_from_portwine/changelog_ru"
HEADERS_SITE = {
"Content-Type": "application/json",
"Api-Key": keys.api_key_site,
"Api-Username": "linux-gaming"
}
PARAMS_VK_GET = {
'access_token': keys.user_token_vk,
'v': '5.199',
'owner_id': str(keys.own_id),
'count': str(100),
'offset': str(0),
'filter': 'all'
}
TELEGRAM_CONFIG = {
'api_id': keys.api_id_tg,
'api_hash': keys.api_hash_tg,
'session_file': 'LG_news',
'channel_username': keys.channel_username_tg,
'topic_id': getattr(keys, 'telegram_topic_id', None) # ID топика для публикации
}
VK_CONFIG = {
'api_key': keys.api_key_vk,
'user_token': keys.user_token_vk,
'owner_id': keys.own_id
}
SITE_CONFIG = {
'category_num': keys.cat_num,
'start_topic_id': keys.start_topic_id
}
DISCORD_CONFIG = {
'enabled': getattr(keys, 'discord_enabled', False),
'token': getattr(keys, 'discord_token', ''),
'channel_id': getattr(keys, 'dicord_channel', None)
}

204
content_processor.py Normal file
View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
import re
import logging
import html2text
import urllib.parse
from bs4 import BeautifulSoup
class ContentProcessor:
def __init__(self):
self.logger = logging.getLogger(__name__)
def make_soup(self, response):
self.logger.debug("Создаем объект BeautifulSoup")
return BeautifulSoup(response.text, 'html.parser')
def html_to_text(self, html_content):
self.logger.debug(f"Конвертируем HTML в текст")
self.logger.debug(f"HTML на входе: {html_content}")
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = True
h.bypass_tables = True
h.reference_links = True
markdown_text = h.handle(html_content)
self.logger.debug(f"Текст до обработки регулярными выражениями: {markdown_text}")
# Удаление переносов строк из-за -
markdown_text = re.sub(r'-\s*\n\s*', '-', markdown_text, flags=re.DOTALL)
markdown_text = re.sub(r'-\s*\n*', '-', markdown_text, flags=re.DOTALL)
# Убираем переносы строк внутри круглых скобок ()
markdown_text = re.sub(r'\((.*?)\)', lambda x: '(' + x.group(1).replace('\n', ' ') + ')', markdown_text, flags=re.DOTALL)
# Убираем переносы строк внутри квадратных скобок []
markdown_text = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace('\n', ' ') + ']', markdown_text, flags=re.DOTALL)
# Удаление строк, содержащих '* * *'
markdown_text = re.sub(r'^.*\* \* \*.*$', '', markdown_text, flags=re.MULTILINE)
# Преобразование всех ссылок с параметрами URL
markdown_text = self.convert_links(markdown_text)
# Работа с #
patterns_to_remove = [
r'###',
r'##',
r'#',
r'\[scripts\]\(\/tag\/scripts\) version \d+ ',
r'##\[scripts\]\(\) version \d+ ',
r'\d{4}×\d{3} \d+ KB'
]
for pattern in patterns_to_remove:
markdown_text = re.sub(pattern, '', markdown_text)
# Удаление избыточных пустых строк после удаления строк
markdown_text = re.sub(r'\n\s*\n', '\n', markdown_text)
# Замена текстов типа "image1280×474 99.7 KB", "807×454 64.1 KB" на "."
markdown_text = re.sub(r'image\d+×\d+\s+\d+(\.\d+)?\s+KB', '.', markdown_text)
markdown_text = re.sub(r'\d+×\d+\s+\d+(\.\d+)?\s+KB', '.', markdown_text)
# Изменение ссылок без описания
markdown_text = re.sub(r'\[\]\((https:\/\/[^\)]+)\)', r'[.](\1)', markdown_text)
markdown_text = re.sub(r'\[\s]\((https:\/\/[^\)]+)\)', r'[.](\1)', markdown_text)
# Удаление дублирующихся ссылок
markdown_text = self.remove_duplicate_links(markdown_text)
# Удаление лишних отступов для строк, начинающихся с '*'
markdown_text = re.sub(r' \*', r'*', markdown_text)
# Перемещение ссылки на изображение в конец последней строки
image_link = "[.](https://linux-gaming.ru/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png)"
if image_link in markdown_text:
markdown_text = markdown_text.replace(image_link, '')
markdown_text = markdown_text + image_link
self.logger.debug(f"Текст после обработки: {markdown_text}")
return markdown_text
def convert_links(self, text):
self.logger.debug("Конвертируем ссылки")
url_pattern = re.compile(r'https?://[^\s\)]+')
url_pattern = url_pattern.sub(lambda match: self.decode_url_params(match.group(0)), text)
self.logger.debug(f"Результат конвертации ссылок: {url_pattern}")
return url_pattern
def decode_url_params(self, url):
self.logger.debug(f"Декодируем URL параметры: {url}")
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
for key, values in query_params.items():
if key.lower() == 'to' and values:
return urllib.parse.unquote(values[0])
self.logger.debug(f"Возвращаем URL: {url}")
return url
def remove_empty_lines(self, text_data):
self.logger.debug("Удаляем пустые строки")
lines = text_data.splitlines()
non_empty_lines = [line for line in lines if line.strip()]
non_empty_lines = '\n'.join(non_empty_lines)
self.logger.debug(f"Результат удаления пустых строк: {non_empty_lines}")
return non_empty_lines
def remove_markdown_links(self, markdown_text):
self.logger.debug("Удаляем markdown ссылки")
markdown_text = re.sub(r'\[.*?\]\((https?://.*?)\)', r'\1', markdown_text)
self.logger.debug(f"Результат удаления markdown ссылок: {markdown_text}")
return markdown_text
def remove_duplicate_links(self, text):
self.logger.debug("Удаляем дубликаты ссылок")
seen_links = set()
def replace_link(match):
link = match.group(2)
if link in seen_links:
return ''
seen_links.add(link)
return match.group(0)
link_pattern = re.compile(r'(\[.*?\]\((https:\/\/.*?)\))')
text = re.sub(link_pattern, replace_link, text)
self.logger.debug(f"Результат удаления дубликатов ссылок: {text}")
return text
def extract_links(self, text):
self.logger.debug("Извлекаем ссылки из текста")
# Улучшенное регулярное выражение, исключающее конечные знаки препинания
url_pattern = re.compile(r'https?://[^\s\)\]\}\>,;]+')
links = url_pattern.findall(text)
# Дополнительная очистка: убираем точки и запятые в конце
cleaned_links = []
for link in links:
link = link.rstrip('.,!?')
cleaned_links.append(link)
self.logger.debug(f"Найденные ссылки: {cleaned_links}")
return cleaned_links
def format_for_vk(self, content):
"""Форматирование контента для VK"""
self.logger.debug("Форматируем контент для VK")
# Замена маркеров списка
content = re.sub(r'\* ', '', content)
content = re.sub(r'', '', content)
content = re.sub(r'', '', content)
# Удаление markdown ссылок
content = self.remove_markdown_links(content)
# Замена изображения
content = re.sub(
r'https://linux-gaming.ru/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png',
'\n',
content,
flags=re.DOTALL
)
return content
def format_for_telegram(self, content):
"""Форматирование контента для Telegram"""
self.logger.debug("Форматируем контент для Telegram")
return content # Telegram поддерживает markdown
def create_script_content(self, script_ver, next_version, response):
"""Создание контента для обновления скрипта"""
self.logger.debug(f"Создаем контент для версии скрипта {script_ver}")
soup = self.make_soup(response)
page_text = str(soup)
page_text = page_text.replace("Вы можете помочь развитию проекта: https://linux-gaming.ru/donate/", '')
last_text = f"###Scripts version {next_version}### / stable"
index_last_text = page_text.find(last_text)
if index_last_text != -1:
changelog_text_last = page_text[:index_last_text]
prev_text = f"###Scripts version {script_ver}### / stable"
index_script_ver = changelog_text_last.find(prev_text)
changelog_text = changelog_text_last[index_script_ver:]
changelog_text = re.sub(
r'###Scripts version (\d+)### / Дата: (\d{2}\.\d{2}\.\d{4}) / Размер скачиваемого обновления: \d+ \S+',
r'\1 - \2' + ":",
changelog_text
)
changelog_text = re.sub(
r'###Scripts version (\d+)### / stable / Дата: (\d{2}\.\d{2}\.\d{4}) / Размер скачиваемого обновления: \d+ \S+',
r'\1 - \2' + ":",
changelog_text
)
post_text = "-----------------------------\n" + changelog_text
self.logger.debug(f"Возвращаем post_text: {post_text}")
return post_text
return None

143
del-tests.py Executable file
View File

@@ -0,0 +1,143 @@
import discord
import asyncio
import requests
import logging
import time
from telethon import TelegramClient, events
from discord.ext.commands import Bot
import keys # Файл, где хранятся ваши ключи доступа
url_vk_get = "https://api.vk.com/method/wall.get"
url_vk_delete = "https://api.vk.com/method/wall.delete"
# Укажите ваши токены и ID каналов
TELEGRAM_API_ID = keys.api_id_tg
TELEGRAM_API_HASH = keys.api_hash_tg
DISCORD_TOKEN = keys.discord_token
TELEGRAM_CHAT_ID = keys.channel_username_tg
DISCORD_CHANNEL_ID = keys.dicord_channel # Укажите ID вашего канала
# Создание экземпляра клиента Telethon
telegram_client = TelegramClient('session_name', TELEGRAM_API_ID, TELEGRAM_API_HASH)
# Удаление сообщений из Telegram
async def delete_all_telegram_messages(client, chat_id):
async for message in client.iter_messages(chat_id):
try:
await client.delete_messages(chat_id, message.id)
time.sleep(1.0)
print(f"Удалено сообщение {message.id}")
except Exception as e:
print(f"Не удалось удалить сообщение {message.id}: {e}")
# Удаление сообщений из Discord
class MyClient(discord.Client):
async def on_ready(self):
print(f'Вошли как {self.user}')
channel = self.get_channel(DISCORD_CHANNEL_ID)
if channel is not None:
await self.delete_all_messages(channel)
async def delete_all_messages(self, channel):
try:
async for message in channel.history(limit=100): # Установите лимит
try:
await message.delete()
print(f'Удалено сообщение {message.id}')
except Exception as e:
print(f"Не удалось удалить сообщение {message.id}: {e}")
except Exception as e:
print(f"Не удалось получить историю сообщений: {e}")
await self.close()
intents = discord.Intents.default()
intents.message_content = True
discord_client = MyClient(intents=intents)
# Основная функция
async def main():
await telegram_client.start()
print("Telegram клиент запущен")
await delete_all_telegram_messages(telegram_client, TELEGRAM_CHAT_ID)
await telegram_client.disconnect()
params_get = {
'access_token': keys.user_token_vk,
'v': '5.199', # Версия API
'owner_id': str(keys.own_id),
'count': 100,
'offset': 0
}
def get_all_posts():
all_posts = []
while True:
response = requests.get(url_vk_get, params=params_get)
data = response.json()
print(data)
if 'error' in data:
logging.error(f"Ошибка {data['error']['error_code']}: {data['error']['error_msg']}")
break
items = data.get('response', {}).get('items', [])
if not items:
break
all_posts.extend(items)
if len(items) < 100:
break
params_get['offset'] += 100
return all_posts
params_delete = {
'access_token': keys.user_token_vk,
'v': '5.199' # Версия API
}
def delete_post(post_id):
params_delete.update({
'owner_id': str(keys.own_id),
'post_id': post_id
})
response = requests.post(url_vk_delete, params=params_delete)
data = response.json()
if 'error' in data:
logging.error(f"Ошибка {data['error']['error_code']}: {data['error']['error_msg']}")
return False
return True
def delete_all_posts():
print("Удаление ВК постов запущено")
all_posts = get_all_posts()
print(all_posts)
if not all_posts:
logging.info("Нет постов для удаления")
return
for post in all_posts:
post_id = post['id']
if delete_post(post_id):
logging.info(f"Пост {post_id} успешно удален")
else:
logging.error(f"Не удалось удалить пост {post_id}")
time.sleep(1.0) # Пауза между запросами, чтобы избежать превышения лимитов API
if __name__ == "__main__":
delete_all_posts()
discord_client.run(DISCORD_TOKEN)
asyncio.run(main())

283849
discord Normal file

File diff suppressed because it is too large Load Diff

205
discord_client.py Normal file
View File

@@ -0,0 +1,205 @@
#!/usr/bin/env python3
import re
import time
import logging
import asyncio
from config import DISCORD_CONFIG
# Discord импорты - только если Discord включен
if DISCORD_CONFIG['enabled']:
try:
import discord
except ImportError:
logging.error("Discord.py не установлен, но Discord включен в конфигурации")
DISCORD_CONFIG['enabled'] = False
class DiscordClient:
def __init__(self, content_processor):
self.logger = logging.getLogger(__name__)
self.content_processor = content_processor
self.config = DISCORD_CONFIG
if not self.config['enabled']:
self.logger.info("Discord клиент отключен в конфигурации")
async def get_messages(self, client, channel_id):
"""Получение сообщений из Discord канала"""
if not self.is_enabled():
return [], []
self.logger.debug("Получаем сообщения из Discord канала")
channel = client.get_channel(channel_id)
if not channel:
self.logger.error(f"ID канала Discord {channel_id} не существует")
return [], []
messages = []
titles = []
async for message in channel.history(limit=100):
self.logger.debug(f"Сообщение Discord: {message.content}")
messages.append(message.content)
# Извлекаем заголовок из сообщения
title = self._extract_title_from_message(message.content)
if title:
titles.append(title)
self.logger.debug(f"Найдено {len(messages)} сообщений и {len(titles)} заголовков в Discord")
return messages, titles
def _extract_title_from_message(self, message_text):
"""Извлечение заголовка из текста сообщения"""
if not message_text:
return None
# Ищем заголовки в формате "### Заголовок\t"
pattern = re.compile(r'^### (.*?)\t', re.MULTILINE)
match = pattern.search(message_text)
if match:
return match.group(1).strip()
# Ищем заголовки в формате "------ ### Заголовок\t"
pattern2 = re.compile(r'--+\n### (.*?)\t', re.MULTILINE)
match2 = pattern2.search(message_text)
if match2:
return match2.group(1).strip()
# Если не найдено, ищем в первой строке
lines = message_text.strip().split('\n')
if lines:
first_line = lines[0].strip()
# Убираем префиксы типа "###", "**", и табы
first_line = re.sub(r'^[#*\s\t-]+', '', first_line)
first_line = re.sub(r'\t.*$', '', first_line)
return first_line.strip() if first_line else None
return None
async def send_message(self, channel, content):
"""Отправка сообщения в Discord канал с разбивкой длинных сообщений"""
if not self.is_enabled():
return
try:
# Discord лимит: 2000 символов
max_length = 2000
if len(content) <= max_length:
# Короткое сообщение - отправляем как есть
await channel.send(content)
else:
# Длинное сообщение - разбиваем умно по строкам
self.logger.warning(f"Сообщение слишком длинное ({len(content)} символов), разбиваем на части")
parts = []
current_part = ""
for line in content.split('\n'):
# Если добавление этой строки превысит лимит
if len(current_part + line + '\n') > max_length:
if current_part:
parts.append(current_part.rstrip())
current_part = ""
current_part += line + '\n'
# Добавляем последнюю часть
if current_part:
parts.append(current_part.rstrip())
self.logger.info(f"Сообщение разбито на {len(parts)} частей для Discord")
# Отправляем каждую часть
for i, part in enumerate(parts, 1):
if len(parts) > 1:
part_content = f"[Часть {i}/{len(parts)}]\n\n{part}"
else:
part_content = part
await channel.send(part_content)
# Небольшая задержка между частями
if i < len(parts):
await asyncio.sleep(1)
self.logger.info("Сообщение успешно отправлено в Discord")
except Exception as e:
self.logger.error(f"Ошибка отправки сообщения в Discord: {e}")
async def check_and_publish_news(self, news_list):
"""Проверка и публикация новостей в Discord"""
if not self.is_enabled():
self.logger.info("Discord отключен, пропускаем публикацию")
return
self.logger.info("Начинаем проверку новостей для Discord")
intents = discord.Intents.default()
intents.messages = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
self.logger.debug(f"Успешный логин в Discord: {client.user}")
channel_id = self.config['channel_id']
# Получаем существующие сообщения и заголовки
discord_messages, discord_titles = await self.get_messages(client, channel_id)
if not news_list:
self.logger.warning("Список новостей пуст")
await client.close()
return
# Фильтруем новости для публикации
list_for_public = []
for topic_id, topic_title in news_list:
# Проверяем по заголовкам и по полному тексту сообщений
title_exists = any(topic_title == title for title in discord_titles)
text_contains = any(topic_title in (msg or '') for msg in discord_messages)
if not title_exists and not text_contains:
list_for_public.append((topic_id, topic_title))
else:
self.logger.debug(f"Новость '{topic_title}' уже есть в Discord (title_exists={title_exists}, text_contains={text_contains})")
if not list_for_public:
self.logger.warning("Новостей для публикации в Discord нет")
await client.close()
return
self.logger.info(f"Новости для публикации в Discord: {list_for_public}")
channel = client.get_channel(channel_id)
if not channel:
self.logger.error(f"ID канала Discord {channel_id} не существует")
await client.close()
return
# Публикуем новости в обратном порядке, чтобы новые оказались сверху в ленте
for topic_id, topic_title in reversed(list_for_public):
from site_api import SiteAPI
site_api = SiteAPI()
text_data = site_api.get_news_content(topic_id, self.content_processor)
if text_data:
content = f"----------------------------------------------------------\n### {topic_title}\t\n" + text_data + "\n@here"
await self.send_message(channel, content)
time.sleep(1.0)
else:
self.logger.warning(f"Не удалось получить содержимое новости {topic_id}")
await client.close()
try:
await client.start(self.config['token'])
except Exception as e:
self.logger.error(f"Ошибка запуска Discord клиента: {e}")
def is_enabled(self):
"""Проверка, включен ли Discord клиент"""
return (self.config['enabled'] and
self.config['token'] and
self.config['channel_id'])

127
install-service.sh Executable file
View File

@@ -0,0 +1,127 @@
#!/bin/bash
# Установочный скрипт для Linux Gaming News Bot
# Создает systemd service и настраивает его для автозапуска
set -e # Выход при любой ошибке
# Цвета для вывода
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Функции для цветного вывода
print_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Проверка запуска от root
if [ "$EUID" -ne 0 ]; then
print_error "Скрипт должен быть запущен с правами root (sudo)"
print_info "Используйте: sudo ./install-service.sh"
exit 1
fi
# Определение текущего пользователя (не root)
REAL_USER=$(who am i | awk '{print $1}')
if [ -z "$REAL_USER" ]; then
REAL_USER=$(logname 2>/dev/null || echo $SUDO_USER)
fi
if [ -z "$REAL_USER" ]; then
print_error "Не удалось определить имя пользователя"
exit 1
fi
print_info "Установка сервиса для пользователя: $REAL_USER"
# Определение директории проекта
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_DIR="$SCRIPT_DIR"
print_info "Директория проекта: $PROJECT_DIR"
# Проверка существования файлов
if [ ! -f "$PROJECT_DIR/news-bot-modular.py" ]; then
print_error "Не найден файл news-bot-modular.py в $PROJECT_DIR"
exit 1
fi
if [ ! -f "$PROJECT_DIR/bot-news-linux-gaming.service" ]; then
print_error "Не найден файл bot-news-linux-gaming.service в $PROJECT_DIR"
exit 1
fi
# Проверка наличия keys.py
if [ ! -f "$PROJECT_DIR/keys.py" ]; then
print_warning "Не найден файл keys.py с настройками"
print_info "Создайте keys.py на основе keys_example.py перед запуском сервиса:"
print_info " cp keys_example.py keys.py"
print_info " nano keys.py # заполните реальными ключами"
fi
# Создание временного файла сервиса с правильными путями
TEMP_SERVICE=$(mktemp)
sed "s|/home/xpamych/Yandex.Disk/IdeaProjects/bot-news-linux-gaming|$PROJECT_DIR|g" "$PROJECT_DIR/bot-news-linux-gaming.service" > "$TEMP_SERVICE"
sed -i "s|User=xpamych|User=$REAL_USER|g" "$TEMP_SERVICE"
sed -i "s|Group=xpamych|Group=$REAL_USER|g" "$TEMP_SERVICE"
print_info "Копирование systemd unit файла..."
cp "$TEMP_SERVICE" /etc/systemd/system/bot-news-linux-gaming.service
rm "$TEMP_SERVICE"
print_info "Установка прав доступа..."
chmod 644 /etc/systemd/system/bot-news-linux-gaming.service
chown root:root /etc/systemd/system/bot-news-linux-gaming.service
print_info "Перезагрузка systemd..."
systemctl daemon-reload
print_success "Сервис успешно установлен!"
print_info ""
print_info "Для управления сервисом используйте команды:"
print_info " sudo systemctl enable bot-news-linux-gaming # Включить автозапуск"
print_info " sudo systemctl start bot-news-linux-gaming # Запустить сервис"
print_info " sudo systemctl status bot-news-linux-gaming # Посмотреть статус"
print_info " sudo systemctl stop bot-news-linux-gaming # Остановить сервис"
print_info " sudo systemctl disable bot-news-linux-gaming # Отключить автозапуск"
print_info ""
print_info "Логи сервиса:"
print_info " sudo journalctl -u bot-news-linux-gaming -f # Следить за логами"
print_info " sudo journalctl -u bot-news-linux-gaming -n 50 # Последние 50 строк"
print_info ""
# Предложение сразу включить и запустить
read -p "Включить автозапуск и запустить сервис сейчас? (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
print_info "Включение автозапуска..."
systemctl enable bot-news-linux-gaming
print_info "Запуск сервиса..."
systemctl start bot-news-linux-gaming
sleep 2
print_info "Статус сервиса:"
systemctl status bot-news-linux-gaming --no-pager
print_success "Сервис запущен и добавлен в автозагрузку!"
else
print_info "Сервис установлен, но не запущен. Запустите его командой:"
print_info " sudo systemctl enable --now bot-news-linux-gaming"
fi

26
keys_example.py Normal file
View File

@@ -0,0 +1,26 @@
#!/usr/bin/env python3
# Пример файла конфигурации keys.py
# Скопируйте этот файл как keys.py и заполните реальными значениями
# API ключи и токены
api_key_site = "your_site_api_key_here"
api_key_vk = "your_vk_api_key_here"
user_token_vk = "your_vk_user_token_here"
# Telegram настройки
api_id_tg = 'your_telegram_api_id'
api_hash_tg = 'your_telegram_api_hash'
phone_tg = '+your_phone_number'
channel_username_tg = 'your_telegram_channel'
telegram_topic_id = 2 # ID топика для публикации (None для обычного канала)
# Discord настройки (опционально)
discord_enabled = False # Установите True для включения Discord
discord_token = "your_discord_bot_token" # Заполните если discord_enabled = True
dicord_channel = 123456789012345678 # ID канала Discord
# Общие настройки
cat_num = 8 # Номер категории новостей
own_id = -123456789 # ID владельца VK группы/страницы
start_topic_id = 0 # ID темы с которой начинать публикацию (0 = все темы)

26
logger.py Normal file
View File

@@ -0,0 +1,26 @@
#!/usr/bin/env python3
import logging
import colorlog
def setup_logger():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
if not logger.handlers:
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(asctime)s - %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
))
logger.addHandler(handler)
return logger

100
news-bot-modular.py Executable file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
import asyncio
import schedule
import time
from logger import setup_logger
from config import DISCORD_CONFIG
from content_processor import ContentProcessor
from site_api import SiteAPI
from telegram_client import TelegramNewsClient
from vk_client import VKClient
from discord_client import DiscordClient
class NewsBot:
def __init__(self):
self.logger = setup_logger()
self.content_processor = ContentProcessor()
self.site_api = SiteAPI()
self.telegram_client = TelegramNewsClient(self.content_processor)
self.vk_client = VKClient(self.content_processor)
self.discord_client = DiscordClient(self.content_processor)
self.logger.info("Бот инициализирован с модульной архитектурой")
if DISCORD_CONFIG['enabled']:
self.logger.info("Discord функциональность включена")
else:
self.logger.info("Discord функциональность отключена")
async def run_job(self):
"""Основная задача бота"""
self.logger.info("Запуск основной задачи бота")
try:
# Проверка и публикация новых версий скриптов
self.site_api.check_script_versions(self.content_processor)
# Получение списка новостей
news_list = self.site_api.get_news()
if news_list:
self.logger.info(f"Получено {len(news_list)} новостей для обработки")
# Публикация в Telegram
if self.telegram_client.is_enabled():
await self.telegram_client.check_and_publish_news(news_list)
# Публикация в VK
if self.vk_client.is_enabled():
await asyncio.get_event_loop().run_in_executor(
None, self.vk_client.check_and_publish_news, news_list
)
# Публикация в Discord (если включен)
if self.discord_client.is_enabled():
await self.discord_client.check_and_publish_news(news_list)
else:
self.logger.warning("Новостей для обработки не найдено")
except Exception as e:
self.logger.error(f"Ошибка в основной задаче бота: {e}")
def run_sync_job(self):
"""Синхронная обертка для планировщика"""
asyncio.run(self.run_job())
def start(self):
"""Запуск бота"""
self.logger.info("Запуск новостного бота")
# Выполнение задачи при старте
self.logger.info("Выполнение первоначальной задачи...")
self.run_sync_job()
# Планирование периодического выполнения
schedule.every(30).minutes.do(self.run_sync_job)
self.logger.info("Запуск планировщика задач (каждые 30 минут)")
# Основной цикл
while True:
schedule.run_pending()
time.sleep(5)
def main():
"""Точка входа в приложение"""
bot = NewsBot()
try:
bot.start()
except KeyboardInterrupt:
bot.logger.info("Бот остановлен пользователем")
except Exception as e:
bot.logger.error(f"Критическая ошибка бота: {e}")
if __name__ == '__main__':
main()

View File

@@ -1,569 +0,0 @@
#!/usr/bin/env python3
import re
import sys
import time
import asyncio
import discord
import logging
import colorlog
import requests
import html2text
import urllib.parse
from telethon import events
from bs4 import BeautifulSoup
from telethon.sync import TelegramClient
from telethon.errors import FloodWaitError
import keys
url_post = "https://linux-gaming.ru/posts.json"
url_news = "https://linux-gaming.ru/c/news/6.json"
url_vk_post = "https://api.vk.com/method/wall.post"
url_vk_get = "https://api.vk.com/method/wall.get"
url_changelog = "https://gitlab.eterfund.ru/Castro-Fidel/PortWINE/raw/master/data_from_portwine/changelog_ru"
heads_site = {
"Content-Type": "application/json",
"Api-Key": keys.api_key_site,
"Api-Username": "linux-gaming"
}
params_get = {
'access_token': keys.user_token_vk,
'v': '5.236', # Версия API
'owner_id': str(keys.own_id),
'count': str(100),
'offset': str(0),
'filter': 'all'
}
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s: %(message)s',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
))
logger.addHandler(handler)
def main():
last_changelog, resp_changelog = resp_change()
check_version(last_changelog, resp_changelog)
check_discord_public()
check_vk_posts()
check_tg_news()
def make_soup(resp_changelog):
logging.debug(f"Вызываем make_soup")
return BeautifulSoup(resp_changelog.text, 'html.parser')
def html_to_text(html_content):
logging.debug(f"Вызываем html_to_text")
logging.debug(f"HTML на входе {html_content}")
h = html2text.HTML2Text()
h.ignore_links = False # Сохранение ссылок
h.ignore_images = True # Игнорирование изображений
h.bypass_tables = True # Сохранение таблиц
h.reference_links = True # Сохранение оригинальных ссылок
markdown_text = h.handle(html_content)
logging.debug(f"Текст до обработки регулярками {markdown_text}")
# Удаление переносов строк из-за -
markdown_text = re.sub(r'-\s*\n\s*', '-', markdown_text, flags=re.DOTALL)
markdown_text = re.sub(r'-\s*\n*', '-', markdown_text, flags=re.DOTALL)
# Убираем переносы строк внутри круглых скобок ()
markdown_text = re.sub(r'\((.*?)\)', lambda x: '(' + x.group(1).replace('\n', ' ') + ')', markdown_text, flags=re.DOTALL)
# Убираем переносы строк внутри квадратных скобок []
markdown_text = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace('\n', ' ') + ']', markdown_text, flags=re.DOTALL)
# Удаление строк, содержащих '* * *'
markdown_text = re.sub(r'^.*\* \* \*.*$', '', markdown_text, flags=re.MULTILINE)
# Фикс ненумерованных списков
markdown_text = re.sub(r'^.*\* ', '* ', markdown_text, flags=re.MULTILINE)
# Убираем переносы строк, кроме строк, начинающихся с *
markdown_text = re.sub(r'^(?!\*).*?\n(?!\*)', lambda x: x.group(0).replace('\n', ' '), markdown_text, flags=re.MULTILINE)
# Преобразование всех ссылок с параметрами URL
markdown_text = convert_links(markdown_text)
# Работа с #
patterns_to_remove = [
r'###',
r'##',
r'#',
r'\[scripts\]\(\/tag\/scripts\) version \d+ ',
r'##\[scripts\]\(\) version \d+ ',
r'\d{4}×\d{3} \d+ KB'
]
for pattern in patterns_to_remove:
markdown_text = re.sub(pattern, '', markdown_text)
# Удаление избыточных пустых строк после удаления строк
markdown_text = re.sub(r'\n\s*\n', '\n', markdown_text)
# Замена текстов типа "image1280×474 99.7 KB", "807×454 64.1 KB" на "."
markdown_text = re.sub(r'image\d+×\d+\s+\d+(\.\d+)?\s+KB', '.', markdown_text)
markdown_text = re.sub(r'\d+×\d+\s+\d+(\.\d+)?\s+KB', '.', markdown_text)
# Изменение ссылок без описания
markdown_text = re.sub(r'\[\]\((https:\/\/[^\)]+)\)', r'[.](\1)', markdown_text)
markdown_text = re.sub(r'\[\s]\((https:\/\/[^\)]+)\)', r'[.](\1)', markdown_text)
# Удаление дублирующихся ссылок
markdown_text = remove_duplicate_links(markdown_text)
# Добавление переноса после "История изменений:"
re.sub(r'^.*\* \* \*.*$', '', markdown_text)
markdown_text = re.sub(r'История изменений:', r'\n', markdown_text)
logging.debug(f"Текст после обработки {markdown_text}")
return markdown_text
def convert_links(text):
logging.debug(f"Входим в convert_links")
url_pattern = re.compile(r'https?://[^\s\)]+')
url_pattern = url_pattern.sub(lambda match: decode_url_params(match.group(0)), text)
logging.debug(f"Возврат url_pattern {url_pattern}")
return url_pattern
def decode_url_params(url):
logging.debug(f"Входим в decode_url_params")
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
for key, values in query_params.items():
if key.lower() == 'to' and values:
return urllib.parse.unquote(values[0])
logging.debug(f"Возврат url {url}")
return url
def remove_empty_lines(text_data):
logging.debug(f"Входим в remove_empty_lines")
lines = text_data.splitlines()
non_empty_lines = [line for line in lines if line.strip()]
non_empty_lines = '\n'.join(non_empty_lines)
logging.debug(f"Возврат non_empty_lines {non_empty_lines}")
return non_empty_lines
def remove_markdown_links(markdown_text):
logging.debug(f"Входим в remove_markdown_links")
# Регулярное выражение для поиска Markdown-ссылок и замена их на только URL
markdown_text = re.sub(r'\[.*?\]\((https?://.*?)\)', r'\1' or r'(`https?://.*?)`\)', markdown_text)
logging.debug(f"Возврат markdown_text {markdown_text}")
return markdown_text
def remove_duplicate_links(text):
logging.debug(f"Входим в remove_duplicate_links")
seen_links = set()
def replace_link(match):
link = match.group(2)
if link in seen_links:
return ''
seen_links.add(link)
return match.group(0)
# Регулярное выражение для поиска Markdown-ссылок
link_pattern = re.compile(r'(\[.*?\]\((https:\/\/.*?)\))')
text = re.sub(link_pattern, replace_link, text)
logging.debug(f"Возвращаем text {text}")
return text
def extract_links(text):
logging.debug(f"Входим в extract_links")
# Регулярное выражение для поиска ссылок
url_pattern = re.compile(r'https?://\S+')
url_pattern = url_pattern.findall(text)
logging.debug(f"Возвращаем url_pattern {url_pattern}")
return url_pattern
def script_content(script_ver, resp_changelog):
logging.debug(f"Вход в script_content")
soup = make_soup(resp_changelog)
page_text = str(soup)
page_text = page_text.replace("Вы можете помочь развитию проекта: https://linux-gaming.ru/donate/", '')
# Находим текст до определенного текста, тега или класса (например, до тега <hr>)
last_text = f"###Scripts version {script_ver - 1}"
last_text = str(last_text)
index_last_text = page_text.find(last_text)
if index_last_text != -1:
changelog_text_last = page_text[:index_last_text]
prev_text = f"###Scripts version {script_ver}"
index_script_ver = changelog_text_last.find(prev_text)
if index_script_ver != -1:
changelog_text = changelog_text_last[index_script_ver:]
post_text = (f"-----------------------------\n") + changelog_text
site_text = (f"[center][img]/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png[/img]["
f"/center]\n{post_text}")
logging.debug(f"Сообщение на сайт {site_text}")
post_data = {
"title": f"Обновление скриптов {script_ver}",
"raw": site_text,
"category": keys.cat_num
}
logging.debug(f"Возвращаем post_text - {post_text}\n post_data - {post_data}")
return post_text, post_data, post_text
def news_content(post_id):
logging.debug(f"Запрос содержимого поста новости с ID: {post_id}")
response = response_get(f"https://linux-gaming.ru/t/{post_id}.json", heads_site)
if response and response.status_code == 200:
topic_data = response.json()
posts = topic_data.get('post_stream', {}).get('posts', [])
# Найти первый пост
for post in posts:
if post.get('post_number') == 1:
html_content = post.get('cooked', 'Нет содержимого')
text_data = html_to_text(html_content)
return text_data
logging.error(f"Первый пост не найден в теме с ID: {post_id}")
return None
else:
logging.error(f"Не удалось получить содержимое поста с ID: {post_id}")
return None
def response_get(url, heads_site):
try:
if heads_site == params_get:
return requests.get(url, params=params_get)
elif heads_site == heads_site:
return requests.get(url, headers=heads_site)
except requests.RequestException as err:
logging.error(f"Ошибка запроса {err}")
def resp_change():
resp_changelog = response_get(url_changelog, heads_site)
if resp_changelog and resp_changelog.status_code == 200:
matches_changelog = re.findall(r'###Scripts version (\d+)###', resp_changelog.text)
logging.debug(f"Найдены версии в истории изменений: {matches_changelog}")
last_changelog = int(max(matches_changelog))
logging.info(f"Последняя версия в истории изменений: {last_changelog}")
return last_changelog, resp_changelog
else:
logging.error(
f'Ошибка при запросе changelog: {resp_changelog.status_code if resp_changelog else "No Response"}')
return None, None
def news():
resp_topics = response_get(url_news, heads_site)
if resp_topics.status_code == 200:
data = resp_topics.json()
topics = data['topic_list']['topics']
list_titles_and_ids = [(topic['id'], str(topic['title'])) for topic in topics]
filtered_list_titles_and_ids = [(id, title) for id, title in list_titles_and_ids if not title == ('Описание '
'категории '
'«Новости»')]
return filtered_list_titles_and_ids
else:
logging.error(f"Ошибка при запросе тем с сайта: {resp_topics.status_code if resp_topics else 'Нет доступа к сайту'}")
return []
def site_post(url, headers, json):
while True:
title = json.get('title')
try:
resp_post = requests.post(url=url, headers=headers, json=json)
if resp_post.status_code == 200:
logging.info("Новость опубликована на сайте!")
return resp_post
elif resp_post.status_code == 422:
logging.warning(f'Новость "{title}" уже опубликована: {resp_post.status_code}')
return resp_post
else:
logging.error(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code}')
except requests.RequestException as error:
logging.error(f'Ошибка при отправке новости "{title}" на сайт: {error}')
time.sleep(900)
def check_version(last_changelog, resp_changelog):
list_titles_and_ids = news()
pattern = re.compile(r'Обновление скриптов (\d+)')
def extract_number(title):
match = pattern.search(title)
if match:
return int(match.group(1))
return None
numbers = [extract_number(title) for _, title in list_titles_and_ids if extract_number(title) is not None]
last_topics_script = max(numbers)
logging.info(f"Последняя новость на сайте о версии: {last_topics_script}")
if last_topics_script < last_changelog:
list_new_ver = []
for script_ver in range(last_topics_script + 1, last_changelog + 1):
list_new_ver.append(script_ver)
logging.info(f"Найдена новая версия скрипта {script_ver}")
changelog_text, post_data, params = script_content(script_ver, resp_changelog)
if post_data:
logging.debug(f"Публикуем {post_data}")
site_post(url_post, heads_site, post_data)
if not list_new_ver:
logging.warning(f"Не найдена новая версия скрипта")
sys.exit()
else:
logging.warning("Нет новых версий скриптов PortProton")
async def discord_post(post_text, client):
channel = client.get_channel(keys.dicord_channel)
await channel.send(f"{post_text}")
async def get_discord_messages(client_discord, channel_id):
channel = client_discord.get_channel(channel_id)
if not channel:
logging.error(f"ID канала Discord {channel_id} не существует")
return []
messages = []
async for message in channel.history(limit=999):
logging.debug(message)
messages.append(message.content)
pattern = re.compile(r'----------------------------------------------------------\n### (.*?)\t\n', re.DOTALL)
for message in messages:
matches = pattern.findall(message)
if matches:
messages.extend(matches)
logging.debug(f"Найдены сообщения в дискорде: {messages}")
return messages
def check_discord_public():
intents = discord.Intents.default()
intents.messages = True
client_discord = discord.Client(intents=intents)
@client_discord.event
async def on_ready():
logging.debug(f"Успешный логин в discord {client_discord.user}")
channel_id = keys.dicord_channel
discord_messages = await get_discord_messages(client_discord, channel_id)
list_titles_and_ids = news()
if list_titles_and_ids:
list_for_public = []
for topic_id, topic_title in list_titles_and_ids:
if topic_title not in discord_messages and topic_id > keys.start_topic_id:
list_for_public.append((topic_id, topic_title))
if not list_for_public:
logging.warning(f"Новостей для публикации в дискорд нет")
await client_discord.close()
else:
logging.info(f"Новости для публикации в дискорд: {list_for_public}")
channel = client_discord.get_channel(channel_id)
if not channel:
logging.error(f"ID канала Discord {channel_id} не существует")
await client_discord.close()
return
for topic_id, topic_title in reversed(list_for_public):
text_data = news_content(topic_id)
if text_data:
content = f"----------------------------------------------------------\n### {topic_title}\t\n" + text_data + "\n" + "@here"
# Разбиваем содержимое на части по 4000 символов
for i in range(0, len(content), 2000):
await channel.send(content[i:i+2000])
await client_discord.close()
client_discord.run(keys.discord_token)
def vk_post(url, post_text, links=None):
params_post = {
'access_token': keys.api_key_vk,
'v': '5.236', # Версия API VK
'owner_id': str(keys.own_id),
'message': f'{post_text}'
# Дополнительные параметры можно добавить здесь
}
if links:
params_post['attachments'] = links
try:
# Отправляем POST-запрос к VK API
resp_post = requests.post(url=url, params=params_post)
if resp_post.status_code == 200:
logging.info("Сообщение успешно опубликовано.")
logging.info(resp_post.json()) # Выводим ответ сервера в формате JSON
else:
logging.error(f"Ошибка при публикации сообщения в ВК:, {resp_post.status_code}")
return resp_post
except requests.RequestException as err:
logging.error(f"VK post failed: {err}")
return None
def get_vk_topics():
wall_posts = []
while True:
wall_data = response_get(url_vk_get, params_get)
wall_data_json = wall_data.json()
if 'error' in wall_data_json:
error_code = wall_data_json['error']['error_code']
error_msg = wall_data_json['error']['error_msg']
logging.error(f"Ошибка {error_code}: {error_msg}")
sys.exit(f"Ошибка {error_code}: {error_msg}")
items = wall_data_json.get('response', {}).get('items', [])
if not items:
logging.warning("Постов на стене нет")
break
wall_posts.extend((post['text'] for post in items if 'text' in post))
if len(items) < 100:
break
params_get['offset'] = str(int(params_get['offset']) + 100)
pattern = re.compile(r'----------------------------------------------------------\n### (.*?)\t\n', re.DOTALL)
for message in wall_posts:
matches = pattern.findall(message)
if matches:
wall_posts.extend(matches)
logging.debug(f"Найдены посты в ВК: {wall_posts}")
return wall_posts
def check_vk_posts():
vk_posts = get_vk_topics()
if not vk_posts:
logging.warning(f"Постов на стене нет{vk_posts}")
list_titles_and_ids = news()
if list_titles_and_ids:
list_for_public = []
for topic_id, topic_title in list_titles_and_ids:
# Сравнение заголовков с текстами постов
if not any(topic_title in vk_posts for vk_posts in vk_posts):
list_for_public.append((topic_id, topic_title))
if not list_for_public:
logging.warning(f"Новостей для публикации в ВК нет")
else:
for topic_id, topic_title in reversed(list_for_public):
if topic_id > keys.start_topic_id:
logging.info(f"Новости для публикации в ВК: {list_for_public}")
text_data = news_content(topic_id)
if text_data:
content = f"{topic_title}\t\n" + text_data + "\n"
content = remove_markdown_links(content)
content = re.sub(r'https://linux-gaming.ru/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png', '\n', content, flags=re.DOTALL)
links = extract_links(content)
if "Обновление скриптов" in topic_title:
# Пример добавления изображения с постом
vk_post(url_vk_post, content, "photo-99238527_457244491")
else:
if links:
vk_post(url_vk_post, content, links)
else:
vk_post(url_vk_post, content)
else:
logging.warning(f"Новостей для публикации в ВК нет")
def tg_post(post_text, client_tg):
# Отправка сообщения
client_tg.send_message(keys.channel_username_tg, post_text)
# Завершение сеанса
client_tg.disconnect()
async def get_tg_messages(client_tg, channel_username_tg):
messages = []
async for message in client_tg.iter_messages(channel_username_tg, limit=999):
if message.text: # Проверка на NoneType
logging.debug(f"Найдены сообщения в Telegram канале {message.text}")
messages.append(message.text)
return messages
def check_tg_news():
session_file = 'LG_news'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client_tg = TelegramClient(session_file, keys.api_id_tg, keys.api_hash_tg)
@client_tg.on(events.NewMessage(chats=keys.channel_username_tg))
async def handler(event):
logging.debug(f"Новое сообщение в Telegram: {event.message.message}")
async def main_tg():
await client_tg.start()
tg_messages = await get_tg_messages(client_tg, keys.channel_username_tg)
list_titles_and_ids = news()
if list_titles_and_ids:
list_for_public = []
for topic_id, topic_title in list_titles_and_ids:
if all(topic_title not in (msg or '') for msg in tg_messages) and topic_id > keys.start_topic_id:
list_for_public.append((topic_id, topic_title))
if not list_for_public:
logging.warning(f"Новостей для публикации в Telegram нет")
await client_tg.disconnect()
else:
logging.info(f"Новости для публикации в Telegram: {list_for_public}")
for topic_id, topic_title in reversed(list_for_public):
text_data = news_content(topic_id)
if text_data:
content = f"### {topic_title}\t\n" + text_data + "\n"
while True:
try:
await client_tg.send_message(keys.channel_username_tg, content)
break
except FloodWaitError as e:
logging.warning(f"Flood wait error: нужно подождать {e.seconds} секунд.")
await asyncio.sleep(e.seconds) # Ждем указанное время перед повторной попыткой
await client_tg.disconnect()
loop = asyncio.get_event_loop()
loop.run_until_complete(main_tg())
if __name__ == '__main__':
main()

23
requirements.txt Normal file
View File

@@ -0,0 +1,23 @@
# Основные зависимости для bot-news-linux-gaming
# Обновлено: 2025-08-18
# Telegram клиент
Telethon>=1.40.0
# HTTP запросы
requests>=2.32.0
# Планировщик задач
schedule>=1.2.0
# Логирование с цветами
colorlog>=6.9.0
# Конвертация HTML в текст
html2text>=2025.4.0
# Парсинг HTML
beautifulsoup4>=4.13.0
# Discord клиент (опционально)
# discord.py>=2.3.0

15
run_bot.sh Executable file
View File

@@ -0,0 +1,15 @@
#!/bin/bash
# Скрипт для запуска бота в виртуальном окружении
# Использование: ./run_bot.sh
# Проверка наличия виртуального окружения
if [ ! -d "venv" ]; then
echo "Виртуальное окружение не найдено!"
echo "Сначала запустите: bash setup_venv.sh"
exit 1
fi
# Запуск бота через виртуальное окружение
echo "Запуск бота..."
./venv/bin/python3 news-bot-modular.py

155
service-control.sh Executable file
View File

@@ -0,0 +1,155 @@
#!/bin/bash
# Простой скрипт управления Linux Gaming News Bot
SERVICE_NAME="bot-news-linux-gaming"
# Цвета для вывода
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
print_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Функция показа статуса
show_status() {
print_info "Статус сервиса $SERVICE_NAME:"
systemctl status $SERVICE_NAME --no-pager -l
}
# Функция показа логов
show_logs() {
local lines=${1:-50}
print_info "Последние $lines строк логов:"
sudo journalctl -u $SERVICE_NAME -n $lines --no-pager
}
# Функция следения за логами
follow_logs() {
print_info "Следование за логами (Ctrl+C для выхода):"
sudo journalctl -u $SERVICE_NAME -f
}
# Функция показа помощи
show_help() {
echo "Управление Linux Gaming News Bot"
echo ""
echo "Использование: $0 [команда]"
echo ""
echo "Команды:"
echo " start - Запустить сервис"
echo " stop - Остановить сервис"
echo " restart - Перезапустить сервис"
echo " status - Показать статус сервиса"
echo " enable - Включить автозапуск"
echo " disable - Отключить автозапуск"
echo " logs - Показать последние логи"
echo " logs N - Показать последние N строк логов"
echo " follow - Следить за логами в реальном времени"
echo " install - Установить сервис (требует sudo)"
echo " uninstall - Удалить сервис (требует sudo)"
echo " help - Показать эту справку"
echo ""
}
# Проверка существования сервиса
check_service_exists() {
if ! systemctl list-unit-files | grep -q "^$SERVICE_NAME.service"; then
print_error "Сервис $SERVICE_NAME не установлен"
print_info "Запустите: sudo ./install-service.sh"
exit 1
fi
}
# Основная логика
case "${1}" in
"start")
check_service_exists
print_info "Запуск сервиса..."
sudo systemctl start $SERVICE_NAME
show_status
;;
"stop")
check_service_exists
print_info "Остановка сервиса..."
sudo systemctl stop $SERVICE_NAME
show_status
;;
"restart")
check_service_exists
print_info "Перезапуск сервиса..."
sudo systemctl restart $SERVICE_NAME
show_status
;;
"status")
check_service_exists
show_status
;;
"enable")
check_service_exists
print_info "Включение автозапуска..."
sudo systemctl enable $SERVICE_NAME
print_success "Автозапуск включен"
;;
"disable")
check_service_exists
print_info "Отключение автозапуска..."
sudo systemctl disable $SERVICE_NAME
print_success "Автозапуск отключен"
;;
"logs")
check_service_exists
if [ -n "$2" ] && [[ "$2" =~ ^[0-9]+$ ]]; then
show_logs $2
else
show_logs
fi
;;
"follow")
check_service_exists
follow_logs
;;
"install")
if [ ! -f "./install-service.sh" ]; then
print_error "Файл install-service.sh не найден"
exit 1
fi
sudo ./install-service.sh
;;
"uninstall")
if [ ! -f "./uninstall-service.sh" ]; then
print_error "Файл uninstall-service.sh не найден"
exit 1
fi
sudo ./uninstall-service.sh
;;
"help"|"--help"|"-h")
show_help
;;
"")
print_warning "Команда не указана"
show_help
exit 1
;;
*)
print_error "Неизвестная команда: $1"
show_help
exit 1
;;
esac

40
setup_venv.sh Executable file
View File

@@ -0,0 +1,40 @@
#!/bin/bash
# Скрипт установки виртуального окружения для bot-news-linux-gaming
# Запускать на сервере: bash setup_venv.sh
echo "=== Установка виртуального окружения для бота ==="
# 1. Установка python3-venv если не установлен
echo "Проверка наличия python3-venv..."
sudo apt update
sudo apt install -y python3-venv python3-full
# 2. Создание виртуального окружения
echo "Создание виртуального окружения..."
python3 -m venv venv
# 3. Активация виртуального окружения
echo "Активация виртуального окружения..."
source venv/bin/activate
# 4. Обновление pip
echo "Обновление pip..."
pip install --upgrade pip
# 5. Установка зависимостей
echo "Установка зависимостей..."
pip install -r requirements.txt
# 6. Удаление старых сессий
echo "Удаление старых файлов сессий..."
rm -f *.session
echo "=== Установка завершена ==="
echo ""
echo "Для запуска бота используйте:"
echo " source venv/bin/activate"
echo " python3 news-bot-modular.py"
echo ""
echo "Или запускайте напрямую:"
echo " ./venv/bin/python3 news-bot-modular.py"

198
site_api.py Normal file
View File

@@ -0,0 +1,198 @@
#!/usr/bin/env python3
import re
import time
import logging
import requests
from config import URL_POST, URL_NEWS, URL_CHANGELOG, HEADERS_SITE, SITE_CONFIG
class SiteAPI:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.config = SITE_CONFIG
def make_request(self, url, headers=None, params=None, max_retries=10, retry_delay=10):
"""Универсальный метод для HTTP запросов с повторными попытками"""
for attempt in range(max_retries):
try:
if params:
response = requests.get(url, params=params)
elif headers:
response = requests.get(url, headers=headers)
else:
response = requests.get(url)
if response.status_code == 200:
return response
else:
self.logger.warning(f"HTTP {response.status_code} для {url}, попытка {attempt + 1}/{max_retries}")
except requests.RequestException as err:
self.logger.warning(f"Ошибка HTTP запроса к {url} (попытка {attempt + 1}/{max_retries}): {err}")
if attempt < max_retries - 1:
self.logger.info(f"Повторная попытка через {retry_delay} секунд...")
time.sleep(retry_delay)
self.logger.error(f"Все {max_retries} попыток запроса к {url} неудачны")
return None
def get_changelog(self):
"""Получение changelog с GitLab"""
self.logger.debug("Получаем changelog")
response = self.make_request(URL_CHANGELOG, HEADERS_SITE)
if response and response.status_code == 200:
matches = re.findall(r'###Scripts version (\d+)### / stable', response.text)
self.logger.debug(f"Найдены версии в истории изменений: {matches}")
if matches:
last_version = int(max(matches))
self.logger.info(f"Последняя стабильная версия в changelog: {last_version}")
return matches, last_version, response
self.logger.error(f'Ошибка при запросе changelog: {response.status_code if response else "No Response"}')
return None, None, None
def get_news(self):
"""Получение списка новостей с сайта"""
self.logger.debug("Получаем список новостей")
response = self.make_request(URL_NEWS, HEADERS_SITE)
if response and response.status_code == 200:
data = response.json()
topics = data['topic_list']['topics']
# Фильтруем новости, исключая описание категории
filtered_topics = [
(topic['id'], str(topic['title']))
for topic in topics
if topic['title'] != 'Описание категории «Новости»'
]
# Фильтруем по ID темы
filtered_topics = [
(topic_id, title)
for topic_id, title in filtered_topics
if topic_id >= self.config['start_topic_id']
]
self.logger.debug(f"Получено {len(filtered_topics)} новостей")
return filtered_topics
else:
self.logger.error(f"Ошибка при запросе новостей: {response.status_code if response else 'Нет доступа к сайту'}")
return []
def get_news_content(self, post_id, content_processor):
"""Получение содержимого конкретной новости"""
self.logger.debug(f"Получаем содержимое поста с ID: {post_id}")
url = f"https://linux-gaming.ru/t/{post_id}.json"
response = self.make_request(url, HEADERS_SITE)
if response and response.status_code == 200:
topic_data = response.json()
posts = topic_data.get('post_stream', {}).get('posts', [])
# Ищем первый пост
for post in posts:
if post.get('post_number') == 1:
html_content = post.get('cooked', 'Нет содержимого')
text_data = content_processor.html_to_text(html_content)
return text_data
self.logger.error(f"Первый пост не найден в теме с ID: {post_id}")
return None
else:
self.logger.error(f"Не удалось получить содержимое поста с ID: {post_id}")
return None
def post_to_site(self, post_data):
"""Публикация новости на сайт"""
title = post_data.get('title', 'Без названия')
while True:
try:
response = requests.post(url=URL_POST, headers=HEADERS_SITE, json=post_data)
if response.status_code == 200:
self.logger.info("Новость опубликована на сайте!")
return response
elif response.status_code == 422:
self.logger.warning(f'Новость "{title}" уже опубликована: {response.status_code}')
return response
else:
self.logger.error(f'Ошибка при отправке новости "{title}" на сайт: {response.status_code}')
except requests.RequestException as error:
self.logger.error(f'Ошибка при отправке новости "{title}" на сайт: {error}')
time.sleep(900) # Ждем 15 минут перед повторной попыткой
def create_script_update_post(self, script_ver, next_version, changelog_response, content_processor):
"""Создание поста для обновления скрипта"""
self.logger.debug(f"Создаем пост для обновления скрипта версии {script_ver}")
post_text = content_processor.create_script_content(script_ver, next_version, changelog_response)
if post_text:
site_text = f"[center][img]/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png[/img][/center]\n{post_text}"
post_data = {
"title": f"Кумулятивное обновление скриптов {script_ver}",
"raw": site_text,
"category": self.config['category_num']
}
self.logger.debug(f"Данные поста: {post_data}")
return post_text, post_data
return None, None
def check_script_versions(self, content_processor):
"""Проверка и публикация новых версий скриптов"""
self.logger.info("Проверяем новые версии скриптов")
# Получаем changelog
matches_changelog, last_changelog, resp_changelog = self.get_changelog()
if not matches_changelog or not last_changelog:
return
# Получаем существующие новости
news_list = self.get_news()
# Извлекаем номера версий из заголовков новостей
pattern = re.compile(r'Кумулятивное обновление скриптов (\d+)')
def extract_number(title):
match = pattern.search(title)
return int(match.group(1)) if match else None
numbers = [extract_number(title) for _, title in news_list if extract_number(title) is not None]
if numbers:
last_topics_script = max(numbers)
self.logger.info(f"Последняя новость на сайте о версии: {last_topics_script}")
if last_topics_script >= last_changelog:
self.logger.warning("Нет новых версий скриптов PortProton")
return
else:
self.logger.warning("На сайте нет новостей о скриптах")
# Публикуем новые версии
self._publish_new_script_versions(matches_changelog, resp_changelog, content_processor)
def _publish_new_script_versions(self, matches_changelog, resp_changelog, content_processor):
"""Публикация новых версий скриптов"""
for script_ver, next_version in zip(reversed(matches_changelog[:-1]), reversed(matches_changelog[1:])):
self.logger.info(f"Найдена новая версия скрипта {script_ver}")
post_text, post_data = self.create_script_update_post(
script_ver, next_version, resp_changelog, content_processor
)
if post_data:
self.logger.debug(f"Публикуем {post_data}")
self.post_to_site(post_data)

219
telegram_client.py Normal file
View File

@@ -0,0 +1,219 @@
#!/usr/bin/env python3
import time
import asyncio
import logging
from telethon import TelegramClient
from telethon.errors import FloodWaitError
from config import TELEGRAM_CONFIG
class TelegramNewsClient:
def __init__(self, content_processor):
self.logger = logging.getLogger(__name__)
self.content_processor = content_processor
self.config = TELEGRAM_CONFIG
async def get_messages(self, client, channel_username):
"""Получение сообщений из Telegram канала/топика"""
if self.config['topic_id']:
self.logger.debug(f"Получаем сообщения из Telegram топика {self.config['topic_id']}")
else:
self.logger.debug("Получаем сообщения из Telegram канала")
messages = []
titles = []
entity = await client.get_entity(channel_username)
async for message in client.iter_messages(entity, limit=100):
if message.text:
# Если указан топик, фильтруем только сообщения из этого топика
if self.config['topic_id']:
is_in_topic = False
# В Telegram топиках все сообщения имеют reply_to_msg_id равный ID первого сообщения топика
if hasattr(message, 'reply_to_msg_id') and message.reply_to_msg_id:
# Сообщение принадлежит топику если его reply_to_msg_id равен topic_id
if message.reply_to_msg_id == self.config['topic_id']:
is_in_topic = True
# Также проверяем, если само сообщение является первым сообщением топика
elif message.id == self.config['topic_id']:
is_in_topic = True
self.logger.debug(f"Сообщение ID {message.id}, reply_to_msg_id={getattr(message, 'reply_to_msg_id', None)}, нужен топик {self.config['topic_id']}, в топике: {is_in_topic}")
# Если сообщение не принадлежит нужному топику - пропускаем
if not is_in_topic:
continue
self.logger.debug(f"Найдено сообщение в Telegram: {message.text[:100]}...")
messages.append(message.text)
# Извлекаем заголовок из сообщения
title = self._extract_title_from_message(message.text)
if title:
titles.append(title)
self.logger.debug(f"Извлечено {len(titles)} заголовков из Telegram сообщений")
return messages, titles
def _extract_title_from_message(self, message_text):
"""Извлечение заголовка из текста сообщения"""
import re
# Ищем заголовки в формате "### Заголовок\t"
pattern = re.compile(r'^### (.*?)\t', re.MULTILINE)
match = pattern.search(message_text)
if match:
return match.group(1).strip()
# Если не найдено, ищем в первой строке
lines = message_text.strip().split('\n')
if lines:
first_line = lines[0].strip()
# Убираем префиксы типа "###", "**", и табы
first_line = re.sub(r'^[#*\s\t]+', '', first_line)
first_line = re.sub(r'\t.*$', '', first_line) # Убираем всё после таба
return first_line.strip() if first_line else None
return None
async def send_message(self, client, channel_username, content):
"""Отправка сообщения в Telegram канал/топик с обработкой flood wait и длинных сообщений"""
try:
# Получаем entity канала
entity = await client.get_entity(channel_username)
# Telegram лимит: 4096 символов
max_length = 4096
# Если сообщение слишком длинное, разбиваем его
if len(content) > max_length:
self.logger.warning(f"Сообщение слишком длинное ({len(content)} символов), разбиваем на части")
# Разбиваем по параграфам, чтобы не резать посередине слов
parts = []
current_part = ""
for line in content.split('\n'):
# Если добавление этой строки превысит лимит
if len(current_part + line + '\n') > max_length:
if current_part:
parts.append(current_part.rstrip())
current_part = ""
current_part += line + '\n'
# Добавляем последнюю часть
if current_part:
parts.append(current_part.rstrip())
self.logger.info(f"Сообщение разбито на {len(parts)} частей")
# Отправляем каждую часть
for i, part in enumerate(parts, 1):
if len(parts) > 1:
part_content = f"[Часть {i}/{len(parts)}]\n\n{part}"
else:
part_content = part
await self._send_single_message(client, entity, part_content)
# Небольшая задержка между частями
if i < len(parts):
await asyncio.sleep(1)
else:
# Обычная отправка
await self._send_single_message(client, entity, content)
except Exception as e:
self.logger.error(f"Ошибка получения entity канала '{channel_username}': {e}")
self.logger.info("Убедитесь, что имя канала указано правильно и бот имеет доступ")
async def _send_single_message(self, client, entity, content):
"""Отправка одного сообщения с обработкой flood wait"""
while True:
try:
# Если указан topic_id, отправляем в топик
if self.config['topic_id']:
self.logger.debug(f"Отправка в топик {self.config['topic_id']}")
await client.send_message(
entity,
content,
reply_to=self.config['topic_id']
)
self.logger.info(f"Сообщение успешно отправлено в Telegram топик {self.config['topic_id']}")
else:
# Обычная отправка в канал
await client.send_message(entity, content)
self.logger.info("Сообщение успешно отправлено в Telegram канал")
break
except FloodWaitError as e:
self.logger.warning(f"Flood wait error: нужно подождать {e.seconds} секунд")
await asyncio.sleep(e.seconds)
except Exception as e:
self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}")
raise
async def check_and_publish_news(self, news_list):
"""Проверка и публикация новостей в Telegram"""
self.logger.info("Начинаем проверку новостей для Telegram")
client = TelegramClient(
self.config['session_file'],
self.config['api_id'],
self.config['api_hash']
)
async with client:
await client.start()
# Получаем существующие сообщения и заголовки
tg_messages, tg_titles = await self.get_messages(client, self.config['channel_username'])
if not news_list:
self.logger.warning("Список новостей пуст")
return
# Фильтруем новости для публикации
list_for_public = []
for topic_id, topic_title in news_list:
# Проверяем по заголовкам и по полному тексту сообщений
title_exists = any(topic_title == title for title in tg_titles)
text_contains = any(topic_title in (msg or '') for msg in tg_messages)
if not title_exists and not text_contains:
list_for_public.append((topic_id, topic_title))
else:
self.logger.debug(f"Новость '{topic_title}' уже есть в Telegram (title_exists={title_exists}, text_contains={text_contains})")
if not list_for_public:
self.logger.warning("Новостей для публикации в Telegram нет")
return
self.logger.info(f"Новости для публикации в Telegram: {list_for_public}")
# Публикуем новости в обратном порядке, чтобы новые оказались сверху в ленте
for topic_id, topic_title in reversed(list_for_public):
from site_api import SiteAPI
site_api = SiteAPI()
text_data = site_api.get_news_content(topic_id, self.content_processor)
if text_data:
content = f"### {topic_title}\t\n" + text_data + "\n"
content = self.content_processor.format_for_telegram(content)
await self.send_message(
client,
self.config['channel_username'],
content
)
await asyncio.sleep(10.0) # 10 секундная задержка между сообщениями
else:
self.logger.warning(f"Не удалось получить содержимое новости {topic_id}")
def is_enabled(self):
"""Проверка, включен ли Telegram клиент"""
return True # Telegram всегда включен в этой версии

68
uninstall-service.sh Executable file
View File

@@ -0,0 +1,68 @@
#!/bin/bash
# Скрипт удаления Linux Gaming News Bot systemd service
set -e # Выход при любой ошибке
# Цвета для вывода
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Функции для цветного вывода
print_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Проверка запуска от root
if [ "$EUID" -ne 0 ]; then
print_error "Скрипт должен быть запущен с правами root (sudo)"
print_info "Используйте: sudo ./uninstall-service.sh"
exit 1
fi
SERVICE_NAME="bot-news-linux-gaming"
SERVICE_FILE="/etc/systemd/system/${SERVICE_NAME}.service"
print_info "Удаление сервиса $SERVICE_NAME..."
# Остановка сервиса если он запущен
if systemctl is-active --quiet $SERVICE_NAME; then
print_info "Остановка сервиса..."
systemctl stop $SERVICE_NAME
fi
# Отключение автозапуска если включен
if systemctl is-enabled --quiet $SERVICE_NAME; then
print_info "Отключение автозапуска..."
systemctl disable $SERVICE_NAME
fi
# Удаление файла сервиса
if [ -f "$SERVICE_FILE" ]; then
print_info "Удаление файла сервиса..."
rm "$SERVICE_FILE"
else
print_warning "Файл сервиса не найден: $SERVICE_FILE"
fi
# Перезагрузка systemd
print_info "Перезагрузка systemd..."
systemctl daemon-reload
print_success "Сервис $SERVICE_NAME успешно удален!"
print_info "Файлы проекта не затронуты и остались на месте."

201
vk_client.py Normal file
View File

@@ -0,0 +1,201 @@
#!/usr/bin/env python3
import re
import sys
import time
import logging
import requests
from config import VK_CONFIG, URL_VK_POST, URL_VK_GET, PARAMS_VK_GET
class VKClient:
def __init__(self, content_processor):
self.logger = logging.getLogger(__name__)
self.content_processor = content_processor
self.config = VK_CONFIG
def get_wall_posts(self, max_retries=10, retry_delay=10):
"""Получение постов со стены VK с повторными попытками"""
self.logger.debug("Получаем посты со стены VK")
wall_posts = []
params = PARAMS_VK_GET.copy()
while True:
success = False
for attempt in range(max_retries):
try:
response = requests.get(URL_VK_GET, params=params)
wall_data_json = response.json()
if 'error' in wall_data_json:
error_code = wall_data_json['error']['error_code']
error_msg = wall_data_json['error']['error_msg']
self.logger.error(f"Ошибка VK API {error_code}: {error_msg}")
return wall_posts, [] # Возвращаем то что получили
items = wall_data_json.get('response', {}).get('items', [])
if not items:
self.logger.warning("Постов на стене нет")
return wall_posts, [] # Завершаем цикл
wall_posts.extend((post['text'] for post in items if 'text' in post))
if len(items) < 100:
# Получили все посты
success = True
break
params['offset'] = str(int(params['offset']) + 100)
success = True
break
except requests.RequestException as e:
self.logger.warning(f"Ошибка при получении постов VK (попытка {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
self.logger.info(f"Повторная попытка через {retry_delay} секунд...")
time.sleep(retry_delay)
if not success:
self.logger.error(f"Все {max_retries} попыток получения постов VK неудачны")
break
# Если получили меньше 100 постов на последнем запросе, завершаем цикл
if len(items) < 100:
break
# Извлечение заголовков из постов
wall_titles = []
pattern = re.compile(r'### (.*?)\t', re.MULTILINE)
for message in wall_posts:
# Ищем заголовки в формате "### Заголовок\t"
matches = pattern.findall(message)
wall_titles.extend(matches)
# Также ищем в первой строке поста
lines = message.strip().split('\n') if message else []
if lines:
first_line = lines[0].strip()
# Убираем префиксы и табы
first_line = re.sub(r'^[#*\s\t]+', '', first_line)
first_line = re.sub(r'\t.*$', '', first_line)
if first_line and first_line not in wall_titles:
wall_titles.append(first_line)
self.logger.info(f"Найдено {len(wall_posts)} постов и {len(wall_titles)} заголовков в VK")
self.logger.debug(f"Заголовки VK: {wall_titles[:5]}...") # Показываем первые 5 заголовков
return wall_posts, wall_titles
def post_message(self, content, attachments=None, max_retries=10, retry_delay=10):
"""Отправка сообщения в VK с повторными попытками"""
params_post = {
'access_token': self.config['api_key'],
'v': '5.199',
'owner_id': str(self.config['owner_id'])
}
# Форматирование контента для VK
formatted_content = self.content_processor.format_for_vk(content)
data = {
'message': formatted_content
}
if attachments:
params_post['attachments'] = attachments
for attempt in range(max_retries):
try:
response = requests.post(url=URL_VK_POST, params=params_post, data=data)
response_json = response.json()
if response.status_code == 200:
# Проверяем наличие ошибки в ответе VK API
if 'error' in response_json:
error_code = response_json['error']['error_code']
error_msg = response_json['error']['error_msg']
self.logger.error(f"Ошибка VK API {error_code}: {error_msg}")
if 'request_params' in response_json['error']:
self.logger.debug(f"Параметры запроса: {response_json['error']['request_params']}")
# Не повторяем попытки при ошибках валидации
if error_code == 100: # Invalid parameter
return None
elif 'response' in response_json:
self.logger.info("Сообщение успешно опубликовано в VK")
self.logger.debug(f"ID поста: {response_json['response'].get('post_id', 'неизвестен')}")
return response
else:
self.logger.warning(f"Неожиданный формат ответа VK: {response_json}")
else:
self.logger.warning(f"Ошибка при публикации в VK: {response.status_code} - {response.reason} (попытка {attempt + 1}/{max_retries})")
except requests.RequestException as err:
self.logger.warning(f"Ошибка VK API (попытка {attempt + 1}/{max_retries}): {err}")
if attempt < max_retries - 1:
self.logger.info(f"Повторная попытка публикации через {retry_delay} секунд...")
time.sleep(retry_delay)
self.logger.error(f"Все {max_retries} попыток публикации в VK неудачны")
return None
def check_and_publish_news(self, news_list):
"""Проверка и публикация новостей в VK"""
self.logger.info("Начинаем проверку новостей для VK")
vk_posts, vk_titles = self.get_wall_posts()
if not vk_posts:
self.logger.warning("Постов на стене VK нет")
if not news_list:
self.logger.warning("Список новостей пуст")
return
# Фильтруем новости для публикации
list_for_public = []
for topic_id, topic_title in news_list:
# Проверяем по заголовкам и по полному тексту сообщений
title_exists = any(topic_title == title for title in vk_titles)
text_contains = any(topic_title in vk_post for vk_post in vk_posts)
if not title_exists and not text_contains:
list_for_public.append((topic_id, topic_title))
else:
self.logger.debug(f"Новость '{topic_title}' уже есть в VK (title_exists={title_exists}, text_contains={text_contains})")
if not list_for_public:
self.logger.warning("Новостей для публикации в VK нет")
return
self.logger.info(f"Новости для публикации в VK: {list_for_public}")
# Публикуем новости в обратном порядке, чтобы новые оказались сверху в ленте
for topic_id, topic_title in reversed(list_for_public):
from site_api import SiteAPI
site_api = SiteAPI()
text_data = site_api.get_news_content(topic_id, self.content_processor)
if text_data:
content = f"{topic_title}\t\n" + text_data + "\n"
# Извлекаем ссылки для прикрепления
links = self.content_processor.extract_links(content)
# Специальная обработка для постов о скриптах
if "Кумулятивное обновление скриптов" in topic_title:
# Добавляем изображение для постов о скриптах
self.post_message(content, "photo-99238527_457244491")
else:
if links:
# Берем первую ссылку как прикрепление
self.post_message(content, links[0] if len(links) == 1 else None)
else:
self.post_message(content)
time.sleep(1.0) # Пауза между постами
else:
self.logger.warning(f"Не удалось получить содержимое новости {topic_id}")
def is_enabled(self):
"""Проверка, включен ли VK клиент"""
return True # VK всегда включен в этой версии