chore: clean all vulture 80% confidence dead code
All checks were successful
Code check / Check code (push) Successful in 1m45s

Signed-off-by: Boris Yumankulov <boria138@altlinux.org>
This commit is contained in:
2025-12-21 19:34:32 +05:00
parent 6457084d56
commit 58bbff8e69
10 changed files with 10 additions and 276 deletions

View File

@@ -1,15 +1,11 @@
"""
Utility module for search optimizations including Trie, hash tables, and fuzzy matching.
"""
import concurrent.futures
import threading
from collections.abc import Callable
from typing import Any
from rapidfuzz import fuzz
from threading import Lock
from portprotonqt.logger import get_logger
from PySide6.QtCore import QThread, QRunnable, Signal, QObject, QTimer
import requests
from PySide6.QtCore import QThread, Signal, QObject
logger = get_logger(__name__)
@@ -139,99 +135,12 @@ class SearchOptimizer:
return []
class RequestRunnable(QRunnable):
"""Runnable for executing HTTP requests in a thread."""
def __init__(self, method: str, url: str, on_success=None, on_error=None, **kwargs):
super().__init__()
self.method = method
self.url = url
self.kwargs = kwargs
self.result = None
self.error = None
self.on_success: Callable | None = on_success
self.on_error: Callable | None = on_error
def run(self):
try:
if self.method.lower() == 'get':
self.result = requests.get(self.url, **self.kwargs)
elif self.method.lower() == 'post':
self.result = requests.post(self.url, **self.kwargs)
else:
raise ValueError(f"Unsupported HTTP method: {self.method}")
# Execute success callback if provided
if self.on_success is not None:
success_callback = self.on_success # Capture the callback
def success_handler():
if success_callback is not None: # Re-check to satisfy Pyright
success_callback(self.result)
QTimer.singleShot(0, success_handler)
except Exception as e:
self.error = e
# Execute error callback if provided
if self.on_error is not None:
error_callback = self.on_error # Capture the callback
captured_error = e # Capture the exception
def error_handler():
error_callback(captured_error)
QTimer.singleShot(0, error_handler)
def run_request_in_thread(method: str, url: str, on_success: Callable | None = None, on_error: Callable | None = None, **kwargs):
"""Run HTTP request in a separate thread using Qt's thread system."""
runnable = RequestRunnable(method, url, on_success=on_success, on_error=on_error, **kwargs)
# Use QThreadPool to execute the runnable
from PySide6.QtCore import QThreadPool
thread_pool = QThreadPool.globalInstance()
thread_pool.start(runnable)
return runnable # Return the runnable to allow for potential cancellation if needed
def run_function_in_thread(func, *args, on_success: Callable | None = None, on_error: Callable | None = None, **kwargs):
"""Run a function in a separate thread."""
def execute():
try:
result = func(*args, **kwargs)
if on_success:
on_success(result)
except Exception as e:
if on_error:
on_error(e)
thread = threading.Thread(target=execute)
thread.daemon = True
thread.start()
return thread
def run_in_thread(func, *args, **kwargs):
"""Run a function in a separate thread."""
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(func, *args, **kwargs)
return future.result()
def run_in_thread_async(func, *args, callback: Callable | None = None, **kwargs):
"""Run a function in a separate thread asynchronously."""
import threading
def target():
try:
result = func(*args, **kwargs)
if callback:
callback(result)
except Exception as e:
if callback:
callback(None) # or handle error in callback
logger.error(f"Error in threaded operation: {e}")
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
return thread
# Threaded search implementation using QThread for performance optimization
@@ -320,11 +229,6 @@ class ThreadedSearch(QThread):
self.worker.search_finished.connect(self.search_finished)
self.worker.search_error.connect(self.search_error)
def set_search_params(self, search_text: str, games_data: list, search_type: str = "auto"):
"""Set parameters for the search operation."""
self.search_text = search_text
self.games_data = games_data
self.search_type = search_type
def set_games_data(self, games_data: list):
"""Set the games data to be searched."""
@@ -334,46 +238,3 @@ class ThreadedSearch(QThread):
def run(self):
"""Run the search operation in the thread."""
self.worker.execute_search(self.search_text, self.search_type)
class SearchThreadPool:
"""
A simple thread pool for managing multiple search operations.
"""
def __init__(self, max_threads: int = 3):
self.max_threads = max_threads
self.active_threads = []
self.thread_queue = []
def submit_search(self, search_text: str, games_data: list, search_type: str = "auto",
on_start: Callable | None = None, on_finish: Callable | None = None, on_error: Callable | None = None):
"""
Submit a search operation to the pool.
Args:
search_text: Text to search for
games_data: List of game data tuples to search in
search_type: Type of search ("exact", "prefix", "fuzzy", "auto")
on_start: Callback when search starts
on_finish: Callback when search finishes (receives results)
on_error: Callback when search errors (receives error message)
"""
search_thread = ThreadedSearch()
search_thread.set_search_params(search_text, games_data, search_type)
# Connect callbacks if provided
if on_start:
search_thread.search_started.connect(on_start)
if on_finish:
search_thread.search_finished.connect(on_finish)
if on_error:
search_thread.search_error.connect(on_error)
# Start the thread
search_thread.start()
self.active_threads.append(search_thread)
# Clean up finished threads
self.active_threads = [thread for thread in self.active_threads if thread.isRunning()]
return search_thread