570 lines
23 KiB
Python
Executable File
570 lines
23 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
import re
|
||
import sys
|
||
import time
|
||
import asyncio
|
||
import discord
|
||
import logging
|
||
import colorlog
|
||
import requests
|
||
import html2text
|
||
import urllib.parse
|
||
|
||
from telethon import events
|
||
from bs4 import BeautifulSoup
|
||
from telethon.sync import TelegramClient
|
||
from telethon.errors import FloodWaitError
|
||
|
||
import keys
|
||
|
||
url_post = "https://linux-gaming.ru/posts.json"
|
||
url_news = "https://linux-gaming.ru/c/news/6.json"
|
||
url_vk_post = "https://api.vk.com/method/wall.post"
|
||
url_vk_get = "https://api.vk.com/method/wall.get"
|
||
url_changelog = "https://gitlab.eterfund.ru/Castro-Fidel/PortWINE/raw/master/data_from_portwine/changelog_ru"
|
||
|
||
heads_site = {
|
||
"Content-Type": "application/json",
|
||
"Api-Key": keys.api_key_site,
|
||
"Api-Username": "linux-gaming"
|
||
}
|
||
|
||
params_get = {
|
||
'access_token': keys.user_token_vk,
|
||
'v': '5.236', # Версия API
|
||
'owner_id': str(keys.own_id),
|
||
'count': str(100),
|
||
'offset': str(0),
|
||
'filter': 'all'
|
||
}
|
||
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.INFO)
|
||
|
||
handler = colorlog.StreamHandler()
|
||
handler.setFormatter(colorlog.ColoredFormatter(
|
||
'%(log_color)s%(levelname)s: %(message)s',
|
||
log_colors={
|
||
'DEBUG': 'cyan',
|
||
'INFO': 'green',
|
||
'WARNING': 'yellow',
|
||
'ERROR': 'red',
|
||
'CRITICAL': 'red,bg_white',
|
||
}
|
||
))
|
||
|
||
logger.addHandler(handler)
|
||
|
||
|
||
def main():
|
||
last_changelog, resp_changelog = resp_change()
|
||
check_version(last_changelog, resp_changelog)
|
||
check_discord_public()
|
||
check_vk_posts()
|
||
check_tg_news()
|
||
|
||
|
||
def make_soup(resp_changelog):
|
||
logging.debug(f"Вызываем make_soup")
|
||
return BeautifulSoup(resp_changelog.text, 'html.parser')
|
||
|
||
|
||
def html_to_text(html_content):
|
||
logging.debug(f"Вызываем html_to_text")
|
||
logging.debug(f"HTML на входе {html_content}")
|
||
h = html2text.HTML2Text()
|
||
h.ignore_links = False # Сохранение ссылок
|
||
h.ignore_images = True # Игнорирование изображений
|
||
h.bypass_tables = True # Сохранение таблиц
|
||
h.reference_links = True # Сохранение оригинальных ссылок
|
||
markdown_text = h.handle(html_content)
|
||
|
||
logging.debug(f"Текст до обработки регулярками {markdown_text}")
|
||
|
||
# Удаление переносов строк из-за -
|
||
markdown_text = re.sub(r'-\s*\n\s*', '-', markdown_text, flags=re.DOTALL)
|
||
markdown_text = re.sub(r'-\s*\n*', '-', markdown_text, flags=re.DOTALL)
|
||
|
||
# Убираем переносы строк внутри круглых скобок ()
|
||
markdown_text = re.sub(r'\((.*?)\)', lambda x: '(' + x.group(1).replace('\n', ' ') + ')', markdown_text, flags=re.DOTALL)
|
||
|
||
# Убираем переносы строк внутри квадратных скобок []
|
||
markdown_text = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace('\n', ' ') + ']', markdown_text, flags=re.DOTALL)
|
||
|
||
# Удаление строк, содержащих '* * *'
|
||
markdown_text = re.sub(r'^.*\* \* \*.*$', '', markdown_text, flags=re.MULTILINE)
|
||
|
||
# Фикс ненумерованных списков
|
||
markdown_text = re.sub(r'^.*\* ', '* ', markdown_text, flags=re.MULTILINE)
|
||
|
||
# Убираем переносы строк, кроме строк, начинающихся с *
|
||
markdown_text = re.sub(r'^(?!\*).*?\n(?!\*)', lambda x: x.group(0).replace('\n', ' '), markdown_text, flags=re.MULTILINE)
|
||
|
||
# Преобразование всех ссылок с параметрами URL
|
||
markdown_text = convert_links(markdown_text)
|
||
|
||
# Работа с #
|
||
patterns_to_remove = [
|
||
r'###',
|
||
r'##',
|
||
r'#',
|
||
r'\[scripts\]\(\/tag\/scripts\) version \d+ ',
|
||
r'##\[scripts\]\(\) version \d+ ',
|
||
r'\d{4}×\d{3} \d+ KB'
|
||
]
|
||
for pattern in patterns_to_remove:
|
||
markdown_text = re.sub(pattern, '', markdown_text)
|
||
|
||
# Удаление избыточных пустых строк после удаления строк
|
||
markdown_text = re.sub(r'\n\s*\n', '\n', markdown_text)
|
||
|
||
# Замена текстов типа "image1280×474 99.7 KB", "807×454 64.1 KB" на "."
|
||
markdown_text = re.sub(r'image\d+×\d+\s+\d+(\.\d+)?\s+KB', '.', markdown_text)
|
||
markdown_text = re.sub(r'\d+×\d+\s+\d+(\.\d+)?\s+KB', '.', markdown_text)
|
||
|
||
# Изменение ссылок без описания
|
||
markdown_text = re.sub(r'\[\]\((https:\/\/[^\)]+)\)', r'[.](\1)', markdown_text)
|
||
markdown_text = re.sub(r'\[\s]\((https:\/\/[^\)]+)\)', r'[.](\1)', markdown_text)
|
||
|
||
# Удаление дублирующихся ссылок
|
||
markdown_text = remove_duplicate_links(markdown_text)
|
||
|
||
# Добавление переноса после "История изменений:"
|
||
re.sub(r'^.*\* \* \*.*$', '', markdown_text)
|
||
markdown_text = re.sub(r'История изменений:', r'\n', markdown_text)
|
||
|
||
logging.debug(f"Текст после обработки {markdown_text}")
|
||
return markdown_text
|
||
|
||
|
||
def convert_links(text):
|
||
logging.debug(f"Входим в convert_links")
|
||
url_pattern = re.compile(r'https?://[^\s\)]+')
|
||
url_pattern = url_pattern.sub(lambda match: decode_url_params(match.group(0)), text)
|
||
logging.debug(f"Возврат url_pattern {url_pattern}")
|
||
return url_pattern
|
||
|
||
def decode_url_params(url):
|
||
logging.debug(f"Входим в decode_url_params")
|
||
parsed_url = urllib.parse.urlparse(url)
|
||
query_params = urllib.parse.parse_qs(parsed_url.query)
|
||
for key, values in query_params.items():
|
||
if key.lower() == 'to' and values:
|
||
return urllib.parse.unquote(values[0])
|
||
logging.debug(f"Возврат url {url}")
|
||
return url
|
||
|
||
|
||
def remove_empty_lines(text_data):
|
||
logging.debug(f"Входим в remove_empty_lines")
|
||
lines = text_data.splitlines()
|
||
non_empty_lines = [line for line in lines if line.strip()]
|
||
non_empty_lines = '\n'.join(non_empty_lines)
|
||
logging.debug(f"Возврат non_empty_lines {non_empty_lines}")
|
||
return non_empty_lines
|
||
|
||
def remove_markdown_links(markdown_text):
|
||
logging.debug(f"Входим в remove_markdown_links")
|
||
# Регулярное выражение для поиска Markdown-ссылок и замена их на только URL
|
||
markdown_text = re.sub(r'\[.*?\]\((https?://.*?)\)', r'\1' or r'(`https?://.*?)`\)', markdown_text)
|
||
logging.debug(f"Возврат markdown_text {markdown_text}")
|
||
return markdown_text
|
||
|
||
|
||
def remove_duplicate_links(text):
|
||
logging.debug(f"Входим в remove_duplicate_links")
|
||
seen_links = set()
|
||
|
||
def replace_link(match):
|
||
link = match.group(2)
|
||
if link in seen_links:
|
||
return ''
|
||
seen_links.add(link)
|
||
return match.group(0)
|
||
|
||
# Регулярное выражение для поиска Markdown-ссылок
|
||
link_pattern = re.compile(r'(\[.*?\]\((https:\/\/.*?)\))')
|
||
text = re.sub(link_pattern, replace_link, text)
|
||
logging.debug(f"Возвращаем text {text}")
|
||
return text
|
||
|
||
|
||
def extract_links(text):
|
||
logging.debug(f"Входим в extract_links")
|
||
# Регулярное выражение для поиска ссылок
|
||
url_pattern = re.compile(r'https?://\S+')
|
||
url_pattern = url_pattern.findall(text)
|
||
logging.debug(f"Возвращаем url_pattern {url_pattern}")
|
||
return url_pattern
|
||
|
||
|
||
def script_content(script_ver, resp_changelog):
|
||
logging.debug(f"Вход в script_content")
|
||
soup = make_soup(resp_changelog)
|
||
page_text = str(soup)
|
||
page_text = page_text.replace("Вы можете помочь развитию проекта: https://linux-gaming.ru/donate/", '')
|
||
|
||
# Находим текст до определенного текста, тега или класса (например, до тега <hr>)
|
||
last_text = f"###Scripts version {script_ver - 1}"
|
||
last_text = str(last_text)
|
||
index_last_text = page_text.find(last_text)
|
||
|
||
if index_last_text != -1:
|
||
changelog_text_last = page_text[:index_last_text]
|
||
prev_text = f"###Scripts version {script_ver}"
|
||
index_script_ver = changelog_text_last.find(prev_text)
|
||
|
||
if index_script_ver != -1:
|
||
changelog_text = changelog_text_last[index_script_ver:]
|
||
post_text = (f"-----------------------------\n") + changelog_text
|
||
site_text = (f"[center][img]/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png[/img]["
|
||
f"/center]\n{post_text}")
|
||
|
||
logging.debug(f"Сообщение на сайт {site_text}")
|
||
|
||
post_data = {
|
||
"title": f"Обновление скриптов {script_ver}",
|
||
"raw": site_text,
|
||
"category": keys.cat_num
|
||
}
|
||
logging.debug(f"Возвращаем post_text - {post_text}\n post_data - {post_data}")
|
||
return post_text, post_data, post_text
|
||
|
||
|
||
def news_content(post_id):
|
||
logging.debug(f"Запрос содержимого поста новости с ID: {post_id}")
|
||
response = response_get(f"https://linux-gaming.ru/t/{post_id}.json", heads_site)
|
||
if response and response.status_code == 200:
|
||
topic_data = response.json()
|
||
posts = topic_data.get('post_stream', {}).get('posts', [])
|
||
# Найти первый пост
|
||
for post in posts:
|
||
if post.get('post_number') == 1:
|
||
html_content = post.get('cooked', 'Нет содержимого')
|
||
text_data = html_to_text(html_content)
|
||
return text_data
|
||
logging.error(f"Первый пост не найден в теме с ID: {post_id}")
|
||
return None
|
||
else:
|
||
logging.error(f"Не удалось получить содержимое поста с ID: {post_id}")
|
||
return None
|
||
|
||
|
||
def response_get(url, heads_site):
|
||
try:
|
||
if heads_site == params_get:
|
||
return requests.get(url, params=params_get)
|
||
elif heads_site == heads_site:
|
||
return requests.get(url, headers=heads_site)
|
||
except requests.RequestException as err:
|
||
logging.error(f"Ошибка запроса {err}")
|
||
|
||
|
||
def resp_change():
|
||
resp_changelog = response_get(url_changelog, heads_site)
|
||
|
||
if resp_changelog and resp_changelog.status_code == 200:
|
||
matches_changelog = re.findall(r'###Scripts version (\d+)###', resp_changelog.text)
|
||
logging.debug(f"Найдены версии в истории изменений: {matches_changelog}")
|
||
last_changelog = int(max(matches_changelog))
|
||
logging.info(f"Последняя версия в истории изменений: {last_changelog}")
|
||
return last_changelog, resp_changelog
|
||
else:
|
||
logging.error(
|
||
f'Ошибка при запросе changelog: {resp_changelog.status_code if resp_changelog else "No Response"}')
|
||
return None, None
|
||
|
||
|
||
def news():
|
||
resp_topics = response_get(url_news, heads_site)
|
||
|
||
if resp_topics.status_code == 200:
|
||
data = resp_topics.json()
|
||
topics = data['topic_list']['topics']
|
||
list_titles_and_ids = [(topic['id'], str(topic['title'])) for topic in topics]
|
||
filtered_list_titles_and_ids = [(id, title) for id, title in list_titles_and_ids if not title == ('Описание '
|
||
'категории '
|
||
'«Новости»')]
|
||
return filtered_list_titles_and_ids
|
||
else:
|
||
logging.error(f"Ошибка при запросе тем с сайта: {resp_topics.status_code if resp_topics else 'Нет доступа к сайту'}")
|
||
return []
|
||
|
||
|
||
def site_post(url, headers, json):
|
||
while True:
|
||
title = json.get('title')
|
||
try:
|
||
resp_post = requests.post(url=url, headers=headers, json=json)
|
||
if resp_post.status_code == 200:
|
||
logging.info("Новость опубликована на сайте!")
|
||
return resp_post
|
||
elif resp_post.status_code == 422:
|
||
logging.warning(f'Новость "{title}" уже опубликована: {resp_post.status_code}')
|
||
return resp_post
|
||
else:
|
||
logging.error(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code}')
|
||
except requests.RequestException as error:
|
||
logging.error(f'Ошибка при отправке новости "{title}" на сайт: {error}')
|
||
time.sleep(900)
|
||
|
||
|
||
def check_version(last_changelog, resp_changelog):
|
||
list_titles_and_ids = news()
|
||
pattern = re.compile(r'Обновление скриптов (\d+)')
|
||
|
||
def extract_number(title):
|
||
match = pattern.search(title)
|
||
if match:
|
||
return int(match.group(1))
|
||
return None
|
||
|
||
numbers = [extract_number(title) for _, title in list_titles_and_ids if extract_number(title) is not None]
|
||
last_topics_script = max(numbers)
|
||
logging.info(f"Последняя новость на сайте о версии: {last_topics_script}")
|
||
|
||
if last_topics_script < last_changelog:
|
||
list_new_ver = []
|
||
|
||
for script_ver in range(last_topics_script + 1, last_changelog + 1):
|
||
list_new_ver.append(script_ver)
|
||
logging.info(f"Найдена новая версия скрипта {script_ver}")
|
||
changelog_text, post_data, params = script_content(script_ver, resp_changelog)
|
||
if post_data:
|
||
logging.debug(f"Публикуем {post_data}")
|
||
site_post(url_post, heads_site, post_data)
|
||
|
||
if not list_new_ver:
|
||
logging.warning(f"Не найдена новая версия скрипта")
|
||
sys.exit()
|
||
else:
|
||
logging.warning("Нет новых версий скриптов PortProton")
|
||
|
||
|
||
async def discord_post(post_text, client):
|
||
channel = client.get_channel(keys.dicord_channel)
|
||
await channel.send(f"{post_text}")
|
||
|
||
|
||
async def get_discord_messages(client_discord, channel_id):
|
||
channel = client_discord.get_channel(channel_id)
|
||
if not channel:
|
||
logging.error(f"ID канала Discord {channel_id} не существует")
|
||
return []
|
||
|
||
messages = []
|
||
async for message in channel.history(limit=999):
|
||
logging.debug(message)
|
||
messages.append(message.content)
|
||
pattern = re.compile(r'----------------------------------------------------------\n### (.*?)\t\n', re.DOTALL)
|
||
for message in messages:
|
||
matches = pattern.findall(message)
|
||
if matches:
|
||
messages.extend(matches)
|
||
logging.debug(f"Найдены сообщения в дискорде: {messages}")
|
||
return messages
|
||
|
||
|
||
def check_discord_public():
|
||
intents = discord.Intents.default()
|
||
intents.messages = True
|
||
client_discord = discord.Client(intents=intents)
|
||
|
||
@client_discord.event
|
||
async def on_ready():
|
||
logging.debug(f"Успешный логин в discord {client_discord.user}")
|
||
channel_id = keys.dicord_channel
|
||
discord_messages = await get_discord_messages(client_discord, channel_id)
|
||
|
||
list_titles_and_ids = news()
|
||
if list_titles_and_ids:
|
||
list_for_public = []
|
||
|
||
for topic_id, topic_title in list_titles_and_ids:
|
||
if topic_title not in discord_messages and topic_id > keys.start_topic_id:
|
||
list_for_public.append((topic_id, topic_title))
|
||
|
||
if not list_for_public:
|
||
logging.warning(f"Новостей для публикации в дискорд нет")
|
||
await client_discord.close()
|
||
else:
|
||
logging.info(f"Новости для публикации в дискорд: {list_for_public}")
|
||
channel = client_discord.get_channel(channel_id)
|
||
if not channel:
|
||
logging.error(f"ID канала Discord {channel_id} не существует")
|
||
await client_discord.close()
|
||
return
|
||
|
||
for topic_id, topic_title in reversed(list_for_public):
|
||
text_data = news_content(topic_id)
|
||
if text_data:
|
||
content = f"----------------------------------------------------------\n### {topic_title}\t\n" + text_data + "\n" + "@here"
|
||
# Разбиваем содержимое на части по 4000 символов
|
||
for i in range(0, len(content), 2000):
|
||
await channel.send(content[i:i+2000])
|
||
await client_discord.close()
|
||
|
||
client_discord.run(keys.discord_token)
|
||
|
||
|
||
def vk_post(url, post_text, links=None):
|
||
params_post = {
|
||
'access_token': keys.api_key_vk,
|
||
'v': '5.236', # Версия API VK
|
||
'owner_id': str(keys.own_id),
|
||
'message': f'{post_text}'
|
||
# Дополнительные параметры можно добавить здесь
|
||
}
|
||
|
||
if links:
|
||
params_post['attachments'] = links
|
||
|
||
try:
|
||
# Отправляем POST-запрос к VK API
|
||
resp_post = requests.post(url=url, params=params_post)
|
||
|
||
if resp_post.status_code == 200:
|
||
logging.info("Сообщение успешно опубликовано.")
|
||
logging.info(resp_post.json()) # Выводим ответ сервера в формате JSON
|
||
else:
|
||
logging.error(f"Ошибка при публикации сообщения в ВК:, {resp_post.status_code}")
|
||
|
||
return resp_post
|
||
except requests.RequestException as err:
|
||
logging.error(f"VK post failed: {err}")
|
||
return None
|
||
|
||
|
||
def get_vk_topics():
|
||
wall_posts = []
|
||
while True:
|
||
wall_data = response_get(url_vk_get, params_get)
|
||
wall_data_json = wall_data.json()
|
||
if 'error' in wall_data_json:
|
||
error_code = wall_data_json['error']['error_code']
|
||
error_msg = wall_data_json['error']['error_msg']
|
||
logging.error(f"Ошибка {error_code}: {error_msg}")
|
||
sys.exit(f"Ошибка {error_code}: {error_msg}")
|
||
|
||
items = wall_data_json.get('response', {}).get('items', [])
|
||
if not items:
|
||
logging.warning("Постов на стене нет")
|
||
break
|
||
|
||
wall_posts.extend((post['text'] for post in items if 'text' in post))
|
||
if len(items) < 100:
|
||
break
|
||
|
||
params_get['offset'] = str(int(params_get['offset']) + 100)
|
||
|
||
pattern = re.compile(r'----------------------------------------------------------\n### (.*?)\t\n', re.DOTALL)
|
||
for message in wall_posts:
|
||
matches = pattern.findall(message)
|
||
if matches:
|
||
wall_posts.extend(matches)
|
||
logging.debug(f"Найдены посты в ВК: {wall_posts}")
|
||
|
||
return wall_posts
|
||
|
||
|
||
def check_vk_posts():
|
||
vk_posts = get_vk_topics()
|
||
if not vk_posts:
|
||
logging.warning(f"Постов на стене нет{vk_posts}")
|
||
|
||
list_titles_and_ids = news()
|
||
if list_titles_and_ids:
|
||
list_for_public = []
|
||
for topic_id, topic_title in list_titles_and_ids:
|
||
# Сравнение заголовков с текстами постов
|
||
if not any(topic_title in vk_posts for vk_posts in vk_posts):
|
||
list_for_public.append((topic_id, topic_title))
|
||
|
||
if not list_for_public:
|
||
logging.warning(f"Новостей для публикации в ВК нет")
|
||
else:
|
||
for topic_id, topic_title in reversed(list_for_public):
|
||
if topic_id > keys.start_topic_id:
|
||
logging.info(f"Новости для публикации в ВК: {list_for_public}")
|
||
text_data = news_content(topic_id)
|
||
if text_data:
|
||
content = f"{topic_title}\t\n" + text_data + "\n"
|
||
content = remove_markdown_links(content)
|
||
content = re.sub(r'https://linux-gaming.ru/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png', '\n', content, flags=re.DOTALL)
|
||
links = extract_links(content)
|
||
if "Обновление скриптов" in topic_title:
|
||
# Пример добавления изображения с постом
|
||
vk_post(url_vk_post, content, "photo-99238527_457244491")
|
||
else:
|
||
if links:
|
||
vk_post(url_vk_post, content, links)
|
||
else:
|
||
vk_post(url_vk_post, content)
|
||
else:
|
||
logging.warning(f"Новостей для публикации в ВК нет")
|
||
|
||
|
||
def tg_post(post_text, client_tg):
|
||
# Отправка сообщения
|
||
client_tg.send_message(keys.channel_username_tg, post_text)
|
||
# Завершение сеанса
|
||
client_tg.disconnect()
|
||
|
||
|
||
async def get_tg_messages(client_tg, channel_username_tg):
|
||
messages = []
|
||
async for message in client_tg.iter_messages(channel_username_tg, limit=999):
|
||
if message.text: # Проверка на NoneType
|
||
logging.debug(f"Найдены сообщения в Telegram канале {message.text}")
|
||
messages.append(message.text)
|
||
return messages
|
||
|
||
|
||
def check_tg_news():
|
||
session_file = 'LG_news'
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
|
||
client_tg = TelegramClient(session_file, keys.api_id_tg, keys.api_hash_tg)
|
||
|
||
@client_tg.on(events.NewMessage(chats=keys.channel_username_tg))
|
||
async def handler(event):
|
||
logging.debug(f"Новое сообщение в Telegram: {event.message.message}")
|
||
|
||
async def main_tg():
|
||
await client_tg.start()
|
||
tg_messages = await get_tg_messages(client_tg, keys.channel_username_tg)
|
||
list_titles_and_ids = news()
|
||
if list_titles_and_ids:
|
||
list_for_public = []
|
||
|
||
for topic_id, topic_title in list_titles_and_ids:
|
||
if all(topic_title not in (msg or '') for msg in tg_messages) and topic_id > keys.start_topic_id:
|
||
list_for_public.append((topic_id, topic_title))
|
||
|
||
if not list_for_public:
|
||
logging.warning(f"Новостей для публикации в Telegram нет")
|
||
await client_tg.disconnect()
|
||
else:
|
||
logging.info(f"Новости для публикации в Telegram: {list_for_public}")
|
||
for topic_id, topic_title in reversed(list_for_public):
|
||
text_data = news_content(topic_id)
|
||
if text_data:
|
||
content = f"### {topic_title}\t\n" + text_data + "\n"
|
||
while True:
|
||
try:
|
||
await client_tg.send_message(keys.channel_username_tg, content)
|
||
break
|
||
except FloodWaitError as e:
|
||
logging.warning(f"Flood wait error: нужно подождать {e.seconds} секунд.")
|
||
await asyncio.sleep(e.seconds) # Ждем указанное время перед повторной попыткой
|
||
await client_tg.disconnect()
|
||
|
||
loop = asyncio.get_event_loop()
|
||
loop.run_until_complete(main_tg())
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|