Files
bot-news-linux-gaming/news-bot.py

485 lines
19 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import re
import sys
import time
import urllib.parse
import discord
import logging
import colorlog
import requests
import html2text
from bs4 import BeautifulSoup
import keys
url_post = "https://linux-gaming.ru/posts.json"
url_news = "https://linux-gaming.ru/c/news/6.json"
url_vk_post = "https://api.vk.com/method/wall.post"
url_vk_get = "https://api.vk.com/method/wall.get"
url_changelog = "https://gitlab.eterfund.ru/Castro-Fidel/PortWINE/raw/master/data_from_portwine/changelog_ru"
heads_site = {
"Content-Type": "application/json",
"Api-Key": keys.api_key_site,
"Api-Username": "linux-gaming"
}
params_get = {
'access_token': keys.user_token_vk,
'v': '5.236', # Версия API
'owner_id': str(keys.own_id),
'count': str(100),
'offset': str(0),
'filter': 'all'
}
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s: %(message)s',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
))
logger.addHandler(handler)
def main():
last_changelog, resp_changelog = resp_change()
check_version(last_changelog, resp_changelog)
check_discord_public()
check_vk_posts()
def make_soup(resp_changelog):
return BeautifulSoup(resp_changelog.text, 'html.parser')
def html_to_text(html_content):
h = html2text.HTML2Text()
h.ignore_links = False # Сохранение ссылок
h.ignore_images = True # Игнорирование изображений
h.bypass_tables = True # Сохранение таблиц
h.reference_links = True # Сохранение оригинальных ссылок
markdown_text = h.handle(html_content)
logging.debug(f"Markdown text: {markdown_text}")
# Удаление переносов строк из-за -
markdown_text = re.sub(r'-\s*\n\s*', '-', markdown_text, flags=re.DOTALL)
markdown_text = re.sub(r'-\s*\n*', '-', markdown_text, flags=re.DOTALL)
# Убираем переносы строк внутри круглых скобок ()
markdown_text = re.sub(r'\((.*?)\)', lambda x: '(' + x.group(1).replace('\n', ' ') + ')', markdown_text, flags=re.DOTALL)
# Убираем переносы строк внутри квадратных скобок []
markdown_text = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace('\n', ' ') + ']', markdown_text, flags=re.DOTALL)
# Удаление строк, содержащих '* * *'
markdown_text = re.sub(r'^.*\* \* \*.*$', '', markdown_text, flags=re.MULTILINE)
markdown_text = re.sub(r'^.*\* ', '* ', markdown_text, flags=re.MULTILINE)
# Убираем переносы строк, кроме строк, начинающихся с *
markdown_text = re.sub(r'^(?!\*).*?\n(?!\*)', lambda x: x.group(0).replace('\n', ' '), markdown_text, flags=re.MULTILINE)
# Преобразование всех ссылок с параметрами URL
markdown_text = convert_links(markdown_text)
# Работа с #
patterns_to_remove = [
r'###',
r'##',
r'#',
r'\[scripts\]\(\/tag\/scripts\) version \d+ ',
r'##\[scripts\]\(\) version \d+ ',
r'\d{4}×\d{3} \d+ KB'
]
for pattern in patterns_to_remove:
markdown_text = re.sub(pattern, '', markdown_text)
logging.debug(f"Удаляем {pattern}")
# Удаление избыточных пустых строк после удаления строк
markdown_text = re.sub(r'\n\s*\n', '\n', markdown_text)
# Изменение ссылок без описания
markdown_text = re.sub(r'\[\]\((https:\/\/[^\)]+)\)', r'[.](\1)', markdown_text)
markdown_text = re.sub(r'\[\s]\((https:\/\/[^\)]+)\)', r'[.](\1)', markdown_text)
# Удаление дублирующихся ссылок
markdown_text = remove_duplicate_links(markdown_text)
# Добавление переноса после "История изменений:"
re.sub(r'^.*\* \* \*.*$', '', markdown_text)
markdown_text = re.sub(r'История изменений:', r'\n', markdown_text)
logging.debug(f"Текст после обработки {markdown_text}")
return markdown_text
def convert_links(text):
url_pattern = re.compile(r'https?://[^\s\)]+')
return url_pattern.sub(lambda match: decode_url_params(match.group(0)), text)
def decode_url_params(url):
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
for key, values in query_params.items():
if key.lower() == 'to' and values:
return urllib.parse.unquote(values[0])
return url
def remove_empty_lines(text_data):
lines = text_data.splitlines()
non_empty_lines = [line for line in lines if line.strip()]
return '\n'.join(non_empty_lines)
def remove_markdown_links(markdown_text):
# Регулярное выражение для поиска Markdown-ссылок и замена их на только URL
markdown_text = re.sub(r'\[.*?\]\((https?://.*?)\)', r'\1' or r'(`https?://.*?)`\)', markdown_text)
return markdown_text
def remove_duplicate_links(text):
seen_links = set()
def replace_link(match):
link = match.group(2)
if link in seen_links:
return ''
seen_links.add(link)
return match.group(0)
# Регулярное выражение для поиска Markdown-ссылок
link_pattern = re.compile(r'(\[.*?\]\((https:\/\/.*?)\))')
text = re.sub(link_pattern, replace_link, text)
return text
def extract_links(text):
# Регулярное выражение для поиска ссылок
url_pattern = re.compile(r'https?://\S+')
return url_pattern.findall(text)
def script_content(script_ver, resp_changelog):
soup = make_soup(resp_changelog)
page_text = str(soup)
page_text = page_text.replace("Вы можете помочь развитию проекта: https://linux-gaming.ru/donate/", '')
# Находим текст до определенного текста, тега или класса (например, до тега <hr>)
last_text = f"###Scripts version {script_ver - 1}"
last_text = str(last_text)
index_last_text = page_text.find(last_text)
if index_last_text != -1:
changelog_text_last = page_text[:index_last_text]
prev_text = f"###Scripts version {script_ver}"
index_script_ver = changelog_text_last.find(prev_text)
if index_script_ver != -1:
changelog_text = changelog_text_last[index_script_ver:]
post_text = (f"-----------------------------\n") + changelog_text
site_text = (f"[center][img]/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png[/img]["
f"/center]\n{post_text}")
logging.debug(f"Сообщение на сайт {site_text}")
post_data = {
"title": f"Обновление скриптов {script_ver}",
"raw": site_text,
"category": keys.cat_num
}
return post_text, post_data, post_text
def news_content(post_id):
logging.debug(f"Запрос содержимого поста новости с ID: {post_id}")
response = response_get(f"https://linux-gaming.ru/t/{post_id}.json", heads_site)
if response and response.status_code == 200:
topic_data = response.json()
posts = topic_data.get('post_stream', {}).get('posts', [])
# Найти первый пост
for post in posts:
if post.get('post_number') == 1:
html_content = post.get('cooked', 'Нет содержимого')
text_data = html_to_text(html_content)
return text_data
logging.error(f"Первый пост не найден в теме с ID: {post_id}")
return None
else:
logging.error(f"Не удалось получить содержимое поста с ID: {post_id}")
return None
def response_get(url, heads_site):
try:
if heads_site == params_get:
return requests.get(url, params=params_get)
elif heads_site == heads_site:
return requests.get(url, headers=heads_site)
except requests.RequestException as err:
logging.error(f"Ошибка запроса {err}")
def resp_change():
resp_changelog = response_get(url_changelog, heads_site)
if resp_changelog and resp_changelog.status_code == 200:
matches_changelog = re.findall(r'###Scripts version (\d+)###', resp_changelog.text)
logging.info(f"Найдены версии в истории изменений: {matches_changelog}")
last_changelog = int(max(matches_changelog))
logging.info(f"Последняя версия в истории изменений: {last_changelog}")
return last_changelog, resp_changelog
else:
logging.error(
f'Ошибка при запросе changelog: {resp_changelog.status_code if resp_changelog else "No Response"}')
return None, None
def news():
resp_topics = response_get(url_news, heads_site)
if resp_topics.status_code == 200:
data = resp_topics.json()
topics = data['topic_list']['topics']
list_titles_and_ids = [(topic['id'], str(topic['title'])) for topic in topics]
filtered_list_titles_and_ids = [(id, title) for id, title in list_titles_and_ids if not title == ('Описание '
'категории '
'«Новости»')]
return filtered_list_titles_and_ids
else:
logging.error(f"Ошибка при запросе тем с сайта: {resp_topics.status_code if resp_topics else 'Нет доступа к сайту'}")
return []
def site_post(url, headers, json):
while True:
title = json.get('title')
try:
resp_post = requests.post(url=url, headers=headers, json=json)
if resp_post.status_code == 200:
logging.info("Новость опубликована на сайте!")
return resp_post
elif resp_post.status_code == 422:
logging.warning(f'Новость "{title}" уже опубликована: {resp_post.status_code}')
return resp_post
else:
logging.error(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code}')
except requests.RequestException as error:
logging.error(f'Ошибка при отправке новости "{title}" на сайт: {error}')
time.sleep(900)
def check_version(last_changelog, resp_changelog):
list_titles_and_ids = news()
pattern = re.compile(r'Обновление скриптов (\d+)')
def extract_number(title):
match = pattern.search(title)
if match:
return int(match.group(1))
return None
numbers = [extract_number(title) for _, title in list_titles_and_ids if extract_number(title) is not None]
last_topics_script = max(numbers)
logging.info(f"Последняя новость на сайте о версии: {last_topics_script}")
if last_topics_script < last_changelog:
list_new_ver = []
for script_ver in range(last_topics_script + 1, last_changelog + 1):
list_new_ver.append(script_ver)
logging.info(f"Найдена новая версия скрипта {script_ver}")
changelog_text, post_data, params = script_content(script_ver, resp_changelog)
if post_data:
logging.debug(f"Публикуем {post_data}")
site_post(url_post, heads_site, post_data)
if not list_new_ver:
logging.warning(f"Не найдена новая версия скрипта")
sys.exit()
else:
logging.warning("Нет новых версий скриптов PortProton")
async def discord_post(post_text, client):
channel = client.get_channel(keys.dicord_channel)
await channel.send(f"{post_text}")
async def get_discord_messages(client, channel_id):
channel = client.get_channel(channel_id)
if not channel:
logging.error(f"ID канала Discord {channel_id} не существует")
return []
messages = []
async for message in channel.history(limit=999):
logging.debug(message)
messages.append(message.content)
pattern = re.compile(r'----------------------------------------------------------\n### (.*?)\t\n', re.DOTALL)
for message in messages:
matches = pattern.findall(message)
if matches:
messages.extend(matches)
logging.debug(f"Найдены сообщения в дискорде: {messages}")
return messages
def check_discord_public():
intents = discord.Intents.default()
intents.messages = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
logging.debug(f"Успешный логин в discord {client.user}")
channel_id = keys.dicord_channel
discord_messages = await get_discord_messages(client, channel_id)
list_titles_and_ids = news()
if list_titles_and_ids:
list_for_public = []
for topic_id, topic_title in list_titles_and_ids:
if topic_title not in discord_messages and topic_id > 698:
list_for_public.append((topic_id, topic_title))
if not list_for_public:
logging.info(f"Новостей для публикации в дискорд нет")
await client.close()
else:
logging.info(f"Новости для публикации в дискорд: {list_for_public}")
channel = client.get_channel(channel_id)
if not channel:
logging.error(f"ID канала Discord {channel_id} не существует")
await client.close()
return
for topic_id, topic_title in reversed(list_for_public):
text_data = news_content(topic_id)
if text_data:
content = f"----------------------------------------------------------\n### {topic_title}\t\n" + text_data + "\n" + "@here"
# Разбиваем содержимое на части по 4000 символов
for i in range(0, len(content), 2000):
await channel.send(content[i:i+2000])
await client.close()
client.run(keys.discord_token)
def vk_post(url, post_text, links=None):
params_post = {
'access_token': keys.api_key_vk,
'v': '5.236', # Версия API VK
'owner_id': str(keys.own_id),
'message': f'{post_text}'
# Дополнительные параметры можно добавить здесь
}
if links:
params_post['attachments'] = links
try:
# Отправляем POST-запрос к VK API
resp_post = requests.post(url=url, params=params_post)
if resp_post.status_code == 200:
logging.info("Сообщение успешно опубликовано.")
logging.info(resp_post.json()) # Выводим ответ сервера в формате JSON
else:
logging.error(f"Ошибка при публикации сообщения в ВК:, {resp_post.status_code}")
return resp_post
except requests.RequestException as err:
logging.error(f"VK post failed: {err}")
return None
def get_vk_topics():
wall_posts = []
while True:
wall_data = response_get(url_vk_get, params_get)
wall_data_json = wall_data.json()
if 'error' in wall_data_json:
error_code = wall_data_json['error']['error_code']
error_msg = wall_data_json['error']['error_msg']
logging.error(f"Ошибка {error_code}: {error_msg}")
sys.exit(f"Ошибка {error_code}: {error_msg}")
items = wall_data_json.get('response', {}).get('items', [])
if not items:
logging.warning("Постов на стене нет")
break
wall_posts.extend((post['text'] for post in items if 'text' in post))
if len(items) < 100:
break
params_get['offset'] = str(int(params_get['offset']) + 100)
pattern = re.compile(r'----------------------------------------------------------\n### (.*?)\t\n', re.DOTALL)
for message in wall_posts:
matches = pattern.findall(message)
if matches:
wall_posts.extend(matches)
logging.debug(f"Найдены посты в ВК: {wall_posts}")
return wall_posts
def check_vk_posts():
vk_posts = get_vk_topics()
if not vk_posts:
logging.warning(f"Постов на стене нет{vk_posts}")
list_titles_and_ids = news()
if list_titles_and_ids:
list_for_public = []
for topic_id, topic_title in list_titles_and_ids:
# Сравнение заголовков с текстами постов
if not any(topic_title in vk_posts for vk_posts in vk_posts):
list_for_public.append((topic_id, topic_title))
if not list_for_public:
logging.info(f"Новостей для публикации в ВК нет")
else:
for topic_id, topic_title in reversed(list_for_public):
if topic_id > 698:
logging.info(f"Новости для публикации в ВК: {list_for_public}")
text_data = news_content(topic_id)
if text_data:
content = f"----------------------------------------------------------\n{topic_title}\t\n" + text_data + "\n"
content = remove_markdown_links(content)
content = re.sub(r'https://linux-gaming.ru/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png', '\n', content, flags=re.DOTALL)
links = extract_links(content)
if "Обновление скриптов" in topic_title:
# Пример добавления изображения с постом
vk_post(url_vk_post, content, "photo-99238527_457244491")
else:
if links:
vk_post(url_vk_post, content, links)
else:
vk_post(url_vk_post, content)
else:
logging.info(f"Новостей для публикации в ВК нет")
if __name__ == '__main__':
main()