mirror of
https://github.com/goldenfishs/MRobot.git
synced 2025-07-05 06:54:17 +08:00
修改保留用户区域逻辑
This commit is contained in:
parent
f2fedac360
commit
62b4b07912
124
MRobot.py
124
MRobot.py
@ -48,9 +48,15 @@ from qfluentwidgets import (
|
|||||||
SettingCardGroup, ExpandSettingCard, SubtitleLabel, BodyLabel, HorizontalSeparator, FluentIcon, InfoBar
|
SettingCardGroup, ExpandSettingCard, SubtitleLabel, BodyLabel, HorizontalSeparator, FluentIcon, InfoBar
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
import os
|
||||||
|
from jinja2 import Template
|
||||||
|
import yaml
|
||||||
|
import re
|
||||||
|
import textwrap
|
||||||
# 添加quote
|
# 添加quote
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
|
import re
|
||||||
from packaging.version import parse as vparse
|
from packaging.version import parse as vparse
|
||||||
__version__ = "1.0.1"
|
__version__ = "1.0.1"
|
||||||
|
|
||||||
@ -749,8 +755,6 @@ class DataInterface(BaseInterface):
|
|||||||
cancel_btn.clicked.connect(self.reject)
|
cancel_btn.clicked.connect(self.reject)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# 自动读取配置文件
|
# 自动读取配置文件
|
||||||
if config_path and os.path.exists(config_path):
|
if config_path and os.path.exists(config_path):
|
||||||
try:
|
try:
|
||||||
@ -855,26 +859,38 @@ class DataInterface(BaseInterface):
|
|||||||
parent=self,
|
parent=self,
|
||||||
duration=3000
|
duration=3000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def preserve_user_region(self, new_code, old_code, region_name):
|
||||||
|
"""
|
||||||
|
替换 new_code 中 region_name 区域为 old_code 中的内容(如果有)
|
||||||
|
region_name: 如 'USER INCLUDE'
|
||||||
|
"""
|
||||||
|
pattern = re.compile(
|
||||||
|
rf"/\*\s*{region_name}\s*BEGIN\s*\*/(.*?)/\*\s*{region_name}\s*END\s*\*/",
|
||||||
|
re.DOTALL
|
||||||
|
)
|
||||||
|
old_match = pattern.search(old_code or "")
|
||||||
|
if not old_match:
|
||||||
|
return new_code # 旧文件没有该区域,直接返回新代码
|
||||||
|
|
||||||
|
old_content = old_match.group(1)
|
||||||
|
def repl(m):
|
||||||
|
return m.group(0).replace(m.group(1), old_content)
|
||||||
|
# 替换新代码中的该区域
|
||||||
|
return pattern.sub(repl, new_code, count=1)
|
||||||
|
|
||||||
def generate_task_code(self, task_list):
|
def generate_task_code(self, task_list):
|
||||||
import os
|
|
||||||
from jinja2 import Template
|
|
||||||
import yaml
|
|
||||||
import re
|
|
||||||
import textwrap
|
|
||||||
|
|
||||||
base_dir = os.path.dirname(os.path.abspath(__file__))
|
base_dir = os.path.dirname(os.path.abspath(__file__))
|
||||||
template_dir = os.path.join(base_dir, "User_code", "task")
|
template_dir = os.path.join(base_dir, "User_code", "task")
|
||||||
output_dir = os.path.join(self.project_path, "User", "task")
|
output_dir = os.path.join(self.project_path, "User", "task")
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
# 模板路径
|
|
||||||
user_task_h_tpl = os.path.join(template_dir, "user_task.h.template")
|
user_task_h_tpl = os.path.join(template_dir, "user_task.h.template")
|
||||||
user_task_c_tpl = os.path.join(template_dir, "user_task.c.template")
|
user_task_c_tpl = os.path.join(template_dir, "user_task.c.template")
|
||||||
init_c_tpl = os.path.join(template_dir, "init.c.template")
|
init_c_tpl = os.path.join(template_dir, "init.c.template")
|
||||||
task_c_tpl = os.path.join(template_dir, "task.c.template")
|
task_c_tpl = os.path.join(template_dir, "task.c.template")
|
||||||
|
|
||||||
# 只统计需要频率控制的任务
|
|
||||||
freq_tasks = [t for t in task_list if t.get("freq_control", True)]
|
freq_tasks = [t for t in task_list if t.get("freq_control", True)]
|
||||||
|
|
||||||
def render_template(path, context):
|
def render_template(path, context):
|
||||||
@ -882,7 +898,6 @@ class DataInterface(BaseInterface):
|
|||||||
tpl = Template(f.read())
|
tpl = Template(f.read())
|
||||||
return tpl.render(**context)
|
return tpl.render(**context)
|
||||||
|
|
||||||
# 构造模板上下文
|
|
||||||
context_h = {
|
context_h = {
|
||||||
"thread_definitions": "\n".join([f" osThreadId_t {t['name']};" for t in task_list]),
|
"thread_definitions": "\n".join([f" osThreadId_t {t['name']};" for t in task_list]),
|
||||||
"freq_definitions": "\n".join([f" float {t['name']};" for t in freq_tasks]),
|
"freq_definitions": "\n".join([f" float {t['name']};" for t in freq_tasks]),
|
||||||
@ -893,49 +908,27 @@ class DataInterface(BaseInterface):
|
|||||||
"task_attr_declarations": "\n".join([f"extern const osThreadAttr_t attr_{t['name']};" for t in task_list]),
|
"task_attr_declarations": "\n".join([f"extern const osThreadAttr_t attr_{t['name']};" for t in task_list]),
|
||||||
"task_function_declarations": "\n".join([f"void {t['function']}(void *argument);" for t in task_list]),
|
"task_function_declarations": "\n".join([f"void {t['function']}(void *argument);" for t in task_list]),
|
||||||
}
|
}
|
||||||
|
|
||||||
# ----------- 用户区域保护函数 -----------
|
|
||||||
def preserve_user_region(new_code, old_code, region_name):
|
|
||||||
"""
|
|
||||||
替换 new_code 中 region_name 区域为 old_code 中的内容(如果有)
|
|
||||||
region_name: 如 'USER INCLUDE'
|
|
||||||
"""
|
|
||||||
pattern = re.compile(
|
|
||||||
rf"/\*\s*{region_name}\s*BEGIN\s*\*/(.*?)/\*\s*{region_name}\s*END\s*\*/",
|
|
||||||
re.DOTALL
|
|
||||||
)
|
|
||||||
old_match = pattern.search(old_code or "")
|
|
||||||
if not old_match:
|
|
||||||
return new_code # 旧文件没有该区域,直接返回新代码
|
|
||||||
|
|
||||||
old_content = old_match.group(1)
|
|
||||||
def repl(m):
|
|
||||||
return m.group(0).replace(m.group(1), old_content)
|
|
||||||
# 替换新代码中的该区域
|
|
||||||
return pattern.sub(repl, new_code, count=1)
|
|
||||||
|
|
||||||
# ----------- 生成 user_task.h -----------
|
# ----------- 生成 user_task.h -----------
|
||||||
user_task_h_path = os.path.join(output_dir, "user_task.h")
|
user_task_h_path = os.path.join(output_dir, "user_task.h")
|
||||||
new_user_task_h = render_template(user_task_h_tpl, context_h)
|
new_user_task_h = render_template(user_task_h_tpl, context_h)
|
||||||
|
|
||||||
# 检查并保留所有用户区域
|
|
||||||
if os.path.exists(user_task_h_path):
|
if os.path.exists(user_task_h_path):
|
||||||
with open(user_task_h_path, "r", encoding="utf-8") as f:
|
with open(user_task_h_path, "r", encoding="utf-8") as f:
|
||||||
old_code = f.read()
|
old_code = f.read()
|
||||||
# 只保留有内容的用户区域
|
|
||||||
for region in ["USER INCLUDE", "USER MESSAGE", "USER CONFIG"]:
|
for region in ["USER INCLUDE", "USER MESSAGE", "USER CONFIG"]:
|
||||||
# 如果旧文件该区域有内容,则保留
|
|
||||||
pattern = re.compile(
|
pattern = re.compile(
|
||||||
rf"/\*\s*{region}\s*BEGIN\s*\*/(.*?)/\*\s*{region}\s*END\s*\*/",
|
rf"/\*\s*{region}\s*BEGIN\s*\*/(.*?)/\*\s*{region}\s*END\s*\*/",
|
||||||
re.DOTALL
|
re.DOTALL
|
||||||
)
|
)
|
||||||
old_match = pattern.search(old_code)
|
old_match = pattern.search(old_code)
|
||||||
if old_match and old_match.group(1).strip():
|
if old_match and old_match.group(1).strip():
|
||||||
new_user_task_h = preserve_user_region(new_user_task_h, old_code, region)
|
new_user_task_h = self.preserve_user_region(
|
||||||
# 写入
|
new_user_task_h, old_code, region
|
||||||
|
)
|
||||||
with open(user_task_h_path, "w", encoding="utf-8") as f:
|
with open(user_task_h_path, "w", encoding="utf-8") as f:
|
||||||
f.write(new_user_task_h)
|
f.write(new_user_task_h)
|
||||||
|
|
||||||
# ----------- 生成 user_task.c -----------
|
# ----------- 生成 user_task.c -----------
|
||||||
context_c = {
|
context_c = {
|
||||||
"task_attr_definitions": "\n".join([
|
"task_attr_definitions": "\n".join([
|
||||||
@ -950,44 +943,27 @@ class DataInterface(BaseInterface):
|
|||||||
user_task_c = render_template(user_task_c_tpl, context_c)
|
user_task_c = render_template(user_task_c_tpl, context_c)
|
||||||
with open(os.path.join(output_dir, "user_task.c"), "w", encoding="utf-8") as f:
|
with open(os.path.join(output_dir, "user_task.c"), "w", encoding="utf-8") as f:
|
||||||
f.write(user_task_c)
|
f.write(user_task_c)
|
||||||
|
|
||||||
# ----------- 生成 init.c -----------
|
# ----------- 生成 init.c -----------
|
||||||
# 线程创建代码
|
|
||||||
thread_creation_code = "\n".join([
|
thread_creation_code = "\n".join([
|
||||||
f" task_runtime.thread.{t['name']} = osThreadNew({t['function']}, NULL, &attr_{t['name']});"
|
f" task_runtime.thread.{t['name']} = osThreadNew({t['function']}, NULL, &attr_{t['name']});"
|
||||||
for t in task_list
|
for t in task_list
|
||||||
])
|
])
|
||||||
|
|
||||||
context_init = {
|
context_init = {
|
||||||
"thread_creation_code": thread_creation_code,
|
"thread_creation_code": thread_creation_code,
|
||||||
}
|
}
|
||||||
# 渲染模板
|
|
||||||
init_c = render_template(init_c_tpl, context_init)
|
init_c = render_template(init_c_tpl, context_init)
|
||||||
|
|
||||||
# 保留 USER MESSAGE 区域
|
|
||||||
def preserve_user_region(new_code, old_code, region_name):
|
|
||||||
pattern = re.compile(
|
|
||||||
rf"/\*\s*{region_name}\s*BEGIN\s*\*/(.*?)/\*\s*{region_name}\s*END\s*\*/",
|
|
||||||
re.DOTALL
|
|
||||||
)
|
|
||||||
old_match = pattern.search(old_code or "")
|
|
||||||
if not old_match:
|
|
||||||
return new_code
|
|
||||||
old_content = old_match.group(1)
|
|
||||||
def repl(m):
|
|
||||||
return m.group(0).replace(m.group(1), old_content)
|
|
||||||
return pattern.sub(repl, new_code, count=1)
|
|
||||||
|
|
||||||
init_c_path = os.path.join(output_dir, "init.c")
|
init_c_path = os.path.join(output_dir, "init.c")
|
||||||
if os.path.exists(init_c_path):
|
if os.path.exists(init_c_path):
|
||||||
with open(init_c_path, "r", encoding="utf-8") as f:
|
with open(init_c_path, "r", encoding="utf-8") as f:
|
||||||
old_code = f.read()
|
old_code = f.read()
|
||||||
# 保留 USER MESSAGE 区域
|
for region in ["USER INCLUDE", "USER CODE", "USER CODE INIT"]:
|
||||||
init_c = preserve_user_region(init_c, old_code, "USER MESSAGE")
|
init_c = self.preserve_user_region(
|
||||||
|
init_c, old_code, region
|
||||||
|
)
|
||||||
with open(init_c_path, "w", encoding="utf-8") as f:
|
with open(init_c_path, "w", encoding="utf-8") as f:
|
||||||
f.write(init_c)
|
f.write(init_c)
|
||||||
|
|
||||||
# ----------- 生成 task.c -----------
|
# ----------- 生成 task.c -----------
|
||||||
for t in task_list:
|
for t in task_list:
|
||||||
desc = t.get("description", "")
|
desc = t.get("description", "")
|
||||||
@ -1000,29 +976,17 @@ class DataInterface(BaseInterface):
|
|||||||
"task_description": desc_wrapped,
|
"task_description": desc_wrapped,
|
||||||
"freq_control": t.get("freq_control", True)
|
"freq_control": t.get("freq_control", True)
|
||||||
}
|
}
|
||||||
# 渲染模板
|
|
||||||
with open(task_c_tpl, encoding="utf-8") as f:
|
with open(task_c_tpl, encoding="utf-8") as f:
|
||||||
tpl = Template(f.read())
|
tpl = Template(f.read())
|
||||||
code = tpl.render(**context_task)
|
code = tpl.render(**context_task)
|
||||||
# 保留USER区域
|
|
||||||
task_c_path = os.path.join(output_dir, f"{t['name']}.c")
|
task_c_path = os.path.join(output_dir, f"{t['name']}.c")
|
||||||
if os.path.exists(task_c_path):
|
if os.path.exists(task_c_path):
|
||||||
with open(task_c_path, "r", encoding="utf-8") as f:
|
with open(task_c_path, "r", encoding="utf-8") as f:
|
||||||
old_code = f.read()
|
old_code = f.read()
|
||||||
def preserve_user_region(new_code, old_code, region_name):
|
for region in ["USER INCLUDE", "USER STRUCT", "USER CODE", "USER CODE INIT"]:
|
||||||
pattern = re.compile(
|
code = self.preserve_user_region(
|
||||||
rf"/\*\s*{region_name}\s*BEGIN\s*\*/(.*?)/\*\s*{region_name}\s*END\s*\*/",
|
code, old_code, region
|
||||||
re.DOTALL
|
|
||||||
)
|
)
|
||||||
old_match = pattern.search(old_code or "")
|
|
||||||
if not old_match:
|
|
||||||
return new_code
|
|
||||||
old_content = old_match.group(1)
|
|
||||||
def repl(m):
|
|
||||||
return m.group(0).replace(m.group(1), old_content)
|
|
||||||
return pattern.sub(repl, new_code, count=1)
|
|
||||||
for region in ["USER INCLUDE", "USER STRUCT", "USER CODE", "USER INIT CODE"]:
|
|
||||||
code = preserve_user_region(code, old_code, region)
|
|
||||||
with open(task_c_path, "w", encoding="utf-8") as f:
|
with open(task_c_path, "w", encoding="utf-8") as f:
|
||||||
f.write(code)
|
f.write(code)
|
||||||
|
|
||||||
@ -1030,6 +994,10 @@ class DataInterface(BaseInterface):
|
|||||||
config_yaml_path = os.path.join(output_dir, "config.yaml")
|
config_yaml_path = os.path.join(output_dir, "config.yaml")
|
||||||
with open(config_yaml_path, "w", encoding="utf-8") as f:
|
with open(config_yaml_path, "w", encoding="utf-8") as f:
|
||||||
yaml.safe_dump(task_list, f, allow_unicode=True)
|
yaml.safe_dump(task_list, f, allow_unicode=True)
|
||||||
|
|
||||||
|
# ...existing code...
|
||||||
|
|
||||||
|
|
||||||
# ===================== 串口终端界面 =====================
|
# ===================== 串口终端界面 =====================
|
||||||
class SerialReadThread(QThread):
|
class SerialReadThread(QThread):
|
||||||
data_received = pyqtSignal(str)
|
data_received = pyqtSignal(str)
|
||||||
|
@ -24,9 +24,9 @@
|
|||||||
*/
|
*/
|
||||||
void Task_Init(void *argument) {
|
void Task_Init(void *argument) {
|
||||||
(void)argument; /* 未使用argument,消除警告 */
|
(void)argument; /* 未使用argument,消除警告 */
|
||||||
/* USER CODE BEGIN Task_Init */
|
/* USER CODE INIT BEGIN */
|
||||||
|
|
||||||
/* USER CODE END Task_Init */
|
/* USER CODE INIT END */
|
||||||
osKernelLock(); /* 锁定内核,防止任务切换 */
|
osKernelLock(); /* 锁定内核,防止任务切换 */
|
||||||
|
|
||||||
/* 创建任务线程 */
|
/* 创建任务线程 */
|
||||||
|
Loading…
Reference in New Issue
Block a user