Добавлена отправка в discord при отсутствии новости в канале.

This commit is contained in:
2024-05-25 19:22:45 +03:00
parent a2526241ae
commit d0bd6f01be

View File

@@ -3,15 +3,15 @@
import re import re
import sys import sys
import time import time
import shutil
import discord import discord
import colorama import logging
import colorlog
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import keys import keys
dmessage_list = []
url_post = "https://linux-gaming.ru/posts.json" url_post = "https://linux-gaming.ru/posts.json"
url_news = "https://linux-gaming.ru/c/news/6.json" url_news = "https://linux-gaming.ru/c/news/6.json"
url_vk_api = "https://api.vk.com/method/wall.post" url_vk_api = "https://api.vk.com/method/wall.post"
@@ -23,47 +23,53 @@ heads_site = {
"Api-Username": "linux-gaming" "Api-Username": "linux-gaming"
} }
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
'%(log_color)s%(levelname)s: %(message)s',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
))
logger.addHandler(handler)
def main(): def main():
try:
last_changelog, resp_changelog = resp_change() last_changelog, resp_changelog = resp_change()
check_version(last_changelog, resp_changelog) check_version(last_changelog, resp_changelog)
check_discord_public() check_discord_public()
except Exception as err:
logging.error(f"Ошибка исполнения функции main: {err}")
def print_line(*text, flag="", sep=" ", end="\n"):
"""Добавление обводки вокруг текста, покраска"""
if flag == "RED":
color = colorama.Fore.RED
elif flag == "YELLOW":
color = colorama.Fore.YELLOW
elif flag == "GREEN":
color = colorama.Fore.GREEN
elif flag == "CYAN":
color = colorama.Fore.CYAN
else:
color = colorama.Fore.WHITE
len_text = str(*text)
len_text = len_text.split("\n")
max_string = max(len(str(string)) for string in len_text) + 2
max_length = shutil.get_terminal_size()
max_length = max_length[0]
if max_string > max_length:
len_dots = max_length
else:
len_dots = max_string
print(color + "." * len_dots)
print(color, *text, sep=sep, end=end)
print(color + "." * len_dots + colorama.Style.RESET_ALL)
def make_soup(resp_changelog): def make_soup(resp_changelog):
return BeautifulSoup(resp_changelog.text, 'html.parser') return BeautifulSoup(resp_changelog.text, 'html.parser')
def html_to_text(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
text = soup.get_text(separator='\n')
links = []
for a in soup.find_all('a', href=True):
links.append(a['href'])
return text, links
def remove_empty_lines(text_data):
lines = text_data.splitlines()
non_empty_lines = [line for line in lines if line.strip()]
return '\n'.join(non_empty_lines)
def script_content(script_ver, resp_changelog): def script_content(script_ver, resp_changelog):
# Используем BeautifulSoup для парсинга HTML-кода страницы
soup = make_soup(resp_changelog) soup = make_soup(resp_changelog)
page_text = str(soup) page_text = str(soup)
page_text = page_text.replace("Вы можете помочь развитию проекта: https://linux-gaming.ru/donate/", '') page_text = page_text.replace("Вы можете помочь развитию проекта: https://linux-gaming.ru/donate/", '')
@@ -71,10 +77,12 @@ def script_content(script_ver, resp_changelog):
last_text = f"###Scripts version {script_ver - 1}" last_text = f"###Scripts version {script_ver - 1}"
last_text = str(last_text) last_text = str(last_text)
index_last_text = page_text.find(last_text) index_last_text = page_text.find(last_text)
if index_last_text != -1: if index_last_text != -1:
changelog_text_last = page_text[:index_last_text] changelog_text_last = page_text[:index_last_text]
prev_text = f"###Scripts version {script_ver}" prev_text = f"###Scripts version {script_ver}"
index_script_ver = changelog_text_last.find(prev_text) index_script_ver = changelog_text_last.find(prev_text)
if index_script_ver != -1: if index_script_ver != -1:
changelog_text = changelog_text_last[index_script_ver:] changelog_text = changelog_text_last[index_script_ver:]
post_text = (f"-----------------------------\nОбновление скриптов {script_ver}\n" post_text = (f"-----------------------------\nОбновление скриптов {script_ver}\n"
@@ -100,74 +108,68 @@ def script_content(script_ver, resp_changelog):
return post_text, post_data, params return post_text, post_data, params
def news_content(): def news_content(post_id):
print_line(f"Запрос содержимого поста новости") logging.info(f"Запрос содержимого поста новости с ID: {post_id}")
response = response_get(f"https://linux-gaming.ru/t/{post_id}.json")
if response and response.status_code == 200:
topic_data = response.json()
posts = topic_data.get('post_stream', {}).get('posts', [])
# Найти первый пост
for post in posts:
if post.get('post_number') == 1:
html_content = post.get('cooked', 'Нет содержимого')
text_data, links = html_to_text(html_content)
text_data = remove_empty_lines(text_data)
logging.debug(text_data)
return text_data, links
logging.error(f"Первый пост не найден в теме с ID: {post_id}")
return None
else:
logging.error(f"Не удалось получить содержимое поста с ID: {post_id}")
return None
def response_get(url): def response_get(url):
resp_get = requests.get(url, headers=heads_site) try:
return resp_get return requests.get(url, headers=heads_site)
except requests.RequestException as err:
logging.error(f"Ошибка запроса {err}")
def resp_change(): def resp_change():
resp_changelog = response_get(url_changelog) resp_changelog = response_get(url_changelog)
if resp_changelog.status_code == 200:
# Использование регулярного выражения для поиска текста if resp_changelog and resp_changelog.status_code == 200:
matches_changelog = re.findall(r'###Scripts version (\d+)###', resp_changelog.text) matches_changelog = re.findall(r'###Scripts version (\d+)###', resp_changelog.text)
# Печать всех найденных совпадений logging.info(f"Найдены версии в истории изменений: {matches_changelog}")
print_line(f"Найдены версии в истории изменений: {matches_changelog}", flag="CYAN") last_changelog = int(max(matches_changelog))
list_changelog = [] logging.info(f"Последняя версия в истории изменений: {last_changelog}")
for match in matches_changelog:
list_changelog.append(match)
last_changelog = int(max(list_changelog))
print_line(f"Последняя версия в истории изменений: {last_changelog}", flag="GREEN")
return last_changelog, resp_changelog return last_changelog, resp_changelog
else: else:
print_line(f'Ошибка при запросе changelog: {resp_changelog.status_code}', flag="RED") logging.error(
f'Ошибка при запросе changelog: {resp_changelog.status_code if resp_changelog else "No Response"}')
return None, None
def resp_get(url): def resp_get(url):
resp_topics = response_get(url) return response_get(url)
return resp_topics
def news_scripts(topics):
matches_topics = re.findall(r'Обновление скриптов (\d+)', str(topics))
list_topics = []
for match in matches_topics:
list_topics.append(match)
list_topics = set(list_topics)
print_line(f"Найдены новости на сайте о версиях:{list_topics}", flag="CYAN")
last_topic_scripts = max(list_topics)
print_line(f"Последняя новость на сайте о версии: {last_topic_scripts}", flag="GREEN")
return last_topic_scripts, list_topics
def news_noscripts(topics):
list_titles = []
pattern = re.compile(r'Обновление скриптов (\d+)')
for topic in topics:
title = str(topic['title'])
list_titles.append(title)
bl_topic = {'Описание категории «Новости»'}
filtered_list_titles = [title for title in list_titles if not pattern.search(title)]
filtered_list_titles = [x for x in filtered_list_titles if x not in bl_topic]
print_line(f"Список новостей на сайте: {filtered_list_titles}", flag="CYAN")
return filtered_list_titles
def news(): def news():
resp_topics = resp_get(url_news) resp_topics = resp_get(url_news)
if resp_topics.status_code == 200: if resp_topics.status_code == 200:
data = resp_topics.json() data = resp_topics.json()
topics = data['topic_list']['topics'] topics = data['topic_list']['topics']
last_topics_scripts, list_topics = news_scripts(topics) list_titles_and_ids = [(topic['id'], str(topic['title'])) for topic in topics]
list_topic_news = news_noscripts(topics) filtered_list_titles_and_ids = [(id, title) for id, title in list_titles_and_ids if not title == ('Описание '
return last_topics_scripts, list_topic_news, list_topics 'категории '
'«Новости»')]
return filtered_list_titles_and_ids
else: else:
print_line(f"Ошибка при запросе тем с сайта: {resp_topics.status_code}", flag="RED") logging.error(f"Ошибка при запросе тем с сайта: {resp_topics.status_code if resp_topics else
'Нет доступа к сайту'}")
return []
def site_post(url, headers, json): def site_post(url, headers, json):
@@ -175,69 +177,90 @@ def site_post(url, headers, json):
title = json.get('title', 'No title') title = json.get('title', 'No title')
try: try:
resp_post = requests.post(url=url, headers=headers, json=json) resp_post = requests.post(url=url, headers=headers, json=json)
except requests.RequestException as error:
print_line(f'Ошибка при отправке новости "{title}" на сайт {error}', flag="RED")
time.sleep(900) # Ждем 15 минут (900 секунд) перед повторной попыткой
continue
if resp_post.status_code == 200: if resp_post.status_code == 200:
print_line(f"Новость опубликована на сайте!", flag="GREEN") logging.info("Новость опубликована на сайте!")
return resp_post return resp_post
elif resp_post.status_code == 422: elif resp_post.status_code == 422:
text_resp_post = f"Новость уже опубликована" logging.warning(f'Новость "{title}" уже опубликована: {resp_post.status_code}')
print_line(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code} {text_resp_post}',
flag="RED")
return resp_post return resp_post
else: else:
text_resp_post = f"Уточнить код ошибки" logging.error(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code}')
print_line(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code} {text_resp_post}', except requests.RequestException as error:
flag="RED") logging.error(f'Ошибка при отправке новости "{title}" на сайт: {error}')
time.sleep(900) time.sleep(900)
def vk_post(url, params): def vk_post(url, params):
try:
# Отправляем POST-запрос к VK API # Отправляем POST-запрос к VK API
resp_post = requests.post(url=url, params=params) resp_post = requests.post(url=url, params=params)
# Проверяем ответ сервера
if resp_post.status_code == 200: if resp_post.status_code == 200:
print_line("Сообщение успешно опубликовано.", flag="GREEN") logging.info("Сообщение успешно опубликовано.")
print_line(resp_post.json(), flag="CYAN") # Выводим ответ сервера в формате JSON logging.info(resp_post.json()) # Выводим ответ сервера в формате JSON
else: else:
print_line(f"Ошибка при публикации сообщения в ВК:, {resp_post.status_code}", flag="RED") logging.error(f"Ошибка при публикации сообщения в ВК:, {resp_post.status_code}")
return resp_post return resp_post
except requests.RequestException as err:
logging.error(f"VK post failed: {err}")
return None
def discord_post(post_text): async def discord_post(post_text, client):
intents = discord.Intents.default()
intents.messages = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
print_line(f'Успешный логин в discord {client.user}')
channel = client.get_channel(keys.dicord_channel) channel = client.get_channel(keys.dicord_channel)
await channel.send(f"{post_text}") await channel.send(f"{post_text}")
await client.close()
client.run(keys.discord_token)
async def get_discord_messages(client, channel_id):
channel = client.get_channel(channel_id)
if not channel:
logging.error(f"ID канала Discord {channel_id} не существует")
return []
messages = []
async for message in channel.history(limit=999999):
messages.append(message.content)
pattern = re.compile(r'-----------------------------\n(.*?)\n-----------------------------', re.DOTALL)
for message in messages:
matches = pattern.findall(message)
if matches:
messages.extend(matches)
logging.debug(f"Найдены сообщения в дискорде: {messages}")
return messages
def check_version(last_changelog, resp_changelog): def check_version(last_changelog, resp_changelog):
last_topics_scripts, list_topic_news, list_topics = news() list_titles_and_ids = news()
if int(last_topics_scripts) < last_changelog: pattern = re.compile(r'Обновление скриптов (\d+)')
def extract_number(title):
match = pattern.search(title)
if match:
return int(match.group(1))
return None
numbers = [extract_number(title) for _, title in list_titles_and_ids if extract_number(title) is not None]
last_topics_script = max(numbers)
logging.info(f"Последняя новость на сайте о версии: {last_topics_script}")
#last_topics_script = 2297
if last_topics_script < last_changelog:
list_new_ver = [] list_new_ver = []
for script_ver in range(last_topics_scripts + 1, last_changelog + 1): for script_ver in range(last_topics_script + 1, last_changelog + 1):
list_new_ver.append(script_ver) list_new_ver.append(script_ver)
print_line(f"Найдена новая версия скрипта {script_ver}", flag="GREEN") logging.info(f"Найдена новая версия скрипта {script_ver}")
changelog_text, post_data, params = script_content(script_ver, resp_changelog) changelog_text, post_data, params = script_content(script_ver, resp_changelog)
site_post(url_post, headers=heads_site, json=post_data) if post_data:
site_post(url_post, heads_site, post_data)
# vk_post(url_vk_api, params=params) # vk_post(url_vk_api, params=params)
if not list_new_ver: if not list_new_ver:
print_line(f"Не найдена новая версия скрипта", flag="YELLOW") logging.warning(f"Не найдена новая версия скрипта")
sys.exit() sys.exit()
else: else:
print_line("Нет новых версий скриптов PortProton", flag="YELLOW") logging.warning("Нет новых версий скриптов PortProton")
def check_discord_public(): def check_discord_public():
@@ -247,36 +270,40 @@ def check_discord_public():
@client.event @client.event
async def on_ready(): async def on_ready():
print_line(f"Успешный логин в discord {client.user}", flag="GREEN") logging.debug(f"Успешный логин в discord {client.user}")
channel = client.get_channel(keys.dicord_channel) channel_id = keys.dicord_channel
dmessage_title = [] discord_messages = await get_discord_messages(client, channel_id)
if channel is None:
print_line(f"ID канала Discord {keys.dicord_channel} не существует", flag="RED") list_titles_and_ids = news()
if list_titles_and_ids:
list_for_public = []
for topic_id, topic_title in list_titles_and_ids:
if topic_title not in discord_messages:
list_for_public.append((topic_id, topic_title))
if list_for_public != []:
logging.info(f"Новости для публикации в дискорд: {list_for_public}")
else:
logging.info(f"Новостей для публикации в дискорд нет")
channel = client.get_channel(channel_id)
if not channel:
logging.error(f"ID канала Discord {channel_id} не существует")
await client.close() await client.close()
return return
async for message in channel.history(limit=999999): # Количество сообщений для чтения
dmessage_list.append(message.content) for topic_id, topic_title in reversed(list_for_public):
pattern = re.compile(r'-----------------------------\n(.*?)\n-----------------------------', re.DOTALL) text_data, links = news_content(topic_id)
for message in dmessage_list: if text_data:
matches = pattern.findall(message) content = f"-----------------------------\n{topic_title}\n-----------------------------\n" + text_data + "\n" + "----------------------------------------------------------"
if matches: for link in links:
dmessage_title.extend(matches) if link not in content:
print_line(f"Новости в Discord: {dmessage_title}", flag="CYAN") content += f"\n{link}"
await client.close() # Разбиваем содержимое на части по 4000 символов
for i in range(0, len(content), 4000):
await channel.send(content[i:i+4000])
client.run(keys.discord_token) client.run(keys.discord_token)
last_topics_scripts, list_topic_news, list_topics = news()
list_topics = list(list_topics)
list_topics.sort()
list_fullname_topics = []
for y in list_topics:
list_fullname_topics.append("Обновление скриптов" + " " + y)
list_for_public = [item for item in list_fullname_topics + list_topic_news if item not in dmessage_list]
print_line(f"Новоcти для публикации в дискорд: {list_for_public}", flag="GREEN")
for dnew in list_for_public:
print_line(dnew)
#discord_post(post_text)
if __name__ == '__main__': if __name__ == '__main__':
main() main()