MRobot/app/tools/auto_updater.py
2025-10-13 15:21:29 +08:00

462 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
自动更新模块
实现软件的自动更新功能,包括下载、解压、安装等完整流程
"""
import os
import sys
import shutil
import tempfile
import zipfile
import subprocess
import platform
from pathlib import Path
from typing import Optional, Callable
from urllib.parse import urlparse
import requests
from packaging.version import parse as vparse
from PyQt5.QtCore import QThread, pyqtSignal, QObject
# 移除多线程下载器依赖
class UpdaterSignals(QObject):
"""更新器信号类"""
progress_changed = pyqtSignal(int) # 进度变化信号 (0-100)
download_progress = pyqtSignal(int, int, float, float) # 下载进度: 已下载字节, 总字节, 速度MB/s, 剩余时间秒
status_changed = pyqtSignal(str) # 状态变化信号
error_occurred = pyqtSignal(str) # 错误信号
update_completed = pyqtSignal(str) # 更新完成信号,可选包含文件路径
update_cancelled = pyqtSignal() # 更新取消信号
class AutoUpdater(QThread):
"""自动更新器类"""
def __init__(self, current_version: str, repo: str = "goldenfishs/MRobot"):
super().__init__()
self.current_version = current_version
self.repo = repo
self.signals = UpdaterSignals()
self.cancelled = False
# 获取当前程序信息
self.is_frozen = getattr(sys, 'frozen', False)
self.app_dir = self._get_app_directory()
self.temp_dir = None
# 多线程下载器
self.downloader = None
def _get_app_directory(self) -> str:
"""获取应用程序目录"""
if self.is_frozen:
# 如果是打包的exe返回exe所在目录
return os.path.dirname(sys.executable)
else:
# 如果是Python脚本返回项目根目录
return os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
def cancel_update(self):
"""取消更新"""
self.cancelled = True
if self.downloader:
self.downloader.cancel()
self.signals.update_cancelled.emit()
def check_for_updates(self) -> Optional[dict]:
"""检查是否有新版本可用"""
try:
self.signals.status_changed.emit("正在检查更新...")
url = f"https://api.github.com/repos/{self.repo}/releases/latest"
response = requests.get(url, timeout=10)
if response.status_code == 200:
release_data = response.json()
latest_version = release_data["tag_name"].lstrip("v")
if vparse(latest_version) > vparse(self.current_version):
download_url = self._get_download_url(release_data)
asset_size = self._get_asset_size(release_data, download_url) if download_url else 0
return {
'version': latest_version,
'download_url': download_url,
'release_notes': release_data.get('body', ''),
'release_date': release_data.get('published_at', ''),
'asset_name': self._get_asset_name(release_data),
'asset_size': asset_size
}
return None
else:
raise Exception(f"GitHub API请求失败: {response.status_code}")
except Exception as e:
self.signals.error_occurred.emit(f"检查更新失败: {str(e)}")
return None
def _get_download_url(self, release_data: dict) -> Optional[str]:
"""从release数据中获取适合当前平台的下载链接"""
assets = release_data.get('assets', [])
system = platform.system().lower()
# 根据操作系统选择合适的安装包
for asset in assets:
name = asset['name'].lower()
if system == 'windows':
if 'installer' in name and name.endswith('.exe'):
return asset['browser_download_url']
if name.endswith('.exe') or name.endswith('.zip'):
return asset['browser_download_url']
elif system == 'darwin': # macOS
# 优先选择 dmg 或 zip 文件
if name.endswith('.dmg'):
return asset['browser_download_url']
if name.endswith('.zip') and 'macos' in name:
return asset['browser_download_url']
elif system == 'linux':
if name.endswith('.tar.gz') or (name.endswith('.zip') and 'linux' in name):
return asset['browser_download_url']
# 如果没找到特定平台的,在 macOS 上避免选择 .exe 文件
for asset in assets:
name = asset['name'].lower()
if system == 'darwin':
# macOS 优先选择非 exe 文件
if name.endswith('.zip') or name.endswith('.dmg') or name.endswith('.tar.gz'):
return asset['browser_download_url']
else:
if any(name.endswith(ext) for ext in ['.zip', '.exe', '.dmg', '.tar.gz']):
return asset['browser_download_url']
# 最后才选择 exe 文件(如果没有其他选择)
if system == 'darwin':
for asset in assets:
name = asset['name'].lower()
if name.endswith('.exe'):
return asset['browser_download_url']
return None
def _get_asset_name(self, release_data: dict) -> Optional[str]:
"""获取资源文件名"""
download_url = self._get_download_url(release_data)
if download_url:
return os.path.basename(urlparse(download_url).path)
return None
def _get_asset_size(self, release_data: dict, download_url: str) -> int:
"""获取资源文件大小"""
if not download_url:
return 0
assets = release_data.get('assets', [])
for asset in assets:
if asset.get('browser_download_url') == download_url:
return asset.get('size', 0)
return 0
def download_update(self, download_url: str, filename: str) -> Optional[str]:
"""下载更新文件 - 使用简单的单线程下载"""
try:
self.signals.status_changed.emit("开始下载...")
# 创建临时目录
self.temp_dir = tempfile.mkdtemp(prefix="MRobot_update_")
file_path = os.path.join(self.temp_dir, filename)
print(f"Downloading to: {file_path}")
# 使用简单的requests下载
import requests
headers = {
'User-Agent': 'MRobot-Updater/1.0',
'Accept': '*/*',
}
response = requests.get(download_url, headers=headers, stream=True, timeout=30)
response.raise_for_status()
# 获取文件总大小
total_size = int(response.headers.get('Content-Length', 0))
downloaded_size = 0
# 确保目录存在
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# 下载文件
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if self.cancelled:
print("Download cancelled by user")
return None
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 更新进度
if total_size > 0:
progress = int((downloaded_size / total_size) * 100)
self.signals.progress_changed.emit(progress)
# 发送详细进度信息
speed = 0 # 简化版本不计算速度
remaining = 0
self.signals.download_progress.emit(downloaded_size, total_size, speed, remaining)
# 验证下载的文件
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
print(f"Download completed successfully: {file_path}")
self.signals.status_changed.emit("下载完成!")
return file_path
else:
raise Exception("下载的文件不存在或为空")
except Exception as e:
error_msg = f"下载失败: {str(e)}"
print(f"Download error: {error_msg}")
self.signals.error_occurred.emit(error_msg)
return None
def _on_download_progress(self, downloaded: int, total: int, speed: float, remaining: float):
"""处理下载进度"""
# 转发详细下载进度
self.signals.download_progress.emit(downloaded, total, speed, remaining)
# 计算总体进度 (下载占80%其他操作占20%)
if total > 0:
download_progress = int((downloaded / total) * 80)
self.signals.progress_changed.emit(download_progress)
def extract_update(self, file_path: str) -> Optional[str]:
"""解压更新文件"""
try:
self.signals.status_changed.emit("正在解压文件...")
self.signals.progress_changed.emit(85)
if not self.temp_dir:
raise Exception("临时目录未初始化")
extract_dir = os.path.join(self.temp_dir, "extracted")
os.makedirs(extract_dir, exist_ok=True)
# 根据文件扩展名选择解压方法
if file_path.endswith('.zip'):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
elif file_path.endswith('.tar.gz'):
import tarfile
with tarfile.open(file_path, 'r:gz') as tar_ref:
tar_ref.extractall(extract_dir)
elif file_path.endswith('.exe'):
# 对于 .exe 文件,在非 Windows 系统上提示手动安装
current_system = platform.system()
if current_system != 'Windows':
self.signals.error_occurred.emit(f"下载的是 Windows 安装程序,当前系统是 {current_system}\n请手动下载适合您系统的版本。")
return None
else:
# Windows 系统直接返回文件路径,由安装函数处理
return file_path
else:
raise Exception(f"不支持的文件格式: {file_path}")
self.signals.progress_changed.emit(90)
self.signals.status_changed.emit("解压完成")
return extract_dir
except Exception as e:
self.signals.error_occurred.emit(f"解压失败: {str(e)}")
return None
def install_update(self, extract_dir: str) -> bool:
"""安装更新"""
try:
self.signals.status_changed.emit("正在安装更新...")
self.signals.progress_changed.emit(95)
if not self.temp_dir:
raise Exception("临时目录未初始化")
# 创建备份目录
backup_dir = os.path.join(self.temp_dir, "backup")
os.makedirs(backup_dir, exist_ok=True)
# 备份当前程序文件
self._backup_current_files(backup_dir)
# 复制新文件
self._copy_update_files(extract_dir)
self.signals.progress_changed.emit(99)
self.signals.status_changed.emit("安装完成")
return True
except Exception as e:
self.signals.error_occurred.emit(f"安装失败: {str(e)}")
# 尝试恢复备份
self._restore_backup(backup_dir)
return False
def _backup_current_files(self, backup_dir: str):
"""备份当前程序文件"""
important_files = ['MRobot.py', 'MRobot.exe', 'app/', 'assets/']
for item in important_files:
src_path = os.path.join(self.app_dir, item)
if os.path.exists(src_path):
dst_path = os.path.join(backup_dir, item)
if os.path.isdir(src_path):
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)
else:
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
shutil.copy2(src_path, dst_path)
def _copy_update_files(self, extract_dir: str):
"""复制更新文件到应用程序目录"""
# 查找解压目录中的主要文件/文件夹
extract_contents = os.listdir(extract_dir)
# 如果解压后只有一个文件夹,进入该文件夹
if len(extract_contents) == 1 and os.path.isdir(os.path.join(extract_dir, extract_contents[0])):
extract_dir = os.path.join(extract_dir, extract_contents[0])
# 复制文件到应用程序目录
for item in os.listdir(extract_dir):
src_path = os.path.join(extract_dir, item)
dst_path = os.path.join(self.app_dir, item)
if os.path.isdir(src_path):
if os.path.exists(dst_path):
shutil.rmtree(dst_path)
shutil.copytree(src_path, dst_path)
else:
shutil.copy2(src_path, dst_path)
def _restore_backup(self, backup_dir: str):
"""恢复备份文件"""
try:
for item in os.listdir(backup_dir):
src_path = os.path.join(backup_dir, item)
dst_path = os.path.join(self.app_dir, item)
if os.path.isdir(src_path):
if os.path.exists(dst_path):
shutil.rmtree(dst_path)
shutil.copytree(src_path, dst_path)
else:
shutil.copy2(src_path, dst_path)
except Exception as e:
print(f"恢复备份失败: {e}")
def restart_application(self):
"""重启应用程序"""
try:
self.signals.status_changed.emit("正在重启应用程序...")
if self.is_frozen:
# 如果是打包的exe
executable = sys.executable
else:
# 如果是Python脚本
executable = sys.executable
script_path = os.path.join(self.app_dir, "MRobot.py")
# 启动新进程
if platform.system() == 'Windows':
subprocess.Popen([executable] + ([script_path] if not self.is_frozen else []))
else:
subprocess.Popen([executable] + ([script_path] if not self.is_frozen else []))
# 退出当前进程
sys.exit(0)
except Exception as e:
self.signals.error_occurred.emit(f"重启失败: {str(e)}")
def cleanup(self):
"""清理临时文件"""
print(f"cleanup() called, temp_dir: {self.temp_dir}") # 调试输出
if self.temp_dir and os.path.exists(self.temp_dir):
try:
print(f"Removing temp directory: {self.temp_dir}") # 调试输出
shutil.rmtree(self.temp_dir)
print("Temp directory removed successfully") # 调试输出
except Exception as e:
print(f"清理临时文件失败: {e}")
else:
print("No temp directory to clean up") # 调试输出
def run(self):
"""执行更新流程"""
try:
self.signals.status_changed.emit("开始更新流程...")
# 检查更新
self.signals.status_changed.emit("正在获取更新信息...")
update_info = self.check_for_updates()
if not update_info or self.cancelled:
self.signals.status_changed.emit("未找到更新信息或已取消")
return
self.signals.status_changed.emit(f"准备下载版本 {update_info['version']}")
# 下载更新
downloaded_file = self.download_update(
update_info['download_url'],
update_info['asset_name']
)
if not downloaded_file or self.cancelled:
self.signals.status_changed.emit("下载失败或已取消")
return
self.signals.status_changed.emit("下载完成!")
self.signals.progress_changed.emit(100)
# 检查是否为exe文件且当前系统非Windows
current_system = platform.system()
print(f"Downloaded file: {downloaded_file}") # 调试输出
print(f"Current system: {current_system}") # 调试输出
if downloaded_file.endswith('.exe') and current_system != 'Windows':
# 直接完成,返回文件路径
print(f"Emitting update_completed signal with file path: {downloaded_file}") # 调试输出
self.signals.update_completed.emit(downloaded_file)
# 不要立即清理,让用户有时间访问文件
print("Skipping cleanup to preserve downloaded file") # 调试输出
return
# 对于其他情况,继续原有流程
self.signals.status_changed.emit("下载完成,开始解压...")
# 解压更新
extract_dir = self.extract_update(downloaded_file)
if not extract_dir or self.cancelled:
self.signals.status_changed.emit("解压失败或已取消")
return
# 安装更新
self.signals.status_changed.emit("开始安装更新...")
if self.install_update(extract_dir) and not self.cancelled:
self.signals.progress_changed.emit(100)
self.signals.status_changed.emit("更新安装完成")
self.signals.update_completed.emit("") # 传递空字符串表示正常安装完成
else:
self.signals.status_changed.emit("安装失败或已取消")
except Exception as e:
error_msg = f"更新过程中发生错误: {str(e)}"
print(f"AutoUpdater error: {error_msg}") # 调试输出
self.signals.error_occurred.emit(error_msg)
finally:
# 对于下载完成的情况,延迟清理临时文件
# 这样用户有时间访问下载的文件
pass # 暂时不在这里清理
def check_update_availability(current_version: str, repo: str = "goldenfishs/MRobot") -> Optional[dict]:
"""快速检查是否有新版本可用(不下载)"""
updater = AutoUpdater(current_version, repo)
return updater.check_for_updates()