mirror of
https://github.com/goldenfishs/MRobot.git
synced 2025-07-03 14:24:15 +08:00
创建minishell
This commit is contained in:
parent
26ce1316ca
commit
86b0062881
897
MRobot.py
897
MRobot.py
@ -1,719 +1,246 @@
|
||||
import tkinter as tk
|
||||
from tkinter import ttk
|
||||
from PIL import Image, ImageTk
|
||||
import sys
|
||||
import os
|
||||
import threading
|
||||
import shutil
|
||||
import re
|
||||
from git import Repo
|
||||
from collections import defaultdict
|
||||
import csv
|
||||
import xml.etree.ElementTree as ET
|
||||
from PyQt5.QtCore import Qt, pyqtSignal
|
||||
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QApplication
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
from PyQt5.QtGui import QTextCursor
|
||||
from PyQt5.QtCore import QThread, pyqtSignal
|
||||
from PyQt5.QtWidgets import QHBoxLayout, QComboBox, QPushButton, QTextEdit, QLineEdit, QLabel
|
||||
from PyQt5.QtWidgets import QGroupBox, QGridLayout, QSizePolicy
|
||||
|
||||
# 配置常量
|
||||
REPO_DIR = "MRobot_repo"
|
||||
REPO_URL = "http://gitea.qutrobot.top/robofish/MRobot.git"
|
||||
if getattr(sys, 'frozen', False): # 检查是否为打包后的环境
|
||||
CURRENT_DIR = os.path.dirname(sys.executable) # 使用可执行文件所在目录
|
||||
else:
|
||||
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) # 使用脚本所在目录
|
||||
from qfluentwidgets import (
|
||||
NavigationItemPosition, Theme, FluentWindow, NavigationAvatarWidget,
|
||||
PushButton, FluentIcon
|
||||
)
|
||||
from qfluentwidgets import FluentIcon as FIF
|
||||
|
||||
MDK_ARM_DIR = os.path.join(CURRENT_DIR, "MDK-ARM")
|
||||
USER_DIR = os.path.join(CURRENT_DIR, "User")
|
||||
# ===================== 页面基类 =====================
|
||||
class BaseInterface(QWidget):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
|
||||
class MRobotApp:
|
||||
def __init__(self):
|
||||
self.ioc_data = None
|
||||
self.add_gitignore_var = None # 延迟初始化
|
||||
self.header_file_vars = {}
|
||||
self.task_vars = [] # 用于存储任务的变量
|
||||
# ===================== 首页界面 =====================
|
||||
class HomeInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("homeInterface")
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
# 初始化
|
||||
def initialize(self):
|
||||
print("初始化中,正在克隆仓库...")
|
||||
self.clone_repo()
|
||||
self.ioc_data = self.find_and_read_ioc_file()
|
||||
print("初始化完成,启动主窗口...")
|
||||
self.show_main_window()
|
||||
# ===================== 代码生成页面 =====================
|
||||
class DataInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("dataInterface")
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
# 克隆仓库
|
||||
def clone_repo(self):
|
||||
try:
|
||||
if os.path.exists(REPO_DIR):
|
||||
shutil.rmtree(REPO_DIR)
|
||||
print(f"正在克隆仓库到 {REPO_DIR}(仅克隆当前文件内容)...")
|
||||
Repo.clone_from(REPO_URL, REPO_DIR, multi_options=["--depth=1"])
|
||||
print("仓库克隆成功!")
|
||||
except Exception as e:
|
||||
print(f"克隆仓库时出错: {e}")
|
||||
# ===================== 串口终端界面 =====================
|
||||
class SerialReadThread(QThread):
|
||||
data_received = pyqtSignal(str)
|
||||
|
||||
# 删除克隆的仓库
|
||||
def delete_repo(self):
|
||||
try:
|
||||
if os.path.exists(REPO_DIR):
|
||||
shutil.rmtree(REPO_DIR)
|
||||
print(f"已删除克隆的仓库目录: {REPO_DIR}")
|
||||
except Exception as e:
|
||||
print(f"删除仓库目录时出错: {e}")
|
||||
def __init__(self, ser):
|
||||
super().__init__()
|
||||
self.ser = ser
|
||||
self._running = True
|
||||
|
||||
def run(self):
|
||||
while self._running:
|
||||
if self.ser and self.ser.is_open and self.ser.in_waiting:
|
||||
try:
|
||||
data = self.ser.readline().decode(errors='ignore')
|
||||
self.data_received.emit(data)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 复制文件
|
||||
def copy_file_from_repo(self, src_path, dest_path):
|
||||
try:
|
||||
# 修复路径拼接问题,确保 src_path 不重复包含 REPO_DIR
|
||||
if src_path.startswith(REPO_DIR):
|
||||
full_src_path = src_path
|
||||
else:
|
||||
full_src_path = os.path.join(REPO_DIR, src_path.lstrip(os.sep))
|
||||
def stop(self):
|
||||
self._running = False
|
||||
self.wait()
|
||||
|
||||
# 检查源文件是否存在
|
||||
if not os.path.exists(full_src_path):
|
||||
print(f"文件 {full_src_path} 不存在!(检查路径或仓库内容)")
|
||||
return
|
||||
class SerialTerminalInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("serialTerminalInterface")
|
||||
layout = QVBoxLayout(self)
|
||||
|
||||
# 检查目标路径是否有效
|
||||
if not dest_path or not dest_path.strip():
|
||||
print("目标路径为空或无效,无法复制文件!")
|
||||
return
|
||||
# 串口选择和连接
|
||||
hbox = QHBoxLayout()
|
||||
self.port_combo = QComboBox()
|
||||
self.refresh_ports()
|
||||
self.baud_combo = QComboBox()
|
||||
self.baud_combo.addItems(['9600', '115200', '57600', '38400', '19200', '4800'])
|
||||
self.connect_btn = QPushButton("连接")
|
||||
self.connect_btn.clicked.connect(self.toggle_connection)
|
||||
hbox.addWidget(QLabel("串口:"))
|
||||
hbox.addWidget(self.port_combo)
|
||||
hbox.addWidget(QLabel("波特率:"))
|
||||
hbox.addWidget(self.baud_combo)
|
||||
hbox.addWidget(self.connect_btn)
|
||||
layout.addLayout(hbox)
|
||||
|
||||
# 创建目标目录(如果不存在)
|
||||
dest_dir = os.path.dirname(dest_path)
|
||||
if dest_dir and not os.path.exists(dest_dir):
|
||||
os.makedirs(dest_dir, exist_ok=True)
|
||||
# 预设命令区
|
||||
preset_group = QGroupBox("预设命令")
|
||||
preset_layout = QGridLayout()
|
||||
self.preset_commands = [
|
||||
("复位", "RESET"),
|
||||
("获取版本", "GET_VERSION"),
|
||||
("启动", "START"),
|
||||
("停止", "STOP"),
|
||||
("自检", "SELF_TEST"),
|
||||
("查询状态", "STATUS?"),
|
||||
]
|
||||
for i, (label, cmd) in enumerate(self.preset_commands):
|
||||
btn = QPushButton(label)
|
||||
btn.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Preferred)
|
||||
btn.clicked.connect(lambda _, c=cmd: self.send_preset_command(c))
|
||||
preset_layout.addWidget(btn, i // 3, i % 3)
|
||||
preset_group.setLayout(preset_layout)
|
||||
layout.addWidget(preset_group)
|
||||
|
||||
# 执行文件复制
|
||||
shutil.copy(full_src_path, dest_path)
|
||||
print(f"文件已从 {full_src_path} 复制到 {dest_path}")
|
||||
except Exception as e:
|
||||
print(f"复制文件时出错: {e}")
|
||||
|
||||
# 查找并读取 .ioc 文件
|
||||
def find_and_read_ioc_file(self):
|
||||
try:
|
||||
for file in os.listdir("."):
|
||||
if file.endswith(".ioc"):
|
||||
print(f"找到 .ioc 文件: {file}")
|
||||
with open(file, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
print("未找到 .ioc 文件!")
|
||||
except Exception as e:
|
||||
print(f"读取 .ioc 文件时出错: {e}")
|
||||
return None
|
||||
|
||||
# 检查是否启用了 FreeRTOS
|
||||
def check_freertos_enabled(self, ioc_data):
|
||||
try:
|
||||
return bool(re.search(r"Mcu\.IP\d+=FREERTOS", ioc_data))
|
||||
except Exception as e:
|
||||
print(f"检查 FreeRTOS 配置时出错: {e}")
|
||||
return False
|
||||
|
||||
# 生成操作
|
||||
def generate_action(self):
|
||||
def task():
|
||||
# 检查并创建目录
|
||||
self.create_directories()
|
||||
|
||||
if self.add_gitignore_var.get():
|
||||
self.copy_file_from_repo(".gitignore", ".gitignore")
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
self.copy_file_from_repo("src/freertos.c", os.path.join("Core", "Src", "freertos.c"))
|
||||
|
||||
# 定义需要处理的文件夹
|
||||
folders = ["bsp", "component", "device", "module"]
|
||||
|
||||
# 遍历每个文件夹,复制选中的 .h 和 .c 文件
|
||||
for folder in folders:
|
||||
folder_dir = os.path.join(REPO_DIR, "User", folder)
|
||||
if not os.path.exists(folder_dir):
|
||||
continue # 如果文件夹不存在,跳过
|
||||
|
||||
for file_name in os.listdir(folder_dir):
|
||||
file_base, file_ext = os.path.splitext(file_name)
|
||||
if file_ext not in [".h", ".c"]:
|
||||
continue # 只处理 .h 和 .c 文件
|
||||
|
||||
# 强制复制与文件夹同名的文件
|
||||
if file_base == folder:
|
||||
src_path = os.path.join(folder_dir, file_name)
|
||||
dest_path = os.path.join("User", folder, file_name)
|
||||
self.copy_file_from_repo(src_path, dest_path)
|
||||
continue # 跳过后续检查,直接复制
|
||||
|
||||
# 检查是否选中了对应的文件
|
||||
if file_base in self.header_file_vars and self.header_file_vars[file_base].get():
|
||||
src_path = os.path.join(folder_dir, file_name)
|
||||
dest_path = os.path.join("User", folder, file_name)
|
||||
self.copy_file_from_repo(src_path, dest_path)
|
||||
|
||||
threading.Thread(target=task).start()
|
||||
|
||||
|
||||
|
||||
# 创建必要的目录
|
||||
def create_directories(self):
|
||||
try:
|
||||
directories = [
|
||||
"User/bsp",
|
||||
"User/component",
|
||||
"User/device",
|
||||
"User/module",
|
||||
]
|
||||
# 根据是否启用 FreeRTOS 决定是否创建 User/task
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
directories.append("User/task")
|
||||
|
||||
for directory in directories:
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
print(f"已创建目录: {directory}")
|
||||
else:
|
||||
print(f"目录已存在: {directory}")
|
||||
except Exception as e:
|
||||
print(f"创建目录时出错: {e}")
|
||||
|
||||
|
||||
# 更新 FreeRTOS 状态标签
|
||||
def update_freertos_status(self, label):
|
||||
if self.ioc_data:
|
||||
status = "已启用" if self.check_freertos_enabled(self.ioc_data) else "未启用"
|
||||
else:
|
||||
status = "未检测到 .ioc 文件"
|
||||
label.config(text=f"FreeRTOS 状态: {status}")
|
||||
|
||||
|
||||
|
||||
# 显示主窗口
|
||||
# ...existing code...
|
||||
# ...existing code...
|
||||
|
||||
# 显示主窗口
|
||||
def show_main_window(self):
|
||||
root = tk.Tk()
|
||||
root.title("MRobot 自动生成脚本")
|
||||
root.geometry("1000x650") # 调整窗口大小以适应布局
|
||||
|
||||
# 在窗口关闭时调用 on_closing 方法
|
||||
root.protocol("WM_DELETE_WINDOW", lambda: self.on_closing(root))
|
||||
|
||||
# 初始化 BooleanVar
|
||||
self.add_gitignore_var = tk.BooleanVar(value=False)
|
||||
self.auto_configure_var = tk.BooleanVar(value=False) # 新增复选框变量
|
||||
|
||||
# 创建主框架
|
||||
main_frame = ttk.Frame(root)
|
||||
main_frame.pack(fill="both", expand=True)
|
||||
|
||||
# 添加标题
|
||||
title_label = ttk.Label(main_frame, text="MRobot 自动生成脚本", font=("Arial", 16, "bold"))
|
||||
title_label.pack(pady=10)
|
||||
|
||||
# 添加 FreeRTOS 状态标签
|
||||
freertos_status_label = ttk.Label(main_frame, text="FreeRTOS 状态: 检测中...", font=("Arial", 12))
|
||||
freertos_status_label.pack(pady=10)
|
||||
self.update_freertos_status(freertos_status_label)
|
||||
|
||||
# 模块文件选择和任务管理框架(添加滚动功能)
|
||||
module_task_frame = ttk.Frame(main_frame)
|
||||
module_task_frame.pack(fill="both", expand=True, padx=10, pady=10)
|
||||
|
||||
# 创建 Canvas 和 Scrollbar
|
||||
canvas = tk.Canvas(module_task_frame)
|
||||
scrollbar = ttk.Scrollbar(module_task_frame, orient="vertical", command=canvas.yview)
|
||||
scrollable_frame = ttk.Frame(canvas)
|
||||
|
||||
# 配置滚动区域
|
||||
scrollable_frame.bind(
|
||||
"<Configure>",
|
||||
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
|
||||
# 显示区
|
||||
self.text_edit = QTextEdit()
|
||||
self.text_edit.setReadOnly(True)
|
||||
self.text_edit.setStyleSheet(
|
||||
"background-color: #1e1e1e; color: #d4d4d4; font-family: Consolas, 'Courier New', monospace; font-size: 14px;"
|
||||
)
|
||||
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
|
||||
canvas.configure(yscrollcommand=scrollbar.set)
|
||||
|
||||
# 绑定鼠标滚轮事件
|
||||
def on_mouse_wheel(event):
|
||||
canvas.yview_scroll(-1 * int(event.delta / 120), "units")
|
||||
|
||||
canvas.bind_all("<MouseWheel>", on_mouse_wheel)
|
||||
|
||||
# 布局 Canvas 和 Scrollbar
|
||||
canvas.pack(side="left", fill="both", expand=True)
|
||||
scrollbar.pack(side="right", fill="y")
|
||||
|
||||
# 左右布局:模块文件选择框和任务管理框
|
||||
left_frame = ttk.Frame(scrollable_frame)
|
||||
left_frame.pack(side="left", fill="both", expand=True, padx=5, pady=5)
|
||||
|
||||
right_frame = ttk.Frame(scrollable_frame)
|
||||
right_frame.pack(side="right", fill="both", expand=True, padx=5, pady=5)
|
||||
|
||||
# 模块文件选择框
|
||||
header_files_frame = ttk.LabelFrame(left_frame, text="模块文件选择", padding=(10, 10))
|
||||
header_files_frame.pack(fill="both", expand=True, padx=5)
|
||||
self.header_files_frame = header_files_frame
|
||||
self.update_header_files()
|
||||
|
||||
# 任务管理框
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
task_frame = ttk.LabelFrame(right_frame, text="任务管理", padding=(10, 10))
|
||||
task_frame.pack(fill="both", expand=True, padx=5)
|
||||
self.task_frame = task_frame
|
||||
self.update_task_ui()
|
||||
|
||||
# 添加消息框和生成按钮在同一行
|
||||
bottom_frame = ttk.Frame(main_frame)
|
||||
bottom_frame.pack(fill="x", pady=10, side="bottom")
|
||||
|
||||
# 消息框
|
||||
self.message_box = tk.Text(bottom_frame, wrap="word", state="disabled", height=5, width=60)
|
||||
self.message_box.pack(side="left", fill="x", expand=True, padx=5, pady=5)
|
||||
|
||||
# 生成按钮和复选框选项
|
||||
button_frame = ttk.Frame(bottom_frame)
|
||||
button_frame.pack(side="right", padx=10)
|
||||
|
||||
# 添加复选框容器(横向排列复选框)
|
||||
checkbox_frame = ttk.Frame(button_frame)
|
||||
checkbox_frame.pack(side="top", pady=5)
|
||||
|
||||
# 添加 .gitignore 复选框(左侧)
|
||||
ttk.Checkbutton(checkbox_frame, text=".gitignore", variable=self.add_gitignore_var).pack(side="left", padx=5)
|
||||
|
||||
# 添加自动配置环境复选框(右侧)
|
||||
ttk.Checkbutton(checkbox_frame, text="自动环境", variable=self.auto_configure_var).pack(side="left", padx=5)
|
||||
|
||||
# 添加生成按钮(竖向排列在复选框下方)
|
||||
generate_button = ttk.Button(button_frame, text="一键生成MRobot代码", command=self.generate_action)
|
||||
generate_button.pack(side="top", pady=10)
|
||||
generate_button.config(width=25) # 设置按钮宽度
|
||||
|
||||
# 重定向输出到消息框
|
||||
self.redirect_output()
|
||||
|
||||
# 打印欢迎信息
|
||||
print("欢迎使用 MRobot 自动生成脚本!")
|
||||
print("请根据需要选择模块文件和任务。")
|
||||
print("点击“一键生成MRobot代码”按钮开始生成。")
|
||||
|
||||
# 启动 Tkinter 主事件循环
|
||||
root.mainloop()
|
||||
|
||||
# ...existing code...
|
||||
# ...existing code...
|
||||
layout.addWidget(self.text_edit)
|
||||
|
||||
def redirect_output(self):
|
||||
"""
|
||||
重定向标准输出到消息框
|
||||
"""
|
||||
class TextRedirector:
|
||||
def __init__(self, text_widget):
|
||||
self.text_widget = text_widget
|
||||
# 输入区
|
||||
input_hbox = QHBoxLayout()
|
||||
self.input_line = QLineEdit()
|
||||
self.input_line.setPlaceholderText("输入内容,回车发送")
|
||||
self.input_line.returnPressed.connect(self.send_data)
|
||||
send_btn = QPushButton("发送")
|
||||
send_btn.clicked.connect(self.send_data)
|
||||
input_hbox.addWidget(self.input_line)
|
||||
input_hbox.addWidget(send_btn)
|
||||
layout.addLayout(input_hbox)
|
||||
|
||||
def write(self, message):
|
||||
self.text_widget.config(state="normal")
|
||||
self.text_widget.insert("end", message)
|
||||
self.text_widget.see("end")
|
||||
self.text_widget.config(state="disabled")
|
||||
self.ser = None
|
||||
self.read_thread = None
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
def send_preset_command(self, cmd):
|
||||
self.input_line.setText(cmd)
|
||||
self.send_data()
|
||||
|
||||
sys.stdout = TextRedirector(self.message_box)
|
||||
sys.stderr = TextRedirector(self.message_box)
|
||||
def refresh_ports(self):
|
||||
self.port_combo.clear()
|
||||
ports = serial.tools.list_ports.comports()
|
||||
for port in ports:
|
||||
self.port_combo.addItem(port.device)
|
||||
|
||||
# 修改 update_task_ui 方法
|
||||
def update_task_ui(self):
|
||||
# 检查是否有已存在的任务文件
|
||||
task_dir = os.path.join("User", "task")
|
||||
if os.path.exists(task_dir):
|
||||
for file_name in os.listdir(task_dir):
|
||||
file_base, file_ext = os.path.splitext(file_name)
|
||||
if file_ext == ".c" and file_base not in ["init", "user_task"] and file_base not in [task_var.get() for task_var, _ in self.task_vars]:
|
||||
frequency = 100 # 默认频率
|
||||
user_task_header_path = os.path.join("User", "task", "user_task.h")
|
||||
if os.path.exists(user_task_header_path):
|
||||
try:
|
||||
with open(user_task_header_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
pattern = rf"#define\s+TASK_FREQ_{file_base.upper()}\s*\((\d+)[uU]?\)"
|
||||
match = re.search(pattern, content)
|
||||
if match:
|
||||
frequency = int(match.group(1))
|
||||
print(f"从 user_task.h 文件中读取到任务 {file_base} 的频率: {frequency}")
|
||||
except Exception as e:
|
||||
print(f"读取 user_task.h 文件时出错: {e}")
|
||||
def toggle_connection(self):
|
||||
if self.ser and self.ser.is_open:
|
||||
self.disconnect_serial()
|
||||
else:
|
||||
self.connect_serial()
|
||||
|
||||
new_task_var = tk.StringVar(value=file_base)
|
||||
self.task_vars.append((new_task_var, tk.IntVar(value=frequency)))
|
||||
|
||||
# 清空任务框架中的所有子组件
|
||||
for widget in self.task_frame.winfo_children():
|
||||
widget.destroy()
|
||||
|
||||
|
||||
# 设置任务管理框的固定宽度
|
||||
self.task_frame.config(width=400)
|
||||
|
||||
# 显示任务列表
|
||||
for i, (task_var, freq_var) in enumerate(self.task_vars):
|
||||
task_row = ttk.Frame(self.task_frame, width=400)
|
||||
task_row.pack(fill="x", pady=5)
|
||||
|
||||
ttk.Entry(task_row, textvariable=task_var, width=20).pack(side="left", padx=5)
|
||||
ttk.Label(task_row, text="频率:").pack(side="left", padx=5)
|
||||
ttk.Spinbox(task_row, from_=1, to=1000, textvariable=freq_var, width=5).pack(side="left", padx=5)
|
||||
ttk.Button(task_row, text="删除", command=lambda idx=i: self.remove_task(idx)).pack(side="left", padx=5)
|
||||
|
||||
# 添加新任务按钮
|
||||
add_task_button = ttk.Button(self.task_frame, text="添加任务", command=self.add_task)
|
||||
add_task_button.pack(pady=10)
|
||||
|
||||
|
||||
# 修改 add_task 方法
|
||||
def add_task(self):
|
||||
new_task_var = tk.StringVar(value=f"Task_{len(self.task_vars) + 1}")
|
||||
new_freq_var = tk.IntVar(value=100) # 默认频率为 100
|
||||
self.task_vars.append((new_task_var, new_freq_var))
|
||||
self.update_task_ui()
|
||||
|
||||
# 修改 remove_task 方法
|
||||
def remove_task(self, idx):
|
||||
del self.task_vars[idx]
|
||||
self.update_task_ui()
|
||||
|
||||
# 更新文件夹显示
|
||||
def update_folder_display(self):
|
||||
for widget in self.folder_frame.winfo_children():
|
||||
widget.destroy()
|
||||
|
||||
folders = ["User/bsp", "User/component", "User/device", "User/module"]
|
||||
# if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
# folders.append("User/task")
|
||||
|
||||
for folder in folders:
|
||||
# 去掉 "User/" 前缀
|
||||
display_name = folder.replace("User/", "")
|
||||
tk.Label(self.folder_frame, text=display_name).pack()
|
||||
|
||||
# 更新 .h 文件复选框
|
||||
def update_header_files(self):
|
||||
for widget in self.header_files_frame.winfo_children():
|
||||
widget.destroy()
|
||||
|
||||
folders = ["bsp", "component", "device", "module"]
|
||||
dependencies = defaultdict(list)
|
||||
|
||||
for folder in folders:
|
||||
folder_dir = os.path.join(REPO_DIR, "User", folder)
|
||||
if os.path.exists(folder_dir):
|
||||
dependencies_file = os.path.join(folder_dir, "dependencies.csv")
|
||||
if os.path.exists(dependencies_file):
|
||||
with open(dependencies_file, "r", encoding="utf-8") as f:
|
||||
reader = csv.reader(f)
|
||||
for row in reader:
|
||||
if len(row) == 2:
|
||||
dependencies[row[0]].append(row[1])
|
||||
|
||||
# 创建复选框
|
||||
for folder in folders:
|
||||
folder_dir = os.path.join(REPO_DIR, "User", folder)
|
||||
if os.path.exists(folder_dir):
|
||||
module_frame = ttk.LabelFrame(self.header_files_frame, text=folder.capitalize(), padding=(10, 10))
|
||||
module_frame.pack(fill="x", pady=5)
|
||||
|
||||
row, col = 0, 0
|
||||
for file in os.listdir(folder_dir):
|
||||
file_base, file_ext = os.path.splitext(file)
|
||||
if file_ext == ".h" and file_base != folder:
|
||||
var = tk.BooleanVar(value=False)
|
||||
self.header_file_vars[file_base] = var
|
||||
|
||||
checkbox = ttk.Checkbutton(
|
||||
module_frame,
|
||||
text=file_base,
|
||||
variable=var,
|
||||
command=lambda fb=file_base: self.handle_dependencies(fb, dependencies)
|
||||
)
|
||||
checkbox.grid(row=row, column=col, padx=5, pady=5, sticky="w")
|
||||
col += 1
|
||||
if col >= 6:
|
||||
col = 0
|
||||
row += 1
|
||||
|
||||
|
||||
|
||||
def handle_dependencies(self, file_base, dependencies):
|
||||
"""
|
||||
根据依赖关系自动勾选相关模块
|
||||
"""
|
||||
if file_base in self.header_file_vars and self.header_file_vars[file_base].get():
|
||||
# 如果当前模块被选中,自动勾选其依赖项
|
||||
for dependency in dependencies.get(file_base, []):
|
||||
dep_base = os.path.basename(dependency)
|
||||
if dep_base in self.header_file_vars:
|
||||
self.header_file_vars[dep_base].set(True)
|
||||
|
||||
# 在 MRobotApp 类中添加以下方法
|
||||
def generate_task_files(self):
|
||||
def connect_serial(self):
|
||||
port = self.port_combo.currentText()
|
||||
baud = int(self.baud_combo.currentText())
|
||||
try:
|
||||
template_file_path = os.path.join(REPO_DIR, "User", "task", "task.c.template")
|
||||
task_dir = os.path.join("User", "task")
|
||||
|
||||
if not os.path.exists(template_file_path):
|
||||
print(f"模板文件 {template_file_path} 不存在,无法生成 task.c 文件!")
|
||||
return
|
||||
|
||||
os.makedirs(task_dir, exist_ok=True)
|
||||
|
||||
with open(template_file_path, "r", encoding="utf-8") as f:
|
||||
template_content = f.read()
|
||||
|
||||
# 为每个任务生成对应的 task.c 文件
|
||||
for task_var, _ in self.task_vars: # 解包元组
|
||||
task_name = f"Task_{task_var.get()}" # 添加前缀 Task_
|
||||
task_file_path = os.path.join(task_dir, f"{task_var.get().lower()}.c") # 文件名保持原始小写
|
||||
|
||||
# 替换模板中的占位符
|
||||
task_content = template_content.replace("{{task_name}}", task_name)
|
||||
task_content = task_content.replace("{{task_function}}", task_name)
|
||||
task_content = task_content.replace(
|
||||
"{{task_frequency}}", f"TASK_FREQ_{task_var.get().upper()}"
|
||||
) # 替换为 user_task.h 中的宏定义
|
||||
task_content = task_content.replace("{{task_delay}}", f"TASK_INIT_DELAY_{task_var.get().upper()}")
|
||||
|
||||
with open(task_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(task_content)
|
||||
|
||||
print(f"已成功生成 {task_file_path} 文件!")
|
||||
self.ser = serial.Serial(port, baud, timeout=0.1)
|
||||
self.connect_btn.setText("断开")
|
||||
self.text_edit.append(f"已连接到 {port} @ {baud}")
|
||||
self.read_thread = SerialReadThread(self.ser)
|
||||
self.read_thread.data_received.connect(self.display_data)
|
||||
self.read_thread.start()
|
||||
except Exception as e:
|
||||
print(f"生成 task.c 文件时出错: {e}")
|
||||
# 修改 user_task.c 文件
|
||||
def modify_user_task_file(self):
|
||||
try:
|
||||
template_file_path = os.path.join(REPO_DIR, "User", "task", "user_task.c.template")
|
||||
generated_task_file_path = os.path.join("User", "task", "user_task.c")
|
||||
|
||||
if not os.path.exists(template_file_path):
|
||||
print(f"模板文件 {template_file_path} 不存在,无法生成 user_task.c 文件!")
|
||||
return
|
||||
|
||||
os.makedirs(os.path.dirname(generated_task_file_path), exist_ok=True)
|
||||
|
||||
with open(template_file_path, "r", encoding="utf-8") as f:
|
||||
template_content = f.read()
|
||||
|
||||
# 生成任务属性定义
|
||||
task_attr_definitions = "\n".join([
|
||||
f"""const osThreadAttr_t attr_{task_var.get().lower()} = {{
|
||||
.name = "{task_var.get()}",
|
||||
.priority = osPriorityNormal,
|
||||
.stack_size = 128 * 4,
|
||||
}};"""
|
||||
for task_var, _ in self.task_vars # 解包元组
|
||||
])
|
||||
|
||||
# 替换模板中的占位符
|
||||
task_content = template_content.replace("{{task_attr_definitions}}", task_attr_definitions)
|
||||
|
||||
with open(generated_task_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(task_content)
|
||||
|
||||
print(f"已成功生成 {generated_task_file_path} 文件!")
|
||||
except Exception as e:
|
||||
print(f"修改 user_task.c 文件时出错: {e}")
|
||||
# ...existing code...
|
||||
|
||||
def generate_user_task_header(self):
|
||||
try:
|
||||
template_file_path = os.path.join(REPO_DIR, "User", "task", "user_task.h.template")
|
||||
header_file_path = os.path.join("User", "task", "user_task.h")
|
||||
|
||||
if not os.path.exists(template_file_path):
|
||||
print(f"模板文件 {template_file_path} 不存在,无法生成 user_task.h 文件!")
|
||||
return
|
||||
|
||||
os.makedirs(os.path.dirname(header_file_path), exist_ok=True)
|
||||
|
||||
# 如果 user_task.h 已存在,提取 /* USER MESSAGE BEGIN */ 和 /* USER MESSAGE END */ 区域内容
|
||||
existing_msgq_content = ""
|
||||
if os.path.exists(header_file_path):
|
||||
with open(header_file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
# 提取 /* USER MESSAGE BEGIN */ 和 /* USER MESSAGE END */ 区域内容
|
||||
match = re.search(r"/\* USER MESSAGE BEGIN \*/\s*(.*?)\s*/\* USER MESSAGE END \*/", content, re.DOTALL)
|
||||
if match:
|
||||
existing_msgq_content = match.group(1).strip()
|
||||
print("已存在的 msgq 区域内容:")
|
||||
print(existing_msgq_content)
|
||||
|
||||
with open(template_file_path, "r", encoding="utf-8") as f:
|
||||
template_content = f.read()
|
||||
|
||||
# 定义占位符内容
|
||||
thread_definitions = "\n".join([f" osThreadId_t {task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
msgq_definitions = existing_msgq_content if existing_msgq_content else " osMessageQueueId_t default_msgq;"
|
||||
freq_definitions = "\n".join([f" float {task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
last_up_time_definitions = "\n".join([f" uint32_t {task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
task_attr_declarations = "\n".join([f"extern const osThreadAttr_t attr_{task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
task_function_declarations = "\n".join([f"void Task_{task_var.get()}(void *argument);" for task_var, _ in self.task_vars])
|
||||
task_frequency_definitions = "\n".join([
|
||||
f"#define TASK_FREQ_{task_var.get().upper()} ({freq_var.get()}u)"
|
||||
for task_var, freq_var in self.task_vars
|
||||
])
|
||||
task_init_delay_definitions = "\n".join([f"#define TASK_INIT_DELAY_{task_var.get().upper()} (0u)" for task_var, _ in self.task_vars])
|
||||
task_handle_definitions = "\n".join([f" osThreadId_t {task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
|
||||
# 替换模板中的占位符
|
||||
header_content = template_content.replace("{{thread_definitions}}", thread_definitions)
|
||||
header_content = header_content.replace("{{msgq_definitions}}", msgq_definitions)
|
||||
header_content = header_content.replace("{{freq_definitions}}", freq_definitions)
|
||||
header_content = header_content.replace("{{last_up_time_definitions}}", last_up_time_definitions)
|
||||
header_content = header_content.replace("{{task_attr_declarations}}", task_attr_declarations)
|
||||
header_content = header_content.replace("{{task_function_declarations}}", task_function_declarations)
|
||||
header_content = header_content.replace("{{task_frequency_definitions}}", task_frequency_definitions)
|
||||
header_content = header_content.replace("{{task_init_delay_definitions}}", task_init_delay_definitions)
|
||||
header_content = header_content.replace("{{task_handle_definitions}}", task_handle_definitions)
|
||||
|
||||
# 如果存在 /* USER MESSAGE BEGIN */ 区域内容,则保留
|
||||
if existing_msgq_content:
|
||||
header_content = re.sub(
|
||||
r"/\* USER MESSAGE BEGIN \*/\s*.*?\s*/\* USER MESSAGE END \*/",
|
||||
f"/* USER MESSAGE BEGIN */\n\n {existing_msgq_content}\n\n /* USER MESSAGE END */",
|
||||
header_content,
|
||||
flags=re.DOTALL
|
||||
)
|
||||
|
||||
with open(header_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(header_content)
|
||||
|
||||
print(f"已成功生成 {header_file_path} 文件!")
|
||||
except Exception as e:
|
||||
print(f"生成 user_task.h 文件时出错: {e}")
|
||||
self.text_edit.append(f"连接失败: {e}")
|
||||
|
||||
def generate_init_file(self):
|
||||
try:
|
||||
template_file_path = os.path.join(REPO_DIR, "User", "task", "init.c.template")
|
||||
generated_file_path = os.path.join("User", "task", "init.c")
|
||||
|
||||
if not os.path.exists(template_file_path):
|
||||
print(f"模板文件 {template_file_path} 不存在,无法生成 init.c 文件!")
|
||||
return
|
||||
|
||||
os.makedirs(os.path.dirname(generated_file_path), exist_ok=True)
|
||||
|
||||
# 如果 init.c 已存在,提取 /* USER MESSAGE BEGIN */ 和 /* USER MESSAGE END */ 区域内容
|
||||
existing_msgq_content = ""
|
||||
if os.path.exists(generated_file_path):
|
||||
with open(generated_file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
# 提取 /* USER MESSAGE BEGIN */ 和 /* USER MESSAGE END */ 区域内容
|
||||
match = re.search(r"/\* USER MESSAGE BEGIN \*/\s*(.*?)\s*/\* USER MESSAGE END \*/", content, re.DOTALL)
|
||||
if match:
|
||||
existing_msgq_content = match.group(1).strip()
|
||||
print("已存在的消息队列区域内容:")
|
||||
print(existing_msgq_content)
|
||||
|
||||
with open(template_file_path, "r", encoding="utf-8") as f:
|
||||
template_content = f.read()
|
||||
|
||||
# 生成任务创建代码
|
||||
thread_creation_code = "\n".join([
|
||||
f" task_runtime.thread.{task_var.get().lower()} = osThreadNew(Task_{task_var.get()}, NULL, &attr_{task_var.get().lower()});"
|
||||
for task_var, _ in self.task_vars # 解包元组
|
||||
])
|
||||
|
||||
# 替换模板中的占位符
|
||||
init_content = template_content.replace("{{thread_creation_code}}", thread_creation_code)
|
||||
|
||||
# 如果存在 /* USER MESSAGE BEGIN */ 区域内容,则保留
|
||||
if existing_msgq_content:
|
||||
init_content = re.sub(
|
||||
r"/\* USER MESSAGE BEGIN \*/\s*.*?\s*/\* USER MESSAGE END \*/",
|
||||
f"/* USER MESSAGE BEGIN */\n {existing_msgq_content}\n /* USER MESSAGE END */",
|
||||
init_content,
|
||||
flags=re.DOTALL
|
||||
)
|
||||
|
||||
with open(generated_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(init_content)
|
||||
|
||||
print(f"已成功生成 {generated_file_path} 文件!")
|
||||
except Exception as e:
|
||||
print(f"生成 init.c 文件时出错: {e}")
|
||||
def disconnect_serial(self):
|
||||
if self.read_thread:
|
||||
self.read_thread.stop()
|
||||
self.read_thread = None
|
||||
if self.ser:
|
||||
self.ser.close()
|
||||
self.ser = None
|
||||
self.connect_btn.setText("连接")
|
||||
self.text_edit.append("已断开连接")
|
||||
|
||||
# 修改 generate_action 方法
|
||||
|
||||
def generate_action(self):
|
||||
def task():
|
||||
# 检查并创建目录(与 FreeRTOS 状态无关的模块始终创建)
|
||||
self.create_directories()
|
||||
def display_data(self, data):
|
||||
self.text_edit.moveCursor(QTextCursor.End)
|
||||
self.text_edit.insertPlainText(data)
|
||||
self.text_edit.moveCursor(QTextCursor.End)
|
||||
|
||||
# 复制 .gitignore 文件
|
||||
if self.add_gitignore_var.get():
|
||||
self.copy_file_from_repo(".gitignore", ".gitignore")
|
||||
def send_data(self):
|
||||
if self.ser and self.ser.is_open:
|
||||
text = self.input_line.text()
|
||||
try:
|
||||
if not text:
|
||||
# 内容为空,只发送换行
|
||||
self.ser.write('\n'.encode())
|
||||
else:
|
||||
# 逐字符发送
|
||||
for char in text:
|
||||
self.ser.write(char.encode())
|
||||
# 结尾加换行
|
||||
self.ser.write('\n'.encode())
|
||||
except Exception as e:
|
||||
self.text_edit.append(f"发送失败: {e}")
|
||||
self.input_line.clear()
|
||||
|
||||
# 如果启用了 FreeRTOS,复制相关文件
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
self.copy_file_from_repo("src/freertos.c", os.path.join("Core", "Src", "freertos.c"))
|
||||
# ===================== 设置界面 =====================
|
||||
class SettingInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("settingInterface")
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
# 定义需要处理的文件夹(与 FreeRTOS 状态无关)
|
||||
folders = ["bsp", "component", "device", "module"]
|
||||
# ===================== 帮助与关于界面 =====================
|
||||
class HelpInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("helpInterface")
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
# 遍历每个文件夹,复制选中的 .h 和 .c 文件
|
||||
for folder in folders:
|
||||
folder_dir = os.path.join(REPO_DIR, "User", folder)
|
||||
if not os.path.exists(folder_dir):
|
||||
continue # 如果文件夹不存在,跳过
|
||||
class AboutInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("aboutInterface")
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
for file_name in os.listdir(folder_dir):
|
||||
file_base, file_ext = os.path.splitext(file_name)
|
||||
if file_ext not in [".h", ".c"]:
|
||||
continue # 只处理 .h 和 .c 文件
|
||||
# ===================== 主窗口与导航 =====================
|
||||
class MainWindow(FluentWindow):
|
||||
themeChanged = pyqtSignal(Theme)
|
||||
|
||||
# 强制复制与文件夹同名的文件
|
||||
if file_base == folder:
|
||||
src_path = os.path.join(folder_dir, file_name)
|
||||
dest_path = os.path.join("User", folder, file_name)
|
||||
self.copy_file_from_repo(src_path, dest_path)
|
||||
print(f"强制复制与文件夹同名的文件: {file_name}")
|
||||
continue # 跳过后续检查,直接复制
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.setWindowTitle("MR_ToolBox")
|
||||
self.resize(1000, 700)
|
||||
self.setMinimumSize(800, 600)
|
||||
|
||||
# 检查是否选中了对应的文件
|
||||
if file_base in self.header_file_vars and self.header_file_vars[file_base].get():
|
||||
src_path = os.path.join(folder_dir, file_name)
|
||||
dest_path = os.path.join("User", folder, file_name)
|
||||
self.copy_file_from_repo(src_path, dest_path)
|
||||
self.page_registry = [
|
||||
(HomeInterface(self), FIF.HOME, "首页", NavigationItemPosition.TOP),
|
||||
(DataInterface(self), FIF.LIBRARY, "MRobot代码生成", NavigationItemPosition.SCROLL),
|
||||
(SerialTerminalInterface(self), FIF.COMMAND_PROMPT, "串口终端", NavigationItemPosition.SCROLL),
|
||||
(SettingInterface(self), FIF.SETTING, "设置", NavigationItemPosition.BOTTOM),
|
||||
(HelpInterface(self), FIF.HELP, "帮助", NavigationItemPosition.BOTTOM),
|
||||
(AboutInterface(self), FIF.INFO, "关于", NavigationItemPosition.BOTTOM),
|
||||
]
|
||||
self.initNavigation()
|
||||
|
||||
# 如果启用了 FreeRTOS,执行任务相关的生成逻辑
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
# 修改 user_task.c 文件
|
||||
self.modify_user_task_file()
|
||||
def initNavigation(self):
|
||||
for page, icon, name, position in self.page_registry:
|
||||
self.addSubInterface(page, icon, name, position)
|
||||
self.navigationInterface.addSeparator()
|
||||
avatar = NavigationAvatarWidget('用户', ':/qfluentwidgets/images/avatar.png')
|
||||
self.navigationInterface.addWidget(
|
||||
routeKey='avatar',
|
||||
widget=avatar,
|
||||
onClick=None,
|
||||
position=NavigationItemPosition.BOTTOM
|
||||
)
|
||||
|
||||
# 生成 user_task.h 文件
|
||||
self.generate_user_task_header()
|
||||
# ===================== 程序入口 =====================
|
||||
def main():
|
||||
app = QApplication(sys.argv)
|
||||
window = MainWindow()
|
||||
window.show()
|
||||
sys.exit(app.exec_())
|
||||
|
||||
# 生成 init.c 文件
|
||||
self.generate_init_file()
|
||||
|
||||
# 生成 task.c 文件
|
||||
self.generate_task_files()
|
||||
|
||||
# # 自动配置环境
|
||||
# if self.auto_configure_var.get():
|
||||
|
||||
# self.auto_configure_environment()
|
||||
|
||||
|
||||
threading.Thread(target=task).start()
|
||||
|
||||
# 程序关闭时清理
|
||||
def on_closing(self, root):
|
||||
self.delete_repo()
|
||||
root.destroy()
|
||||
|
||||
|
||||
# 程序入口
|
||||
if __name__ == "__main__":
|
||||
app = MRobotApp()
|
||||
app.initialize()
|
||||
if __name__ == '__main__':
|
||||
main()
|
719
MRobot_old.py
Normal file
719
MRobot_old.py
Normal file
@ -0,0 +1,719 @@
|
||||
import tkinter as tk
|
||||
from tkinter import ttk
|
||||
from PIL import Image, ImageTk
|
||||
import sys
|
||||
import os
|
||||
import threading
|
||||
import shutil
|
||||
import re
|
||||
from git import Repo
|
||||
from collections import defaultdict
|
||||
import csv
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
# 配置常量
|
||||
REPO_DIR = "MRobot_repo"
|
||||
REPO_URL = "http://gitea.qutrobot.top/robofish/MRobot.git"
|
||||
if getattr(sys, 'frozen', False): # 检查是否为打包后的环境
|
||||
CURRENT_DIR = os.path.dirname(sys.executable) # 使用可执行文件所在目录
|
||||
else:
|
||||
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) # 使用脚本所在目录
|
||||
|
||||
MDK_ARM_DIR = os.path.join(CURRENT_DIR, "MDK-ARM")
|
||||
USER_DIR = os.path.join(CURRENT_DIR, "User")
|
||||
|
||||
class MRobotApp:
|
||||
def __init__(self):
|
||||
self.ioc_data = None
|
||||
self.add_gitignore_var = None # 延迟初始化
|
||||
self.header_file_vars = {}
|
||||
self.task_vars = [] # 用于存储任务的变量
|
||||
|
||||
# 初始化
|
||||
def initialize(self):
|
||||
print("初始化中,正在克隆仓库...")
|
||||
self.clone_repo()
|
||||
self.ioc_data = self.find_and_read_ioc_file()
|
||||
print("初始化完成,启动主窗口...")
|
||||
self.show_main_window()
|
||||
|
||||
# 克隆仓库
|
||||
def clone_repo(self):
|
||||
try:
|
||||
if os.path.exists(REPO_DIR):
|
||||
shutil.rmtree(REPO_DIR)
|
||||
print(f"正在克隆仓库到 {REPO_DIR}(仅克隆当前文件内容)...")
|
||||
Repo.clone_from(REPO_URL, REPO_DIR, multi_options=["--depth=1"])
|
||||
print("仓库克隆成功!")
|
||||
except Exception as e:
|
||||
print(f"克隆仓库时出错: {e}")
|
||||
|
||||
# 删除克隆的仓库
|
||||
def delete_repo(self):
|
||||
try:
|
||||
if os.path.exists(REPO_DIR):
|
||||
shutil.rmtree(REPO_DIR)
|
||||
print(f"已删除克隆的仓库目录: {REPO_DIR}")
|
||||
except Exception as e:
|
||||
print(f"删除仓库目录时出错: {e}")
|
||||
|
||||
|
||||
# 复制文件
|
||||
def copy_file_from_repo(self, src_path, dest_path):
|
||||
try:
|
||||
# 修复路径拼接问题,确保 src_path 不重复包含 REPO_DIR
|
||||
if src_path.startswith(REPO_DIR):
|
||||
full_src_path = src_path
|
||||
else:
|
||||
full_src_path = os.path.join(REPO_DIR, src_path.lstrip(os.sep))
|
||||
|
||||
# 检查源文件是否存在
|
||||
if not os.path.exists(full_src_path):
|
||||
print(f"文件 {full_src_path} 不存在!(检查路径或仓库内容)")
|
||||
return
|
||||
|
||||
# 检查目标路径是否有效
|
||||
if not dest_path or not dest_path.strip():
|
||||
print("目标路径为空或无效,无法复制文件!")
|
||||
return
|
||||
|
||||
# 创建目标目录(如果不存在)
|
||||
dest_dir = os.path.dirname(dest_path)
|
||||
if dest_dir and not os.path.exists(dest_dir):
|
||||
os.makedirs(dest_dir, exist_ok=True)
|
||||
|
||||
# 执行文件复制
|
||||
shutil.copy(full_src_path, dest_path)
|
||||
print(f"文件已从 {full_src_path} 复制到 {dest_path}")
|
||||
except Exception as e:
|
||||
print(f"复制文件时出错: {e}")
|
||||
|
||||
# 查找并读取 .ioc 文件
|
||||
def find_and_read_ioc_file(self):
|
||||
try:
|
||||
for file in os.listdir("."):
|
||||
if file.endswith(".ioc"):
|
||||
print(f"找到 .ioc 文件: {file}")
|
||||
with open(file, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
print("未找到 .ioc 文件!")
|
||||
except Exception as e:
|
||||
print(f"读取 .ioc 文件时出错: {e}")
|
||||
return None
|
||||
|
||||
# 检查是否启用了 FreeRTOS
|
||||
def check_freertos_enabled(self, ioc_data):
|
||||
try:
|
||||
return bool(re.search(r"Mcu\.IP\d+=FREERTOS", ioc_data))
|
||||
except Exception as e:
|
||||
print(f"检查 FreeRTOS 配置时出错: {e}")
|
||||
return False
|
||||
|
||||
# 生成操作
|
||||
def generate_action(self):
|
||||
def task():
|
||||
# 检查并创建目录
|
||||
self.create_directories()
|
||||
|
||||
if self.add_gitignore_var.get():
|
||||
self.copy_file_from_repo(".gitignore", ".gitignore")
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
self.copy_file_from_repo("src/freertos.c", os.path.join("Core", "Src", "freertos.c"))
|
||||
|
||||
# 定义需要处理的文件夹
|
||||
folders = ["bsp", "component", "device", "module"]
|
||||
|
||||
# 遍历每个文件夹,复制选中的 .h 和 .c 文件
|
||||
for folder in folders:
|
||||
folder_dir = os.path.join(REPO_DIR, "User", folder)
|
||||
if not os.path.exists(folder_dir):
|
||||
continue # 如果文件夹不存在,跳过
|
||||
|
||||
for file_name in os.listdir(folder_dir):
|
||||
file_base, file_ext = os.path.splitext(file_name)
|
||||
if file_ext not in [".h", ".c"]:
|
||||
continue # 只处理 .h 和 .c 文件
|
||||
|
||||
# 强制复制与文件夹同名的文件
|
||||
if file_base == folder:
|
||||
src_path = os.path.join(folder_dir, file_name)
|
||||
dest_path = os.path.join("User", folder, file_name)
|
||||
self.copy_file_from_repo(src_path, dest_path)
|
||||
continue # 跳过后续检查,直接复制
|
||||
|
||||
# 检查是否选中了对应的文件
|
||||
if file_base in self.header_file_vars and self.header_file_vars[file_base].get():
|
||||
src_path = os.path.join(folder_dir, file_name)
|
||||
dest_path = os.path.join("User", folder, file_name)
|
||||
self.copy_file_from_repo(src_path, dest_path)
|
||||
|
||||
threading.Thread(target=task).start()
|
||||
|
||||
|
||||
|
||||
# 创建必要的目录
|
||||
def create_directories(self):
|
||||
try:
|
||||
directories = [
|
||||
"User/bsp",
|
||||
"User/component",
|
||||
"User/device",
|
||||
"User/module",
|
||||
]
|
||||
# 根据是否启用 FreeRTOS 决定是否创建 User/task
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
directories.append("User/task")
|
||||
|
||||
for directory in directories:
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
print(f"已创建目录: {directory}")
|
||||
else:
|
||||
print(f"目录已存在: {directory}")
|
||||
except Exception as e:
|
||||
print(f"创建目录时出错: {e}")
|
||||
|
||||
|
||||
# 更新 FreeRTOS 状态标签
|
||||
def update_freertos_status(self, label):
|
||||
if self.ioc_data:
|
||||
status = "已启用" if self.check_freertos_enabled(self.ioc_data) else "未启用"
|
||||
else:
|
||||
status = "未检测到 .ioc 文件"
|
||||
label.config(text=f"FreeRTOS 状态: {status}")
|
||||
|
||||
|
||||
|
||||
# 显示主窗口
|
||||
# ...existing code...
|
||||
# ...existing code...
|
||||
|
||||
# 显示主窗口
|
||||
def show_main_window(self):
|
||||
root = tk.Tk()
|
||||
root.title("MRobot 自动生成脚本")
|
||||
root.geometry("1000x650") # 调整窗口大小以适应布局
|
||||
|
||||
# 在窗口关闭时调用 on_closing 方法
|
||||
root.protocol("WM_DELETE_WINDOW", lambda: self.on_closing(root))
|
||||
|
||||
# 初始化 BooleanVar
|
||||
self.add_gitignore_var = tk.BooleanVar(value=False)
|
||||
self.auto_configure_var = tk.BooleanVar(value=False) # 新增复选框变量
|
||||
|
||||
# 创建主框架
|
||||
main_frame = ttk.Frame(root)
|
||||
main_frame.pack(fill="both", expand=True)
|
||||
|
||||
# 添加标题
|
||||
title_label = ttk.Label(main_frame, text="MRobot 自动生成脚本", font=("Arial", 16, "bold"))
|
||||
title_label.pack(pady=10)
|
||||
|
||||
# 添加 FreeRTOS 状态标签
|
||||
freertos_status_label = ttk.Label(main_frame, text="FreeRTOS 状态: 检测中...", font=("Arial", 12))
|
||||
freertos_status_label.pack(pady=10)
|
||||
self.update_freertos_status(freertos_status_label)
|
||||
|
||||
# 模块文件选择和任务管理框架(添加滚动功能)
|
||||
module_task_frame = ttk.Frame(main_frame)
|
||||
module_task_frame.pack(fill="both", expand=True, padx=10, pady=10)
|
||||
|
||||
# 创建 Canvas 和 Scrollbar
|
||||
canvas = tk.Canvas(module_task_frame)
|
||||
scrollbar = ttk.Scrollbar(module_task_frame, orient="vertical", command=canvas.yview)
|
||||
scrollable_frame = ttk.Frame(canvas)
|
||||
|
||||
# 配置滚动区域
|
||||
scrollable_frame.bind(
|
||||
"<Configure>",
|
||||
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
|
||||
)
|
||||
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
|
||||
canvas.configure(yscrollcommand=scrollbar.set)
|
||||
|
||||
# 绑定鼠标滚轮事件
|
||||
def on_mouse_wheel(event):
|
||||
canvas.yview_scroll(-1 * int(event.delta / 120), "units")
|
||||
|
||||
canvas.bind_all("<MouseWheel>", on_mouse_wheel)
|
||||
|
||||
# 布局 Canvas 和 Scrollbar
|
||||
canvas.pack(side="left", fill="both", expand=True)
|
||||
scrollbar.pack(side="right", fill="y")
|
||||
|
||||
# 左右布局:模块文件选择框和任务管理框
|
||||
left_frame = ttk.Frame(scrollable_frame)
|
||||
left_frame.pack(side="left", fill="both", expand=True, padx=5, pady=5)
|
||||
|
||||
right_frame = ttk.Frame(scrollable_frame)
|
||||
right_frame.pack(side="right", fill="both", expand=True, padx=5, pady=5)
|
||||
|
||||
# 模块文件选择框
|
||||
header_files_frame = ttk.LabelFrame(left_frame, text="模块文件选择", padding=(10, 10))
|
||||
header_files_frame.pack(fill="both", expand=True, padx=5)
|
||||
self.header_files_frame = header_files_frame
|
||||
self.update_header_files()
|
||||
|
||||
# 任务管理框
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
task_frame = ttk.LabelFrame(right_frame, text="任务管理", padding=(10, 10))
|
||||
task_frame.pack(fill="both", expand=True, padx=5)
|
||||
self.task_frame = task_frame
|
||||
self.update_task_ui()
|
||||
|
||||
# 添加消息框和生成按钮在同一行
|
||||
bottom_frame = ttk.Frame(main_frame)
|
||||
bottom_frame.pack(fill="x", pady=10, side="bottom")
|
||||
|
||||
# 消息框
|
||||
self.message_box = tk.Text(bottom_frame, wrap="word", state="disabled", height=5, width=60)
|
||||
self.message_box.pack(side="left", fill="x", expand=True, padx=5, pady=5)
|
||||
|
||||
# 生成按钮和复选框选项
|
||||
button_frame = ttk.Frame(bottom_frame)
|
||||
button_frame.pack(side="right", padx=10)
|
||||
|
||||
# 添加复选框容器(横向排列复选框)
|
||||
checkbox_frame = ttk.Frame(button_frame)
|
||||
checkbox_frame.pack(side="top", pady=5)
|
||||
|
||||
# 添加 .gitignore 复选框(左侧)
|
||||
ttk.Checkbutton(checkbox_frame, text=".gitignore", variable=self.add_gitignore_var).pack(side="left", padx=5)
|
||||
|
||||
# 添加自动配置环境复选框(右侧)
|
||||
ttk.Checkbutton(checkbox_frame, text="自动环境", variable=self.auto_configure_var).pack(side="left", padx=5)
|
||||
|
||||
# 添加生成按钮(竖向排列在复选框下方)
|
||||
generate_button = ttk.Button(button_frame, text="一键生成MRobot代码", command=self.generate_action)
|
||||
generate_button.pack(side="top", pady=10)
|
||||
generate_button.config(width=25) # 设置按钮宽度
|
||||
|
||||
# 重定向输出到消息框
|
||||
self.redirect_output()
|
||||
|
||||
# 打印欢迎信息
|
||||
print("欢迎使用 MRobot 自动生成脚本!")
|
||||
print("请根据需要选择模块文件和任务。")
|
||||
print("点击“一键生成MRobot代码”按钮开始生成。")
|
||||
|
||||
# 启动 Tkinter 主事件循环
|
||||
root.mainloop()
|
||||
|
||||
# ...existing code...
|
||||
# ...existing code...
|
||||
|
||||
def redirect_output(self):
|
||||
"""
|
||||
重定向标准输出到消息框
|
||||
"""
|
||||
class TextRedirector:
|
||||
def __init__(self, text_widget):
|
||||
self.text_widget = text_widget
|
||||
|
||||
def write(self, message):
|
||||
self.text_widget.config(state="normal")
|
||||
self.text_widget.insert("end", message)
|
||||
self.text_widget.see("end")
|
||||
self.text_widget.config(state="disabled")
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
sys.stdout = TextRedirector(self.message_box)
|
||||
sys.stderr = TextRedirector(self.message_box)
|
||||
|
||||
# 修改 update_task_ui 方法
|
||||
def update_task_ui(self):
|
||||
# 检查是否有已存在的任务文件
|
||||
task_dir = os.path.join("User", "task")
|
||||
if os.path.exists(task_dir):
|
||||
for file_name in os.listdir(task_dir):
|
||||
file_base, file_ext = os.path.splitext(file_name)
|
||||
if file_ext == ".c" and file_base not in ["init", "user_task"] and file_base not in [task_var.get() for task_var, _ in self.task_vars]:
|
||||
frequency = 100 # 默认频率
|
||||
user_task_header_path = os.path.join("User", "task", "user_task.h")
|
||||
if os.path.exists(user_task_header_path):
|
||||
try:
|
||||
with open(user_task_header_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
pattern = rf"#define\s+TASK_FREQ_{file_base.upper()}\s*\((\d+)[uU]?\)"
|
||||
match = re.search(pattern, content)
|
||||
if match:
|
||||
frequency = int(match.group(1))
|
||||
print(f"从 user_task.h 文件中读取到任务 {file_base} 的频率: {frequency}")
|
||||
except Exception as e:
|
||||
print(f"读取 user_task.h 文件时出错: {e}")
|
||||
|
||||
new_task_var = tk.StringVar(value=file_base)
|
||||
self.task_vars.append((new_task_var, tk.IntVar(value=frequency)))
|
||||
|
||||
# 清空任务框架中的所有子组件
|
||||
for widget in self.task_frame.winfo_children():
|
||||
widget.destroy()
|
||||
|
||||
|
||||
# 设置任务管理框的固定宽度
|
||||
self.task_frame.config(width=400)
|
||||
|
||||
# 显示任务列表
|
||||
for i, (task_var, freq_var) in enumerate(self.task_vars):
|
||||
task_row = ttk.Frame(self.task_frame, width=400)
|
||||
task_row.pack(fill="x", pady=5)
|
||||
|
||||
ttk.Entry(task_row, textvariable=task_var, width=20).pack(side="left", padx=5)
|
||||
ttk.Label(task_row, text="频率:").pack(side="left", padx=5)
|
||||
ttk.Spinbox(task_row, from_=1, to=1000, textvariable=freq_var, width=5).pack(side="left", padx=5)
|
||||
ttk.Button(task_row, text="删除", command=lambda idx=i: self.remove_task(idx)).pack(side="left", padx=5)
|
||||
|
||||
# 添加新任务按钮
|
||||
add_task_button = ttk.Button(self.task_frame, text="添加任务", command=self.add_task)
|
||||
add_task_button.pack(pady=10)
|
||||
|
||||
|
||||
# 修改 add_task 方法
|
||||
def add_task(self):
|
||||
new_task_var = tk.StringVar(value=f"Task_{len(self.task_vars) + 1}")
|
||||
new_freq_var = tk.IntVar(value=100) # 默认频率为 100
|
||||
self.task_vars.append((new_task_var, new_freq_var))
|
||||
self.update_task_ui()
|
||||
|
||||
# 修改 remove_task 方法
|
||||
def remove_task(self, idx):
|
||||
del self.task_vars[idx]
|
||||
self.update_task_ui()
|
||||
|
||||
# 更新文件夹显示
|
||||
def update_folder_display(self):
|
||||
for widget in self.folder_frame.winfo_children():
|
||||
widget.destroy()
|
||||
|
||||
folders = ["User/bsp", "User/component", "User/device", "User/module"]
|
||||
# if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
# folders.append("User/task")
|
||||
|
||||
for folder in folders:
|
||||
# 去掉 "User/" 前缀
|
||||
display_name = folder.replace("User/", "")
|
||||
tk.Label(self.folder_frame, text=display_name).pack()
|
||||
|
||||
# 更新 .h 文件复选框
|
||||
def update_header_files(self):
|
||||
for widget in self.header_files_frame.winfo_children():
|
||||
widget.destroy()
|
||||
|
||||
folders = ["bsp", "component", "device", "module"]
|
||||
dependencies = defaultdict(list)
|
||||
|
||||
for folder in folders:
|
||||
folder_dir = os.path.join(REPO_DIR, "User", folder)
|
||||
if os.path.exists(folder_dir):
|
||||
dependencies_file = os.path.join(folder_dir, "dependencies.csv")
|
||||
if os.path.exists(dependencies_file):
|
||||
with open(dependencies_file, "r", encoding="utf-8") as f:
|
||||
reader = csv.reader(f)
|
||||
for row in reader:
|
||||
if len(row) == 2:
|
||||
dependencies[row[0]].append(row[1])
|
||||
|
||||
# 创建复选框
|
||||
for folder in folders:
|
||||
folder_dir = os.path.join(REPO_DIR, "User", folder)
|
||||
if os.path.exists(folder_dir):
|
||||
module_frame = ttk.LabelFrame(self.header_files_frame, text=folder.capitalize(), padding=(10, 10))
|
||||
module_frame.pack(fill="x", pady=5)
|
||||
|
||||
row, col = 0, 0
|
||||
for file in os.listdir(folder_dir):
|
||||
file_base, file_ext = os.path.splitext(file)
|
||||
if file_ext == ".h" and file_base != folder:
|
||||
var = tk.BooleanVar(value=False)
|
||||
self.header_file_vars[file_base] = var
|
||||
|
||||
checkbox = ttk.Checkbutton(
|
||||
module_frame,
|
||||
text=file_base,
|
||||
variable=var,
|
||||
command=lambda fb=file_base: self.handle_dependencies(fb, dependencies)
|
||||
)
|
||||
checkbox.grid(row=row, column=col, padx=5, pady=5, sticky="w")
|
||||
col += 1
|
||||
if col >= 6:
|
||||
col = 0
|
||||
row += 1
|
||||
|
||||
|
||||
|
||||
def handle_dependencies(self, file_base, dependencies):
|
||||
"""
|
||||
根据依赖关系自动勾选相关模块
|
||||
"""
|
||||
if file_base in self.header_file_vars and self.header_file_vars[file_base].get():
|
||||
# 如果当前模块被选中,自动勾选其依赖项
|
||||
for dependency in dependencies.get(file_base, []):
|
||||
dep_base = os.path.basename(dependency)
|
||||
if dep_base in self.header_file_vars:
|
||||
self.header_file_vars[dep_base].set(True)
|
||||
|
||||
# 在 MRobotApp 类中添加以下方法
|
||||
def generate_task_files(self):
|
||||
try:
|
||||
template_file_path = os.path.join(REPO_DIR, "User", "task", "task.c.template")
|
||||
task_dir = os.path.join("User", "task")
|
||||
|
||||
if not os.path.exists(template_file_path):
|
||||
print(f"模板文件 {template_file_path} 不存在,无法生成 task.c 文件!")
|
||||
return
|
||||
|
||||
os.makedirs(task_dir, exist_ok=True)
|
||||
|
||||
with open(template_file_path, "r", encoding="utf-8") as f:
|
||||
template_content = f.read()
|
||||
|
||||
# 为每个任务生成对应的 task.c 文件
|
||||
for task_var, _ in self.task_vars: # 解包元组
|
||||
task_name = f"Task_{task_var.get()}" # 添加前缀 Task_
|
||||
task_file_path = os.path.join(task_dir, f"{task_var.get().lower()}.c") # 文件名保持原始小写
|
||||
|
||||
# 替换模板中的占位符
|
||||
task_content = template_content.replace("{{task_name}}", task_name)
|
||||
task_content = task_content.replace("{{task_function}}", task_name)
|
||||
task_content = task_content.replace(
|
||||
"{{task_frequency}}", f"TASK_FREQ_{task_var.get().upper()}"
|
||||
) # 替换为 user_task.h 中的宏定义
|
||||
task_content = task_content.replace("{{task_delay}}", f"TASK_INIT_DELAY_{task_var.get().upper()}")
|
||||
|
||||
with open(task_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(task_content)
|
||||
|
||||
print(f"已成功生成 {task_file_path} 文件!")
|
||||
except Exception as e:
|
||||
print(f"生成 task.c 文件时出错: {e}")
|
||||
# 修改 user_task.c 文件
|
||||
def modify_user_task_file(self):
|
||||
try:
|
||||
template_file_path = os.path.join(REPO_DIR, "User", "task", "user_task.c.template")
|
||||
generated_task_file_path = os.path.join("User", "task", "user_task.c")
|
||||
|
||||
if not os.path.exists(template_file_path):
|
||||
print(f"模板文件 {template_file_path} 不存在,无法生成 user_task.c 文件!")
|
||||
return
|
||||
|
||||
os.makedirs(os.path.dirname(generated_task_file_path), exist_ok=True)
|
||||
|
||||
with open(template_file_path, "r", encoding="utf-8") as f:
|
||||
template_content = f.read()
|
||||
|
||||
# 生成任务属性定义
|
||||
task_attr_definitions = "\n".join([
|
||||
f"""const osThreadAttr_t attr_{task_var.get().lower()} = {{
|
||||
.name = "{task_var.get()}",
|
||||
.priority = osPriorityNormal,
|
||||
.stack_size = 128 * 4,
|
||||
}};"""
|
||||
for task_var, _ in self.task_vars # 解包元组
|
||||
])
|
||||
|
||||
# 替换模板中的占位符
|
||||
task_content = template_content.replace("{{task_attr_definitions}}", task_attr_definitions)
|
||||
|
||||
with open(generated_task_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(task_content)
|
||||
|
||||
print(f"已成功生成 {generated_task_file_path} 文件!")
|
||||
except Exception as e:
|
||||
print(f"修改 user_task.c 文件时出错: {e}")
|
||||
# ...existing code...
|
||||
|
||||
def generate_user_task_header(self):
|
||||
try:
|
||||
template_file_path = os.path.join(REPO_DIR, "User", "task", "user_task.h.template")
|
||||
header_file_path = os.path.join("User", "task", "user_task.h")
|
||||
|
||||
if not os.path.exists(template_file_path):
|
||||
print(f"模板文件 {template_file_path} 不存在,无法生成 user_task.h 文件!")
|
||||
return
|
||||
|
||||
os.makedirs(os.path.dirname(header_file_path), exist_ok=True)
|
||||
|
||||
# 如果 user_task.h 已存在,提取 /* USER MESSAGE BEGIN */ 和 /* USER MESSAGE END */ 区域内容
|
||||
existing_msgq_content = ""
|
||||
if os.path.exists(header_file_path):
|
||||
with open(header_file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
# 提取 /* USER MESSAGE BEGIN */ 和 /* USER MESSAGE END */ 区域内容
|
||||
match = re.search(r"/\* USER MESSAGE BEGIN \*/\s*(.*?)\s*/\* USER MESSAGE END \*/", content, re.DOTALL)
|
||||
if match:
|
||||
existing_msgq_content = match.group(1).strip()
|
||||
print("已存在的 msgq 区域内容:")
|
||||
print(existing_msgq_content)
|
||||
|
||||
with open(template_file_path, "r", encoding="utf-8") as f:
|
||||
template_content = f.read()
|
||||
|
||||
# 定义占位符内容
|
||||
thread_definitions = "\n".join([f" osThreadId_t {task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
msgq_definitions = existing_msgq_content if existing_msgq_content else " osMessageQueueId_t default_msgq;"
|
||||
freq_definitions = "\n".join([f" float {task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
last_up_time_definitions = "\n".join([f" uint32_t {task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
task_attr_declarations = "\n".join([f"extern const osThreadAttr_t attr_{task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
task_function_declarations = "\n".join([f"void Task_{task_var.get()}(void *argument);" for task_var, _ in self.task_vars])
|
||||
task_frequency_definitions = "\n".join([
|
||||
f"#define TASK_FREQ_{task_var.get().upper()} ({freq_var.get()}u)"
|
||||
for task_var, freq_var in self.task_vars
|
||||
])
|
||||
task_init_delay_definitions = "\n".join([f"#define TASK_INIT_DELAY_{task_var.get().upper()} (0u)" for task_var, _ in self.task_vars])
|
||||
task_handle_definitions = "\n".join([f" osThreadId_t {task_var.get().lower()};" for task_var, _ in self.task_vars])
|
||||
|
||||
# 替换模板中的占位符
|
||||
header_content = template_content.replace("{{thread_definitions}}", thread_definitions)
|
||||
header_content = header_content.replace("{{msgq_definitions}}", msgq_definitions)
|
||||
header_content = header_content.replace("{{freq_definitions}}", freq_definitions)
|
||||
header_content = header_content.replace("{{last_up_time_definitions}}", last_up_time_definitions)
|
||||
header_content = header_content.replace("{{task_attr_declarations}}", task_attr_declarations)
|
||||
header_content = header_content.replace("{{task_function_declarations}}", task_function_declarations)
|
||||
header_content = header_content.replace("{{task_frequency_definitions}}", task_frequency_definitions)
|
||||
header_content = header_content.replace("{{task_init_delay_definitions}}", task_init_delay_definitions)
|
||||
header_content = header_content.replace("{{task_handle_definitions}}", task_handle_definitions)
|
||||
|
||||
# 如果存在 /* USER MESSAGE BEGIN */ 区域内容,则保留
|
||||
if existing_msgq_content:
|
||||
header_content = re.sub(
|
||||
r"/\* USER MESSAGE BEGIN \*/\s*.*?\s*/\* USER MESSAGE END \*/",
|
||||
f"/* USER MESSAGE BEGIN */\n\n {existing_msgq_content}\n\n /* USER MESSAGE END */",
|
||||
header_content,
|
||||
flags=re.DOTALL
|
||||
)
|
||||
|
||||
with open(header_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(header_content)
|
||||
|
||||
print(f"已成功生成 {header_file_path} 文件!")
|
||||
except Exception as e:
|
||||
print(f"生成 user_task.h 文件时出错: {e}")
|
||||
|
||||
def generate_init_file(self):
|
||||
try:
|
||||
template_file_path = os.path.join(REPO_DIR, "User", "task", "init.c.template")
|
||||
generated_file_path = os.path.join("User", "task", "init.c")
|
||||
|
||||
if not os.path.exists(template_file_path):
|
||||
print(f"模板文件 {template_file_path} 不存在,无法生成 init.c 文件!")
|
||||
return
|
||||
|
||||
os.makedirs(os.path.dirname(generated_file_path), exist_ok=True)
|
||||
|
||||
# 如果 init.c 已存在,提取 /* USER MESSAGE BEGIN */ 和 /* USER MESSAGE END */ 区域内容
|
||||
existing_msgq_content = ""
|
||||
if os.path.exists(generated_file_path):
|
||||
with open(generated_file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
# 提取 /* USER MESSAGE BEGIN */ 和 /* USER MESSAGE END */ 区域内容
|
||||
match = re.search(r"/\* USER MESSAGE BEGIN \*/\s*(.*?)\s*/\* USER MESSAGE END \*/", content, re.DOTALL)
|
||||
if match:
|
||||
existing_msgq_content = match.group(1).strip()
|
||||
print("已存在的消息队列区域内容:")
|
||||
print(existing_msgq_content)
|
||||
|
||||
with open(template_file_path, "r", encoding="utf-8") as f:
|
||||
template_content = f.read()
|
||||
|
||||
# 生成任务创建代码
|
||||
thread_creation_code = "\n".join([
|
||||
f" task_runtime.thread.{task_var.get().lower()} = osThreadNew(Task_{task_var.get()}, NULL, &attr_{task_var.get().lower()});"
|
||||
for task_var, _ in self.task_vars # 解包元组
|
||||
])
|
||||
|
||||
# 替换模板中的占位符
|
||||
init_content = template_content.replace("{{thread_creation_code}}", thread_creation_code)
|
||||
|
||||
# 如果存在 /* USER MESSAGE BEGIN */ 区域内容,则保留
|
||||
if existing_msgq_content:
|
||||
init_content = re.sub(
|
||||
r"/\* USER MESSAGE BEGIN \*/\s*.*?\s*/\* USER MESSAGE END \*/",
|
||||
f"/* USER MESSAGE BEGIN */\n {existing_msgq_content}\n /* USER MESSAGE END */",
|
||||
init_content,
|
||||
flags=re.DOTALL
|
||||
)
|
||||
|
||||
with open(generated_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(init_content)
|
||||
|
||||
print(f"已成功生成 {generated_file_path} 文件!")
|
||||
except Exception as e:
|
||||
print(f"生成 init.c 文件时出错: {e}")
|
||||
|
||||
# 修改 generate_action 方法
|
||||
|
||||
def generate_action(self):
|
||||
def task():
|
||||
# 检查并创建目录(与 FreeRTOS 状态无关的模块始终创建)
|
||||
self.create_directories()
|
||||
|
||||
# 复制 .gitignore 文件
|
||||
if self.add_gitignore_var.get():
|
||||
self.copy_file_from_repo(".gitignore", ".gitignore")
|
||||
|
||||
# 如果启用了 FreeRTOS,复制相关文件
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
self.copy_file_from_repo("src/freertos.c", os.path.join("Core", "Src", "freertos.c"))
|
||||
|
||||
# 定义需要处理的文件夹(与 FreeRTOS 状态无关)
|
||||
folders = ["bsp", "component", "device", "module"]
|
||||
|
||||
# 遍历每个文件夹,复制选中的 .h 和 .c 文件
|
||||
for folder in folders:
|
||||
folder_dir = os.path.join(REPO_DIR, "User", folder)
|
||||
if not os.path.exists(folder_dir):
|
||||
continue # 如果文件夹不存在,跳过
|
||||
|
||||
for file_name in os.listdir(folder_dir):
|
||||
file_base, file_ext = os.path.splitext(file_name)
|
||||
if file_ext not in [".h", ".c"]:
|
||||
continue # 只处理 .h 和 .c 文件
|
||||
|
||||
# 强制复制与文件夹同名的文件
|
||||
if file_base == folder:
|
||||
src_path = os.path.join(folder_dir, file_name)
|
||||
dest_path = os.path.join("User", folder, file_name)
|
||||
self.copy_file_from_repo(src_path, dest_path)
|
||||
print(f"强制复制与文件夹同名的文件: {file_name}")
|
||||
continue # 跳过后续检查,直接复制
|
||||
|
||||
# 检查是否选中了对应的文件
|
||||
if file_base in self.header_file_vars and self.header_file_vars[file_base].get():
|
||||
src_path = os.path.join(folder_dir, file_name)
|
||||
dest_path = os.path.join("User", folder, file_name)
|
||||
self.copy_file_from_repo(src_path, dest_path)
|
||||
|
||||
# 如果启用了 FreeRTOS,执行任务相关的生成逻辑
|
||||
if self.ioc_data and self.check_freertos_enabled(self.ioc_data):
|
||||
# 修改 user_task.c 文件
|
||||
self.modify_user_task_file()
|
||||
|
||||
# 生成 user_task.h 文件
|
||||
self.generate_user_task_header()
|
||||
|
||||
# 生成 init.c 文件
|
||||
self.generate_init_file()
|
||||
|
||||
# 生成 task.c 文件
|
||||
self.generate_task_files()
|
||||
|
||||
# # 自动配置环境
|
||||
# if self.auto_configure_var.get():
|
||||
|
||||
# self.auto_configure_environment()
|
||||
|
||||
|
||||
threading.Thread(target=task).start()
|
||||
|
||||
# 程序关闭时清理
|
||||
def on_closing(self, root):
|
||||
self.delete_repo()
|
||||
root.destroy()
|
||||
|
||||
|
||||
# 程序入口
|
||||
if __name__ == "__main__":
|
||||
app = MRobotApp()
|
||||
app.initialize()
|
@ -1 +0,0 @@
|
||||
Subproject commit c4731883f241d4748d45ed168520a345a042326e
|
273
fluentui.py
273
fluentui.py
@ -1,236 +1,124 @@
|
||||
import sys
|
||||
import webbrowser
|
||||
from PyQt5.QtCore import Qt
|
||||
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QApplication, QLabel, QGroupBox, QGridLayout, QFrame, QHBoxLayout
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
|
||||
from PyQt5.QtCore import Qt, QSize, pyqtSignal
|
||||
from PyQt5.QtGui import QPixmap, QFont
|
||||
from PyQt5.QtWidgets import (
|
||||
QWidget, QVBoxLayout, QApplication, QLabel, QGroupBox, QGridLayout, QFrame,
|
||||
QHBoxLayout, QComboBox, QTextEdit, QLineEdit
|
||||
)
|
||||
|
||||
from qfluentwidgets import (
|
||||
NavigationInterface, NavigationItemPosition, MessageBox,
|
||||
setTheme, Theme, FluentWindow, NavigationAvatarWidget,
|
||||
SubtitleLabel, setFont, InfoBar, InfoBarPosition, PushButton
|
||||
InfoBar, InfoBarPosition, PushButton, FluentIcon
|
||||
)
|
||||
from qfluentwidgets import FluentIcon as FIF
|
||||
|
||||
# ===================== 页面基类 =====================
|
||||
class BaseInterface(QWidget):
|
||||
"""所有页面的基类,便于统一扩展"""
|
||||
def __init__(self, title: str = "", parent=None):
|
||||
"""所有页面的基类,页面内容完全自定义"""
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName(f'{self.__class__.__name__}')
|
||||
self._mainLayout = QVBoxLayout(self)
|
||||
if title:
|
||||
self.titleLabel = SubtitleLabel(title, self)
|
||||
setFont(self.titleLabel, 24)
|
||||
self._mainLayout.addWidget(self.titleLabel, 0, Qt.AlignmentFlag.AlignCenter)
|
||||
self._mainLayout.addStretch(1)
|
||||
self.setLayout(self._mainLayout)
|
||||
|
||||
# ===================== 首页界面 =====================
|
||||
class HomeInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent=parent)
|
||||
|
||||
# 顶部Logo
|
||||
iconLabel = QLabel(self)
|
||||
iconPixmap = QPixmap('img/MRobot.png').scaled(
|
||||
180, 180, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation
|
||||
)
|
||||
iconLabel.setPixmap(iconPixmap)
|
||||
iconLabel.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
|
||||
# 标题
|
||||
titleLabel = QLabel("MRobot 智能R助手", self)
|
||||
titleLabel.setFont(QFont("微软雅黑", 28, QFont.Bold))
|
||||
titleLabel.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
titleLabel.setStyleSheet("color: #1976d2; margin-top: 12px;")
|
||||
|
||||
# 副标题
|
||||
subtitle = QLabel("高效 · 智能 · 便捷\n让R语言开发更简单", self)
|
||||
subtitle.setFont(QFont("微软雅黑", 16))
|
||||
subtitle.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
subtitle.setStyleSheet("color: #555; margin-bottom: 18px;")
|
||||
|
||||
# 三个主按钮
|
||||
btnGen = PushButton('代码生成', self, FIF.LIBRARY)
|
||||
btnGen.setFixedSize(180, 48)
|
||||
btnGen.setFont(QFont("微软雅黑", 15))
|
||||
btnGen.clicked.connect(self.showInfoBar)
|
||||
|
||||
btnHelp = PushButton('帮助文档', self, FIF.HELP)
|
||||
btnHelp.setFixedSize(180, 48)
|
||||
btnHelp.setFont(QFont("微软雅黑", 15))
|
||||
btnHelp.clicked.connect(self.showHelp)
|
||||
|
||||
btnAbout = PushButton('关于我们', self, FIF.INFO)
|
||||
btnAbout.setFixedSize(180, 48)
|
||||
btnAbout.setFont(QFont("微软雅黑", 15))
|
||||
btnAbout.clicked.connect(self.showAbout)
|
||||
|
||||
btnLayout = QHBoxLayout()
|
||||
btnLayout.setSpacing(36)
|
||||
btnLayout.addStretch(1)
|
||||
btnLayout.addWidget(btnGen)
|
||||
btnLayout.addWidget(btnHelp)
|
||||
btnLayout.addWidget(btnAbout)
|
||||
btnLayout.addStretch(1)
|
||||
|
||||
# 卡片式欢迎信息
|
||||
card = QFrame(self)
|
||||
card.setStyleSheet("""
|
||||
QFrame {
|
||||
background: #f8fbfd;
|
||||
border-radius: 18px;
|
||||
border: 1.5px solid #d6eaf8;
|
||||
padding: 24px 32px 18px 32px;
|
||||
}
|
||||
""")
|
||||
cardLayout = QVBoxLayout(card)
|
||||
cardLayout.setSpacing(10)
|
||||
cardLayout.addWidget(titleLabel)
|
||||
cardLayout.addWidget(subtitle)
|
||||
cardLayout.addLayout(btnLayout)
|
||||
|
||||
# 主布局
|
||||
self.setObjectName("homeInterface")
|
||||
layout = QVBoxLayout()
|
||||
layout.setSpacing(24)
|
||||
layout.setContentsMargins(0, 32, 0, 0)
|
||||
layout.addWidget(iconLabel, 0, Qt.AlignmentFlag.AlignHCenter)
|
||||
layout.addWidget(card, 0, Qt.AlignmentFlag.AlignHCenter)
|
||||
layout.addStretch(1)
|
||||
|
||||
# 清空原有内容并设置新布局
|
||||
QWidget().setLayout(self._mainLayout)
|
||||
self.setLayout(layout)
|
||||
|
||||
def showInfoBar(self):
|
||||
InfoBar.success(
|
||||
title="提示",
|
||||
content="代码生成功能即将上线,敬请期待!",
|
||||
parent=self,
|
||||
position=InfoBarPosition.TOP,
|
||||
duration=2000
|
||||
)
|
||||
|
||||
def showHelp(self):
|
||||
webbrowser.open("https://github.com/lvzucheng/MRobot")
|
||||
|
||||
def showAbout(self):
|
||||
MessageBox("关于我们", "MRobot 由 lvzucheng 开发,致力于高效智能的R代码辅助。", self).exec()
|
||||
# ===================== 代码生成页面 =====================
|
||||
class DataInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__('代码生成', parent)
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("dataInterface")
|
||||
# 空页面示例
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
# ===================== 实用工具页面 =====================
|
||||
# ...existing code...
|
||||
|
||||
class DownloadTool(BaseInterface):
|
||||
# ===================== 串口终端界面 =====================
|
||||
class SerialTerminalInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__('实用工具', parent)
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("serialTerminalInterface")
|
||||
layout = QVBoxLayout()
|
||||
|
||||
# 主体水平布局
|
||||
main_layout = QHBoxLayout()
|
||||
main_layout.setSpacing(32)
|
||||
main_layout.setContentsMargins(32, 0, 32, 0) # 顶部和底部间距减小
|
||||
|
||||
# 区域1:常用小工具
|
||||
tools = [
|
||||
("Geek Uninstaller", "https://geekuninstaller.com/download", FIF.DELETE),
|
||||
("Neat Download Manager", "https://www.neatdownloadmanager.com/index.php/en/", FIF.DOWNLOAD),
|
||||
("Everything", "https://www.voidtools.com/zh-cn/downloads/", FIF.SEARCH),
|
||||
("Bandizip", "https://www.bandisoft.com/bandizip/", FIF.ZIP_FOLDER),
|
||||
("PotPlayer", "https://potplayer.daum.net/", FIF.VIDEO),
|
||||
("Typora", "https://typora.io/", FIF.EDIT),
|
||||
("Git", "https://git-scm.com/download/win", FIF.CODE),
|
||||
("Python", "https://www.python.org/downloads/", FIF.CODE),
|
||||
]
|
||||
tools_card = QGroupBox("常用小工具", self)
|
||||
tools_card.setFont(QFont("微软雅黑", 15, QFont.Bold))
|
||||
tools_card.setStyleSheet("""
|
||||
QGroupBox { border: 1.5px solid #d6eaf8; border-radius: 12px; background: #f8fbfd; margin-top: 0px; }
|
||||
QGroupBox:title { color: #222; left: 18px; top: -10px; background: transparent; padding: 0 8px; }
|
||||
""")
|
||||
tools_grid = QGridLayout()
|
||||
tools_grid.setSpacing(18)
|
||||
tools_grid.setContentsMargins(18, 18, 18, 18)
|
||||
for idx, (name, url, icon) in enumerate(tools):
|
||||
btn = PushButton(name, self, icon)
|
||||
btn.setFont(QFont("微软雅黑", 14))
|
||||
btn.setMinimumHeight(44)
|
||||
btn.clicked.connect(self._make_open_url(url))
|
||||
row, col = divmod(idx, 2)
|
||||
tools_grid.addWidget(btn, row, col)
|
||||
tools_card.setLayout(tools_grid)
|
||||
main_layout.addWidget(tools_card, 1)
|
||||
|
||||
# 区域2:开发/设计软件
|
||||
dev_tools = [
|
||||
("STM32CubeMX", "https://www.st.com/zh/development-tools/stm32cubemx.html", FIF.SETTING),
|
||||
("Keil MDK", "https://www.keil.com/download/product/", FIF.CODE),
|
||||
("Visual Studio Code", "https://code.visualstudio.com/", FIF.CODE),
|
||||
("CLion", "https://www.jetbrains.com/clion/download/", FIF.CODE),
|
||||
("MATLAB", "https://www.mathworks.com/downloads/", FIF.CODE),
|
||||
("SolidWorks", "https://www.solidworks.com/sw/support/downloads.htm", FIF.LAYOUT),
|
||||
("Altium Designer", "https://www.altium.com/zh/altium-designer/downloads", FIF.LAYOUT),
|
||||
("原神", "https://download-porter.hoyoverse.com/download-porter/2025/03/27/GenshinImpact_install_202503072011.exe?trace_key=GenshinImpact_install_ua_679d0b4e9b10", FIF.GAME),
|
||||
]
|
||||
dev_card = QGroupBox("开发/设计软件", self)
|
||||
dev_card.setFont(QFont("微软雅黑", 15, QFont.Bold))
|
||||
dev_card.setStyleSheet("""
|
||||
QGroupBox { border: 1.5px solid #d6eaf8; border-radius: 12px; background: #f8fbfd; margin-top: 0px; }
|
||||
QGroupBox:title { color: #222; left: 18px; top: -10px; background: transparent; padding: 0 8px; }
|
||||
""")
|
||||
dev_grid = QGridLayout()
|
||||
dev_grid.setSpacing(18)
|
||||
dev_grid.setContentsMargins(18, 18, 18, 18)
|
||||
for idx, (name, url, icon) in enumerate(dev_tools):
|
||||
btn = PushButton(name, self, icon)
|
||||
btn.setFont(QFont("微软雅黑", 14))
|
||||
btn.setMinimumHeight(44)
|
||||
btn.clicked.connect(self._make_open_url(url))
|
||||
row, col = divmod(idx, 2)
|
||||
dev_grid.addWidget(btn, row, col)
|
||||
dev_card.setLayout(dev_grid)
|
||||
main_layout.addWidget(dev_card, 1)
|
||||
|
||||
# 添加到主布局,靠上显示
|
||||
self._mainLayout.insertLayout(1, main_layout)
|
||||
|
||||
def _make_open_url(self, url):
|
||||
# 返回一个只带url参数的槽,避免lambda闭包问题
|
||||
return lambda checked=False, link=url: webbrowser.open(link)
|
||||
# ...existing code...
|
||||
# ===================== 其它页面 =====================
|
||||
# ===================== 设置界面 =====================
|
||||
class SettingInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__('设置页面', parent)
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("settingInterface")
|
||||
layout = QVBoxLayout()
|
||||
self.themeBtn = PushButton(
|
||||
"切换夜间", self, FluentIcon.BRUSH
|
||||
)
|
||||
self.themeBtn.setFixedWidth(120)
|
||||
self.themeBtn.clicked.connect(self.onThemeBtnClicked)
|
||||
layout.addWidget(self.themeBtn)
|
||||
layout.addStretch(1)
|
||||
self.setLayout(layout)
|
||||
|
||||
# 监听主题变化
|
||||
mw = self.window()
|
||||
if hasattr(mw, "themeChanged"):
|
||||
mw.themeChanged.connect(self.updateThemeBtn)
|
||||
|
||||
def onThemeBtnClicked(self):
|
||||
mw = self.window()
|
||||
if hasattr(mw, "toggleTheme"):
|
||||
mw.toggleTheme()
|
||||
|
||||
def updateThemeBtn(self, theme):
|
||||
if theme == Theme.LIGHT:
|
||||
self.themeBtn.setText("切换夜间")
|
||||
else:
|
||||
self.themeBtn.setText("切换白天")
|
||||
# ===================== 帮助与关于界面 =====================
|
||||
class HelpInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__('帮助页面', parent)
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("helpInterface")
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
class AboutInterface(BaseInterface):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__('关于页面', parent)
|
||||
super().__init__(parent=parent)
|
||||
self.setObjectName("aboutInterface")
|
||||
layout = QVBoxLayout()
|
||||
self.setLayout(layout)
|
||||
|
||||
# ===================== 主窗口与导航 =====================
|
||||
class MainWindow(FluentWindow):
|
||||
themeChanged = pyqtSignal(Theme)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.setWindowTitle("MR_ToolBox")
|
||||
self.resize(1000, 700)
|
||||
self.setMinimumSize(800, 600)
|
||||
setTheme(Theme.LIGHT)
|
||||
self.theme = Theme.LIGHT
|
||||
|
||||
self.page_registry = [
|
||||
(HomeInterface(self), FIF.HOME, "首页", NavigationItemPosition.TOP),
|
||||
(DataInterface(self), FIF.LIBRARY, "MRobot代码生成", NavigationItemPosition.SCROLL),
|
||||
(DownloadTool(self), FIF.DOWNLOAD, "实用工具", NavigationItemPosition.SCROLL),
|
||||
(SerialTerminalInterface(self), FIF.COMMAND_PROMPT, "串口终端", NavigationItemPosition.SCROLL),
|
||||
(SettingInterface(self), FIF.SETTING, "设置", NavigationItemPosition.BOTTOM),
|
||||
(HelpInterface(self), FIF.HELP, "帮助", NavigationItemPosition.BOTTOM),
|
||||
(AboutInterface(self), FIF.INFO, "关于", NavigationItemPosition.BOTTOM),
|
||||
]
|
||||
self.initNavigation()
|
||||
|
||||
# 把切换主题按钮放到标题栏右侧
|
||||
self.themeBtn = PushButton("切换夜间", self, FluentIcon.BRUSH)
|
||||
self.themeBtn.setFixedWidth(120)
|
||||
self.themeBtn.clicked.connect(self.toggleTheme)
|
||||
self.addTitleBarWidget(self.themeBtn, align=Qt.AlignRight)
|
||||
|
||||
def initNavigation(self):
|
||||
for page, icon, name, position in self.page_registry:
|
||||
self.addSubInterface(page, icon, name, position)
|
||||
@ -243,10 +131,27 @@ class MainWindow(FluentWindow):
|
||||
position=NavigationItemPosition.BOTTOM
|
||||
)
|
||||
|
||||
def showUserInfo(self):
|
||||
content = "当前登录用户: 管理员\n邮箱: admin@example.com"
|
||||
MessageBox("用户信息", content, self).exec()
|
||||
def toggleTheme(self):
|
||||
if self.theme == Theme.LIGHT:
|
||||
setTheme(Theme.DARK)
|
||||
self.theme = Theme.DARK
|
||||
self.themeBtn.setText("切换白天")
|
||||
else:
|
||||
setTheme(Theme.LIGHT)
|
||||
self.theme = Theme.LIGHT
|
||||
self.themeBtn.setText("切换夜间")
|
||||
self.themeChanged.emit(self.theme)
|
||||
self.refreshStyle()
|
||||
|
||||
def refreshStyle(self):
|
||||
def refresh(widget):
|
||||
widget.setStyleSheet(widget.styleSheet())
|
||||
for child in widget.findChildren(QWidget):
|
||||
refresh(child)
|
||||
refresh(self)
|
||||
|
||||
def showUserInfo(self):
|
||||
MessageBox("用户信息", "当前登录用户:管理员", self).exec()
|
||||
# ===================== 程序入口 =====================
|
||||
def main():
|
||||
QApplication.setHighDpiScaleFactorRoundingPolicy(
|
||||
|
Loading…
Reference in New Issue
Block a user