MOVE_AI/tools/plot_receiver.py
2026-03-15 00:38:03 +08:00

154 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
UDP JSON 波形接收器 - 用于 auto_aim_debug_mpc 实时调参
监听 127.0.0.1:9870实时绘制 MPC 规划数据波形
用法: python3 tools/plot_receiver.py
"""
import socket
import json
import threading
import time
from collections import deque
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# ========== 配置 ==========
HOST = "127.0.0.1"
PORT = 9870
MAX_POINTS = 500 # 最多显示的数据点数
# 定义要绘制的子图和曲线
# 每个子图: (标题, [(数据key, 显示名, 颜色), ...])
PLOT_CONFIG = [
("Yaw (rad)", [
("gimbal_yaw", "gimbal_yaw", "tab:blue"),
("target_yaw", "target_yaw", "tab:orange"),
("plan_yaw", "plan_yaw", "tab:red"),
]),
("Yaw Velocity (rad/s)", [
("gimbal_yaw_vel", "gimbal_yaw_vel", "tab:blue"),
("plan_yaw_vel", "plan_yaw_vel", "tab:red"),
]),
("Yaw Acceleration (rad/s²)", [
("plan_yaw_acc", "plan_yaw_acc", "tab:red"),
]),
("Pitch (rad)", [
("gimbal_pitch", "gimbal_pitch", "tab:blue"),
("target_pitch", "target_pitch", "tab:orange"),
("plan_pitch", "plan_pitch", "tab:red"),
]),
("Pitch Velocity (rad/s)", [
("gimbal_pitch_vel", "gimbal_pitch_vel", "tab:blue"),
("plan_pitch_vel", "plan_pitch_vel", "tab:red"),
]),
("Pitch Acceleration (rad/s²)", [
("plan_pitch_acc", "plan_pitch_acc", "tab:red"),
]),
("Fire & Target", [
("fire", "fire_cmd", "tab:green"),
("fired", "fired", "tab:red"),
]),
("Target State", [
("w", "omega (rad/s)", "tab:purple"),
("target_z", "z (m)", "tab:cyan"),
("target_vz", "vz (m/s)", "tab:orange"),
]),
]
# ========== 数据存储 ==========
data_lock = threading.Lock()
time_buf = deque(maxlen=MAX_POINTS)
buffers = {} # key -> deque
for _, curves in PLOT_CONFIG:
for key, _, _ in curves:
if key not in buffers:
buffers[key] = deque(maxlen=MAX_POINTS)
def udp_listener():
"""后台线程:接收 UDP JSON 数据"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((HOST, PORT))
sock.settimeout(1.0)
print(f"[plot_receiver] 监听 {HOST}:{PORT} ...")
while True:
try:
raw, _ = sock.recvfrom(65536)
msg = json.loads(raw.decode("utf-8"))
except socket.timeout:
continue
except Exception as e:
print(f"[plot_receiver] 解析错误: {e}")
continue
with data_lock:
t = msg.get("t", 0.0)
time_buf.append(t)
for key, buf in buffers.items():
buf.append(msg.get(key, float('nan')))
# ========== 绘图 ==========
def main():
# 启动 UDP 接收线程
listener = threading.Thread(target=udp_listener, daemon=True)
listener.start()
n_plots = len(PLOT_CONFIG)
n_cols = 2
n_rows = (n_plots + 1) // 2
fig, axes = plt.subplots(n_rows, n_cols, figsize=(16, 3 * n_rows), squeeze=False)
fig.suptitle("auto_aim_debug_mpc Real-time Plot", fontsize=14)
plt.subplots_adjust(hspace=0.45, wspace=0.25)
lines = {} # (subplot_idx, key) -> line object
for idx, (title, curves) in enumerate(PLOT_CONFIG):
ax = axes[idx // n_cols][idx % n_cols]
ax.set_title(title, fontsize=10)
ax.set_xlabel("t (s)", fontsize=8)
ax.grid(True, alpha=0.3)
for key, label, color in curves:
ln, = ax.plot([], [], label=label, color=color, linewidth=1.2)
lines[(idx, key)] = ln
ax.legend(fontsize=7, loc="upper left")
# 隐藏多余子图
for idx in range(n_plots, n_rows * n_cols):
axes[idx // n_cols][idx % n_cols].set_visible(False)
def update(_frame):
with data_lock:
t_list = list(time_buf)
snap = {k: list(v) for k, v in buffers.items()}
if len(t_list) < 2:
return lines.values()
for idx, (_, curves) in enumerate(PLOT_CONFIG):
ax = axes[idx // n_cols][idx % n_cols]
for key, _, _ in curves:
ln = lines[(idx, key)]
y = snap.get(key, [])
ln.set_data(t_list[:len(y)], y)
ax.relim()
ax.autoscale_view()
return lines.values()
_ani = FuncAnimation(fig, update, interval=50, blit=False, cache_frame_data=False)
plt.show()
if __name__ == "__main__":
main()