Files
bot-news-linux-gaming/news-bot.py

282 lines
11 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import re
import sys
import time
import shutil
import discord
import colorama
import requests
from bs4 import BeautifulSoup
import keys
dmessage_list = []
url_post = "https://linux-gaming.ru/posts.json"
url_news = "https://linux-gaming.ru/c/news/6.json"
url_vk_api = "https://api.vk.com/method/wall.post"
url_changelog = "https://gitlab.eterfund.ru/Castro-Fidel/PortWINE/raw/master/data_from_portwine/changelog_ru"
heads_site = {
"Content-Type": "application/json",
"Api-Key": keys.api_key_site,
"Api-Username": "linux-gaming"
}
def main():
last_changelog, resp_changelog = resp_change()
check_version(last_changelog, resp_changelog)
check_discord_public()
def print_line(*text, flag="", sep=" ", end="\n"):
"""Добавление обводки вокруг текста, покраска"""
if flag == "RED":
color = colorama.Fore.RED
elif flag == "YELLOW":
color = colorama.Fore.YELLOW
elif flag == "GREEN":
color = colorama.Fore.GREEN
elif flag == "CYAN":
color = colorama.Fore.CYAN
else:
color = colorama.Fore.WHITE
len_text = str(*text)
len_text = len_text.split("\n")
max_string = max(len(str(string)) for string in len_text) + 2
max_length = shutil.get_terminal_size()
max_length = max_length[0]
if max_string > max_length:
len_dots = max_length
else:
len_dots = max_string
print(color + "." * len_dots)
print(color, *text, sep=sep, end=end)
print(color + "." * len_dots + colorama.Style.RESET_ALL)
def make_soup(resp_changelog):
return BeautifulSoup(resp_changelog.text, 'html.parser')
def script_content(script_ver, resp_changelog):
# Используем BeautifulSoup для парсинга HTML-кода страницы
soup = make_soup(resp_changelog)
page_text = str(soup)
page_text = page_text.replace("Вы можете помочь развитию проекта: https://linux-gaming.ru/donate/", '')
# Находим текст до определенного текста, тега или класса (например, до тега <hr>)
last_text = f"###Scripts version {script_ver-1}"
last_text = str(last_text)
index_last_text = page_text.find(last_text)
if index_last_text != -1:
changelog_text_last = page_text[:index_last_text]
prev_text = f"###Scripts version {script_ver}"
index_script_ver = changelog_text_last.find(prev_text)
if index_script_ver != -1:
changelog_text = changelog_text_last[index_script_ver:]
post_text = (f"-----------------------------\nОбновление скриптов {script_ver}\n"
f"-----------------------------\n") + changelog_text
site_text = (f"[center][img]/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png[/img]["
f"/center]\n{post_text}")
post_data = {
"title": f"Обновление скриптов {script_ver}",
"raw": site_text,
"category": keys.cat_num,
"tags": ["scripts"]
}
params = {
'access_token': keys.api_key_vk,
'v': '5.199', # Версия API VK
'owner_id': keys.own_id,
'message': f'{post_text}',
'attachments': "photo-99238527_457244491"
# Дополнительные параметры можно добавить здесь
}
return post_text, post_data, params
def news_content():
print_line(f"Запрос содержимого поста новости")
def response_get(url):
resp_get = requests.get(url, headers=heads_site)
return resp_get
def resp_change():
resp_changelog = response_get(url_changelog)
if resp_changelog.status_code == 200:
# Использование регулярного выражения для поиска текста
matches_changelog = re.findall(r'###Scripts version (\d+)###', resp_changelog.text)
# Печать всех найденных совпадений
print_line(f"Найдены версии в истории изменений: {matches_changelog}", flag="CYAN")
list_changelog = []
for match in matches_changelog:
list_changelog.append(match)
last_changelog = int(max(list_changelog))
print_line(f"Последняя версия в истории изменений: {last_changelog}", flag="GREEN")
return last_changelog, resp_changelog
else:
print_line(f'Ошибка при запросе changelog: {resp_changelog.status_code}', flag="RED")
def resp_get(url):
resp_topics = response_get(url)
return resp_topics
def news_scripts(topics):
matches_topics = re.findall(r'Обновление скриптов (\d+)', str(topics))
list_topics = []
for match in matches_topics:
list_topics.append(match)
list_topics = set(list_topics)
print_line(f"Найдены новости на сайте о версиях:{list_topics}", flag="CYAN")
last_topic_scripts = max(list_topics)
print_line(f"Последняя новость на сайте о версии: {last_topic_scripts}", flag="GREEN")
return last_topic_scripts, list_topics
def news_noscripts(topics):
list_titles = []
pattern = re.compile(r'Обновление скриптов (\d+)')
for topic in topics:
title = str(topic['title'])
list_titles.append(title)
bl_topic = {'Описание категории «Новости»'}
filtered_list_titles = [title for title in list_titles if not pattern.search(title)]
filtered_list_titles = [x for x in filtered_list_titles if x not in bl_topic]
print_line(f"Список новостей на сайте: {filtered_list_titles}", flag="CYAN")
return filtered_list_titles
def news():
resp_topics = resp_get(url_news)
if resp_topics.status_code == 200:
data = resp_topics.json()
topics = data['topic_list']['topics']
last_topics_scripts, list_topics = news_scripts(topics)
list_topic_news = news_noscripts(topics)
return last_topics_scripts, list_topic_news, list_topics
else:
print_line(f"Ошибка при запросе тем с сайта: {resp_topics.status_code}", flag="RED")
def site_post(url, headers, json):
while True:
title = json.get('title', 'No title')
try:
resp_post = requests.post(url=url, headers=headers, json=json)
except requests.RequestException as error:
print_line(f'Ошибка при отправке новости "{title}" на сайт {error}', flag="RED")
time.sleep(900) # Ждем 15 минут (900 секунд) перед повторной попыткой
continue
if resp_post.status_code == 200:
print_line(f"Новость опубликована на сайте!", flag="GREEN")
return resp_post
elif resp_post.status_code == 422:
text_resp_post = f"Новость уже опубликована"
print_line(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code} {text_resp_post}',
flag="RED")
return resp_post
else:
text_resp_post = f"Уточнить код ошибки"
print_line(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code} {text_resp_post}',
flag="RED")
time.sleep(900)
def vk_post(url, params):
# Отправляем POST-запрос к VK API
resp_post = requests.post(url=url, params=params)
# Проверяем ответ сервера
if resp_post.status_code == 200:
print_line("Сообщение успешно опубликовано.", flag="GREEN")
print_line(resp_post.json(), flag="CYAN") # Выводим ответ сервера в формате JSON
else:
print_line(f"Ошибка при публикации сообщения в ВК:, {resp_post.status_code}", flag="RED")
return resp_post
def discord_post(post_text):
intents = discord.Intents.default()
intents.messages = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
print_line(f'Успешный логин в discord {client.user}')
channel = client.get_channel(keys.dicord_channel)
await channel.send(f"{post_text}")
await client.close()
client.run(keys.discord_token)
def check_version(last_changelog, resp_changelog):
last_topics_scripts, list_topic_news, list_topics = news()
if int(last_topics_scripts) < last_changelog:
list_new_ver = []
for script_ver in range(last_topics_scripts + 1, last_changelog + 1):
list_new_ver.append(script_ver)
print_line(f"Найдена новая версия скрипта {script_ver}", flag="GREEN")
changelog_text, post_data, params = script_content(script_ver, resp_changelog)
site_post(url_post, headers=heads_site, json=post_data)
# vk_post(url_vk_api, params=params)
if not list_new_ver:
print_line(f"Не найдена новая версия скрипта", flag="YELLOW")
sys.exit()
else:
print_line("Нет новых версий скриптов PortProton", flag="YELLOW")
def check_discord_public():
intents = discord.Intents.default()
intents.messages = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
print_line(f"Успешный логин в discord {client.user}", flag="GREEN")
channel = client.get_channel(keys.dicord_channel)
dmessage_title = []
if channel is None:
print_line(f"ID канала Discord {keys.dicord_channel} не существует", flag="RED")
await client.close()
return
async for message in channel.history(limit=999999): # Количество сообщений для чтения
dmessage_list.append(message.content)
pattern = re.compile(r'-----------------------------\n(.*?)\n-----------------------------', re.DOTALL)
for message in dmessage_list:
matches = pattern.findall(message)
if matches:
dmessage_title.extend(matches)
print_line(f"Новости в Discord: {dmessage_title}", flag="CYAN")
await client.close()
client.run(keys.discord_token)
last_topics_scripts, list_topic_news, list_topics = news()
list_topics = list(list_topics)
list_topics.sort()
list_fullname_topics = []
for y in list_topics:
list_fullname_topics.append("Обновление скриптов" + " " + y)
list_for_public = [item for item in list_fullname_topics + list_topic_news if item not in dmessage_list]
print_line(f"Новоcти для публикации в дискорд: {list_for_public}", flag="GREEN")
for dnew in list_for_public:
print_line(dnew)
#discord_post(post_text)
if __name__ == '__main__':
main()