奇怪的bug

This commit is contained in:
Robofish 2025-07-31 02:22:58 +08:00
parent 7951dae760
commit 08193b8093
6 changed files with 334 additions and 330 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -1,7 +1,12 @@
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QStackedLayout, QFileDialog, QHeaderView
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QTreeWidgetItem as TreeItem
from qfluentwidgets import TitleLabel, BodyLabel, SubtitleLabel, StrongBodyLabel, HorizontalSeparator, PushButton, TreeWidget, InfoBar,FluentIcon
from qfluentwidgets import TitleLabel, BodyLabel, SubtitleLabel, StrongBodyLabel, HorizontalSeparator, PushButton, TreeWidget, InfoBar,FluentIcon, Dialog
from PyQt5.QtWidgets import QDialog, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QSpinBox, QPushButton, QTableWidget, QTableWidgetItem, QHeaderView, QCheckBox
from qfluentwidgets import CardWidget, LineEdit, SpinBox, CheckBox, TextEdit, PrimaryPushButton, PushButton, InfoBar
from qfluentwidgets import HeaderCardWidget
from PyQt5.QtWidgets import QScrollArea, QWidget
import os
import requests
import zipfile
@ -12,6 +17,38 @@ import yaml
import textwrap
from jinja2 import Template
def preserve_all_user_regions(new_code, old_code):
"""
自动保留所有 /* USER XXX BEGIN */ ... /* USER XXX END */ 区域内容
new_code: 模板生成的新代码
old_code: 旧代码
返回合并后的代码
"""
import re
pattern = re.compile(
r"/\*\s*(USER [A-Z0-9_ ]+)\s*BEGIN\s*\*/(.*?)/\*\s*\1\s*END\s*\*/",
re.DOTALL
)
old_regions = {m.group(1): m.group(2) for m in pattern.finditer(old_code or "")}
def repl(m):
region = m.group(1)
old_content = old_regions.get(region)
if old_content is not None:
return m.group(0).replace(m.group(2), old_content)
return m.group(0)
return pattern.sub(repl, new_code)
def save_with_preserve(path, new_code):
"""
写入文件自动保留所有用户自定义区域
"""
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
old_code = f.read()
new_code = preserve_all_user_regions(new_code, old_code)
with open(path, "w", encoding="utf-8") as f:
f.write(new_code)
class IocConfig:
def __init__(self, ioc_path):
self.ioc_path = ioc_path
@ -322,9 +359,8 @@ class DataInterface(QWidget):
self.file_tree.setHeaderLabels(["模块名", "描述"])
self.file_tree.setSelectionMode(self.file_tree.ExtendedSelection)
self.file_tree.header().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.file_tree.header().setSectionResizeMode(1, QHeaderView.Stretch) # 描述列自适应
self.file_tree.setCheckedColor("#0078d4", "#2d7d9a")
self.file_tree.header().setSectionResizeMode(0, QHeaderView.Interactive)
self.file_tree.header().setSectionResizeMode(1, QHeaderView.Interactive)
self.file_tree.setBorderRadius(8)
self.file_tree.setBorderVisible(True)
@ -528,111 +564,6 @@ class DataInterface(QWidget):
)
def open_task_config_dialog(self):
from PyQt5.QtWidgets import QDialog, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QSpinBox, QPushButton, QTableWidget, QTableWidgetItem, QHeaderView, QCheckBox
import yaml
import os
class TaskConfigDialog(QDialog):
def __init__(self, parent=None, config_path=None):
super().__init__(parent)
self.setWindowTitle("任务配置")
self.resize(900, 420)
layout = QVBoxLayout(self)
self.table = QTableWidget(0, 6)
self.table.setHorizontalHeaderLabels(["任务名称", "运行频率", "初始化延迟", "堆栈大小", "任务描述", "频率控制"])
self.table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(2, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(3, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(4, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(5, QHeaderView.ResizeToContents)
self.table.setColumnWidth(4, 320) # 任务描述更宽
layout.addWidget(self.table)
btn_layout = QHBoxLayout()
add_btn = QPushButton("添加任务")
del_btn = QPushButton("删除选中")
ok_btn = QPushButton("生成")
cancel_btn = QPushButton("取消")
btn_layout.addWidget(add_btn)
btn_layout.addWidget(del_btn)
btn_layout.addStretch()
btn_layout.addWidget(ok_btn)
btn_layout.addWidget(cancel_btn)
layout.addLayout(btn_layout)
add_btn.clicked.connect(self.add_row)
del_btn.clicked.connect(self.del_row)
ok_btn.clicked.connect(self.accept)
cancel_btn.clicked.connect(self.reject)
# 自动读取配置文件
if config_path and os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
tasks = yaml.safe_load(f)
if tasks:
for t in tasks:
row = self.table.rowCount()
self.table.insertRow(row)
for col, key in enumerate(["name", "frequency", "delay", "stack", "description"]):
item = QTableWidgetItem(str(t.get(key, "")))
item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, col, item)
# 新增频率控制复选框
freq_ctrl = QCheckBox()
freq_ctrl.setChecked(t.get("freq_control", True))
self.table.setCellWidget(row, 5, freq_ctrl)
except Exception as e:
pass # 配置文件损坏时忽略
def add_row(self):
row = self.table.rowCount()
self.table.insertRow(row)
default_values = [
f"Task{row+1}", "500", "0", "256", "不要偷懒,请写清楚每个任务的作用!(如果你看到任务上面是这句话,说明作者是个懒蛋)"
]
for col, val in enumerate(default_values):
item = QTableWidgetItem(val)
item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, col, item)
freq_ctrl = QCheckBox()
freq_ctrl.setChecked(True)
self.table.setCellWidget(row, 5, freq_ctrl)
def del_row(self):
rows = set([i.row() for i in self.table.selectedItems()])
for r in sorted(rows, reverse=True):
self.table.removeRow(r)
def get_tasks(self):
tasks = []
for row in range(self.table.rowCount()):
name = self.table.item(row, 0).text().strip()
freq = self.table.item(row, 1).text()
delay = int(self.table.item(row, 2).text())
stack = int(self.table.item(row, 3).text())
desc = self.table.item(row, 4).text().strip()
freq_ctrl = self.table.cellWidget(row, 5).isChecked()
# 校验 stack 必须为 128*2^n
if stack < 128 or (stack & (stack - 1)) != 0 or stack % 128 != 0:
raise ValueError(f"{row+1}行任务“{name}”的堆栈大小必须为128、256、512、1024等128*2^n")
task = {
"name": name,
"function": f"Task_{name}",
"delay": delay,
"stack": stack,
"description": desc,
"freq_control": freq_ctrl
}
if freq_ctrl:
task["frequency"] = int(freq)
tasks.append(task)
return tasks
config_path = os.path.join(self.project_path, "User", "task", "config.yaml")
dlg = TaskConfigDialog(self, config_path=config_path)
if dlg.exec() == QDialog.Accepted:
@ -670,44 +601,26 @@ class DataInterface(QWidget):
duration=3000
)
def preserve_user_region(self, new_code, old_code, region_name):
"""
替换 new_code region_name 区域为 old_code 中的内容如果有
region_name: 'USER INCLUDE'
"""
pattern = re.compile(
rf"/\*\s*{region_name}\s*BEGIN\s*\*/(.*?)/\*\s*{region_name}\s*END\s*\*/",
re.DOTALL
)
old_match = pattern.search(old_code or "")
if not old_match:
return new_code # 旧文件没有该区域,直接返回新代码
old_content = old_match.group(1)
def repl(m):
return m.group(0).replace(m.group(1), old_content)
# 替换新代码中的该区域
return pattern.sub(repl, new_code, count=1)
def generate_task_code(self, task_list):
base_dir = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(base_dir, "../assets/User_code/task")
output_dir = os.path.join(self.project_path, "User", "task")
os.makedirs(output_dir, exist_ok=True)
user_task_h_tpl = os.path.join(template_dir, "user_task.h.template")
user_task_c_tpl = os.path.join(template_dir, "user_task.c.template")
init_c_tpl = os.path.join(template_dir, "init.c.template")
task_c_tpl = os.path.join(template_dir, "task.c.template")
freq_tasks = [t for t in task_list if t.get("freq_control", True)]
def render_template(path, context):
with open(path, encoding="utf-8") as f:
tpl = Template(f.read())
return tpl.render(**context)
# ----------- 生成 user_task.h -----------
context_h = {
"thread_definitions": "\n".join([f" osThreadId_t {t['name']};" for t in task_list]),
"freq_definitions": "\n".join([f" float {t['name']};" for t in freq_tasks]),
@ -718,27 +631,10 @@ class DataInterface(QWidget):
"task_attr_declarations": "\n".join([f"extern const osThreadAttr_t attr_{t['name']};" for t in task_list]),
"task_function_declarations": "\n".join([f"void {t['function']}(void *argument);" for t in task_list]),
}
# ----------- 生成 user_task.h -----------
user_task_h_path = os.path.join(output_dir, "user_task.h")
new_user_task_h = render_template(user_task_h_tpl, context_h)
if os.path.exists(user_task_h_path):
with open(user_task_h_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER MESSAGE", "USER CONFIG"]:
pattern = re.compile(
rf"/\*\s*{region}\s*BEGIN\s*\*/(.*?)/\*\s*{region}\s*END\s*\*/",
re.DOTALL
)
old_match = pattern.search(old_code)
if old_match and old_match.group(1).strip():
new_user_task_h = self.preserve_user_region(
new_user_task_h, old_code, region
)
with open(user_task_h_path, "w", encoding="utf-8") as f:
f.write(new_user_task_h)
save_with_preserve(user_task_h_path, new_user_task_h)
# ----------- 生成 user_task.c -----------
context_c = {
"task_attr_definitions": "\n".join([
@ -750,10 +646,10 @@ class DataInterface(QWidget):
for t in task_list
])
}
user_task_c_path = os.path.join(output_dir, "user_task.c")
user_task_c = render_template(user_task_c_tpl, context_c)
with open(os.path.join(output_dir, "user_task.c"), "w", encoding="utf-8") as f:
f.write(user_task_c)
save_with_preserve(user_task_c_path, user_task_c)
# ----------- 生成 init.c -----------
thread_creation_code = "\n".join([
f" task_runtime.thread.{t['name']} = osThreadNew({t['function']}, NULL, &attr_{t['name']});"
@ -762,18 +658,10 @@ class DataInterface(QWidget):
context_init = {
"thread_creation_code": thread_creation_code,
}
init_c = render_template(init_c_tpl, context_init)
init_c_path = os.path.join(output_dir, "init.c")
if os.path.exists(init_c_path):
with open(init_c_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER CODE", "USER CODE INIT"]:
init_c = self.preserve_user_region(
init_c, old_code, region
)
with open(init_c_path, "w", encoding="utf-8") as f:
f.write(init_c)
init_c = render_template(init_c_tpl, context_init)
save_with_preserve(init_c_path, init_c)
# ----------- 生成 task.c -----------
for t in task_list:
desc = t.get("description", "")
@ -790,18 +678,271 @@ class DataInterface(QWidget):
tpl = Template(f.read())
code = tpl.render(**context_task)
task_c_path = os.path.join(output_dir, f"{t['name']}.c")
if os.path.exists(task_c_path):
with open(task_c_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER STRUCT", "USER CODE", "USER CODE INIT"]:
code = self.preserve_user_region(
code, old_code, region
)
with open(task_c_path, "w", encoding="utf-8") as f:
f.write(code)
save_with_preserve(task_c_path, code)
# ----------- 保存任务配置到 config.yaml -----------
config_yaml_path = os.path.join(output_dir, "config.yaml")
with open(config_yaml_path, "w", encoding="utf-8") as f:
yaml.safe_dump(task_list, f, allow_unicode=True)
class TaskConfigDialog(QDialog):
def __init__(self, parent=None, config_path=None):
super().__init__(parent)
self.setWindowTitle("任务配置")
self.resize(900, 520)
# 主布局
main_layout = QVBoxLayout(self)
main_layout.setContentsMargins(16, 16, 16, 16)
main_layout.setSpacing(12)
# 顶部横向分栏
self.top_layout = QHBoxLayout() # 注意:改为 self.top_layout
self.top_layout.setSpacing(16)
# 左侧:任务列表(按钮)
left_widget = QWidget()
left_vbox = QVBoxLayout(left_widget)
left_vbox.setContentsMargins(0, 0, 0, 0)
left_vbox.setSpacing(8)
self.add_btn = PrimaryPushButton("添加任务")
self.add_btn.clicked.connect(self.add_task)
left_vbox.addWidget(self.add_btn)
self.task_btn_area = QScrollArea()
self.task_btn_area.setWidgetResizable(True)
self.task_btn_area.setFrameShape(QScrollArea.NoFrame)
self.task_btn_container = QWidget()
self.task_btn_layout = QVBoxLayout(self.task_btn_container)
self.task_btn_layout.setContentsMargins(0, 0, 0, 0)
self.task_btn_layout.setSpacing(4)
self.task_btn_layout.addStretch()
self.task_btn_area.setWidget(self.task_btn_container)
left_vbox.addWidget(self.task_btn_area, stretch=1)
left_widget.setFixedWidth(220)
self.top_layout.addWidget(left_widget, stretch=0)
main_layout.addLayout(self.top_layout, stretch=1)
# 下方按钮区
btn_layout = QHBoxLayout()
btn_layout.addStretch()
self.ok_btn = PrimaryPushButton("生成")
self.cancel_btn = PushButton("取消")
btn_layout.addWidget(self.ok_btn)
btn_layout.addWidget(self.cancel_btn)
main_layout.addLayout(btn_layout)
self.ok_btn.clicked.connect(self.accept)
self.cancel_btn.clicked.connect(self.reject)
self.tasks = []
self.current_index = -1
if config_path and os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
tasks = yaml.safe_load(f)
if tasks:
for t in tasks:
self.tasks.append(self._make_task_obj(t))
except Exception:
pass
if not self.tasks:
self.tasks.append(self._make_task_obj())
self.refresh_task_btns()
self.select_task(0)
def show_task_form(self, task):
# 先移除旧的 form_widget
if hasattr(self, "form_widget") and self.form_widget is not None:
self.top_layout.removeWidget(self.form_widget)
self.form_widget.deleteLater()
self.form_widget = None
# 新建 form_widget 和 form_layout
self.form_widget = QWidget()
self.form_layout = QVBoxLayout(self.form_widget)
self.form_layout.setContentsMargins(0, 0, 0, 0)
self.form_layout.setSpacing(12)
# 添加到右侧
self.top_layout.addWidget(self.form_widget, stretch=1)
if not task:
label = QLabel("选择左侧任务。")
label.setAlignment(Qt.AlignCenter)
self.form_layout.addWidget(label)
self.form_layout.addStretch()
return
# 任务名称
row1 = QHBoxLayout()
label_name = QLabel("任务名称")
self.name_edit = LineEdit()
self.name_edit.setText(task["name"])
self.name_edit.setPlaceholderText("任务名称")
row1.addWidget(label_name)
row1.addWidget(self.name_edit)
self.form_layout.addLayout(row1)
# 频率
row2 = QHBoxLayout()
label_freq = QLabel("频率")
self.freq_spin = SpinBox()
self.freq_spin.setRange(1, 10000)
self.freq_spin.setSuffix(" Hz")
self.freq_spin.setValue(task.get("frequency", 500))
row2.addWidget(label_freq)
row2.addWidget(self.freq_spin)
self.form_layout.addLayout(row2)
# 延迟
row3 = QHBoxLayout()
label_delay = QLabel("延迟")
self.delay_spin = SpinBox()
self.delay_spin.setRange(0, 10000)
self.delay_spin.setSuffix(" ms")
self.delay_spin.setValue(task.get("delay", 0))
row3.addWidget(label_delay)
row3.addWidget(self.delay_spin)
self.form_layout.addLayout(row3)
# 堆栈
row4 = QHBoxLayout()
label_stack = QLabel("堆栈")
self.stack_spin = SpinBox()
self.stack_spin.setRange(128, 8192)
self.stack_spin.setSingleStep(128)
self.stack_spin.setValue(task.get("stack", 256))
row4.addWidget(label_stack)
row4.addWidget(self.stack_spin)
self.form_layout.addLayout(row4)
# 频率控制
row5 = QHBoxLayout()
self.freq_ctrl = CheckBox("启用频率控制")
self.freq_ctrl.setChecked(task.get("freq_control", True))
row5.addWidget(self.freq_ctrl)
self.form_layout.addLayout(row5)
# 描述
label_desc = QLabel("任务描述")
self.desc_edit = TextEdit()
self.desc_edit.setText(task.get("description", ""))
self.desc_edit.setPlaceholderText("任务描述")
self.form_layout.addWidget(label_desc)
self.form_layout.addWidget(self.desc_edit)
self.form_layout.addStretch()
def _make_task_obj(self, task=None):
# 生成一个任务数据结构
return {
"name": task["name"] if task else f"Task{len(self.tasks)+1}",
"frequency": task.get("frequency", 500) if task else 500,
"delay": task.get("delay", 0) if task else 0,
"stack": task.get("stack", 256) if task else 256,
"description": task.get("description", "") if task else "",
"freq_control": task.get("freq_control", True) if task else True,
}
def refresh_task_btns(self):
# 清空所有控件和stretch
while self.task_btn_layout.count():
item = self.task_btn_layout.takeAt(0)
widget = item.widget()
if widget is not None:
widget.deleteLater()
else:
# 是 QSpacerItem 也要删除
del item
# 重新添加任务按钮
for idx, t in enumerate(self.tasks):
row = QWidget()
hbox = QHBoxLayout(row)
hbox.setContentsMargins(0, 0, 0, 0)
hbox.setSpacing(4)
btn = PushButton(t["name"])
btn.setCheckable(True)
btn.setChecked(idx == self.current_index)
btn.clicked.connect(lambda _, i=idx: self.select_task(i))
hbox.addWidget(btn, stretch=1)
row.setLayout(hbox)
self.task_btn_layout.addWidget(row)
self.task_btn_layout.addStretch()
def select_task(self, idx):
if idx < 0 or idx >= len(self.tasks):
# 没有可选任务时,右侧显示提示
self.show_task_form(None)
return
# 保存当前表单内容
self.save_form()
self.current_index = idx
self.refresh_task_btns()
self.show_task_form(self.tasks[idx])
def add_task(self):
self.save_form()
self.tasks.append(self._make_task_obj())
self.refresh_task_btns()
self.select_task(len(self.tasks) - 1)
def delete_task(self, idx):
if len(self.tasks) <= 1:
return # 至少保留一个
if idx == self.current_index:
# 删除当前,切换到前一个
self.tasks.pop(idx)
self.current_index = max(0, idx - 1)
else:
self.tasks.pop(idx)
if self.current_index > idx:
self.current_index -= 1
self.refresh_task_btns()
self.select_task(self.current_index)
def save_form(self):
# 保存当前表单内容到 self.tasks[self.current_index]
if self.current_index < 0 or self.current_index >= len(self.tasks):
return
t = self.tasks[self.current_index]
t["name"] = self.name_edit.text().strip()
t["frequency"] = self.freq_spin.value()
t["delay"] = self.delay_spin.value()
t["stack"] = self.stack_spin.value()
t["description"] = self.desc_edit.toPlainText().strip()
t["freq_control"] = self.freq_ctrl.isChecked()
def get_tasks(self):
self.save_form()
tasks = []
for idx, t in enumerate(self.tasks):
name = t["name"].strip()
freq = t["frequency"]
delay = t["delay"]
stack = t["stack"]
desc = t["description"].strip()
freq_ctrl = t["freq_control"]
# 校验 stack 必须为 128*2^n
if stack < 128 or (stack & (stack - 1)) != 0 or stack % 128 != 0:
raise ValueError(f"{idx+1}个任务“{name}”的堆栈大小必须为128、256、512、1024等128*2^n")
task = {
"name": name,
"function": f"Task_{name}",
"delay": delay,
"stack": stack,
"description": desc,
"freq_control": freq_ctrl
}
if freq_ctrl:
task["frequency"] = freq
tasks.append(task)
return tasks

View File

@ -1,20 +0,0 @@
import re
def preserve_user_region(new_code, old_code, region_name):
"""
替换 new_code region_name 区域为 old_code 中的内容如果有
region_name: 'USER INCLUDE'
"""
pattern = re.compile(
rf"/\*\s*{region_name}\s*BEGIN\s*\*/(.*?)/\*\s*{region_name}\s*END\s*\*/",
re.DOTALL
)
old_match = pattern.search(old_code or "")
if not old_match:
return new_code # 旧文件没有该区域,直接返回新代码
old_content = old_match.group(1)
def repl(m):
return m.group(0).replace(m.group(1), old_content)
# 替换新代码中的该区域
return pattern.sub(repl, new_code, count=1)

View File

@ -1,25 +0,0 @@
class IocConfig:
def __init__(self, ioc_path):
self.ioc_path = ioc_path
self.config = {}
self._parse()
def _parse(self):
with open(self.ioc_path, encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
self.config[key.strip()] = value.strip()
def is_freertos_enabled(self):
ip_keys = [k for k in self.config if k.startswith('Mcu.IP')]
for k in ip_keys:
if self.config[k] == 'FREERTOS':
return True
for k in self.config:
if k.startswith('FREERTOS.'):
return True
return False

View File

@ -1,109 +0,0 @@
import os
import yaml
import textwrap
from jinja2 import Template
from .code_utils import preserve_user_region
def generate_task_code(task_list, project_path):
# base_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
template_dir = os.path.join(project_root, "User_code", "task")
output_dir = os.path.join(project_path, "User", "task")
os.makedirs(output_dir, exist_ok=True)
user_task_h_tpl = os.path.join(template_dir, "user_task.h.template")
user_task_c_tpl = os.path.join(template_dir, "user_task.c.template")
init_c_tpl = os.path.join(template_dir, "init.c.template")
task_c_tpl = os.path.join(template_dir, "task.c.template")
freq_tasks = [t for t in task_list if t.get("freq_control", True)]
def render_template(path, context):
with open(path, encoding="utf-8") as f:
tpl = Template(f.read())
return tpl.render(**context)
context_h = {
"thread_definitions": "\n".join([f" osThreadId_t {t['name']};" for t in task_list]),
"freq_definitions": "\n".join([f" float {t['name']};" for t in freq_tasks]),
"stack_definitions": "\n".join([f" UBaseType_t {t['name']};" for t in task_list]),
"last_up_time_definitions": "\n".join([f" float {t['name']};" for t in freq_tasks]),
"task_frequency_definitions": "\n".join([f"#define {t['name'].upper()}_FREQ ({t['frequency']})" for t in freq_tasks]),
"task_init_delay_definitions": "\n".join([f"#define {t['name'].upper()}_INIT_DELAY ({t['delay']})" for t in task_list]),
"task_attr_declarations": "\n".join([f"extern const osThreadAttr_t attr_{t['name']};" for t in task_list]),
"task_function_declarations": "\n".join([f"void {t['function']}(void *argument);" for t in task_list]),
}
# ----------- 生成 user_task.h -----------
user_task_h_path = os.path.join(output_dir, "user_task.h")
new_user_task_h = render_template(user_task_h_tpl, context_h)
if os.path.exists(user_task_h_path):
with open(user_task_h_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER MESSAGE", "USER CONFIG"]:
new_user_task_h = preserve_user_region(new_user_task_h, old_code, region)
with open(user_task_h_path, "w", encoding="utf-8") as f:
f.write(new_user_task_h)
# ----------- 生成 user_task.c -----------
context_c = {
"task_attr_definitions": "\n".join([
f"const osThreadAttr_t attr_{t['name']} = {{\n"
f" .name = \"{t['name']}\",\n"
f" .priority = osPriorityNormal,\n"
f" .stack_size = {t['stack']} * 4,\n"
f"}};"
for t in task_list
])
}
user_task_c = render_template(user_task_c_tpl, context_c)
with open(os.path.join(output_dir, "user_task.c"), "w", encoding="utf-8") as f:
f.write(user_task_c)
# ----------- 生成 init.c -----------
thread_creation_code = "\n".join([
f" task_runtime.thread.{t['name']} = osThreadNew({t['function']}, NULL, &attr_{t['name']});"
for t in task_list
])
context_init = {
"thread_creation_code": thread_creation_code,
}
init_c = render_template(init_c_tpl, context_init)
init_c_path = os.path.join(output_dir, "init.c")
if os.path.exists(init_c_path):
with open(init_c_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER CODE", "USER CODE INIT"]:
init_c = preserve_user_region(init_c, old_code, region)
with open(init_c_path, "w", encoding="utf-8") as f:
f.write(init_c)
# ----------- 生成 task.c -----------
for t in task_list:
desc = t.get("description", "")
desc_wrapped = "\n ".join(textwrap.wrap(desc, 20))
context_task = {
"task_name": t["name"],
"task_function": t["function"],
"task_frequency": f"{t['name'].upper()}_FREQ" if t.get("freq_control", True) else None,
"task_delay": f"{t['name'].upper()}_INIT_DELAY",
"task_description": desc_wrapped,
"freq_control": t.get("freq_control", True)
}
with open(task_c_tpl, encoding="utf-8") as f:
tpl = Template(f.read())
code = tpl.render(**context_task)
task_c_path = os.path.join(output_dir, f"{t['name']}.c")
if os.path.exists(task_c_path):
with open(task_c_path, "r", encoding="utf-8") as f:
old_code = f.read()
for region in ["USER INCLUDE", "USER STRUCT", "USER CODE", "USER CODE INIT"]:
code = preserve_user_region(code, old_code, region)
with open(task_c_path, "w", encoding="utf-8") as f:
f.write(code)
# ----------- 保存任务配置到 config.yaml -----------
config_yaml_path = os.path.join(output_dir, "config.yaml")
with open(config_yaml_path, "w", encoding="utf-8") as f:
yaml.safe_dump(task_list, f, allow_unicode=True)

17
test.py Normal file
View File

@ -0,0 +1,17 @@
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QHBoxLayout, QStackedLayout, QFileDialog, QHeaderView
from qfluentwidgets import TitleLabel, BodyLabel, SubtitleLabel, StrongBodyLabel, HorizontalSeparator, PushButton, TreeWidget, InfoBar, FluentIcon, Dialog, FlowLayout, FluentWindow, MSFluentWindow, SplitFluentWindow
class Demo(SplitFluentWindow):
def __init__(self):
super().__init__()
layout = FlowLayout(self, needAni=True) # 启用动画
self.resize(250, 300)
if __name__ == "__main__":
from PyQt5.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
w = Demo()
w.show()
sys.exit(app.exec_())