154 lines
4.5 KiB
Python
154 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
UDP JSON 波形接收器 - 用于 auto_aim_debug_mpc 实时调参
|
||
监听 127.0.0.1:9870,实时绘制 MPC 规划数据波形
|
||
|
||
用法: python3 tools/plot_receiver.py
|
||
"""
|
||
|
||
import socket
|
||
import json
|
||
import threading
|
||
import time
|
||
from collections import deque
|
||
|
||
import matplotlib
|
||
matplotlib.use('TkAgg')
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib.animation import FuncAnimation
|
||
|
||
# ========== 配置 ==========
|
||
HOST = "127.0.0.1"
|
||
PORT = 9870
|
||
MAX_POINTS = 500 # 最多显示的数据点数
|
||
|
||
# 定义要绘制的子图和曲线
|
||
# 每个子图: (标题, [(数据key, 显示名, 颜色), ...])
|
||
PLOT_CONFIG = [
|
||
("Yaw (rad)", [
|
||
("gimbal_yaw", "gimbal_yaw", "tab:blue"),
|
||
("target_yaw", "target_yaw", "tab:orange"),
|
||
("plan_yaw", "plan_yaw", "tab:red"),
|
||
]),
|
||
("Yaw Velocity (rad/s)", [
|
||
("gimbal_yaw_vel", "gimbal_yaw_vel", "tab:blue"),
|
||
("plan_yaw_vel", "plan_yaw_vel", "tab:red"),
|
||
]),
|
||
("Yaw Acceleration (rad/s²)", [
|
||
("plan_yaw_acc", "plan_yaw_acc", "tab:red"),
|
||
]),
|
||
("Pitch (rad)", [
|
||
("gimbal_pitch", "gimbal_pitch", "tab:blue"),
|
||
("target_pitch", "target_pitch", "tab:orange"),
|
||
("plan_pitch", "plan_pitch", "tab:red"),
|
||
]),
|
||
("Pitch Velocity (rad/s)", [
|
||
("gimbal_pitch_vel", "gimbal_pitch_vel", "tab:blue"),
|
||
("plan_pitch_vel", "plan_pitch_vel", "tab:red"),
|
||
]),
|
||
("Pitch Acceleration (rad/s²)", [
|
||
("plan_pitch_acc", "plan_pitch_acc", "tab:red"),
|
||
]),
|
||
("Fire & Target", [
|
||
("fire", "fire_cmd", "tab:green"),
|
||
("fired", "fired", "tab:red"),
|
||
]),
|
||
("Target State", [
|
||
("w", "omega (rad/s)", "tab:purple"),
|
||
("target_z", "z (m)", "tab:cyan"),
|
||
("target_vz", "vz (m/s)", "tab:orange"),
|
||
]),
|
||
]
|
||
|
||
# ========== 数据存储 ==========
|
||
data_lock = threading.Lock()
|
||
time_buf = deque(maxlen=MAX_POINTS)
|
||
buffers = {} # key -> deque
|
||
|
||
for _, curves in PLOT_CONFIG:
|
||
for key, _, _ in curves:
|
||
if key not in buffers:
|
||
buffers[key] = deque(maxlen=MAX_POINTS)
|
||
|
||
|
||
def udp_listener():
|
||
"""后台线程:接收 UDP JSON 数据"""
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
sock.bind((HOST, PORT))
|
||
sock.settimeout(1.0)
|
||
print(f"[plot_receiver] 监听 {HOST}:{PORT} ...")
|
||
|
||
while True:
|
||
try:
|
||
raw, _ = sock.recvfrom(65536)
|
||
msg = json.loads(raw.decode("utf-8"))
|
||
except socket.timeout:
|
||
continue
|
||
except Exception as e:
|
||
print(f"[plot_receiver] 解析错误: {e}")
|
||
continue
|
||
|
||
with data_lock:
|
||
t = msg.get("t", 0.0)
|
||
time_buf.append(t)
|
||
for key, buf in buffers.items():
|
||
buf.append(msg.get(key, float('nan')))
|
||
|
||
|
||
# ========== 绘图 ==========
|
||
def main():
|
||
# 启动 UDP 接收线程
|
||
listener = threading.Thread(target=udp_listener, daemon=True)
|
||
listener.start()
|
||
|
||
n_plots = len(PLOT_CONFIG)
|
||
n_cols = 2
|
||
n_rows = (n_plots + 1) // 2
|
||
|
||
fig, axes = plt.subplots(n_rows, n_cols, figsize=(16, 3 * n_rows), squeeze=False)
|
||
fig.suptitle("auto_aim_debug_mpc Real-time Plot", fontsize=14)
|
||
plt.subplots_adjust(hspace=0.45, wspace=0.25)
|
||
|
||
lines = {} # (subplot_idx, key) -> line object
|
||
|
||
for idx, (title, curves) in enumerate(PLOT_CONFIG):
|
||
ax = axes[idx // n_cols][idx % n_cols]
|
||
ax.set_title(title, fontsize=10)
|
||
ax.set_xlabel("t (s)", fontsize=8)
|
||
ax.grid(True, alpha=0.3)
|
||
for key, label, color in curves:
|
||
ln, = ax.plot([], [], label=label, color=color, linewidth=1.2)
|
||
lines[(idx, key)] = ln
|
||
ax.legend(fontsize=7, loc="upper left")
|
||
|
||
# 隐藏多余子图
|
||
for idx in range(n_plots, n_rows * n_cols):
|
||
axes[idx // n_cols][idx % n_cols].set_visible(False)
|
||
|
||
def update(_frame):
|
||
with data_lock:
|
||
t_list = list(time_buf)
|
||
snap = {k: list(v) for k, v in buffers.items()}
|
||
|
||
if len(t_list) < 2:
|
||
return lines.values()
|
||
|
||
for idx, (_, curves) in enumerate(PLOT_CONFIG):
|
||
ax = axes[idx // n_cols][idx % n_cols]
|
||
for key, _, _ in curves:
|
||
ln = lines[(idx, key)]
|
||
y = snap.get(key, [])
|
||
ln.set_data(t_list[:len(y)], y)
|
||
ax.relim()
|
||
ax.autoscale_view()
|
||
|
||
return lines.values()
|
||
|
||
_ani = FuncAnimation(fig, update, interval=50, blit=False, cache_frame_data=False)
|
||
plt.show()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|