pyqt_monitor/src/test.py
2024-12-18 15:28:55 +08:00

347 lines
14 KiB
Python

import sys
import serial
import serial.tools.list_ports
import threading
import time
import random
from collections import deque
from PyQt5.QtWidgets import QApplication, QMainWindow, QHBoxLayout, QVBoxLayout, QWidget, QPushButton, QComboBox, QLabel, QFileDialog, QProgressBar, QGridLayout, QGroupBox, QCheckBox, QLineEdit
from PyQt5.QtCore import QTimer, Qt
import pyqtgraph as pg
import pandas as pd
from datetime import datetime
import pyqtgraph.exporters
class VoltageReaderApp(QMainWindow):
NUM_CHANNELS = 2 # 通道数量
PACKET_SIZE = 7 # 数据包大小
PACKET_HEADER = [0xFE, 0xEE] # 数据包头
PACKET_FOOTER = 0xAA # 数据包尾
BAUD_RATES = ['9600', '19200', '38400', '57600', '115200'] # 波特率选项
PLOT_UPDATE_INTERVAL = 100 # 图表更新间隔(毫秒)
def __init__(self):
super().__init__()
self.initUI()
self.serial_port = None
self.data = [deque() for _ in range(self.NUM_CHANNELS)]
self.raw_data = [deque() for _ in range(self.NUM_CHANNELS)]
self.timestamps = deque()
self.reading_event = threading.Event()
self.start_time = None
self.buffer = bytearray()
self.timer = QTimer()
self.timer.timeout.connect(self.update_plot)
self.auto_follow = False
self.show_raw = False
self.test_mode = False # 添加测试模式标志
def initUI(self):
self.setWindowTitle("Voltage Monitor")
self.setGeometry(100, 100, 1600, 700)
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QHBoxLayout(central_widget)
control_layout = QVBoxLayout()
control_widget = QWidget()
control_widget.setLayout(control_layout)
control_widget.setFixedWidth(250)
main_layout.addWidget(control_widget)
self.setup_port_group(control_layout)
self.setup_action_group(control_layout)
self.setup_channel_group(control_layout)
self.setup_plot_widget(main_layout)
self.update_ports()
def setup_port_group(self, layout):
port_group = QGroupBox("串口设置")
port_layout = QVBoxLayout()
port_group.setLayout(port_layout)
layout.addWidget(port_group)
self.port_label = QLabel('选择串口:')
port_layout.addWidget(self.port_label)
self.port_combo = QComboBox()
port_layout.addWidget(self.port_combo)
self.refresh_button = QPushButton('刷新串口')
self.refresh_button.clicked.connect(self.update_ports)
port_layout.addWidget(self.refresh_button)
self.baud_label = QLabel('选择波特率:')
port_layout.addWidget(self.baud_label)
self.baud_combo = QComboBox()
self.baud_combo.addItems(self.BAUD_RATES)
self.baud_combo.setCurrentText('115200')
port_layout.addWidget(self.baud_combo)
self.connect_button = QPushButton('连接')
self.connect_button.clicked.connect(self.connect_serial)
port_layout.addWidget(self.connect_button)
def setup_action_group(self, layout):
action_group = QGroupBox("操作")
action_layout = QVBoxLayout()
action_group.setLayout(action_layout)
layout.addWidget(action_group)
self.start_button = QPushButton('开始读取')
self.start_button.clicked.connect(self.start_reading)
self.start_button.setEnabled(False)
action_layout.addWidget(self.start_button)
self.stop_button = QPushButton('停止读取')
self.stop_button.clicked.connect(self.stop_reading)
self.stop_button.setEnabled(False)
action_layout.addWidget(self.stop_button)
self.auto_button = QPushButton('自动跟随')
self.auto_button.setCheckable(True)
self.auto_button.clicked.connect(self.toggle_auto_follow)
action_layout.addWidget(self.auto_button)
self.show_raw_button = QPushButton('显示原始值')
self.show_raw_button.setCheckable(True)
self.show_raw_button.clicked.connect(self.toggle_show_raw)
action_layout.addWidget(self.show_raw_button)
self.save_button = QPushButton('保存数据')
self.save_button.clicked.connect(self.save_data)
action_layout.addWidget(self.save_button)
self.load_button = QPushButton('加载数据')
self.load_button.clicked.connect(self.load_data)
action_layout.addWidget(self.load_button)
self.test_button = QPushButton('测试模式')
self.test_button.setCheckable(True)
self.test_button.clicked.connect(self.toggle_test_mode)
action_layout.addWidget(self.test_button)
self.remark_label = QLabel('备注(学生姓名):')
action_layout.addWidget(self.remark_label)
self.remark_input = QLineEdit()
action_layout.addWidget(self.remark_input)
self.message_label = QLabel()
self.message_label.setWordWrap(True)
layout.addWidget(self.message_label)
self.progress_bar = QProgressBar()
self.progress_bar.setAlignment(Qt.AlignCenter)
self.progress_bar.setVisible(False)
layout.addWidget(self.progress_bar)
def setup_channel_group(self, layout):
channel_group = QGroupBox("通道选择")
channel_layout = QVBoxLayout()
channel_group.setLayout(channel_layout)
layout.addWidget(channel_group)
self.checkboxes = []
for i in range(self.NUM_CHANNELS):
color = (i*12, 255-i*12, 150)
checkbox = QCheckBox(f'adc{i+1}')
checkbox.setChecked(True)
checkbox.setStyleSheet(f'color: black; background-color: rgb({color[0]}, {color[1]}, {color[2]})')
checkbox.stateChanged.connect(lambda state, idx=i: self.toggle_curve_visibility(state, idx))
channel_layout.addWidget(checkbox)
self.checkboxes.append(checkbox)
def setup_plot_widget(self, layout):
self.plot_widget = pg.PlotWidget()
self.plot_widget.showGrid(x=True, y=True, alpha=0.3)
layout.addWidget(self.plot_widget)
self.plot_data = [self.plot_widget.plot([], [], pen=pg.mkPen(color=(i*12, 255-i*12, 150))) for i in range(self.NUM_CHANNELS)]
def update_ports(self):
self.port_combo.clear()
ports = [port.device for port in serial.tools.list_ports.comports()]
self.port_combo.addItems(ports)
self.message_label.setText("串口列表已更新")
def connect_serial(self):
port = self.port_combo.currentText()
baudrate = self.baud_combo.currentText()
if port and baudrate:
try:
self.serial_port = serial.Serial(port, int(baudrate), timeout=1)
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.message_label.setText(f"已连接到 {port},波特率 {baudrate}")
except serial.SerialException as e:
self.message_label.setText(f"连接失败: {e}")
def start_reading(self):
if self.serial_port or self.test_mode:
self.reading_event.set()
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
if not self.start_time:
self.start_time = time.time()
threading.Thread(target=self.read_data, daemon=True).start()
self.timer.start(self.PLOT_UPDATE_INTERVAL)
self.message_label.setText("开始读取数据")
else:
self.message_label.setText("错误: 串口未打开,无法读取数据")
def stop_reading(self):
self.reading_event.clear()
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.timer.stop()
self.message_label.setText("停止读取数据")
def read_data(self):
if self.test_mode:
self.generate_random_waveform()
else:
while self.reading_event.is_set() and self.serial_port:
try:
while self.serial_port.in_waiting > 0:
self.buffer.extend(self.serial_port.read(self.serial_port.in_waiting))
while len(self.buffer) >= self.PACKET_SIZE:
if self.buffer[:2] == bytearray(self.PACKET_HEADER) and self.buffer[self.PACKET_SIZE-1] == self.PACKET_FOOTER:
packet = self.buffer[:self.PACKET_SIZE]
self.buffer = self.buffer[self.PACKET_SIZE:]
raw_values = [packet[2 + i*2] | (packet[3 + i*2] << 8) for i in range(self.NUM_CHANNELS)]
voltages = [(raw / 4096.0) * 3.3 for raw in raw_values]
for i in range(self.NUM_CHANNELS):
self.raw_data[i].append(raw_values[i])
self.data[i].append(voltages[i])
if self.start_time is not None:
elapsed_time = time.time() - self.start_time
self.timestamps.append(elapsed_time)
else:
self.buffer.pop(0)
time.sleep(0.01)
except serial.SerialException as e:
self.message_label.setText(f"读取数据失败: {e}")
self.stop_reading()
except Exception as e:
self.message_label.setText(f"未知错误: {e}")
self.stop_reading()
def generate_random_waveform(self):
while self.reading_event.is_set():
elapsed_time = time.time() - self.start_time
self.timestamps.append(elapsed_time)
for i in range(self.NUM_CHANNELS):
raw_value = random.randint(0, 4095)
voltage = (raw_value / 4096.0) * 3.3
self.raw_data[i].append(raw_value)
self.data[i].append(voltage)
time.sleep(0.1)
def update_plot(self):
if self.timestamps:
for i in range(self.NUM_CHANNELS):
if self.checkboxes[i].isChecked():
if self.show_raw:
self.plot_data[i].setData(list(self.timestamps), list(self.raw_data[i]), clear=True)
else:
self.plot_data[i].setData(list(self.timestamps), list(self.data[i]), clear=True)
else:
self.plot_data[i].clear()
if self.auto_follow:
self.plot_widget.setXRange(self.timestamps[-1] - 5, self.timestamps[-1], padding=0)
def toggle_curve_visibility(self, state, index):
if state == 0:
self.plot_data[index].clear()
else:
if self.show_raw:
self.plot_data[index].setData(list(self.timestamps), list(self.raw_data[index]))
else:
self.plot_data[index].setData(list(self.timestamps), list(self.data[index]))
def toggle_auto_follow(self):
self.auto_follow = not self.auto_follow
self.auto_button.setText('取消自动跟随' if self.auto_follow else '自动跟随')
def toggle_show_raw(self):
self.show_raw = not self.show_raw
self.show_raw_button.setText('显示计算值' if self.show_raw else '显示原始值')
self.update_plot()
def toggle_test_mode(self):
self.test_mode = not self.test_mode
self.test_button.setText('退出测试模式' if self.test_mode else '测试模式')
if self.test_mode:
self.start_reading()
else:
self.stop_reading()
def save_data(self):
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
default_file_name = f"data_{current_time}.xlsx"
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
file_name, _ = QFileDialog.getSaveFileName(self, "保存数据文件", default_file_name, "Excel Files (*.xlsx);;All Files (*)", options=options)
if file_name:
if not file_name.endswith('.xlsx'):
file_name += '.xlsx'
student_name = self.remark_input.text()
data_dict = {"Timestamp": list(self.timestamps)}
for i in range(self.NUM_CHANNELS):
data_dict[f"Voltage_{i+1}"] = list(self.data[i])
data_dict[f"Raw_{i+1}"] = list(self.raw_data[i])
df = pd.DataFrame(data_dict)
self.progress_bar.setVisible(True)
self.progress_bar.setValue(0)
QApplication.processEvents()
try:
with pd.ExcelWriter(file_name, engine='openpyxl') as writer:
df.to_excel(writer, index=False, startrow=1)
worksheet = writer.sheets['Sheet1']
worksheet.merge_cells('A1:E1')
worksheet['A1'] = f"备注: {student_name}"
self.progress_bar.setValue(50)
QApplication.processEvents()
image_filename = file_name.replace('.xlsx', '.png')
exporter = pg.exporters.ImageExporter(self.plot_widget.plotItem)
exporter.parameters()['width'] = 1600
exporter.export(image_filename)
self.progress_bar.setValue(100)
QApplication.processEvents()
self.message_label.setText(f"数据已保存为 {file_name}{image_filename}")
except Exception as e:
self.message_label.setText(f"保存数据失败: {e}")
finally:
self.progress_bar.setVisible(False)
def load_data(self):
options = QFileDialog.Options()
file_name, _ = QFileDialog.getOpenFileName(self, "加载数据文件", "", "Excel Files (*.xlsx);;All Files (*)", options=options)
if file_name:
df = pd.read_excel(file_name)
self.timestamps = deque(df["Timestamp"].tolist())
for i in range(self.NUM_CHANNELS):
self.data[i] = deque(df[f"Voltage_{i+1}"].tolist())
self.raw_data[i] = deque(df[f"Raw_{i+1}"].tolist())
self.update_plot()
self.message_label.setText(f"数据已加载自 {file_name}")
if __name__ == '__main__':
app = QApplication(sys.argv)
window = VoltageReaderApp()
window.show()
sys.exit(app.exec_())