Compare commits

...

3 Commits

Author SHA1 Message Date
b4a4d87909 修好ui了 2025-07-31 03:52:22 +08:00
88ec1517fb 修改了 2025-07-31 02:47:48 +08:00
08193b8093 奇怪的bug 2025-07-31 02:22:58 +08:00
6 changed files with 364 additions and 330 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -1,7 +1,14 @@
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QStackedLayout, QFileDialog, QHeaderView
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QTreeWidgetItem as TreeItem
from qfluentwidgets import TitleLabel, BodyLabel, SubtitleLabel, StrongBodyLabel, HorizontalSeparator, PushButton, TreeWidget, InfoBar,FluentIcon
from qfluentwidgets import TitleLabel, BodyLabel, SubtitleLabel, StrongBodyLabel, HorizontalSeparator, PushButton, TreeWidget, InfoBar,FluentIcon, Dialog,SubtitleLabel,BodyLabel
from PyQt5.QtWidgets import QDialog, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QSpinBox, QPushButton, QTableWidget, QTableWidgetItem, QHeaderView, QCheckBox
from qfluentwidgets import CardWidget, LineEdit, SpinBox, CheckBox, TextEdit, PrimaryPushButton, PushButton, InfoBar, DoubleSpinBox
from qfluentwidgets import HeaderCardWidget
from PyQt5.QtWidgets import QScrollArea, QWidget
from qfluentwidgets import theme, Theme
from PyQt5.QtWidgets import QDoubleSpinBox
import os
import requests
import zipfile
@ -12,6 +19,38 @@ import yaml
import textwrap
from jinja2 import Template
def preserve_all_user_regions(new_code, old_code):
"""
自动保留所有 /* USER XXX BEGIN */ ... /* USER XXX END */ 区域内容
new_code: 模板生成的新代码
old_code: 旧代码
返回合并后的代码
"""
import re
pattern = re.compile(
r"/\*\s*(USER [A-Z0-9_ ]+)\s*BEGIN\s*\*/(.*?)/\*\s*\1\s*END\s*\*/",
re.DOTALL
)
old_regions = {m.group(1): m.group(2) for m in pattern.finditer(old_code or "")}
def repl(m):
region = m.group(1)
old_content = old_regions.get(region)
if old_content is not None:
return m.group(0).replace(m.group(2), old_content)
return m.group(0)
return pattern.sub(repl, new_code)
def save_with_preserve(path, new_code):
"""
写入文件自动保留所有用户自定义区域
"""
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
old_code = f.read()
new_code = preserve_all_user_regions(new_code, old_code)
with open(path, "w", encoding="utf-8") as f:
f.write(new_code)
class IocConfig:
def __init__(self, ioc_path):
self.ioc_path = ioc_path
@ -322,9 +361,8 @@ class DataInterface(QWidget):
self.file_tree.setHeaderLabels(["模块名", "描述"])
self.file_tree.setSelectionMode(self.file_tree.ExtendedSelection)
self.file_tree.header().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.file_tree.header().setSectionResizeMode(1, QHeaderView.Stretch) # 描述列自适应
self.file_tree.setCheckedColor("#0078d4", "#2d7d9a")
self.file_tree.header().setSectionResizeMode(0, QHeaderView.Interactive)
self.file_tree.header().setSectionResizeMode(1, QHeaderView.Interactive)
self.file_tree.setBorderRadius(8)
self.file_tree.setBorderVisible(True)
@ -528,111 +566,6 @@ class DataInterface(QWidget):
)
def open_task_config_dialog(self):
from PyQt5.QtWidgets import QDialog, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QSpinBox, QPushButton, QTableWidget, QTableWidgetItem, QHeaderView, QCheckBox
import yaml
import os
class TaskConfigDialog(QDialog):
def __init__(self, parent=None, config_path=None):
super().__init__(parent)
self.setWindowTitle("任务配置")
self.resize(900, 420)
layout = QVBoxLayout(self)
self.table = QTableWidget(0, 6)
self.table.setHorizontalHeaderLabels(["任务名称", "运行频率", "初始化延迟", "堆栈大小", "任务描述", "频率控制"])
self.table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(2, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(3, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(4, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(5, QHeaderView.ResizeToContents)
self.table.setColumnWidth(4, 320) # 任务描述更宽
layout.addWidget(self.table)
btn_layout = QHBoxLayout()
add_btn = QPushButton("添加任务")
del_btn = QPushButton("删除选中")
ok_btn = QPushButton("生成")
cancel_btn = QPushButton("取消")
btn_layout.addWidget(add_btn)
btn_layout.addWidget(del_btn)
btn_layout.addStretch()
btn_layout.addWidget(ok_btn)
btn_layout.addWidget(cancel_btn)
layout.addLayout(btn_layout)
add_btn.clicked.connect(self.add_row)
del_btn.clicked.connect(self.del_row)
ok_btn.clicked.connect(self.accept)
cancel_btn.clicked.connect(self.reject)
# 自动读取配置文件
if config_path and os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
tasks = yaml.safe_load(f)
if tasks:
for t in tasks:
row = self.table.rowCount()
self.table.insertRow(row)
for col, key in enumerate(["name", "frequency", "delay", "stack", "description"]):
item = QTableWidgetItem(str(t.get(key, "")))
item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, col, item)
# 新增频率控制复选框
freq_ctrl = QCheckBox()
freq_ctrl.setChecked(t.get("freq_control", True))
self.table.setCellWidget(row, 5, freq_ctrl)
except Exception as e:
pass # 配置文件损坏时忽略
def add_row(self):
row = self.table.rowCount()
self.table.insertRow(row)
default_values = [
f"Task{row+1}", "500", "0", "256", "不要偷懒,请写清楚每个任务的作用!(如果你看到任务上面是这句话,说明作者是个懒蛋)"
]
for col, val in enumerate(default_values):
item = QTableWidgetItem(val)
item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, col, item)
freq_ctrl = QCheckBox()
freq_ctrl.setChecked(True)
self.table.setCellWidget(row, 5, freq_ctrl)
def del_row(self):
rows = set([i.row() for i in self.table.selectedItems()])
for r in sorted(rows, reverse=True):
self.table.removeRow(r)
def get_tasks(self):
tasks = []
for row in range(self.table.rowCount()):
name = self.table.item(row, 0).text().strip()
freq = self.table.item(row, 1).text()
delay = int(self.table.item(row, 2).text())
stack = int(self.table.item(row, 3).text())
desc = self.table.item(row, 4).text().strip()
freq_ctrl = self.table.cellWidget(row, 5).isChecked()
# 校验 stack 必须为 128*2^n
if stack < 128 or (stack & (stack - 1)) != 0 or stack % 128 != 0:
raise ValueError(f"{row+1}行任务“{name}”的堆栈大小必须为128、256、512、1024等128*2^n")
task = {
"name": name,
"function": f"Task_{name}",
"delay": delay,
"stack": stack,
"description": desc,
"freq_control": freq_ctrl
}
if freq_ctrl:
task["frequency"] = int(freq)
tasks.append(task)
return tasks
config_path = os.path.join(self.project_path, "User", "task", "config.yaml")
dlg = TaskConfigDialog(self, config_path=config_path)
if dlg.exec() == QDialog.Accepted:
@ -670,44 +603,26 @@ class DataInterface(QWidget):
duration=3000
)
def preserve_user_region(self, new_code, old_code, region_name):
"""
替换 new_code region_name 区域为 old_code 中的内容如果有
region_name: 'USER INCLUDE'
"""
pattern = re.compile(
rf"/\*\s*{region_name}\s*BEGIN\s*\*/(.*?)/\*\s*{region_name}\s*END\s*\*/",
re.DOTALL
)
old_match = pattern.search(old_code or "")
if not old_match:
return new_code # 旧文件没有该区域,直接返回新代码
old_content = old_match.group(1)
def repl(m):
return m.group(0).replace(m.group(1), old_content)
# 替换新代码中的该区域
return pattern.sub(repl, new_code, count=1)
def generate_task_code(self, task_list):
base_dir = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(base_dir, "../assets/User_code/task")
output_dir = os.path.join(self.project_path, "User", "task")
os.makedirs(output_dir, exist_ok=True)
user_task_h_tpl = os.path.join(template_dir, "user_task.h.template")
user_task_c_tpl = os.path.join(template_dir, "user_task.c.template")
init_c_tpl = os.path.join(template_dir, "init.c.template")
task_c_tpl = os.path.join(template_dir, "task.c.template")
freq_tasks = [t for t in task_list if t.get("freq_control", True)]
def render_template(path, context):
with open(path, encoding="utf-8") as f:
tpl = Template(f.read())
return tpl.render(**context)
# ----------- 生成 user_task.h -----------
context_h = {
"thread_definitions": "\n".join([f" osThreadId_t {t['name']};" for t in task_list]),
"freq_definitions": "\n".join([f" float {t['name']};" for t in freq_tasks]),
@ -718,27 +633,10 @@ class DataInterface(QWidget):
"task_attr_declarations": "\n".join([f"extern const osThreadAttr_t attr_{t['name']};" for t in task_list]),
"task_function_declarations": "\n".join([f"void {t['function']}(void *argument);" for t in task_list]),
}
# ----------- 生成 user_task.h -----------
user_task_h_path = os.path.join(output_dir, "user_task.h")
new_user_task_h = render_template(user_task_h_tpl, context_h)
if os.path.exists(user_task_h_path):
with open(user_task_h_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER MESSAGE", "USER CONFIG"]:
pattern = re.compile(
rf"/\*\s*{region}\s*BEGIN\s*\*/(.*?)/\*\s*{region}\s*END\s*\*/",
re.DOTALL
)
old_match = pattern.search(old_code)
if old_match and old_match.group(1).strip():
new_user_task_h = self.preserve_user_region(
new_user_task_h, old_code, region
)
with open(user_task_h_path, "w", encoding="utf-8") as f:
f.write(new_user_task_h)
save_with_preserve(user_task_h_path, new_user_task_h)
# ----------- 生成 user_task.c -----------
context_c = {
"task_attr_definitions": "\n".join([
@ -750,10 +648,10 @@ class DataInterface(QWidget):
for t in task_list
])
}
user_task_c_path = os.path.join(output_dir, "user_task.c")
user_task_c = render_template(user_task_c_tpl, context_c)
with open(os.path.join(output_dir, "user_task.c"), "w", encoding="utf-8") as f:
f.write(user_task_c)
save_with_preserve(user_task_c_path, user_task_c)
# ----------- 生成 init.c -----------
thread_creation_code = "\n".join([
f" task_runtime.thread.{t['name']} = osThreadNew({t['function']}, NULL, &attr_{t['name']});"
@ -762,18 +660,10 @@ class DataInterface(QWidget):
context_init = {
"thread_creation_code": thread_creation_code,
}
init_c = render_template(init_c_tpl, context_init)
init_c_path = os.path.join(output_dir, "init.c")
if os.path.exists(init_c_path):
with open(init_c_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER CODE", "USER CODE INIT"]:
init_c = self.preserve_user_region(
init_c, old_code, region
)
with open(init_c_path, "w", encoding="utf-8") as f:
f.write(init_c)
init_c = render_template(init_c_tpl, context_init)
save_with_preserve(init_c_path, init_c)
# ----------- 生成 task.c -----------
for t in task_list:
desc = t.get("description", "")
@ -790,18 +680,299 @@ class DataInterface(QWidget):
tpl = Template(f.read())
code = tpl.render(**context_task)
task_c_path = os.path.join(output_dir, f"{t['name']}.c")
if os.path.exists(task_c_path):
with open(task_c_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER STRUCT", "USER CODE", "USER CODE INIT"]:
code = self.preserve_user_region(
code, old_code, region
)
with open(task_c_path, "w", encoding="utf-8") as f:
f.write(code)
save_with_preserve(task_c_path, code)
# ----------- 保存任务配置到 config.yaml -----------
config_yaml_path = os.path.join(output_dir, "config.yaml")
with open(config_yaml_path, "w", encoding="utf-8") as f:
yaml.safe_dump(task_list, f, allow_unicode=True)
class TaskConfigDialog(QDialog):
def __init__(self, parent=None, config_path=None):
super().__init__(parent)
self.setWindowTitle("任务配置")
self.resize(900, 520)
# 设置背景色跟随主题
if theme() == Theme.DARK:
self.setStyleSheet("background-color: #232323;")
else:
self.setStyleSheet("background-color: #f7f9fc;")
# 主布局
main_layout = QVBoxLayout(self)
main_layout.setContentsMargins(16, 16, 16, 16)
main_layout.setSpacing(12)
# 顶部横向分栏
self.top_layout = QHBoxLayout()
self.top_layout.setSpacing(16)
# ----------- 左侧任务按钮区 -----------
self.left_widget = QWidget()
self.left_layout = QVBoxLayout(self.left_widget)
self.left_layout.setContentsMargins(0, 0, 0, 0)
self.left_layout.setSpacing(8)
self.task_list_label = BodyLabel("任务列表")
# self.left_layout.addWidget(self.task_list_label)
# 添加任务列表居中
self.task_list_label.setAlignment(Qt.AlignCenter)
self.left_layout.addWidget(self.task_list_label, alignment=Qt.AlignCenter)
# 添加一个水平分割线
self.left_layout.addWidget(HorizontalSeparator())
# 任务按钮区
self.task_btn_area = QScrollArea()
self.task_btn_area.setWidgetResizable(True)
self.task_btn_area.setFrameShape(QScrollArea.NoFrame)
self.task_btn_container = QWidget()
self.task_btn_layout = QVBoxLayout(self.task_btn_container)
self.task_btn_layout.setContentsMargins(0, 0, 0, 0)
self.task_btn_layout.setSpacing(4)
self.task_btn_layout.addStretch()
self.task_btn_area.setWidget(self.task_btn_container)
self.left_layout.addWidget(self.task_btn_area, stretch=1)
self.left_widget.setFixedWidth(180)
self.top_layout.addWidget(self.left_widget, stretch=0)
# ----------- 左侧任务按钮区 END -----------
main_layout.addLayout(self.top_layout, stretch=1)
# 下方按钮区
btn_layout = QHBoxLayout()
# 左下角:添加/删除任务
self.add_btn = PrimaryPushButton("创建新任务")
self.add_btn.setAutoDefault(False) # 禁止回车触发
self.add_btn.setDefault(False)
self.del_btn = PushButton("删除当前任务")
self.del_btn.setAutoDefault(False) # 禁止回车触发
self.del_btn.setDefault(False)
self.add_btn.clicked.connect(self.add_task)
self.del_btn.clicked.connect(self.delete_current_task)
btn_layout.addWidget(self.add_btn)
btn_layout.addWidget(self.del_btn)
btn_layout.addStretch() # 添加/删除靠左stretch在中间
# 右下角:生成/取消
self.ok_btn = PrimaryPushButton("生成任务")
self.ok_btn.setAutoDefault(False) # 允许回车触发
self.ok_btn.setDefault(False) # 设置为默认按钮
self.cancel_btn = PushButton("取消")
self.cancel_btn.setAutoDefault(False) # 禁止回车触发
self.cancel_btn.setDefault(False)
btn_layout.addWidget(self.ok_btn)
btn_layout.addWidget(self.cancel_btn)
main_layout.addLayout(btn_layout)
self.ok_btn.clicked.connect(self.accept)
self.cancel_btn.clicked.connect(self.reject)
self.tasks = []
self.current_index = -1
if config_path and os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
tasks = yaml.safe_load(f)
if tasks:
for t in tasks:
self.tasks.append(self._make_task_obj(t))
except Exception:
pass
# 允许没有任何任务
self.current_index = 0 if self.tasks else -1
self.refresh_task_btns()
if self.tasks:
self.show_task_form(self.tasks[self.current_index])
else:
self.show_task_form(None)
def refresh_task_btns(self):
# 清空旧按钮
while self.task_btn_layout.count():
item = self.task_btn_layout.takeAt(0)
w = item.widget()
if w:
w.deleteLater()
# 重新添加按钮
for idx, t in enumerate(self.tasks):
btn = PushButton(t["name"])
btn.setCheckable(True)
btn.setChecked(idx == self.current_index)
btn.clicked.connect(lambda checked, i=idx: self.select_task(i))
self.task_btn_layout.addWidget(btn)
self.task_btn_layout.addStretch()
def add_task(self):
self.save_form()
new_idx = len(self.tasks)
self.tasks.append(self._make_task_obj({"name": f"Task{new_idx+1}"}))
self.current_index = new_idx
self.refresh_task_btns()
self.show_task_form(self.tasks[self.current_index])
def delete_current_task(self):
if self.current_index < 0 or not self.tasks:
return
del self.tasks[self.current_index]
if not self.tasks:
self.current_index = -1
self.refresh_task_btns()
self.show_task_form(None)
return
if self.current_index >= len(self.tasks):
self.current_index = len(self.tasks) - 1
self.refresh_task_btns()
self.show_task_form(self.tasks[self.current_index])
def select_task(self, idx):
self.save_form()
self.current_index = idx
self.refresh_task_btns()
self.show_task_form(self.tasks[idx])
def show_task_form(self, task):
# 先移除旧的 form_widget
if hasattr(self, "form_widget") and self.form_widget is not None:
self.top_layout.removeWidget(self.form_widget)
self.form_widget.deleteLater()
self.form_widget = None
# 新建 form_widget 和 form_layout
self.form_widget = QWidget()
self.form_layout = QVBoxLayout(self.form_widget)
self.form_layout.setContentsMargins(0, 0, 0, 0)
self.form_layout.setSpacing(12)
# 添加到右侧
self.top_layout.addWidget(self.form_widget, stretch=1)
if not task:
label = TitleLabel("暂无任务,请点击下方“添加任务”。")
label.setAlignment(Qt.AlignCenter)
self.form_layout.addStretch()
self.form_layout.addWidget(label)
self.form_layout.addStretch()
return
# 任务名称
row1 = QHBoxLayout()
label_name = BodyLabel("任务名称")
self.name_edit = LineEdit()
self.name_edit.setText(task["name"])
self.name_edit.setPlaceholderText("任务名称")
# 新增:名称编辑完成后刷新按钮
self.name_edit.editingFinished.connect(self.on_name_edit_finished)
row1.addWidget(label_name)
row1.addWidget(self.name_edit)
self.form_layout.addLayout(row1)
# 频率
row2 = QHBoxLayout()
label_freq = BodyLabel("任务运行频率")
self.freq_spin = DoubleSpinBox()
self.freq_spin.setRange(0, 10000)
self.freq_spin.setDecimals(3)
self.freq_spin.setSingleStep(1)
self.freq_spin.setSuffix(" Hz")
self.freq_spin.setValue(float(task.get("frequency", 500)))
row2.addWidget(label_freq)
row2.addWidget(self.freq_spin)
self.form_layout.addLayout(row2)
# 延迟
row3 = QHBoxLayout()
label_delay = BodyLabel("初始化延时")
self.delay_spin = SpinBox()
self.delay_spin.setRange(0, 10000)
self.delay_spin.setSuffix(" ms")
self.delay_spin.setValue(task.get("delay", 0))
row3.addWidget(label_delay)
row3.addWidget(self.delay_spin)
self.form_layout.addLayout(row3)
# 堆栈
row4 = QHBoxLayout()
label_stack = BodyLabel("堆栈大小")
self.stack_spin = SpinBox()
self.stack_spin.setRange(128, 8192)
self.stack_spin.setSingleStep(128)
self.stack_spin.setSuffix(" Byte") # 添加单位
self.stack_spin.setValue(task.get("stack", 256))
row4.addWidget(label_stack)
row4.addWidget(self.stack_spin)
self.form_layout.addLayout(row4)
# 频率控制
row5 = QHBoxLayout()
self.freq_ctrl = CheckBox("启用默认频率控制")
self.freq_ctrl.setChecked(task.get("freq_control", True))
row5.addWidget(self.freq_ctrl)
self.form_layout.addLayout(row5)
# 描述
label_desc = BodyLabel("任务描述")
self.desc_edit = TextEdit()
self.desc_edit.setText(task.get("description", ""))
self.desc_edit.setPlaceholderText("任务描述")
self.form_layout.addWidget(label_desc)
self.form_layout.addWidget(self.desc_edit)
self.form_layout.addStretch()
def on_name_edit_finished(self):
# 保存当前表单内容
self.save_form()
# 刷新左侧按钮名称
self.refresh_task_btns()
def _make_task_obj(self, task=None):
return {
"name": task["name"] if task else f"Task1",
"frequency": task.get("frequency", 500) if task else 500,
"delay": task.get("delay", 0) if task else 0,
"stack": task.get("stack", 256) if task else 256,
"description": task.get("description", "") if task else "",
"freq_control": task.get("freq_control", True) if task else True,
}
def save_form(self):
if self.current_index < 0 or self.current_index >= len(self.tasks):
return
t = self.tasks[self.current_index]
t["name"] = self.name_edit.text().strip()
t["frequency"] = float(self.freq_spin.value()) # 支持小数
t["delay"] = self.delay_spin.value()
t["stack"] = self.stack_spin.value()
t["description"] = self.desc_edit.toPlainText().strip()
t["freq_control"] = self.freq_ctrl.isChecked()
def get_tasks(self):
self.save_form()
tasks = []
for idx, t in enumerate(self.tasks):
name = t["name"].strip()
freq = t["frequency"]
delay = t["delay"]
stack = t["stack"]
desc = t["description"].strip()
freq_ctrl = t["freq_control"]
if stack < 128 or (stack & (stack - 1)) != 0 or stack % 128 != 0:
raise ValueError(f"{idx+1}个任务“{name}”的堆栈大小必须为128、256、512、1024等128*2^n")
task = {
"name": name,
"function": f"Task_{name}",
"delay": delay,
"stack": stack,
"description": desc,
"freq_control": freq_ctrl
}
if freq_ctrl:
task["frequency"] = freq
tasks.append(task)
return tasks

View File

@ -1,20 +0,0 @@
import re
def preserve_user_region(new_code, old_code, region_name):
"""
替换 new_code region_name 区域为 old_code 中的内容如果有
region_name: 'USER INCLUDE'
"""
pattern = re.compile(
rf"/\*\s*{region_name}\s*BEGIN\s*\*/(.*?)/\*\s*{region_name}\s*END\s*\*/",
re.DOTALL
)
old_match = pattern.search(old_code or "")
if not old_match:
return new_code # 旧文件没有该区域,直接返回新代码
old_content = old_match.group(1)
def repl(m):
return m.group(0).replace(m.group(1), old_content)
# 替换新代码中的该区域
return pattern.sub(repl, new_code, count=1)

View File

@ -1,25 +0,0 @@
class IocConfig:
def __init__(self, ioc_path):
self.ioc_path = ioc_path
self.config = {}
self._parse()
def _parse(self):
with open(self.ioc_path, encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
self.config[key.strip()] = value.strip()
def is_freertos_enabled(self):
ip_keys = [k for k in self.config if k.startswith('Mcu.IP')]
for k in ip_keys:
if self.config[k] == 'FREERTOS':
return True
for k in self.config:
if k.startswith('FREERTOS.'):
return True
return False

View File

@ -1,109 +0,0 @@
import os
import yaml
import textwrap
from jinja2 import Template
from .code_utils import preserve_user_region
def generate_task_code(task_list, project_path):
# base_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
template_dir = os.path.join(project_root, "User_code", "task")
output_dir = os.path.join(project_path, "User", "task")
os.makedirs(output_dir, exist_ok=True)
user_task_h_tpl = os.path.join(template_dir, "user_task.h.template")
user_task_c_tpl = os.path.join(template_dir, "user_task.c.template")
init_c_tpl = os.path.join(template_dir, "init.c.template")
task_c_tpl = os.path.join(template_dir, "task.c.template")
freq_tasks = [t for t in task_list if t.get("freq_control", True)]
def render_template(path, context):
with open(path, encoding="utf-8") as f:
tpl = Template(f.read())
return tpl.render(**context)
context_h = {
"thread_definitions": "\n".join([f" osThreadId_t {t['name']};" for t in task_list]),
"freq_definitions": "\n".join([f" float {t['name']};" for t in freq_tasks]),
"stack_definitions": "\n".join([f" UBaseType_t {t['name']};" for t in task_list]),
"last_up_time_definitions": "\n".join([f" float {t['name']};" for t in freq_tasks]),
"task_frequency_definitions": "\n".join([f"#define {t['name'].upper()}_FREQ ({t['frequency']})" for t in freq_tasks]),
"task_init_delay_definitions": "\n".join([f"#define {t['name'].upper()}_INIT_DELAY ({t['delay']})" for t in task_list]),
"task_attr_declarations": "\n".join([f"extern const osThreadAttr_t attr_{t['name']};" for t in task_list]),
"task_function_declarations": "\n".join([f"void {t['function']}(void *argument);" for t in task_list]),
}
# ----------- 生成 user_task.h -----------
user_task_h_path = os.path.join(output_dir, "user_task.h")
new_user_task_h = render_template(user_task_h_tpl, context_h)
if os.path.exists(user_task_h_path):
with open(user_task_h_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER MESSAGE", "USER CONFIG"]:
new_user_task_h = preserve_user_region(new_user_task_h, old_code, region)
with open(user_task_h_path, "w", encoding="utf-8") as f:
f.write(new_user_task_h)
# ----------- 生成 user_task.c -----------
context_c = {
"task_attr_definitions": "\n".join([
f"const osThreadAttr_t attr_{t['name']} = {{\n"
f" .name = \"{t['name']}\",\n"
f" .priority = osPriorityNormal,\n"
f" .stack_size = {t['stack']} * 4,\n"
f"}};"
for t in task_list
])
}
user_task_c = render_template(user_task_c_tpl, context_c)
with open(os.path.join(output_dir, "user_task.c"), "w", encoding="utf-8") as f:
f.write(user_task_c)
# ----------- 生成 init.c -----------
thread_creation_code = "\n".join([
f" task_runtime.thread.{t['name']} = osThreadNew({t['function']}, NULL, &attr_{t['name']});"
for t in task_list
])
context_init = {
"thread_creation_code": thread_creation_code,
}
init_c = render_template(init_c_tpl, context_init)
init_c_path = os.path.join(output_dir, "init.c")
if os.path.exists(init_c_path):
with open(init_c_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER CODE", "USER CODE INIT"]:
init_c = preserve_user_region(init_c, old_code, region)
with open(init_c_path, "w", encoding="utf-8") as f:
f.write(init_c)
# ----------- 生成 task.c -----------
for t in task_list:
desc = t.get("description", "")
desc_wrapped = "\n ".join(textwrap.wrap(desc, 20))
context_task = {
"task_name": t["name"],
"task_function": t["function"],
"task_frequency": f"{t['name'].upper()}_FREQ" if t.get("freq_control", True) else None,
"task_delay": f"{t['name'].upper()}_INIT_DELAY",
"task_description": desc_wrapped,
"freq_control": t.get("freq_control", True)
}
with open(task_c_tpl, encoding="utf-8") as f:
tpl = Template(f.read())
code = tpl.render(**context_task)
task_c_path = os.path.join(output_dir, f"{t['name']}.c")
if os.path.exists(task_c_path):
with open(task_c_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER STRUCT", "USER CODE", "USER CODE INIT"]:
code = preserve_user_region(code, old_code, region)
with open(task_c_path, "w", encoding="utf-8") as f:
f.write(code)
# ----------- 保存任务配置到 config.yaml -----------
config_yaml_path = os.path.join(output_dir, "config.yaml")
with open(config_yaml_path, "w", encoding="utf-8") as f:
yaml.safe_dump(task_list, f, allow_unicode=True)

17
test.py Normal file
View File

@ -0,0 +1,17 @@
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QStackedLayout, QFileDialog, QHeaderView
from qfluentwidgets import TitleLabel, BodyLabel, SubtitleLabel, StrongBodyLabel, HorizontalSeparator, PushButton, TreeWidget, InfoBar, FluentIcon, Dialog, FlowLayout, FluentWindow, MSFluentWindow, SplitFluentWindow
class Demo(SplitFluentWindow):
def __init__(self):
super().__init__()
layout = FlowLayout(self, needAni=True) # 启用动画
self.resize(250, 300)
if __name__ == "__main__":
from PyQt5.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
w = Demo()
w.show()
sys.exit(app.exec_())