Files
bot-news-linux-gaming/telegram_client.py

219 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import time
import asyncio
import logging
from telethon import TelegramClient
from telethon.errors import FloodWaitError
from config import TELEGRAM_CONFIG
class TelegramNewsClient:
def __init__(self, content_processor):
self.logger = logging.getLogger(__name__)
self.content_processor = content_processor
self.config = TELEGRAM_CONFIG
async def get_messages(self, client, channel_username):
"""Получение сообщений из Telegram канала/топика"""
if self.config['topic_id']:
self.logger.debug(f"Получаем сообщения из Telegram топика {self.config['topic_id']}")
else:
self.logger.debug("Получаем сообщения из Telegram канала")
messages = []
titles = []
entity = await client.get_entity(channel_username)
async for message in client.iter_messages(entity, limit=100):
if message.text:
# Если указан топик, фильтруем только сообщения из этого топика
if self.config['topic_id']:
is_in_topic = False
# В Telegram топиках все сообщения имеют reply_to_msg_id равный ID первого сообщения топика
if hasattr(message, 'reply_to_msg_id') and message.reply_to_msg_id:
# Сообщение принадлежит топику если его reply_to_msg_id равен topic_id
if message.reply_to_msg_id == self.config['topic_id']:
is_in_topic = True
# Также проверяем, если само сообщение является первым сообщением топика
elif message.id == self.config['topic_id']:
is_in_topic = True
self.logger.debug(f"Сообщение ID {message.id}, reply_to_msg_id={getattr(message, 'reply_to_msg_id', None)}, нужен топик {self.config['topic_id']}, в топике: {is_in_topic}")
# Если сообщение не принадлежит нужному топику - пропускаем
if not is_in_topic:
continue
self.logger.debug(f"Найдено сообщение в Telegram: {message.text[:100]}...")
messages.append(message.text)
# Извлекаем заголовок из сообщения
title = self._extract_title_from_message(message.text)
if title:
titles.append(title)
self.logger.debug(f"Извлечено {len(titles)} заголовков из Telegram сообщений")
return messages, titles
def _extract_title_from_message(self, message_text):
"""Извлечение заголовка из текста сообщения"""
import re
# Ищем заголовки в формате "### Заголовок\t"
pattern = re.compile(r'^### (.*?)\t', re.MULTILINE)
match = pattern.search(message_text)
if match:
return match.group(1).strip()
# Если не найдено, ищем в первой строке
lines = message_text.strip().split('\n')
if lines:
first_line = lines[0].strip()
# Убираем префиксы типа "###", "**", и табы
first_line = re.sub(r'^[#*\s\t]+', '', first_line)
first_line = re.sub(r'\t.*$', '', first_line) # Убираем всё после таба
return first_line.strip() if first_line else None
return None
async def send_message(self, client, channel_username, content):
"""Отправка сообщения в Telegram канал/топик с обработкой flood wait и длинных сообщений"""
try:
# Получаем entity канала
entity = await client.get_entity(channel_username)
# Telegram лимит: 4096 символов
max_length = 4096
# Если сообщение слишком длинное, разбиваем его
if len(content) > max_length:
self.logger.warning(f"Сообщение слишком длинное ({len(content)} символов), разбиваем на части")
# Разбиваем по параграфам, чтобы не резать посередине слов
parts = []
current_part = ""
for line in content.split('\n'):
# Если добавление этой строки превысит лимит
if len(current_part + line + '\n') > max_length:
if current_part:
parts.append(current_part.rstrip())
current_part = ""
current_part += line + '\n'
# Добавляем последнюю часть
if current_part:
parts.append(current_part.rstrip())
self.logger.info(f"Сообщение разбито на {len(parts)} частей")
# Отправляем каждую часть
for i, part in enumerate(parts, 1):
if len(parts) > 1:
part_content = f"[Часть {i}/{len(parts)}]\n\n{part}"
else:
part_content = part
await self._send_single_message(client, entity, part_content)
# Небольшая задержка между частями
if i < len(parts):
await asyncio.sleep(1)
else:
# Обычная отправка
await self._send_single_message(client, entity, content)
except Exception as e:
self.logger.error(f"Ошибка получения entity канала '{channel_username}': {e}")
self.logger.info("Убедитесь, что имя канала указано правильно и бот имеет доступ")
async def _send_single_message(self, client, entity, content):
"""Отправка одного сообщения с обработкой flood wait"""
while True:
try:
# Если указан topic_id, отправляем в топик
if self.config['topic_id']:
self.logger.debug(f"Отправка в топик {self.config['topic_id']}")
await client.send_message(
entity,
content,
reply_to=self.config['topic_id']
)
self.logger.info(f"Сообщение успешно отправлено в Telegram топик {self.config['topic_id']}")
else:
# Обычная отправка в канал
await client.send_message(entity, content)
self.logger.info("Сообщение успешно отправлено в Telegram канал")
break
except FloodWaitError as e:
self.logger.warning(f"Flood wait error: нужно подождать {e.seconds} секунд")
await asyncio.sleep(e.seconds)
except Exception as e:
self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}")
raise
async def check_and_publish_news(self, news_list):
"""Проверка и публикация новостей в Telegram"""
self.logger.info("Начинаем проверку новостей для Telegram")
client = TelegramClient(
self.config['session_file'],
self.config['api_id'],
self.config['api_hash']
)
async with client:
await client.start()
# Получаем существующие сообщения и заголовки
tg_messages, tg_titles = await self.get_messages(client, self.config['channel_username'])
if not news_list:
self.logger.warning("Список новостей пуст")
return
# Фильтруем новости для публикации
list_for_public = []
for topic_id, topic_title in news_list:
# Проверяем по заголовкам и по полному тексту сообщений
title_exists = any(topic_title == title for title in tg_titles)
text_contains = any(topic_title in (msg or '') for msg in tg_messages)
if not title_exists and not text_contains:
list_for_public.append((topic_id, topic_title))
else:
self.logger.debug(f"Новость '{topic_title}' уже есть в Telegram (title_exists={title_exists}, text_contains={text_contains})")
if not list_for_public:
self.logger.warning("Новостей для публикации в Telegram нет")
return
self.logger.info(f"Новости для публикации в Telegram: {list_for_public}")
# Публикуем новости в обратном порядке, чтобы новые оказались сверху в ленте
for topic_id, topic_title in reversed(list_for_public):
from site_api import SiteAPI
site_api = SiteAPI()
text_data = site_api.get_news_content(topic_id, self.content_processor)
if text_data:
content = f"### {topic_title}\t\n" + text_data + "\n"
content = self.content_processor.format_for_telegram(content)
await self.send_message(
client,
self.config['channel_username'],
content
)
await asyncio.sleep(10.0) # 10 секундная задержка между сообщениями
else:
self.logger.warning(f"Не удалось получить содержимое новости {topic_id}")
def is_enabled(self):
"""Проверка, включен ли Telegram клиент"""
return True # Telegram всегда включен в этой версии