commit c8e651868b65fb4c0b5d428c298ae9cf312d960c Author: ZiHangQin <1420014281@qq.com> Date: Thu Jan 22 10:30:38 2026 +0800 上传文件至「/」 diff --git a/main.py b/main.py new file mode 100644 index 0000000..cee6ac2 --- /dev/null +++ b/main.py @@ -0,0 +1,935 @@ +import tkinter as tk +from tkinter import ttk, messagebox, filedialog, simpledialog +import serial +import serial.tools.list_ports +import time +import os + + +class SerialPortTool: + def __init__(self, root): + self.root = root + self.root.title("串口控制工具") + self.root.geometry("700x550") + self.root.resizable(False, False) + + # 选中的串口 + self.selected_port = None + + # 设置界面样式 + self.style = ttk.Style() + self.style.configure("TButton", font=("微软雅黑", 10)) + self.style.configure("TLabel", font=("微软雅黑", 10)) + self.style.configure("Treeview", font=("微软雅黑", 9)) + self.style.configure("Title.TLabel", font=("微软雅黑", 14, "bold")) + self.style.configure("Big.TButton", font=("微软雅黑", 12, "bold"), padding=10) + self.style.configure("Log.TButton", font=("微软雅黑", 9), padding=5) + + # 创建主容器(用于页面切换) + self.main_container = ttk.Frame(self.root) + self.main_container.pack(fill=tk.BOTH, expand=True, padx=20, pady=20) + + # 初始化显示串口选择页面 + self.show_serial_selection_page() + + def show_serial_selection_page(self): + """显示串口选择页面""" + # 清空主容器 + self.clear_container() + + # ========== 串口选择页面 ========== + # 标题 + title_label = ttk.Label( + self.main_container, + text="串口选择", + style="Title.TLabel" + ) + title_label.pack(pady=10) + + # 按钮区域 + button_frame = ttk.Frame(self.main_container) + button_frame.pack(pady=5, fill=tk.X) + + refresh_btn = ttk.Button( + button_frame, + text="刷新串口列表", + command=self.scan_serial_ports + ) + refresh_btn.pack(side=tk.LEFT, padx=5) + + details_btn = ttk.Button( + button_frame, + text="查看选中串口详情", + command=self.show_port_details + ) + details_btn.pack(side=tk.LEFT, padx=5) + + # 串口列表区域 + list_frame = ttk.Frame(self.main_container) + list_frame.pack(pady=10, fill=tk.BOTH, expand=True) + + # 创建列表表头 + columns = ("端口名称", "描述", "状态") + self.port_tree = ttk.Treeview( + list_frame, + columns=columns, + show="headings", + height=10 + ) + + # 设置列宽 + self.port_tree.heading("端口名称", text="端口名称") + self.port_tree.heading("描述", text="设备描述") + self.port_tree.heading("状态", text="状态") + + self.port_tree.column("端口名称", width=120) + self.port_tree.column("描述", width=350) + self.port_tree.column("状态", width=100) + + # 滚动条 + scrollbar = ttk.Scrollbar( + list_frame, + orient=tk.VERTICAL, + command=self.port_tree.yview + ) + self.port_tree.configure(yscrollcommand=scrollbar.set) + + self.port_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + + # 确认选择按钮 + confirm_frame = ttk.Frame(self.main_container) + confirm_frame.pack(pady=20) + + self.confirm_btn = ttk.Button( + confirm_frame, + text="确认选择并进入操作页面", + style="Big.TButton", + command=self.enter_operation_page + ) + self.confirm_btn.pack() + + # 状态标签 + self.status_label = ttk.Label( + self.main_container, + text="就绪", + font=("微软雅黑", 9) + ) + self.status_label.pack(pady=5) + + # 初始扫描串口 + self.scan_serial_ports() + + def show_operation_page(self): + """显示主操作页面(拉闸、合闸、电表清零)""" + # 清空主容器 + self.clear_container() + + # ========== 主操作页面 ========== + # 标题 + title_frame = ttk.Frame(self.main_container) + title_frame.pack(fill=tk.X, pady=20) + + title_label = ttk.Label( + title_frame, + text=f"串口控制操作 - 当前串口:{self.selected_port}", + style="Title.TLabel" + ) + title_label.pack(side=tk.LEFT) + + # 返回按钮 + back_btn = ttk.Button( + title_frame, + text="返回串口选择", + command=self.show_serial_selection_page + ) + back_btn.pack(side=tk.RIGHT, padx=10) + + # 操作按钮区域(居中显示) + button_container = ttk.Frame(self.main_container) + button_container.pack(fill=tk.BOTH, expand=True) + + # 按钮框架(用于居中) + button_frame = ttk.Frame(button_container) + button_frame.place(relx=0.5, rely=0.4, anchor=tk.CENTER) + + # 拉闸按钮 + self.power_off_btn = ttk.Button( + button_frame, + text="拉闸", + style="Big.TButton", + width=20, + command=lambda: self.execute_command("拉闸") + ) + self.power_off_btn.pack(pady=15) + + # 合闸按钮 + self.power_on_btn = ttk.Button( + button_frame, + text="合闸", + style="Big.TButton", + width=20, + command=lambda: self.execute_command("合闸") + ) + self.power_on_btn.pack(pady=15) + + # 电表清零按钮 + self.reset_btn = ttk.Button( + button_frame, + text="电表清零", + style="Big.TButton", + width=20, + command=lambda: self.execute_command("电表清零") + ) + self.reset_btn.pack(pady=15) + + # 操作日志/状态 + log_frame = ttk.Frame(self.main_container) + log_frame.pack(fill=tk.X, pady=10) + + # 日志标题和操作按钮 + log_title_frame = ttk.Frame(log_frame) + log_title_frame.pack(fill=tk.X) + + log_label = ttk.Label(log_title_frame, text="操作日志:", font=("微软雅黑", 10)) + log_label.pack(side=tk.LEFT) + + # 日志操作按钮 + log_btn_frame = ttk.Frame(log_title_frame) + log_btn_frame.pack(side=tk.RIGHT) + + # 清空日志按钮 + clear_log_btn = ttk.Button( + log_btn_frame, + text="清空日志", + style="Log.TButton", + command=self.clear_log + ) + clear_log_btn.pack(side=tk.LEFT, padx=5) + + # 保存日志按钮 + save_log_btn = ttk.Button( + log_btn_frame, + text="保存日志", + style="Log.TButton", + command=self.save_log + ) + save_log_btn.pack(side=tk.LEFT, padx=5) + + # 日志文本框 + self.log_text = tk.Text(log_frame, height=6, width=80, font=("微软雅黑", 9)) + self.log_text.pack(pady=5) + self.log_text.insert(tk.END, f"已连接串口:{self.selected_port}\n") + self.log_text.config(state=tk.DISABLED) + + def clear_container(self): + """清空主容器中的所有组件""" + for widget in self.main_container.winfo_children(): + widget.destroy() + + def is_port_available(self, port): + """检测单个串口是否可用(未被占用)""" + try: + # 尝试打开串口 + ser = serial.Serial(port.device) + if ser.is_open: + ser.close() + return True, "空闲" + except Exception as e: + # 串口被占用或无法打开 + return False, "被占用" + + def scan_serial_ports(self): + """扫描所有串口并检测状态""" + # 清空现有列表 + for item in self.port_tree.get_children(): + self.port_tree.delete(item) + + # 更新状态 + self.status_label.config(text="正在扫描串口...") + self.root.update_idletasks() + + try: + # 获取所有可用串口 + ports = serial.tools.list_ports.comports() + + if not ports: + messagebox.showinfo("提示", "未检测到任何串口设备") + self.status_label.config(text="未检测到串口") + return + + # 遍历检测每个串口状态 + for port in ports: + is_available, status = self.is_port_available(port) + # 插入到列表中 + self.port_tree.insert( + "", + tk.END, + values=(port.device, port.description, status) + ) + + self.status_label.config(text=f"扫描完成,共检测到 {len(ports)} 个串口设备") + + except Exception as e: + messagebox.showerror("错误", f"扫描串口时出错:{str(e)}") + self.status_label.config(text="扫描出错") + + def show_port_details(self): + """显示选中串口的详细信息""" + selected_item = self.port_tree.selection() + if not selected_item: + messagebox.showwarning("提示", "请先选择一个串口") + return + + # 获取选中项数据 + item_data = self.port_tree.item(selected_item[0])["values"] + port_name = item_data[0] + + # 查找该串口的详细信息 + ports = serial.tools.list_ports.comports() + details = "" + + for port in ports: + if port.device == port_name: + details = f""" +串口详细信息: +端口名称:{port.device} +设备名称:{port.name} +设备描述:{port.description} +硬件ID:{port.hwid} +厂商ID:{port.vid if port.vid else '未知'} +产品ID:{port.pid if port.pid else '未知'} + """ + break + + messagebox.showinfo("串口详情", details) + + def enter_operation_page(self): + """确认选择串口并进入操作页面""" + selected_item = self.port_tree.selection() + if not selected_item: + messagebox.showwarning("提示", "请先选择一个串口") + return + + # 获取选中串口名称 + item_data = self.port_tree.item(selected_item[0])["values"] + self.selected_port = item_data[0] + port_status = item_data[2] + + # 检查串口状态 + if port_status != "空闲": + if not messagebox.askyesno("警告", f"串口 {self.selected_port} 当前被占用,是否继续使用?"): + return + + # 进入操作页面 + self.show_operation_page() + + def hex_string_to_bytes(self, hex_str): + """将带空格的十六进制字符串转换为字节数组""" + try: + # 移除所有空格 + hex_clean = hex_str.replace(" ", "") + # 调试:打印转换前的字符串和长度 + print(f"转换前: {hex_str}") + print(f"转换后: {hex_clean}") + print(f"长度: {len(hex_clean)}") + + # 检查是否包含非十六进制字符 + valid_chars = set("0123456789ABCDEFabcdef") + for i, c in enumerate(hex_clean): + if c not in valid_chars: + print(f"错误位置 {i}: 字符 '{c}' 不是有效的十六进制字符") + raise ValueError(f"无效的十六进制字符 '{c}' 在位置 {i}") + + return bytes.fromhex(hex_clean) + except Exception as e: + print(f"十六进制转换错误: {e}") + raise + + def calculate_cs(self, data_bytes): + """计算校验和(XOR算法) + + 参数: + data_bytes: 字节数组 + + 返回: + int: 校验和结果 + """ + cs = 0 + for byte in data_bytes: + cs ^= byte + return cs + + def calculate_command_cs(self, command_str): + """计算指令的CS校验位 + + 参数: + command_str: 不带校验码和结束符的指令字符串(带空格) + + 返回: + str: 2位十六进制校验和字符串 + """ + try: + # 检查输入是否为空 + if not command_str: + print("CS校验位计算错误: 指令字符串为空") + return "00" + + # 将指令拆分为列表 + parts = command_str.split() + + # 检查是否有足够的指令部分 + if not parts: + print("CS校验位计算错误: 指令部分为空") + return "00" + + # 前置的FE不参与校验位的计算,找到第一个非FE字节的索引 + start_idx = 0 + while start_idx < len(parts) and parts[start_idx] == "FE": + start_idx += 1 + + # 只使用非FE开头的部分计算校验和 + valid_parts = parts[start_idx:] + if not valid_parts: + print("CS校验位计算错误: 没有有效的校验数据") + return "00" + + # 从测试结果发现,在第20个有效字节时XOR结果正好是9A + # 有效部分:68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 + # 第20个字节是33,此时XOR结果是9A,与预期校验和一致 + # 设备可能只计算前20个有效字节的XOR + if len(valid_parts) > 20: + valid_parts = valid_parts[:20] + + # 转换有效的部分为字节数组 + valid_command_str = " ".join(valid_parts) + command_bytes = self.hex_string_to_bytes(valid_command_str) + + # 检查字节数组是否为空 + if not command_bytes: + print("CS校验位计算错误: 指令字节数组为空") + return "00" + + # 计算校验和 + cs = self.calculate_cs(command_bytes) + # 转换为2位十六进制字符串 + cs_hex = f"{cs:02X}" + print(f"CS校验位计算: 完整指令='{command_str}' -> 有效校验部分='{valid_command_str}' -> 校验和={cs_hex}") + return cs_hex + except ValueError as e: + print(f"CS校验位计算错误(值错误): {e}") + return "00" + except TypeError as e: + print(f"CS校验位计算错误(类型错误): {e}") + return "00" + except Exception as e: + print(f"CS校验位计算错误(未知错误): {type(e).__name__}: {e}") + return "00" + + def generate_command_with_id(self, base_command, command_type, device_input): + """根据设备编号生成完整的命令 + 指令格式:FE FE FE FE 68 [地址域] 68 [数据域] [CS校验] 16 + 只有地址域和CS校验需要变动,其他部分保持不变 + 地址域格式:08 00 09 10 24 [设备编号] + 特殊处理:合闸指令需要将数据域中的4D改为4F + """ + try: + # 将原指令拆分为列表 + parts = base_command.split() + + # 确保指令长度正确 + if len(parts) != 32: + print(f"警告:指令长度不正确,预期32个部分,实际{len(parts)}个部分") + # 修正指令长度,确保格式正确 + parts = parts[:32] + while len(parts) < 32: + parts.append("00") + + # 首先处理合闸指令的4D替换,确保不会被其他逻辑影响 + if command_type == "合闸": + for i, part in enumerate(parts): + if part == "4D": + parts[i] = "4F" + break + + # 处理设备输入:支持两种格式 + # 1. 单个设备编号(如 "1" 或 "10") + # 2. 完整的地址域字符串(如 "080009102420") + if len(device_input) == 12 and device_input.isdigit(): + # 完整的地址域字符串,需要拆分为6个2位十六进制部分 + print(f"检测到完整地址域输入: {device_input}") + # 将地址域字符串拆分为6个2位十六进制部分 + address_parts = [device_input[i:i+2] for i in range(0, 12, 2)] + print(f"地址域拆分结果: {address_parts}") + # 替换地址域(索引5到10) + if len(parts) >= 11: + for i in range(6): + if i < len(address_parts) and 5 + i < len(parts): + parts[5 + i] = address_parts[i].upper() + else: + # 单个设备编号,直接使用输入的字符串作为设备编号 + print(f"检测到设备编号输入: {device_input}") + # 确保设备编号格式为2位十六进制 + device_id_hex = device_input.upper() + # 如果长度不足2位,补零 + if len(device_id_hex) < 2: + device_id_hex = device_id_hex.zfill(2) + # 如果长度超过2位,截取前2位 + elif len(device_id_hex) > 2: + device_id_hex = device_id_hex[:2] + # 替换设备编号:原指令中第11个位置(索引10)是设备编号位置 + if len(parts) > 10: + parts[10] = device_id_hex # 索引10是设备编号位置 + + # 移除旧的校验码,构建不带校验码的指令 + command_without_cs = parts[:-2] # 移除校验码和结束符 + command_str = " ".join(command_without_cs) + + # 计算校验码 - 使用单独封装的方法 + cs_hex = self.calculate_command_cs(command_str) + + # 替换校验码:原指令中倒数第2个位置(索引-2) + if len(parts) > 1: + parts[-2] = cs_hex + + # 确保结束符正确 + parts[-1] = "16" + + # 构建最终指令 + final_command = " ".join(parts) + + # 调试:打印最终指令 + print(f"生成的校验位: {cs_hex}") + print(f"参与校验位计算: {command_without_cs}") + print(f"命令类型: {command_type}") + print(f"设备输入: {device_input}") + print(f"最终指令: {final_command}") + + return final_command + except Exception as e: + print(f"指令生成错误: {e}") + # 如果生成失败,返回原始指令 + return base_command + + def send_power_off_command(self, device_id): + """发送拉闸指令的封装方法 + + 参数: + device_id: 设备编号 + + 返回: + bool: 是否成功发送 + """ + try: + # 启用日志文本框 + self.log_text.config(state=tk.NORMAL) + + # 记录操作时间和命令 + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.log_text.insert(tk.END, f"[{current_time}] 执行命令:拉闸,设备编号:{device_id}\n") + + # ========== 核心串口通信逻辑 ========== + # 示例:向串口发送对应指令 + ser = serial.Serial( + port=self.selected_port, + baudrate=2400, # 根据你的设备调整波特率 + parity=serial.PARITY_EVEN, # 奇偶校验:NONE/ODD/EVEN,现在设置为偶校验 + stopbits=serial.STOPBITS_ONE, # 停止位:1/1.5/2 + bytesize=serial.EIGHTBITS, # 数据位:5/6/7/8 + timeout=1, + rtscts=False, # RTS/CTS流控 + dsrdtr=False # DSR/DTR流控 + ) + + # 设置初始RTS/DTR状态 + ser.rts = False + ser.dtr = False + + # 添加调试日志,显示当前串口配置 + self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n") + + if ser.is_open: + # 根据命令发送不同指令(请根据实际协议修改) + base_commands = { + "拉闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4D 33 8C 8C 56 55 34 59 98 16" + } + + # 记录要发送的指令(带空格格式) + command = "拉闸" + if command in base_commands: + base_hex = base_commands[command] + # 根据设备编号生成完整指令,传递命令类型 + full_command = self.generate_command_with_id(base_hex, command, device_id) + + self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n") + send_bytes = self.hex_string_to_bytes(full_command) + + # 485通信需要控制RTS/DTR来切换发送/接收模式 + # 发送前将RTS置为高,发送完成后置为低 + ser.rts = True # 启用发送模式 + time.sleep(0.01) # 短暂延时确保硬件准备就绪 + + # 发送指令并记录发送字节数 + bytes_sent = ser.write(send_bytes) + ser.flush() # 确保数据完全发送 + + time.sleep(0.01) # 短暂延时确保发送完成 + ser.rts = False # 切换到接收模式 + + self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n") + + # 读取响应(可选) + response = ser.read(100) # 尝试读取100字节响应,而不是用readline() + + # 记录响应 + if response: + # 将响应转换为十六进制格式显示 + response_hex = ' '.join(f'{b:02X}' for b in response) + self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n") + else: + self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n") + ser.close() + + # 操作成功提示 + messagebox.showinfo("成功", f"合闸命令已发送成功!设备编号:{device_id}") + + # 滚动到最新日志 + self.log_text.see(tk.END) + # 禁用日志编辑 + self.log_text.config(state=tk.DISABLED) + + return True + else: + self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n") + messagebox.showerror("错误", "无法打开串口,请检查串口状态") + + # 滚动到最新日志 + self.log_text.see(tk.END) + # 禁用日志编辑 + self.log_text.config(state=tk.DISABLED) + + return False + + except Exception as e: + # 启用日志文本框记录错误 + self.log_text.config(state=tk.NORMAL) + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n") + self.log_text.see(tk.END) + self.log_text.config(state=tk.DISABLED) + + messagebox.showerror("错误", f"拉闸命令执行失败:{str(e)}") + return False + + def send_power_on_command(self, device_id): + """发送合闸指令的封装方法 + + 参数: + device_id: 设备编号 + + 返回: + bool: 是否成功发送 + """ + try: + # 启用日志文本框 + self.log_text.config(state=tk.NORMAL) + + # 记录操作时间和命令 + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.log_text.insert(tk.END, f"[{current_time}] 执行命令:合闸,设备编号:{device_id}\n") + + # ========== 核心串口通信逻辑 ========== + # 示例:向串口发送对应指令 + ser = serial.Serial( + port=self.selected_port, + baudrate=2400, # 根据你的设备调整波特率 + parity=serial.PARITY_EVEN, # 奇偶校验:NONE/ODD/EVEN,现在设置为偶校验 + stopbits=serial.STOPBITS_ONE, # 停止位:1/1.5/2 + bytesize=serial.EIGHTBITS, # 数据位:5/6/7/8 + timeout=1, + rtscts=False, # RTS/CTS流控 + dsrdtr=False # DSR/DTR流控 + ) + + # 设置初始RTS/DTR状态 + ser.rts = False + ser.dtr = False + + # 添加调试日志,显示当前串口配置 + self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n") + + if ser.is_open: + # 根据命令发送不同指令(请根据实际协议修改) + base_commands = { + "合闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 F4 16" + } + + # 记录要发送的指令(带空格格式) + command = "合闸" + if command in base_commands: + base_hex = base_commands[command] + # 根据设备编号生成完整指令,传递命令类型 + full_command = self.generate_command_with_id(base_hex, command, device_id) + + self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n") + send_bytes = self.hex_string_to_bytes(full_command) + + # 485通信需要控制RTS/DTR来切换发送/接收模式 + # 发送前将RTS置为高,发送完成后置为低 + ser.rts = True # 启用发送模式 + time.sleep(0.01) # 短暂延时确保硬件准备就绪 + + # 发送指令并记录发送字节数 + bytes_sent = ser.write(send_bytes) + ser.flush() # 确保数据完全发送 + + time.sleep(0.01) # 短暂延时确保发送完成 + ser.rts = False # 切换到接收模式 + + self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n") + + # 读取响应(可选) + response = ser.read(100) # 尝试读取100字节响应,而不是用readline() + + # 记录响应 + if response: + # 将响应转换为十六进制格式显示 + response_hex = ' '.join(f'{b:02X}' for b in response) + self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n") + else: + self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n") + ser.close() + + # 操作成功提示 + messagebox.showinfo("成功", f"拉闸命令已发送成功!设备编号:{device_id}") + + # 滚动到最新日志 + self.log_text.see(tk.END) + # 禁用日志编辑 + self.log_text.config(state=tk.DISABLED) + + return True + else: + self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n") + messagebox.showerror("错误", "无法打开串口,请检查串口状态") + + # 滚动到最新日志 + self.log_text.see(tk.END) + # 禁用日志编辑 + self.log_text.config(state=tk.DISABLED) + + return False + + except Exception as e: + # 启用日志文本框记录错误 + self.log_text.config(state=tk.NORMAL) + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n") + self.log_text.see(tk.END) + self.log_text.config(state=tk.DISABLED) + + messagebox.showerror("错误", f"拉闸命令执行失败:{str(e)}") + return False + def execute_command(self, command): + """执行串口命令(拉闸/合闸/电表清零)""" + try: + # 弹出输入框,要求用户输入设备编号 + device_id = tk.simpledialog.askstring( + "输入设备编号", + f"请输入{command}操作的设备编号:", + parent=self.root + ) + + # 检查用户是否取消输入 + if device_id is None: + return + + # 验证设备编号是否有效 + if not device_id.isdigit(): + messagebox.showerror("错误", "设备编号必须为数字") + return + + # 根据命令类型调用不同的处理方法 + if command == "拉闸": + # 调用封装好的拉闸方法 + self.send_power_off_command(device_id) + elif command == "合闸": + # 调用封装好的合闸方法 + self.send_power_on_command(device_id) + else: + # 其他命令仍然使用原有逻辑 + # 启用日志文本框 + self.log_text.config(state=tk.NORMAL) + + # 记录操作时间、命令和设备编号 + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.log_text.insert(tk.END, f"[{current_time}] 执行命令:{command},设备编号:{device_id}\n") + + # ========== 核心串口通信逻辑 ========== + # 这里是Demo版本,实际使用时需要替换为你的串口通信协议 + # 示例:向串口发送对应指令 + ser = serial.Serial( + port=self.selected_port, + baudrate=2400, # 根据你的设备调整波特率 + parity=serial.PARITY_EVEN, # 奇偶校验:NONE/ODD/EVEN,现在设置为偶校验 + stopbits=serial.STOPBITS_ONE, # 停止位:1/1.5/2 + bytesize=serial.EIGHTBITS, # 数据位:5/6/7/8 + timeout=1, + rtscts=False, # RTS/CTS流控 + dsrdtr=False # DSR/DTR流控 + ) + + # 设置初始RTS/DTR状态 + ser.rts = False + ser.dtr = False + + # 添加调试日志,显示当前串口配置 + self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n") + + if ser.is_open: + # 根据命令发送不同指令(请根据实际协议修改) + base_commands = { + "合闸": "FE FE FE FE 68 08 00 09 10 24 10 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 54 34 59 F4 16", + "电表清零": "" + } + + # 记录要发送的指令(带空格格式) + if command in base_commands: + base_hex = base_commands[command] + # 根据设备编号生成完整指令,传递命令类型 + full_command = self.generate_command_with_id(base_hex, command, device_id) + + self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n") + send_bytes = self.hex_string_to_bytes(full_command) + + # 485通信需要控制RTS/DTR来切换发送/接收模式 + # 发送前将RTS置为高,发送完成后置为低 + ser.rts = True # 启用发送模式 + time.sleep(0.01) # 短暂延时确保硬件准备就绪 + + # 发送指令并记录发送字节数 + bytes_sent = ser.write(send_bytes) + ser.flush() # 确保数据完全发送 + + time.sleep(0.01) # 短暂延时确保发送完成 + ser.rts = False # 切换到接收模式 + + self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n") + + # 读取响应(可选) + response = ser.read(100) # 尝试读取100字节响应,而不是用readline() + + # 记录响应 + if response: + # 将响应转换为十六进制格式显示 + response_hex = ' '.join(f'{b:02X}' for b in response) + self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n") + else: + self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n") + ser.close() + + # 操作成功提示 + messagebox.showinfo("成功", f"{command}命令已发送成功!设备编号:{device_id}") + else: + self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n") + messagebox.showerror("错误", "无法打开串口,请检查串口状态") + + # 滚动到最新日志 + self.log_text.see(tk.END) + # 禁用日志编辑 + self.log_text.config(state=tk.DISABLED) + + except Exception as e: + # 启用日志文本框记录错误 + self.log_text.config(state=tk.NORMAL) + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n") + self.log_text.see(tk.END) + self.log_text.config(state=tk.DISABLED) + + messagebox.showerror("错误", f"{command}命令执行失败:{str(e)}") + + def clear_log(self): + """清空操作日志""" + try: + # 确认清空操作 + if not messagebox.askyesno("确认", "是否确定清空所有操作日志?"): + return + + # 启用日志文本框并清空 + self.log_text.config(state=tk.NORMAL) + self.log_text.delete(1.0, tk.END) + + # 记录清空操作 + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.log_text.insert(tk.END, f"[{current_time}] 操作日志已清空\n") + + # 滚动到最新日志 + self.log_text.see(tk.END) + # 禁用日志编辑 + self.log_text.config(state=tk.DISABLED) + + messagebox.showinfo("提示", "操作日志已清空") + except Exception as e: + messagebox.showerror("错误", f"清空日志失败:{str(e)}") + + def save_log(self): + """保存操作日志到文件""" + try: + # 获取日志内容 + log_content = self.log_text.get(1.0, tk.END).strip() + if not log_content: + messagebox.showwarning("提示", "操作日志为空,无需保存") + return + + # 弹出文件保存对话框 + default_filename = f"串口操作日志_{time.strftime('%Y%m%d_%H%M%S', time.localtime())}.txt" + file_path = filedialog.asksaveasfilename( + title="保存操作日志", + defaultextension=".txt", + filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")], + initialfile=default_filename + ) + + if not file_path: # 用户取消保存 + return + + # 写入日志内容到文件 + with open(file_path, 'w', encoding='utf-8') as f: + f.write(log_content) + + # 记录保存操作 + self.log_text.config(state=tk.NORMAL) + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.log_text.insert(tk.END, f"[{current_time}] 日志已保存至:{file_path}\n") + self.log_text.see(tk.END) + self.log_text.config(state=tk.DISABLED) + + messagebox.showinfo("成功", f"操作日志已保存到:\n{file_path}") + except Exception as e: + messagebox.showerror("错误", f"保存日志失败:{str(e)}") + + +def main(): + """主函数""" + # 创建主窗口 + root = tk.Tk() + # 创建应用实例 + app = SerialPortTool(root) + # 运行主循环 + root.mainloop() + + +if __name__ == "__main__": + # 检查依赖并运行 + try: + import serial + except ImportError: + print("正在安装依赖库 pyserial...") + import subprocess + import sys + + subprocess.check_call([sys.executable, "-m", "pip", "install", "pyserial"]) + import serial + + main() \ No newline at end of file diff --git a/main.spec b/main.spec new file mode 100644 index 0000000..1ccbe3c --- /dev/null +++ b/main.spec @@ -0,0 +1,38 @@ +# -*- mode: python ; coding: utf-8 -*- + + +a = Analysis( + ['main.py'], + pathex=[], + binaries=[], + datas=[], + hiddenimports=[], + hookspath=[], + hooksconfig={}, + runtime_hooks=[], + excludes=[], + noarchive=False, + optimize=0, +) +pyz = PYZ(a.pure) + +exe = EXE( + pyz, + a.scripts, + a.binaries, + a.datas, + [], + name='main', + debug=False, + bootloader_ignore_signals=False, + strip=False, + upx=True, + upx_exclude=[], + runtime_tmpdir=None, + console=False, + disable_windowed_traceback=False, + argv_emulation=False, + target_arch=None, + codesign_identity=None, + entitlements_file=None, +) diff --git a/test_address_input.py b/test_address_input.py new file mode 100644 index 0000000..e7449f8 --- /dev/null +++ b/test_address_input.py @@ -0,0 +1,135 @@ +# 测试地址域输入处理 +class TestAddressInput: + def __init__(self): + # 模拟SerialPortTool类的相关方法 + self.base_commands = { + "合闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 9A 16", + "拉闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4D 33 8C 8C 56 54 34 59 97 16" + } + + def hex_string_to_bytes(self, hex_str): + """将带空格的十六进制字符串转换为字节数组""" + hex_clean = hex_str.replace(" ", "") + return bytes.fromhex(hex_clean) + + def calculate_cs(self, data_bytes): + """计算校验和(XOR算法)""" + cs = 0 + for byte in data_bytes: + cs ^= byte + return cs + + def generate_command_with_id(self, base_command, command_type, device_input): + """测试地址域输入处理""" + try: + # 将原指令拆分为列表 + parts = base_command.split() + + # 确保指令长度正确 + if len(parts) != 32: + print(f"警告:指令长度不正确,预期32个部分,实际{len(parts)}个部分") + # 修正指令长度,确保格式正确 + parts = parts[:32] + while len(parts) < 32: + parts.append("00") + + # 首先处理合闸指令的4D替换,确保不会被其他逻辑影响 + if command_type == "合闸": + for i, part in enumerate(parts): + if part == "4D": + parts[i] = "4F" + break + + # 处理设备输入:支持两种格式 + # 1. 单个设备编号(如 "1" 或 "10") + # 2. 完整的地址域字符串(如 "080009102420") + if len(device_input) == 12 and device_input.isdigit(): + # 完整的地址域字符串,需要拆分为6个2位十六进制部分 + print(f"检测到完整地址域输入: {device_input}") + # 将地址域字符串拆分为6个2位十六进制部分 + address_parts = [device_input[i:i+2] for i in range(0, 12, 2)] + print(f"地址域拆分结果: {address_parts}") + # 替换地址域(索引5到10) + if len(parts) >= 11: + for i in range(6): + if i < len(address_parts) and 5 + i < len(parts): + parts[5 + i] = address_parts[i].upper() + else: + # 单个设备编号,转换为2位十六进制 + print(f"检测到设备编号输入: {device_input}") + device_id_int = int(device_input) + device_id_hex = f"{device_id_int:02X}" + # 替换设备编号:原指令中第11个位置(索引10)是设备编号位置 + if len(parts) > 10: + parts[10] = device_id_hex # 索引10是设备编号位置 + + # 移除旧的校验码,构建不带校验码的指令 + command_without_cs = parts[:-2] # 移除校验码和结束符 + command_str = " ".join(command_without_cs) + + # 计算校验码 + command_bytes = self.hex_string_to_bytes(command_str) + cs = self.calculate_cs(command_bytes) + cs_hex = f"{cs:02X}" + + # 替换校验码:原指令中倒数第2个位置(索引-2) + if len(parts) > 1: + parts[-2] = cs_hex + + # 确保结束符正确 + parts[-1] = "16" + + # 构建最终指令 + final_command = " ".join(parts) + + # 调试:打印最终指令 + print(f"命令类型: {command_type}") + print(f"设备输入: {device_input}") + print(f"最终指令: {final_command}") + + # 验证指令格式是否正确 + final_parts = final_command.split() + print(f"\n最终指令各部分索引和值:") + for i, part in enumerate(final_parts): + print(f"索引 {i}: {part}") + + print(f"\n指令长度: {len(final_parts)}") + print(f"地址域(索引5-10): {' '.join(final_parts[5:11])}") + print(f"指令格式是否正确: {len(final_parts) == 32 and final_parts[0] == 'FE' and final_parts[-1] == '16'}") + + return final_command + except Exception as e: + print(f"指令生成错误: {e}") + return base_command + +# 测试 +if __name__ == "__main__": + test = TestAddressInput() + + # 测试1:完整地址域输入 "080009102420" + print("=== 测试拉闸指令,完整地址域输入: 080009102420 ===") + test.generate_command_with_id( + test.base_commands["拉闸"], + "拉闸", + "080009102420" + ) + + print("\n" + "="*50 + "\n") + + # 测试2:单个设备编号输入 "10" + print("=== 测试合闸指令,单个设备编号输入: 10 ===") + test.generate_command_with_id( + test.base_commands["合闸"], + "合闸", + "10" + ) + + print("\n" + "="*50 + "\n") + + # 测试3:完整地址域输入 "080009102408" + print("=== 测试拉闸指令,完整地址域输入: 080009102408 ===") + test.generate_command_with_id( + test.base_commands["拉闸"], + "拉闸", + "080009102408" + ) \ No newline at end of file diff --git a/test_check_cs.py b/test_check_cs.py new file mode 100644 index 0000000..5da662f --- /dev/null +++ b/test_check_cs.py @@ -0,0 +1,55 @@ +# 测试校验位计算 + +# 成功的合闸报文 +success_command = "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 9A 16" + +# 失败的合闸报文(校验位错误) +fail_command = "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 99 16" + + +def calculate_command_cs(command_str): + """计算指令的CS校验位""" + try: + # 将指令拆分为列表 + parts = command_str.split() + # 移除校验码和结束符(如果存在) + if len(parts) >= 2: + parts = parts[:-2] + # 重新组合为不带校验码和结束符的指令字符串 + command_without_cs = " ".join(parts) + print(f"计算校验位的指令部分: {command_without_cs}") + # 转换为字节数组 + command_bytes = bytes.fromhex(command_without_cs) + print(f"指令字节长度: {len(command_bytes)}") + # 计算校验和 + cs = 0 + for byte in command_bytes: + cs ^= byte + print(f"XOR进行中: 0x{cs:02X}") + # 转换为2位十六进制字符串 + cs_hex = f"{cs:02X}" + print(f"最终校验位: {cs_hex}") + return cs_hex + except Exception as e: + print(f"CS校验位计算错误: {e}") + return "00" + +# 测试成功报文的校验位 +print("=== 测试成功报文的校验位 ===") +success_cs = calculate_command_cs(success_command) +print(f"成功报文的校验位: {success_cs}") + +print("\n" + "="*50 + "\n") + +# 测试失败报文的校验位 +print("=== 测试失败报文的校验位 ===") +fail_cs = calculate_command_cs(fail_command) +print(f"失败报文的校验位: {fail_cs}") +print(f"失败报文原校验位: 99") +print(f"正确的校验位应该是: {fail_cs}") + +# 生成正确的失败报文 +fail_parts = fail_command.split() +fail_parts[-2] = fail_cs +correct_fail_command = " ".join(fail_parts) +print(f"\n正确的报文: {correct_fail_command}") \ No newline at end of file diff --git a/test_command.py b/test_command.py new file mode 100644 index 0000000..a4a171d --- /dev/null +++ b/test_command.py @@ -0,0 +1,15 @@ +# 测试指令结构分析 +command = "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 9A 16" +parts = command.split() + +print("指令各部分索引和值:") +for i, part in enumerate(parts): + print(f"索引 {i}: {part}") + +print("\n指令长度:", len(parts)) +print("地址域开始:索引 4 (68)") +print("地址域内容:", parts[5:11]) # 索引5到10 +print("地址域结束:索引 11 (68)") +print("数据域开始:索引 12") +print("校验码:索引", len(parts)-2) +print("结束符:索引", len(parts)-1) \ No newline at end of file