#!/usr/bin/env python3 import re import sys import time import discord import logging import colorlog import requests import html2text from bs4 import BeautifulSoup import keys url_post = "https://linux-gaming.ru/posts.json" url_news = "https://linux-gaming.ru/c/news/6.json" url_vk_post = "https://api.vk.com/method/wall.post" url_vk_get = "https://api.vk.com/method/wall.get" url_changelog = "https://gitlab.eterfund.ru/Castro-Fidel/PortWINE/raw/master/data_from_portwine/changelog_ru" heads_site = { "Content-Type": "application/json", "Api-Key": keys.api_key_site, "Api-Username": "linux-gaming" } params_get = { 'access_token': keys.user_token_vk, 'v': '5.236', # Версия API 'owner_id': str(keys.own_id), 'count': str(100), 'offset': str(0), 'filter': 'all' } logger = logging.getLogger() logger.setLevel(logging.INFO) handler = colorlog.StreamHandler() handler.setFormatter(colorlog.ColoredFormatter( '%(log_color)s%(levelname)s: %(message)s', log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', } )) logger.addHandler(handler) def main(): last_changelog, resp_changelog = resp_change() check_version(last_changelog, resp_changelog) check_discord_public() check_vk_posts() def make_soup(resp_changelog): return BeautifulSoup(resp_changelog.text, 'html.parser') def html_to_text(html_content): h = html2text.HTML2Text() h.ignore_links = False # Сохранение ссылок h.ignore_images = True # Игнорирование изображений h.bypass_tables = True # Сохранение таблиц h.reference_links = True # Сохранение оригинальных ссылок markdown_text = h.handle(html_content) logging.debug(f"Markdown text: {markdown_text}") # Удаление переносов строк из-за - markdown_text = re.sub(r'-\s*\n\s*', '-', markdown_text, flags=re.DOTALL) markdown_text = re.sub(r'-\s*\n*', '-', markdown_text, flags=re.DOTALL) # Убираем переносы строк внутри круглых скобок () markdown_text = re.sub(r'\((.*?)\)', lambda x: '(' + x.group(1).replace('\n', ' ') + ')', markdown_text, flags=re.DOTALL) # Убираем переносы строк внутри квадратных скобок [] markdown_text = re.sub(r'\[(.*?)\]', lambda x: '[' + x.group(1).replace('\n', ' ') + ']', markdown_text, flags=re.DOTALL) # Удаление переносов строк и пробелов внутри текста markdown_text = re.sub(r'(?) last_text = f"###Scripts version {script_ver - 1}" last_text = str(last_text) index_last_text = page_text.find(last_text) if index_last_text != -1: changelog_text_last = page_text[:index_last_text] prev_text = f"###Scripts version {script_ver}" index_script_ver = changelog_text_last.find(prev_text) if index_script_ver != -1: changelog_text = changelog_text_last[index_script_ver:] post_text = (f"-----------------------------\nОбновление скриптов {script_ver}\t\n") + changelog_text site_text = (f"[center][img]/uploads/default/original/1X/5cfa59077a5275971401fab0114e56f3ffdd0ec4.png[/img][" f"/center]\n{post_text}") logging.debug(f"Сообщение на сайт {site_text}") post_data = { "title": f"Обновление скриптов {script_ver}", "raw": site_text, "category": keys.cat_num, "tags": ["scripts"] } return post_text, post_data, post_text def news_content(post_id): logging.debug(f"Запрос содержимого поста новости с ID: {post_id}") response = response_get(f"https://linux-gaming.ru/t/{post_id}.json", heads_site) if response and response.status_code == 200: topic_data = response.json() posts = topic_data.get('post_stream', {}).get('posts', []) # Найти первый пост for post in posts: if post.get('post_number') == 1: html_content = post.get('cooked', 'Нет содержимого') text_data = html_to_text(html_content) return text_data logging.error(f"Первый пост не найден в теме с ID: {post_id}") return None else: logging.error(f"Не удалось получить содержимое поста с ID: {post_id}") return None def response_get(url, heads_site): try: if heads_site == params_get: return requests.get(url, params=params_get) elif heads_site == heads_site: return requests.get(url, headers=heads_site) except requests.RequestException as err: logging.error(f"Ошибка запроса {err}") def resp_change(): resp_changelog = response_get(url_changelog, heads_site) if resp_changelog and resp_changelog.status_code == 200: matches_changelog = re.findall(r'###Scripts version (\d+)###', resp_changelog.text) logging.info(f"Найдены версии в истории изменений: {matches_changelog}") last_changelog = int(max(matches_changelog)) logging.info(f"Последняя версия в истории изменений: {last_changelog}") return last_changelog, resp_changelog else: logging.error( f'Ошибка при запросе changelog: {resp_changelog.status_code if resp_changelog else "No Response"}') return None, None def news(): resp_topics = response_get(url_news, heads_site) if resp_topics.status_code == 200: data = resp_topics.json() topics = data['topic_list']['topics'] list_titles_and_ids = [(topic['id'], str(topic['title'])) for topic in topics] filtered_list_titles_and_ids = [(id, title) for id, title in list_titles_and_ids if not title == ('Описание ' 'категории ' '«Новости»')] return filtered_list_titles_and_ids else: logging.error(f"Ошибка при запросе тем с сайта: {resp_topics.status_code if resp_topics else 'Нет доступа к сайту'}") return [] def site_post(url, headers, json): while True: title = json.get('title', 'No title') try: resp_post = requests.post(url=url, headers=headers, json=json) if resp_post.status_code == 200: logging.info("Новость опубликована на сайте!") return resp_post elif resp_post.status_code == 422: logging.warning(f'Новость "{title}" уже опубликована: {resp_post.status_code}') return resp_post else: logging.error(f'Ошибка при отправке новости "{title}" на сайт: {resp_post.status_code}') except requests.RequestException as error: logging.error(f'Ошибка при отправке новости "{title}" на сайт: {error}') time.sleep(900) def check_version(last_changelog, resp_changelog): list_titles_and_ids = news() pattern = re.compile(r'Обновление скриптов (\d+)') def extract_number(title): match = pattern.search(title) if match: return int(match.group(1)) return None numbers = [extract_number(title) for _, title in list_titles_and_ids if extract_number(title) is not None] last_topics_script = max(numbers) logging.info(f"Последняя новость на сайте о версии: {last_topics_script}") if last_topics_script < last_changelog: list_new_ver = [] for script_ver in range(last_topics_script + 1, last_changelog + 1): list_new_ver.append(script_ver) logging.info(f"Найдена новая версия скрипта {script_ver}") changelog_text, post_data, params = script_content(script_ver, resp_changelog) if post_data: site_post(url_post, heads_site, post_data) if not list_new_ver: logging.warning(f"Не найдена новая версия скрипта") sys.exit() else: logging.warning("Нет новых версий скриптов PortProton") async def discord_post(post_text, client): channel = client.get_channel(keys.dicord_channel) await channel.send(f"{post_text}") async def get_discord_messages(client, channel_id): channel = client.get_channel(channel_id) if not channel: logging.error(f"ID канала Discord {channel_id} не существует") return [] messages = [] async for message in channel.history(limit=999999): messages.append(message.content) pattern = re.compile(r'----------------------------------------------------------\n### (.*?)\t\n', re.DOTALL) for message in messages: matches = pattern.findall(message) if matches: messages.extend(matches) logging.debug(f"Найдены сообщения в дискорде: {messages}") return messages def check_discord_public(): intents = discord.Intents.default() intents.messages = True client = discord.Client(intents=intents) @client.event async def on_ready(): logging.debug(f"Успешный логин в discord {client.user}") channel_id = keys.dicord_channel discord_messages = await get_discord_messages(client, channel_id) list_titles_and_ids = news() if list_titles_and_ids: list_for_public = [] for topic_id, topic_title in list_titles_and_ids: if topic_title not in discord_messages: list_for_public.append((topic_id, topic_title)) if not list_for_public: logging.info(f"Новостей для публикации в дискорд нет") await client.close() else: logging.info(f"Новости для публикации в дискорд: {list_for_public}") channel = client.get_channel(channel_id) if not channel: logging.error(f"ID канала Discord {channel_id} не существует") await client.close() return for topic_id, topic_title in reversed(list_for_public): text_data = news_content(topic_id) if text_data: content = f"----------------------------------------------------------\n### {topic_title}\t\n" + text_data + "\n" # Разбиваем содержимое на части по 4000 символов for i in range(0, len(content), 2000): await channel.send(content[i:i+2000]) await client.close() client.run(keys.discord_token) def vk_post(url, post_text, links=None): params_post = { 'access_token': keys.api_key_vk, 'v': '5.236', # Версия API VK 'owner_id': str(keys.own_id), 'message': f'{post_text}' # Дополнительные параметры можно добавить здесь } if links: params_post['attachments'] = links try: # Отправляем POST-запрос к VK API resp_post = requests.post(url=url, params=params_post) if resp_post.status_code == 200: logging.info("Сообщение успешно опубликовано.") logging.info(resp_post.json()) # Выводим ответ сервера в формате JSON else: logging.error(f"Ошибка при публикации сообщения в ВК:, {resp_post.status_code}") return resp_post except requests.RequestException as err: logging.error(f"VK post failed: {err}") return None def get_vk_topics(): wall_posts = [] while True: wall_data = response_get(url_vk_get, params_get) wall_data_json = wall_data.json() if 'error' in wall_data_json: error_code = wall_data_json['error']['error_code'] error_msg = wall_data_json['error']['error_msg'] logging.error(f"Ошибка {error_code}: {error_msg}") sys.exit(f"Ошибка {error_code}: {error_msg}") items = wall_data_json.get('response', {}).get('items', []) if not items: logging.warning("Постов на стене нет") break wall_posts.extend((post['text'] for post in items if 'text' in post)) if len(items) < 100: break params_get['offset'] = str(int(params_get['offset']) + 100) pattern = re.compile(r'----------------------------------------------------------\n### (.*?)\t\n', re.DOTALL) for message in wall_posts: matches = pattern.findall(message) if matches: wall_posts.extend(matches) logging.debug(f"Найдены посты в ВК: {wall_posts}") return wall_posts def check_vk_posts(): vk_posts = get_vk_topics() if not vk_posts: logging.warning(f"Постов на стене нет{vk_posts}") list_titles_and_ids = news() if list_titles_and_ids: list_for_public = [] for topic_id, topic_title in list_titles_and_ids: # Сравнение заголовков с текстами постов if not any(topic_title in vk_posts for vk_posts in vk_posts): list_for_public.append((topic_id, topic_title)) if not list_for_public: logging.info(f"Новостей для публикации в ВК нет") else: logging.info(f"Новости для публикации в ВК: {list_for_public}") for topic_id, topic_title in reversed(list_for_public): text_data = news_content(topic_id) if text_data: content = f"----------------------------------------------------------\n{topic_title}\t\n" + text_data + "\n" content = remove_markdown_links(content) links = extract_links(content) if "Обновление скриптов" in topic_title: # Пример добавления изображения с постом vk_post(url_vk_post, content, "photo-99238527_457244491") else: if links: vk_post(url_vk_post, content, links) else: vk_post(url_vk_post, content) if __name__ == '__main__': main()