mirror of
https://github.com/goldenfishs/MRobot.git
synced 2025-11-02 04:23:10 +08:00
462 lines
19 KiB
Python
462 lines
19 KiB
Python
"""
|
||
自动更新模块
|
||
实现软件的自动更新功能,包括下载、解压、安装等完整流程
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import shutil
|
||
import tempfile
|
||
import zipfile
|
||
import subprocess
|
||
import platform
|
||
from pathlib import Path
|
||
from typing import Optional, Callable
|
||
from urllib.parse import urlparse
|
||
|
||
import requests
|
||
from packaging.version import parse as vparse
|
||
from PyQt5.QtCore import QThread, pyqtSignal, QObject
|
||
|
||
# 移除多线程下载器依赖
|
||
|
||
|
||
class UpdaterSignals(QObject):
|
||
"""更新器信号类"""
|
||
progress_changed = pyqtSignal(int) # 进度变化信号 (0-100)
|
||
download_progress = pyqtSignal(int, int, float, float) # 下载进度: 已下载字节, 总字节, 速度MB/s, 剩余时间秒
|
||
status_changed = pyqtSignal(str) # 状态变化信号
|
||
error_occurred = pyqtSignal(str) # 错误信号
|
||
update_completed = pyqtSignal(str) # 更新完成信号,可选包含文件路径
|
||
update_cancelled = pyqtSignal() # 更新取消信号
|
||
|
||
|
||
class AutoUpdater(QThread):
|
||
"""自动更新器类"""
|
||
|
||
def __init__(self, current_version: str, repo: str = "goldenfishs/MRobot"):
|
||
super().__init__()
|
||
self.current_version = current_version
|
||
self.repo = repo
|
||
self.signals = UpdaterSignals()
|
||
self.cancelled = False
|
||
|
||
# 获取当前程序信息
|
||
self.is_frozen = getattr(sys, 'frozen', False)
|
||
self.app_dir = self._get_app_directory()
|
||
self.temp_dir = None
|
||
|
||
# 多线程下载器
|
||
self.downloader = None
|
||
|
||
def _get_app_directory(self) -> str:
|
||
"""获取应用程序目录"""
|
||
if self.is_frozen:
|
||
# 如果是打包的exe,返回exe所在目录
|
||
return os.path.dirname(sys.executable)
|
||
else:
|
||
# 如果是Python脚本,返回项目根目录
|
||
return os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
||
|
||
def cancel_update(self):
|
||
"""取消更新"""
|
||
self.cancelled = True
|
||
if self.downloader:
|
||
self.downloader.cancel()
|
||
self.signals.update_cancelled.emit()
|
||
|
||
def check_for_updates(self) -> Optional[dict]:
|
||
"""检查是否有新版本可用"""
|
||
try:
|
||
self.signals.status_changed.emit("正在检查更新...")
|
||
|
||
url = f"https://api.github.com/repos/{self.repo}/releases/latest"
|
||
response = requests.get(url, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
release_data = response.json()
|
||
latest_version = release_data["tag_name"].lstrip("v")
|
||
|
||
if vparse(latest_version) > vparse(self.current_version):
|
||
download_url = self._get_download_url(release_data)
|
||
asset_size = self._get_asset_size(release_data, download_url) if download_url else 0
|
||
return {
|
||
'version': latest_version,
|
||
'download_url': download_url,
|
||
'release_notes': release_data.get('body', ''),
|
||
'release_date': release_data.get('published_at', ''),
|
||
'asset_name': self._get_asset_name(release_data),
|
||
'asset_size': asset_size
|
||
}
|
||
return None
|
||
else:
|
||
raise Exception(f"GitHub API请求失败: {response.status_code}")
|
||
|
||
except Exception as e:
|
||
self.signals.error_occurred.emit(f"检查更新失败: {str(e)}")
|
||
return None
|
||
|
||
def _get_download_url(self, release_data: dict) -> Optional[str]:
|
||
"""从release数据中获取适合当前平台的下载链接"""
|
||
assets = release_data.get('assets', [])
|
||
system = platform.system().lower()
|
||
|
||
# 根据操作系统选择合适的安装包
|
||
for asset in assets:
|
||
name = asset['name'].lower()
|
||
|
||
if system == 'windows':
|
||
if 'installer' in name and name.endswith('.exe'):
|
||
return asset['browser_download_url']
|
||
if name.endswith('.exe') or name.endswith('.zip'):
|
||
return asset['browser_download_url']
|
||
elif system == 'darwin': # macOS
|
||
# 优先选择 dmg 或 zip 文件
|
||
if name.endswith('.dmg'):
|
||
return asset['browser_download_url']
|
||
if name.endswith('.zip') and 'macos' in name:
|
||
return asset['browser_download_url']
|
||
elif system == 'linux':
|
||
if name.endswith('.tar.gz') or (name.endswith('.zip') and 'linux' in name):
|
||
return asset['browser_download_url']
|
||
|
||
# 如果没找到特定平台的,在 macOS 上避免选择 .exe 文件
|
||
for asset in assets:
|
||
name = asset['name'].lower()
|
||
if system == 'darwin':
|
||
# macOS 优先选择非 exe 文件
|
||
if name.endswith('.zip') or name.endswith('.dmg') or name.endswith('.tar.gz'):
|
||
return asset['browser_download_url']
|
||
else:
|
||
if any(name.endswith(ext) for ext in ['.zip', '.exe', '.dmg', '.tar.gz']):
|
||
return asset['browser_download_url']
|
||
|
||
# 最后才选择 exe 文件(如果没有其他选择)
|
||
if system == 'darwin':
|
||
for asset in assets:
|
||
name = asset['name'].lower()
|
||
if name.endswith('.exe'):
|
||
return asset['browser_download_url']
|
||
|
||
return None
|
||
|
||
def _get_asset_name(self, release_data: dict) -> Optional[str]:
|
||
"""获取资源文件名"""
|
||
download_url = self._get_download_url(release_data)
|
||
if download_url:
|
||
return os.path.basename(urlparse(download_url).path)
|
||
return None
|
||
|
||
def _get_asset_size(self, release_data: dict, download_url: str) -> int:
|
||
"""获取资源文件大小"""
|
||
if not download_url:
|
||
return 0
|
||
|
||
assets = release_data.get('assets', [])
|
||
for asset in assets:
|
||
if asset.get('browser_download_url') == download_url:
|
||
return asset.get('size', 0)
|
||
return 0
|
||
|
||
def download_update(self, download_url: str, filename: str) -> Optional[str]:
|
||
"""下载更新文件 - 使用简单的单线程下载"""
|
||
try:
|
||
self.signals.status_changed.emit("开始下载...")
|
||
|
||
# 创建临时目录
|
||
self.temp_dir = tempfile.mkdtemp(prefix="MRobot_update_")
|
||
file_path = os.path.join(self.temp_dir, filename)
|
||
|
||
print(f"Downloading to: {file_path}")
|
||
|
||
# 使用简单的requests下载
|
||
import requests
|
||
headers = {
|
||
'User-Agent': 'MRobot-Updater/1.0',
|
||
'Accept': '*/*',
|
||
}
|
||
|
||
response = requests.get(download_url, headers=headers, stream=True, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
# 获取文件总大小
|
||
total_size = int(response.headers.get('Content-Length', 0))
|
||
downloaded_size = 0
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||
|
||
# 下载文件
|
||
with open(file_path, 'wb') as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if self.cancelled:
|
||
print("Download cancelled by user")
|
||
return None
|
||
|
||
if chunk:
|
||
f.write(chunk)
|
||
downloaded_size += len(chunk)
|
||
|
||
# 更新进度
|
||
if total_size > 0:
|
||
progress = int((downloaded_size / total_size) * 100)
|
||
self.signals.progress_changed.emit(progress)
|
||
|
||
# 发送详细进度信息
|
||
speed = 0 # 简化版本不计算速度
|
||
remaining = 0
|
||
self.signals.download_progress.emit(downloaded_size, total_size, speed, remaining)
|
||
|
||
# 验证下载的文件
|
||
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
|
||
print(f"Download completed successfully: {file_path}")
|
||
self.signals.status_changed.emit("下载完成!")
|
||
return file_path
|
||
else:
|
||
raise Exception("下载的文件不存在或为空")
|
||
|
||
except Exception as e:
|
||
error_msg = f"下载失败: {str(e)}"
|
||
print(f"Download error: {error_msg}")
|
||
self.signals.error_occurred.emit(error_msg)
|
||
return None
|
||
|
||
def _on_download_progress(self, downloaded: int, total: int, speed: float, remaining: float):
|
||
"""处理下载进度"""
|
||
# 转发详细下载进度
|
||
self.signals.download_progress.emit(downloaded, total, speed, remaining)
|
||
|
||
# 计算总体进度 (下载占80%,其他操作占20%)
|
||
if total > 0:
|
||
download_progress = int((downloaded / total) * 80)
|
||
self.signals.progress_changed.emit(download_progress)
|
||
|
||
def extract_update(self, file_path: str) -> Optional[str]:
|
||
"""解压更新文件"""
|
||
try:
|
||
self.signals.status_changed.emit("正在解压文件...")
|
||
self.signals.progress_changed.emit(85)
|
||
|
||
if not self.temp_dir:
|
||
raise Exception("临时目录未初始化")
|
||
|
||
extract_dir = os.path.join(self.temp_dir, "extracted")
|
||
os.makedirs(extract_dir, exist_ok=True)
|
||
|
||
# 根据文件扩展名选择解压方法
|
||
if file_path.endswith('.zip'):
|
||
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
||
zip_ref.extractall(extract_dir)
|
||
elif file_path.endswith('.tar.gz'):
|
||
import tarfile
|
||
with tarfile.open(file_path, 'r:gz') as tar_ref:
|
||
tar_ref.extractall(extract_dir)
|
||
elif file_path.endswith('.exe'):
|
||
# 对于 .exe 文件,在非 Windows 系统上提示手动安装
|
||
current_system = platform.system()
|
||
if current_system != 'Windows':
|
||
self.signals.error_occurred.emit(f"下载的是 Windows 安装程序,当前系统是 {current_system}。\n请手动下载适合您系统的版本。")
|
||
return None
|
||
else:
|
||
# Windows 系统直接返回文件路径,由安装函数处理
|
||
return file_path
|
||
else:
|
||
raise Exception(f"不支持的文件格式: {file_path}")
|
||
|
||
self.signals.progress_changed.emit(90)
|
||
self.signals.status_changed.emit("解压完成")
|
||
return extract_dir
|
||
|
||
except Exception as e:
|
||
self.signals.error_occurred.emit(f"解压失败: {str(e)}")
|
||
return None
|
||
|
||
def install_update(self, extract_dir: str) -> bool:
|
||
"""安装更新"""
|
||
try:
|
||
self.signals.status_changed.emit("正在安装更新...")
|
||
self.signals.progress_changed.emit(95)
|
||
|
||
if not self.temp_dir:
|
||
raise Exception("临时目录未初始化")
|
||
|
||
# 创建备份目录
|
||
backup_dir = os.path.join(self.temp_dir, "backup")
|
||
os.makedirs(backup_dir, exist_ok=True)
|
||
|
||
# 备份当前程序文件
|
||
self._backup_current_files(backup_dir)
|
||
|
||
# 复制新文件
|
||
self._copy_update_files(extract_dir)
|
||
|
||
self.signals.progress_changed.emit(99)
|
||
self.signals.status_changed.emit("安装完成")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.signals.error_occurred.emit(f"安装失败: {str(e)}")
|
||
# 尝试恢复备份
|
||
self._restore_backup(backup_dir)
|
||
return False
|
||
|
||
def _backup_current_files(self, backup_dir: str):
|
||
"""备份当前程序文件"""
|
||
important_files = ['MRobot.py', 'MRobot.exe', 'app/', 'assets/']
|
||
|
||
for item in important_files:
|
||
src_path = os.path.join(self.app_dir, item)
|
||
if os.path.exists(src_path):
|
||
dst_path = os.path.join(backup_dir, item)
|
||
if os.path.isdir(src_path):
|
||
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)
|
||
else:
|
||
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
|
||
shutil.copy2(src_path, dst_path)
|
||
|
||
def _copy_update_files(self, extract_dir: str):
|
||
"""复制更新文件到应用程序目录"""
|
||
# 查找解压目录中的主要文件/文件夹
|
||
extract_contents = os.listdir(extract_dir)
|
||
|
||
# 如果解压后只有一个文件夹,进入该文件夹
|
||
if len(extract_contents) == 1 and os.path.isdir(os.path.join(extract_dir, extract_contents[0])):
|
||
extract_dir = os.path.join(extract_dir, extract_contents[0])
|
||
|
||
# 复制文件到应用程序目录
|
||
for item in os.listdir(extract_dir):
|
||
src_path = os.path.join(extract_dir, item)
|
||
dst_path = os.path.join(self.app_dir, item)
|
||
|
||
if os.path.isdir(src_path):
|
||
if os.path.exists(dst_path):
|
||
shutil.rmtree(dst_path)
|
||
shutil.copytree(src_path, dst_path)
|
||
else:
|
||
shutil.copy2(src_path, dst_path)
|
||
|
||
def _restore_backup(self, backup_dir: str):
|
||
"""恢复备份文件"""
|
||
try:
|
||
for item in os.listdir(backup_dir):
|
||
src_path = os.path.join(backup_dir, item)
|
||
dst_path = os.path.join(self.app_dir, item)
|
||
|
||
if os.path.isdir(src_path):
|
||
if os.path.exists(dst_path):
|
||
shutil.rmtree(dst_path)
|
||
shutil.copytree(src_path, dst_path)
|
||
else:
|
||
shutil.copy2(src_path, dst_path)
|
||
except Exception as e:
|
||
print(f"恢复备份失败: {e}")
|
||
|
||
def restart_application(self):
|
||
"""重启应用程序"""
|
||
try:
|
||
self.signals.status_changed.emit("正在重启应用程序...")
|
||
|
||
if self.is_frozen:
|
||
# 如果是打包的exe
|
||
executable = sys.executable
|
||
else:
|
||
# 如果是Python脚本
|
||
executable = sys.executable
|
||
script_path = os.path.join(self.app_dir, "MRobot.py")
|
||
|
||
# 启动新进程
|
||
if platform.system() == 'Windows':
|
||
subprocess.Popen([executable] + ([script_path] if not self.is_frozen else []))
|
||
else:
|
||
subprocess.Popen([executable] + ([script_path] if not self.is_frozen else []))
|
||
|
||
# 退出当前进程
|
||
sys.exit(0)
|
||
|
||
except Exception as e:
|
||
self.signals.error_occurred.emit(f"重启失败: {str(e)}")
|
||
|
||
def cleanup(self):
|
||
"""清理临时文件"""
|
||
print(f"cleanup() called, temp_dir: {self.temp_dir}") # 调试输出
|
||
if self.temp_dir and os.path.exists(self.temp_dir):
|
||
try:
|
||
print(f"Removing temp directory: {self.temp_dir}") # 调试输出
|
||
shutil.rmtree(self.temp_dir)
|
||
print("Temp directory removed successfully") # 调试输出
|
||
except Exception as e:
|
||
print(f"清理临时文件失败: {e}")
|
||
else:
|
||
print("No temp directory to clean up") # 调试输出
|
||
|
||
def run(self):
|
||
"""执行更新流程"""
|
||
try:
|
||
self.signals.status_changed.emit("开始更新流程...")
|
||
|
||
# 检查更新
|
||
self.signals.status_changed.emit("正在获取更新信息...")
|
||
update_info = self.check_for_updates()
|
||
if not update_info or self.cancelled:
|
||
self.signals.status_changed.emit("未找到更新信息或已取消")
|
||
return
|
||
|
||
self.signals.status_changed.emit(f"准备下载版本 {update_info['version']}")
|
||
|
||
# 下载更新
|
||
downloaded_file = self.download_update(
|
||
update_info['download_url'],
|
||
update_info['asset_name']
|
||
)
|
||
if not downloaded_file or self.cancelled:
|
||
self.signals.status_changed.emit("下载失败或已取消")
|
||
return
|
||
|
||
self.signals.status_changed.emit("下载完成!")
|
||
self.signals.progress_changed.emit(100)
|
||
|
||
# 检查是否为exe文件且当前系统非Windows
|
||
current_system = platform.system()
|
||
print(f"Downloaded file: {downloaded_file}") # 调试输出
|
||
print(f"Current system: {current_system}") # 调试输出
|
||
if downloaded_file.endswith('.exe') and current_system != 'Windows':
|
||
# 直接完成,返回文件路径
|
||
print(f"Emitting update_completed signal with file path: {downloaded_file}") # 调试输出
|
||
self.signals.update_completed.emit(downloaded_file)
|
||
# 不要立即清理,让用户有时间访问文件
|
||
print("Skipping cleanup to preserve downloaded file") # 调试输出
|
||
return
|
||
|
||
# 对于其他情况,继续原有流程
|
||
self.signals.status_changed.emit("下载完成,开始解压...")
|
||
|
||
# 解压更新
|
||
extract_dir = self.extract_update(downloaded_file)
|
||
if not extract_dir or self.cancelled:
|
||
self.signals.status_changed.emit("解压失败或已取消")
|
||
return
|
||
|
||
# 安装更新
|
||
self.signals.status_changed.emit("开始安装更新...")
|
||
if self.install_update(extract_dir) and not self.cancelled:
|
||
self.signals.progress_changed.emit(100)
|
||
self.signals.status_changed.emit("更新安装完成")
|
||
self.signals.update_completed.emit("") # 传递空字符串表示正常安装完成
|
||
else:
|
||
self.signals.status_changed.emit("安装失败或已取消")
|
||
|
||
except Exception as e:
|
||
error_msg = f"更新过程中发生错误: {str(e)}"
|
||
print(f"AutoUpdater error: {error_msg}") # 调试输出
|
||
self.signals.error_occurred.emit(error_msg)
|
||
finally:
|
||
# 对于下载完成的情况,延迟清理临时文件
|
||
# 这样用户有时间访问下载的文件
|
||
pass # 暂时不在这里清理
|
||
|
||
|
||
def check_update_availability(current_version: str, repo: str = "goldenfishs/MRobot") -> Optional[dict]:
|
||
"""快速检查是否有新版本可用(不下载)"""
|
||
updater = AutoUpdater(current_version, repo)
|
||
return updater.check_for_updates() |