Files
bot-news-linux-gaming/news-bot.py

321 lines
12 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import re
import sys
import time
import discord
import logging
import colorlog
import requests
from bs4 import BeautifulSoup
import keys
url_post = "https://linux-gaming.ru/posts.json"
url_news = "https://linux-gaming.ru/c/news/6.json"
url_vk_api = "https://api.vk.com/method/wall.post"
url_changelog = "https://gitlab.eterfund.ru/Castro-Fidel/PortWINE/raw/master/data_from_portwine/changelog_ru"
heads_site = {
"Content-Type": "application/json",
"Api-Key": keys.api_key_site,
"Api-Username": "linux-gaming"
}
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s: %(message)s',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
))
logger.addHandler(handler)
def main():
try:
last_changelog, resp_changelog = resp_change()
check_version(last_changelog, resp_changelog)
check_discord_public()
except Exception as err:
logging.error(f"Ошибка исполнения функции main: {err}")
def make_soup(resp_changelog):
return BeautifulSoup(resp_changelog.text, 'html.parser')
def html_to_text(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
text = soup.get_text(separator='\n')
links = []
for a in soup.find_all('a', href=True):
links.append(a['href'])
return text, links
def remove_empty_lines(text_data):
lines = text_data.splitlines()
non_empty_lines = [line for line in lines if line.strip()]
return '\n'.join(non_empty_lines)
def script_content(script_ver, resp_changelog):
soup = make_soup(resp_changelog)
page_text = str(soup)
page_text = page_text.replace("Вы можете помочь развитию проекта: https://linux-gaming.ru/donate/", '')
# Удаляем несколько различных вхождений с помощью регулярных выражений
patterns_to_remove = [
r'###Scripts version \d+###',
r'\d{4}×\d{3} \d+ KB',
r'/tag/scripts'
]
for pattern in patterns_to_remove:
page_text = re.sub(pattern, '', page_text)
# Находим текст до определенного текста, тега или класса (например, до тега <hr>)
last_text = f"###Scripts version {script_ver - 1}"
last_text = str(last_text)
index_last_text = page_text.find(last_text)
if index_last_text != -1:
changelog_text_last = page_text[:index_last_text]
prev_text = f"###Scripts version {script_ver}"
index_script_ver = changelog_text_last.find(prev_text)
if index_script_ver != -1:
changelog_text = changelog_text_last[index_script_ver:]
post_text = (f"-----------------------------\nОбновление скриптов {script_ver}\n"
f"-----------------------------\n") + changelog_text
site_text = (f"[center][img]/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png[/img]["
f"/center]\n{post_text}")
post_data = {
"title": f"Обновление скриптов {script_ver}",
"raw": site_text,
"category": keys.cat_num,
"tags": ["scripts"]
}
params = {
'access_token': keys.api_key_vk,
'v': '5.199', # Версия API VK
'owner_id': keys.own_id,
'message': f'{post_text}',
'attachments': "photo-99238527_457244491"
# Дополнительные параметры можно добавить здесь
}
return post_text, post_data, params
def news_content(post_id):
logging.debug(f"Запрос содержимого поста новости с ID: {post_id}")
response = response_get(f"https://linux-gaming.ru/t/{post_id}.json")
if response and response.status_code == 200:
topic_data = response.json()
posts = topic_data.get('post_stream', {}).get('posts', [])
# Найти первый пост
for post in posts:
if post.get('post_number') == 1:
html_content = post.get('cooked', 'Нет содержимого')
text_data, links = html_to_text(html_content)
text_data = remove_empty_lines(text_data)
logging.debug(text_data)
return text_data, links
logging.error(f"Первый пост не найден в теме с ID: {post_id}")
return None
else:
logging.error(f"Не удалось получить содержимое поста с ID: {post_id}")
return None
def response_get(url):
try:
return requests.get(url, headers=heads_site)
except requests.RequestException as err:
logging.error(f"Ошибка запроса {err}")
def resp_change():
resp_changelog = response_get(url_changelog)
if resp_changelog and resp_changelog.status_code == 200:
matches_changelog = re.findall(r'###Scripts version (\d+)###', resp_changelog.text)
logging.info(f"Найдены версии в истории изменений: {matches_changelog}")
last_changelog = int(max(matches_changelog))
logging.info(f"Последняя версия в истории изменений: {last_changelog}")
return last_changelog, resp_changelog
else:
logging.error(
f'Ошибка при запросе changelog: {resp_changelog.status_code if resp_changelog else "No Response"}')
return None, None
def resp_get(url):
return response_get(url)
def news():
resp_topics = resp_get(url_news)
if resp_topics.status_code == 200:
data = resp_topics.json()
topics = data['topic_list']['topics']
list_titles_and_ids = [(topic['id'], str(topic['title'])) for topic in topics]
filtered_list_titles_and_ids = [(id, title) for id, title in list_titles_and_ids if not title == ('Описание '
'категории '
'«Новости»')]
return filtered_list_titles_and_ids
else:
logging.error(f"Ошибка при запросе тем с сайта: {resp_topics.status_code if resp_topics else 'Нет доступа к сайту'}")
return []
def site_post(url, headers, json):
while True:
title = json.get('title', 'No title')
try:
resp_post = requests.post(url=url, headers=headers, json=json)
if resp_post.status_code == 200:
logging.info("Новость опубликована на сайте!")
return resp_post
elif resp_post.status_code == 422:
logging.warning(f'Новость "{title}" уже опубликована: {resp_post.status_code}')
return resp_post
else:
logging.error(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code}')
except requests.RequestException as error:
logging.error(f'Ошибка при отправке новости "{title}" на сайт: {error}')
time.sleep(900)
def vk_post(url, params):
try:
# Отправляем POST-запрос к VK API
resp_post = requests.post(url=url, params=params)
if resp_post.status_code == 200:
logging.info("Сообщение успешно опубликовано.")
logging.info(resp_post.json()) # Выводим ответ сервера в формате JSON
else:
logging.error(f"Ошибка при публикации сообщения в ВК:, {resp_post.status_code}")
return resp_post
except requests.RequestException as err:
logging.error(f"VK post failed: {err}")
return None
async def discord_post(post_text, client):
channel = client.get_channel(keys.dicord_channel)
await channel.send(f"{post_text}")
async def get_discord_messages(client, channel_id):
channel = client.get_channel(channel_id)
if not channel:
logging.error(f"ID канала Discord {channel_id} не существует")
return []
messages = []
async for message in channel.history(limit=999999):
messages.append(message.content)
pattern = re.compile(r'-----------------------------\n(.*?)\n-----------------------------', re.DOTALL)
for message in messages:
matches = pattern.findall(message)
if matches:
messages.extend(matches)
logging.debug(f"Найдены сообщения в дискорде: {messages}")
return messages
def check_version(last_changelog, resp_changelog):
list_titles_and_ids = news()
pattern = re.compile(r'Обновление скриптов (\d+)')
def extract_number(title):
match = pattern.search(title)
if match:
return int(match.group(1))
return None
numbers = [extract_number(title) for _, title in list_titles_and_ids if extract_number(title) is not None]
last_topics_script = max(numbers)
logging.info(f"Последняя новость на сайте о версии: {last_topics_script}")
#last_topics_script = 2297
if last_topics_script < last_changelog:
list_new_ver = []
for script_ver in range(last_topics_script + 1, last_changelog + 1):
list_new_ver.append(script_ver)
logging.info(f"Найдена новая версия скрипта {script_ver}")
changelog_text, post_data, params = script_content(script_ver, resp_changelog)
if post_data:
site_post(url_post, heads_site, post_data)
# vk_post(url_vk_api, params=params)
if not list_new_ver:
logging.warning(f"Не найдена новая версия скрипта")
sys.exit()
else:
logging.warning("Нет новых версий скриптов PortProton")
def check_discord_public():
intents = discord.Intents.default()
intents.messages = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
logging.debug(f"Успешный логин в discord {client.user}")
channel_id = keys.dicord_channel
discord_messages = await get_discord_messages(client, channel_id)
list_titles_and_ids = news()
if list_titles_and_ids:
list_for_public = []
for topic_id, topic_title in list_titles_and_ids:
if topic_title not in discord_messages:
list_for_public.append((topic_id, topic_title))
if not list_for_public:
logging.info(f"Новостей для публикации в дискорд нет")
await client.close()
else:
logging.info(f"Новости для публикации в дискорд: {list_for_public}")
channel = client.get_channel(channel_id)
if not channel:
logging.error(f"ID канала Discord {channel_id} не существует")
await client.close()
return
for topic_id, topic_title in reversed(list_for_public):
text_data, links = news_content(topic_id)
if text_data:
content = f"-----------------------------\n{topic_title}\n-----------------------------\n" + text_data + "\n" + "----------------------------------------------------------"
for link in links:
if link not in content:
content += f"\n{link}"
# Разбиваем содержимое на части по 4000 символов
for i in range(0, len(content), 4000):
await channel.send(content[i:i+4000])
await client.close()
client.run(keys.discord_token)
if __name__ == '__main__':
main()