添加显示原始值和计算值切换

This commit is contained in:
Robofish 2024-11-02 18:14:18 +08:00
parent 8cf01bfe56
commit 9ec5e7336d

307
src/monitor.py Normal file
View File

@ -0,0 +1,307 @@
import sys
import serial
import serial.tools.list_ports
import threading
import time
from collections import deque
from PyQt5.QtWidgets import QApplication, QMainWindow, QHBoxLayout, QVBoxLayout, QWidget, QPushButton, QComboBox, QLabel, QFileDialog, QProgressBar, QGridLayout, QGroupBox, QCheckBox
from PyQt5.QtCore import QTimer, Qt
import pyqtgraph as pg
import pandas as pd
from datetime import datetime
import pyqtgraph.exporters
class VoltageReaderApp(QMainWindow):
NUM_CHANNELS = 20 # 通道数量
PACKET_SIZE = 43 # 数据包大小
PACKET_HEADER = [0xFE, 0xEE] # 数据包头
PACKET_FOOTER = 0xAA # 数据包尾
BAUD_RATES = ['9600', '19200', '38400', '57600', '115200'] # 波特率选项
PLOT_UPDATE_INTERVAL = 100 # 图表更新间隔(毫秒)
def __init__(self):
super().__init__()
self.initUI()
self.serial_port = None
self.data = [deque() for _ in range(self.NUM_CHANNELS)]
self.raw_data = [deque() for _ in range(self.NUM_CHANNELS)]
self.timestamps = deque()
self.reading_event = threading.Event()
self.start_time = None
self.buffer = bytearray()
self.timer = QTimer()
self.timer.timeout.connect(self.update_plot)
self.auto_follow = False
self.show_raw = False
def initUI(self):
self.setWindowTitle("Voltage Monitor")
self.setGeometry(100, 100, 1600, 700)
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QHBoxLayout(central_widget)
control_layout = QVBoxLayout()
control_widget = QWidget()
control_widget.setLayout(control_layout)
control_widget.setFixedWidth(250)
main_layout.addWidget(control_widget)
self.setup_port_group(control_layout)
self.setup_action_group(control_layout)
self.setup_channel_group(control_layout)
self.setup_plot_widget(main_layout)
self.update_ports()
def setup_port_group(self, layout):
port_group = QGroupBox("串口设置")
port_layout = QVBoxLayout()
port_group.setLayout(port_layout)
layout.addWidget(port_group)
self.port_label = QLabel('选择串口:')
port_layout.addWidget(self.port_label)
self.port_combo = QComboBox()
port_layout.addWidget(self.port_combo)
self.refresh_button = QPushButton('刷新串口')
self.refresh_button.clicked.connect(self.update_ports)
port_layout.addWidget(self.refresh_button)
self.baud_label = QLabel('选择波特率:')
port_layout.addWidget(self.baud_label)
self.baud_combo = QComboBox()
self.baud_combo.addItems(self.BAUD_RATES)
self.baud_combo.setCurrentText('115200')
port_layout.addWidget(self.baud_combo)
self.connect_button = QPushButton('连接')
self.connect_button.clicked.connect(self.connect_serial)
port_layout.addWidget(self.connect_button)
def setup_action_group(self, layout):
action_group = QGroupBox("操作")
action_layout = QVBoxLayout()
action_group.setLayout(action_layout)
layout.addWidget(action_group)
self.start_button = QPushButton('开始读取')
self.start_button.clicked.connect(self.start_reading)
self.start_button.setEnabled(False)
action_layout.addWidget(self.start_button)
self.stop_button = QPushButton('停止读取')
self.stop_button.clicked.connect(self.stop_reading)
self.stop_button.setEnabled(False)
action_layout.addWidget(self.stop_button)
self.auto_button = QPushButton('自动跟随')
self.auto_button.setCheckable(True)
self.auto_button.clicked.connect(self.toggle_auto_follow)
action_layout.addWidget(self.auto_button)
self.show_raw_button = QPushButton('显示原始值')
self.show_raw_button.setCheckable(True)
self.show_raw_button.clicked.connect(self.toggle_show_raw)
action_layout.addWidget(self.show_raw_button)
self.save_button = QPushButton('保存数据')
self.save_button.clicked.connect(self.save_data)
action_layout.addWidget(self.save_button)
self.load_button = QPushButton('加载数据')
self.load_button.clicked.connect(self.load_data)
action_layout.addWidget(self.load_button)
self.message_label = QLabel()
self.message_label.setWordWrap(True)
layout.addWidget(self.message_label)
self.progress_bar = QProgressBar()
self.progress_bar.setAlignment(Qt.AlignCenter)
self.progress_bar.setVisible(False)
layout.addWidget(self.progress_bar)
def setup_channel_group(self, layout):
channel_group = QGroupBox("通道选择")
channel_layout = QGridLayout()
channel_group.setLayout(channel_layout)
layout.addWidget(channel_group)
self.checkboxes = []
for i in range(self.NUM_CHANNELS):
color = (i*12, 255-i*12, 150)
checkbox = QCheckBox(f'adc{i+1}')
checkbox.setChecked(True)
checkbox.setStyleSheet(f'color: rgb({color[0]}, {color[1]}, {color[2]})')
checkbox.stateChanged.connect(lambda state, idx=i: self.toggle_curve_visibility(state, idx))
row = i // 2
col = i % 2
channel_layout.addWidget(checkbox, row, col)
self.checkboxes.append(checkbox)
def setup_plot_widget(self, layout):
self.plot_widget = pg.PlotWidget()
self.plot_widget.showGrid(x=True, y=True, alpha=0.3)
layout.addWidget(self.plot_widget)
self.plot_data = [self.plot_widget.plot([], [], pen=pg.mkPen(color=(i*12, 255-i*12, 150))) for i in range(self.NUM_CHANNELS)]
def update_ports(self):
self.port_combo.clear()
ports = [port.device for port in serial.tools.list_ports.comports()]
self.port_combo.addItems(ports)
self.message_label.setText("串口列表已更新")
def connect_serial(self):
port = self.port_combo.currentText()
baudrate = self.baud_combo.currentText()
if port and baudrate:
try:
self.serial_port = serial.Serial(port, int(baudrate), timeout=1)
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.message_label.setText(f"已连接到 {port},波特率 {baudrate}")
except serial.SerialException as e:
self.message_label.setText(f"连接失败: {e}")
def start_reading(self):
if self.serial_port:
self.reading_event.set()
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
if not self.start_time:
self.start_time = time.time()
threading.Thread(target=self.read_data, daemon=True).start()
self.timer.start(self.PLOT_UPDATE_INTERVAL)
self.message_label.setText("开始读取数据")
else:
self.message_label.setText("错误: 串口未打开,无法读取数据")
def stop_reading(self):
self.reading_event.clear()
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.timer.stop()
self.message_label.setText("停止读取数据")
def read_data(self):
while self.reading_event.is_set() and self.serial_port:
try:
while self.serial_port.in_waiting > 0:
self.buffer.extend(self.serial_port.read(self.serial_port.in_waiting))
while len(self.buffer) >= self.PACKET_SIZE:
if self.buffer[:2] == bytearray(self.PACKET_HEADER) and self.buffer[self.PACKET_SIZE-1] == self.PACKET_FOOTER:
packet = self.buffer[:self.PACKET_SIZE]
self.buffer = self.buffer[self.PACKET_SIZE:]
raw_values = [packet[2 + i*2] | (packet[3 + i*2] << 8) for i in range(self.NUM_CHANNELS)]
voltages = [(raw / 4096.0) * 3.3 for raw in raw_values]
for i in range(self.NUM_CHANNELS):
self.raw_data[i].append(raw_values[i])
self.data[i].append(voltages[i])
if self.start_time is not None:
elapsed_time = time.time() - self.start_time
self.timestamps.append(elapsed_time)
else:
self.buffer.pop(0)
time.sleep(0.01)
except serial.SerialException as e:
self.message_label.setText(f"读取数据失败: {e}")
self.stop_reading()
except Exception as e:
self.message_label.setText(f"未知错误: {e}")
self.stop_reading()
def update_plot(self):
if self.timestamps:
for i in range(self.NUM_CHANNELS):
if self.checkboxes[i].isChecked():
if self.show_raw:
self.plot_data[i].setData(list(self.timestamps), list(self.raw_data[i]), clear=True)
else:
self.plot_data[i].setData(list(self.timestamps), list(self.data[i]), clear=True)
else:
self.plot_data[i].clear()
if self.auto_follow:
self.plot_widget.setXRange(self.timestamps[-1] - 5, self.timestamps[-1], padding=0)
def toggle_curve_visibility(self, state, index):
if state == 0:
self.plot_data[index].clear()
else:
if self.show_raw:
self.plot_data[index].setData(list(self.timestamps), list(self.raw_data[index]))
else:
self.plot_data[index].setData(list(self.timestamps), list(self.data[index]))
def toggle_auto_follow(self):
self.auto_follow = not self.auto_follow
self.auto_button.setText('取消自动跟随' if self.auto_follow else '自动跟随')
def toggle_show_raw(self):
self.show_raw = not self.show_raw
self.show_raw_button.setText('显示计算值' if self.show_raw else '显示原始值')
self.update_plot()
def save_data(self):
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
default_file_name = f"data_{current_time}.xlsx"
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_name, _ = QFileDialog.getSaveFileName(self, "保存数据文件", default_file_name, "Excel Files (*.xlsx);;All Files (*)", options=options)
if file_name:
if not file_name.endswith('.xlsx'):
file_name += '.xlsx'
data_dict = {"Timestamp": list(self.timestamps)}
for i in range(self.NUM_CHANNELS):
data_dict[f"Voltage_{i+1}"] = list(self.data[i])
data_dict[f"Raw_{i+1}"] = list(self.raw_data[i])
df = pd.DataFrame(data_dict)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
QApplication.processEvents()
try:
df.to_excel(file_name, index=False, engine='openpyxl')
self.progress_bar.setValue(50)
QApplication.processEvents()
image_filename = file_name.replace('.xlsx', '.png')
exporter = pg.exporters.ImageExporter(self.plot_widget.plotItem)
exporter.parameters()['width'] = 1600
exporter.export(image_filename)
self.progress_bar.setValue(100)
QApplication.processEvents()
self.message_label.setText(f"数据已保存为 {file_name}{image_filename}")
except Exception as e:
self.message_label.setText(f"保存数据失败: {e}")
finally:
self.progress_bar.setVisible(False)
def load_data(self):
options = QFileDialog.Options()
file_name, _ = QFileDialog.getOpenFileName(self, "加载数据文件", "", "Excel Files (*.xlsx);;All Files (*)", options=options)
if file_name:
df = pd.read_excel(file_name)
self.timestamps = deque(df["Timestamp"].tolist())
for i in range(self.NUM_CHANNELS):
self.data[i] = deque(df[f"Voltage_{i+1}"].tolist())
self.raw_data[i] = deque(df[f"Raw_{i+1}"].tolist())
self.update_plot()
self.message_label.setText(f"数据已加载自 {file_name}")
if __name__ == '__main__':
app = QApplication(sys.argv)
window = VoltageReaderApp()
window.show()
sys.exit(app.exec_())