303 lines
17 KiB
Python
303 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
|
||
import time
|
||
import asyncio
|
||
import logging
|
||
from telethon import TelegramClient
|
||
from telethon.errors import FloodWaitError
|
||
|
||
from config import TELEGRAM_CONFIG
|
||
from history_manager import HistoryManager
|
||
|
||
|
||
class TelegramNewsClient:
|
||
def __init__(self, content_processor):
|
||
self.logger = logging.getLogger(__name__)
|
||
self.content_processor = content_processor
|
||
self.config = TELEGRAM_CONFIG
|
||
self.history_manager = HistoryManager()
|
||
|
||
async def get_messages(self, client, channel_username):
|
||
"""Получение сообщений из Telegram канала/топика"""
|
||
if self.config['topic_id']:
|
||
self.logger.debug(f"Получаем сообщения из Telegram топика {self.config['topic_id']}")
|
||
else:
|
||
self.logger.debug("Получаем сообщения из Telegram канала")
|
||
|
||
messages = []
|
||
titles = []
|
||
|
||
entity = await client.get_entity(channel_username)
|
||
|
||
async for message in client.iter_messages(entity, limit=100):
|
||
if message.text:
|
||
# Если указан топик, фильтруем только сообщения из этого топика
|
||
if self.config['topic_id']:
|
||
is_in_topic = self._is_message_in_topic(message, self.config['topic_id'])
|
||
|
||
self.logger.debug(f"Сообщение ID {message.id}, reply_to_msg_id={getattr(message, 'reply_to_msg_id', None)}, нужен топик {self.config['topic_id']}, в топике: {is_in_topic}")
|
||
|
||
# Если сообщение не принадлежит нужному топику - пропускаем
|
||
if not is_in_topic:
|
||
continue
|
||
|
||
self.logger.debug(f"Найдено сообщение в Telegram: {message.text[:100]}...")
|
||
messages.append(message.text)
|
||
|
||
# Извлекаем заголовок из сообщения
|
||
title = self._extract_title_from_message(message.text)
|
||
if title:
|
||
titles.append(title)
|
||
|
||
self.logger.info(f"Извлечено {len(titles)} заголовков из Telegram сообщений: {titles[:5]}{'...' if len(titles) > 5 else ''}")
|
||
return messages, titles
|
||
|
||
def _is_message_in_topic(self, message, topic_id):
|
||
"""Проверка принадлежности сообщения к топику с улучшенной логикой"""
|
||
# Само сообщение является первым сообщением топика
|
||
if message.id == topic_id:
|
||
return True
|
||
|
||
# Проверяем прямой ответ на сообщение топика
|
||
if hasattr(message, 'reply_to_msg_id') and message.reply_to_msg_id:
|
||
if message.reply_to_msg_id == topic_id:
|
||
return True
|
||
|
||
# Дополнительная проверка: может быть это ответ на ответ в топике
|
||
# В некоторых случаях Telegram API может показывать вложенную структуру
|
||
try:
|
||
# Получаем информацию о сообщении, на которое отвечали
|
||
reply_to = message.reply_to
|
||
if reply_to and hasattr(reply_to, 'reply_to_msg_id'):
|
||
# Рекурсивно проверяем цепочку ответов
|
||
current_reply_id = reply_to.reply_to_msg_id
|
||
depth = 0
|
||
while current_reply_id and depth < 10: # Ограничиваем глубину для безопасности
|
||
if current_reply_id == topic_id:
|
||
return True
|
||
# В реальном случае здесь нужно было бы получать сообщение по ID
|
||
# Но это может быть слишком дорого по API запросам
|
||
break
|
||
except Exception as e:
|
||
self.logger.debug(f"Ошибка при проверке вложенной структуры ответов: {e}")
|
||
|
||
return False
|
||
|
||
def _check_duplicate_news(self, topic_title, tg_titles, tg_messages):
|
||
"""Улучшенная проверка дубликатов новостей с детальным логированием"""
|
||
# Проверяем точное совпадение с заголовками
|
||
title_exists = topic_title in tg_titles
|
||
if title_exists:
|
||
self.logger.debug(f"Найдено точное совпадение заголовка: '{topic_title}'")
|
||
return True
|
||
|
||
# Проверяем частичные совпадения в заголовках (нормализованные)
|
||
normalized_topic = topic_title.lower().strip()
|
||
for existing_title in tg_titles:
|
||
normalized_existing = existing_title.lower().strip()
|
||
if normalized_topic == normalized_existing:
|
||
self.logger.debug(f"Найдено нормализованное совпадение заголовка: '{topic_title}' == '{existing_title}'")
|
||
return True
|
||
|
||
# Проверяем наличие заголовка как части структурированного сообщения
|
||
# Ищем точное совпадение в формате "### Заголовок\t"
|
||
formatted_title = f"### {topic_title}\t"
|
||
for message in tg_messages:
|
||
if message and formatted_title in message:
|
||
self.logger.debug(f"Найдено совпадение форматированного заголовка в сообщении: '{formatted_title}'")
|
||
return True
|
||
|
||
# Дополнительная проверка: ищем заголовок в начале сообщений
|
||
for message in tg_messages:
|
||
if message:
|
||
message_lines = message.strip().split('\n')
|
||
if message_lines:
|
||
first_line = message_lines[0].strip()
|
||
# Убираем префиксы и сравниваем
|
||
import re
|
||
cleaned_first_line = re.sub(r'^[#*\s\t]+', '', first_line)
|
||
cleaned_first_line = re.sub(r'\t.*$', '', cleaned_first_line).strip()
|
||
|
||
if cleaned_first_line.lower() == normalized_topic:
|
||
self.logger.debug(f"Найдено совпадение в первой строке сообщения: '{cleaned_first_line}' == '{topic_title}'")
|
||
return True
|
||
|
||
self.logger.debug(f"Новость '{topic_title}' не найдена среди {len(tg_titles)} существующих заголовков")
|
||
return False
|
||
|
||
def _extract_title_from_message(self, message_text):
|
||
"""Извлечение заголовка из текста сообщения"""
|
||
import re
|
||
|
||
# Ищем заголовки в формате "### Заголовок\t"
|
||
pattern = re.compile(r'^### (.*?)\t', re.MULTILINE)
|
||
match = pattern.search(message_text)
|
||
if match:
|
||
return match.group(1).strip()
|
||
|
||
# Если не найдено, ищем в первой строке
|
||
lines = message_text.strip().split('\n')
|
||
if lines:
|
||
first_line = lines[0].strip()
|
||
# Убираем префиксы типа "###", "**", и табы
|
||
first_line = re.sub(r'^[#*\s\t]+', '', first_line)
|
||
first_line = re.sub(r'\t.*$', '', first_line) # Убираем всё после таба
|
||
return first_line.strip() if first_line else None
|
||
|
||
return None
|
||
|
||
async def send_message(self, client, channel_username, content):
|
||
"""Отправка сообщения в Telegram канал/топик с обработкой flood wait и длинных сообщений"""
|
||
try:
|
||
# Получаем entity канала
|
||
entity = await client.get_entity(channel_username)
|
||
|
||
# Telegram лимит: 4096 символов
|
||
max_length = 4096
|
||
|
||
# Если сообщение слишком длинное, разбиваем его
|
||
if len(content) > max_length:
|
||
self.logger.warning(f"Сообщение слишком длинное ({len(content)} символов), разбиваем на части")
|
||
|
||
# Разбиваем по параграфам, чтобы не резать посередине слов
|
||
parts = []
|
||
current_part = ""
|
||
|
||
for line in content.split('\n'):
|
||
# Если добавление этой строки превысит лимит
|
||
if len(current_part + line + '\n') > max_length:
|
||
if current_part:
|
||
parts.append(current_part.rstrip())
|
||
current_part = ""
|
||
current_part += line + '\n'
|
||
|
||
# Добавляем последнюю часть
|
||
if current_part:
|
||
parts.append(current_part.rstrip())
|
||
|
||
self.logger.info(f"Сообщение разбито на {len(parts)} частей")
|
||
|
||
# Отправляем каждую часть
|
||
for i, part in enumerate(parts, 1):
|
||
if len(parts) > 1:
|
||
part_content = f"[Часть {i}/{len(parts)}]\n\n{part}"
|
||
else:
|
||
part_content = part
|
||
|
||
await self._send_single_message(client, entity, part_content)
|
||
|
||
# Небольшая задержка между частями
|
||
if i < len(parts):
|
||
await asyncio.sleep(1)
|
||
else:
|
||
# Обычная отправка
|
||
await self._send_single_message(client, entity, content)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка получения entity канала '{channel_username}': {e}")
|
||
self.logger.info("Убедитесь, что имя канала указано правильно и бот имеет доступ")
|
||
|
||
async def _send_single_message(self, client, entity, content):
|
||
"""Отправка одного сообщения с обработкой flood wait"""
|
||
while True:
|
||
try:
|
||
# Если указан topic_id, отправляем в топик
|
||
if self.config['topic_id']:
|
||
self.logger.debug(f"Отправка в топик {self.config['topic_id']}")
|
||
await client.send_message(
|
||
entity,
|
||
content,
|
||
reply_to=self.config['topic_id']
|
||
)
|
||
self.logger.info(f"Сообщение успешно отправлено в Telegram топик {self.config['topic_id']}")
|
||
else:
|
||
# Обычная отправка в канал
|
||
await client.send_message(entity, content)
|
||
self.logger.info("Сообщение успешно отправлено в Telegram канал")
|
||
break
|
||
except FloodWaitError as e:
|
||
self.logger.warning(f"Flood wait error: нужно подождать {e.seconds} секунд")
|
||
await asyncio.sleep(e.seconds)
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}")
|
||
raise
|
||
|
||
async def check_and_publish_news(self, news_list):
|
||
"""Проверка и публикация новостей в Telegram
|
||
|
||
Returns:
|
||
list: Список topic_id успешно опубликованных новостей
|
||
"""
|
||
self.logger.info("Начинаем проверку новостей для Telegram")
|
||
published_topics = [] # Список успешно опубликованных топиков
|
||
|
||
client = TelegramClient(
|
||
self.config['session_file'],
|
||
self.config['api_id'],
|
||
self.config['api_hash']
|
||
)
|
||
|
||
async with client:
|
||
await client.start()
|
||
|
||
# Получаем существующие сообщения и заголовки
|
||
tg_messages, tg_titles = await self.get_messages(client, self.config['channel_username'])
|
||
|
||
if not news_list:
|
||
self.logger.warning("Список новостей пуст")
|
||
return published_topics
|
||
|
||
# Фильтруем новости для публикации
|
||
list_for_public = []
|
||
for topic_id, topic_title in news_list:
|
||
# Сначала проверяем в постоянной истории
|
||
if self.history_manager.is_published('telegram', topic_title):
|
||
self.logger.debug(f"Новость '{topic_title}' найдена в истории публикаций, пропускаем")
|
||
continue
|
||
|
||
# Затем проверяем в последних сообщениях канала
|
||
is_duplicate = self._check_duplicate_news(topic_title, tg_titles, tg_messages)
|
||
|
||
if not is_duplicate:
|
||
list_for_public.append((topic_id, topic_title))
|
||
self.logger.debug(f"Новость '{topic_title}' добавлена в список для публикации")
|
||
else:
|
||
# Добавляем в историю, если новость есть в канале, но нет в истории
|
||
self.history_manager.add_published('telegram', topic_id, topic_title)
|
||
self.logger.debug(f"Новость '{topic_title}' уже существует в Telegram, добавлена в историю")
|
||
|
||
if not list_for_public:
|
||
self.logger.warning("Новостей для публикации в Telegram нет")
|
||
return published_topics
|
||
|
||
self.logger.info(f"Новости для публикации в Telegram: {list_for_public}")
|
||
|
||
# Публикуем новости в обратном порядке, чтобы новые оказались сверху в ленте
|
||
for topic_id, topic_title in reversed(list_for_public):
|
||
from site_api import SiteAPI
|
||
site_api = SiteAPI()
|
||
text_data = site_api.get_news_content(topic_id, self.content_processor)
|
||
|
||
if text_data:
|
||
content = f"### {topic_title}\t\n" + text_data + "\n"
|
||
content = self.content_processor.format_for_telegram(content)
|
||
|
||
await self.send_message(
|
||
client,
|
||
self.config['channel_username'],
|
||
content
|
||
)
|
||
# Сохраняем в историю после успешной публикации
|
||
self.history_manager.add_published('telegram', topic_id, topic_title)
|
||
published_topics.append(topic_id) # Добавляем в список успешно опубликованных
|
||
self.logger.info(f"Новость '{topic_title}' добавлена в историю публикаций")
|
||
|
||
await asyncio.sleep(10.0) # 10 секундная задержка между сообщениями
|
||
else:
|
||
self.logger.warning(f"Не удалось получить содержимое новости {topic_id}")
|
||
|
||
return published_topics
|
||
|
||
def is_enabled(self):
|
||
"""Проверка, включен ли Telegram клиент"""
|
||
return True # Telegram всегда включен в этой версии |