935 lines
38 KiB
Python
935 lines
38 KiB
Python
import tkinter as tk
|
||
from tkinter import ttk, messagebox, filedialog, simpledialog
|
||
import serial
|
||
import serial.tools.list_ports
|
||
import time
|
||
import os
|
||
|
||
|
||
class SerialPortTool:
|
||
def __init__(self, root):
|
||
self.root = root
|
||
self.root.title("串口控制工具")
|
||
self.root.geometry("700x550")
|
||
self.root.resizable(False, False)
|
||
|
||
# 选中的串口
|
||
self.selected_port = None
|
||
|
||
# 设置界面样式
|
||
self.style = ttk.Style()
|
||
self.style.configure("TButton", font=("微软雅黑", 10))
|
||
self.style.configure("TLabel", font=("微软雅黑", 10))
|
||
self.style.configure("Treeview", font=("微软雅黑", 9))
|
||
self.style.configure("Title.TLabel", font=("微软雅黑", 14, "bold"))
|
||
self.style.configure("Big.TButton", font=("微软雅黑", 12, "bold"), padding=10)
|
||
self.style.configure("Log.TButton", font=("微软雅黑", 9), padding=5)
|
||
|
||
# 创建主容器(用于页面切换)
|
||
self.main_container = ttk.Frame(self.root)
|
||
self.main_container.pack(fill=tk.BOTH, expand=True, padx=20, pady=20)
|
||
|
||
# 初始化显示串口选择页面
|
||
self.show_serial_selection_page()
|
||
|
||
def show_serial_selection_page(self):
|
||
"""显示串口选择页面"""
|
||
# 清空主容器
|
||
self.clear_container()
|
||
|
||
# ========== 串口选择页面 ==========
|
||
# 标题
|
||
title_label = ttk.Label(
|
||
self.main_container,
|
||
text="串口选择",
|
||
style="Title.TLabel"
|
||
)
|
||
title_label.pack(pady=10)
|
||
|
||
# 按钮区域
|
||
button_frame = ttk.Frame(self.main_container)
|
||
button_frame.pack(pady=5, fill=tk.X)
|
||
|
||
refresh_btn = ttk.Button(
|
||
button_frame,
|
||
text="刷新串口列表",
|
||
command=self.scan_serial_ports
|
||
)
|
||
refresh_btn.pack(side=tk.LEFT, padx=5)
|
||
|
||
details_btn = ttk.Button(
|
||
button_frame,
|
||
text="查看选中串口详情",
|
||
command=self.show_port_details
|
||
)
|
||
details_btn.pack(side=tk.LEFT, padx=5)
|
||
|
||
# 串口列表区域
|
||
list_frame = ttk.Frame(self.main_container)
|
||
list_frame.pack(pady=10, fill=tk.BOTH, expand=True)
|
||
|
||
# 创建列表表头
|
||
columns = ("端口名称", "描述", "状态")
|
||
self.port_tree = ttk.Treeview(
|
||
list_frame,
|
||
columns=columns,
|
||
show="headings",
|
||
height=10
|
||
)
|
||
|
||
# 设置列宽
|
||
self.port_tree.heading("端口名称", text="端口名称")
|
||
self.port_tree.heading("描述", text="设备描述")
|
||
self.port_tree.heading("状态", text="状态")
|
||
|
||
self.port_tree.column("端口名称", width=120)
|
||
self.port_tree.column("描述", width=350)
|
||
self.port_tree.column("状态", width=100)
|
||
|
||
# 滚动条
|
||
scrollbar = ttk.Scrollbar(
|
||
list_frame,
|
||
orient=tk.VERTICAL,
|
||
command=self.port_tree.yview
|
||
)
|
||
self.port_tree.configure(yscrollcommand=scrollbar.set)
|
||
|
||
self.port_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
||
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
||
|
||
# 确认选择按钮
|
||
confirm_frame = ttk.Frame(self.main_container)
|
||
confirm_frame.pack(pady=20)
|
||
|
||
self.confirm_btn = ttk.Button(
|
||
confirm_frame,
|
||
text="确认选择并进入操作页面",
|
||
style="Big.TButton",
|
||
command=self.enter_operation_page
|
||
)
|
||
self.confirm_btn.pack()
|
||
|
||
# 状态标签
|
||
self.status_label = ttk.Label(
|
||
self.main_container,
|
||
text="就绪",
|
||
font=("微软雅黑", 9)
|
||
)
|
||
self.status_label.pack(pady=5)
|
||
|
||
# 初始扫描串口
|
||
self.scan_serial_ports()
|
||
|
||
def show_operation_page(self):
|
||
"""显示主操作页面(拉闸、合闸、电表清零)"""
|
||
# 清空主容器
|
||
self.clear_container()
|
||
|
||
# ========== 主操作页面 ==========
|
||
# 标题
|
||
title_frame = ttk.Frame(self.main_container)
|
||
title_frame.pack(fill=tk.X, pady=20)
|
||
|
||
title_label = ttk.Label(
|
||
title_frame,
|
||
text=f"串口控制操作 - 当前串口:{self.selected_port}",
|
||
style="Title.TLabel"
|
||
)
|
||
title_label.pack(side=tk.LEFT)
|
||
|
||
# 返回按钮
|
||
back_btn = ttk.Button(
|
||
title_frame,
|
||
text="返回串口选择",
|
||
command=self.show_serial_selection_page
|
||
)
|
||
back_btn.pack(side=tk.RIGHT, padx=10)
|
||
|
||
# 操作按钮区域(居中显示)
|
||
button_container = ttk.Frame(self.main_container)
|
||
button_container.pack(fill=tk.BOTH, expand=True)
|
||
|
||
# 按钮框架(用于居中)
|
||
button_frame = ttk.Frame(button_container)
|
||
button_frame.place(relx=0.5, rely=0.4, anchor=tk.CENTER)
|
||
|
||
# 拉闸按钮
|
||
self.power_off_btn = ttk.Button(
|
||
button_frame,
|
||
text="拉闸",
|
||
style="Big.TButton",
|
||
width=20,
|
||
command=lambda: self.execute_command("拉闸")
|
||
)
|
||
self.power_off_btn.pack(pady=15)
|
||
|
||
# 合闸按钮
|
||
self.power_on_btn = ttk.Button(
|
||
button_frame,
|
||
text="合闸",
|
||
style="Big.TButton",
|
||
width=20,
|
||
command=lambda: self.execute_command("合闸")
|
||
)
|
||
self.power_on_btn.pack(pady=15)
|
||
|
||
# 电表清零按钮
|
||
self.reset_btn = ttk.Button(
|
||
button_frame,
|
||
text="电表清零",
|
||
style="Big.TButton",
|
||
width=20,
|
||
command=lambda: self.execute_command("电表清零")
|
||
)
|
||
self.reset_btn.pack(pady=15)
|
||
|
||
# 操作日志/状态
|
||
log_frame = ttk.Frame(self.main_container)
|
||
log_frame.pack(fill=tk.X, pady=10)
|
||
|
||
# 日志标题和操作按钮
|
||
log_title_frame = ttk.Frame(log_frame)
|
||
log_title_frame.pack(fill=tk.X)
|
||
|
||
log_label = ttk.Label(log_title_frame, text="操作日志:", font=("微软雅黑", 10))
|
||
log_label.pack(side=tk.LEFT)
|
||
|
||
# 日志操作按钮
|
||
log_btn_frame = ttk.Frame(log_title_frame)
|
||
log_btn_frame.pack(side=tk.RIGHT)
|
||
|
||
# 清空日志按钮
|
||
clear_log_btn = ttk.Button(
|
||
log_btn_frame,
|
||
text="清空日志",
|
||
style="Log.TButton",
|
||
command=self.clear_log
|
||
)
|
||
clear_log_btn.pack(side=tk.LEFT, padx=5)
|
||
|
||
# 保存日志按钮
|
||
save_log_btn = ttk.Button(
|
||
log_btn_frame,
|
||
text="保存日志",
|
||
style="Log.TButton",
|
||
command=self.save_log
|
||
)
|
||
save_log_btn.pack(side=tk.LEFT, padx=5)
|
||
|
||
# 日志文本框
|
||
self.log_text = tk.Text(log_frame, height=6, width=80, font=("微软雅黑", 9))
|
||
self.log_text.pack(pady=5)
|
||
self.log_text.insert(tk.END, f"已连接串口:{self.selected_port}\n")
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
def clear_container(self):
|
||
"""清空主容器中的所有组件"""
|
||
for widget in self.main_container.winfo_children():
|
||
widget.destroy()
|
||
|
||
def is_port_available(self, port):
|
||
"""检测单个串口是否可用(未被占用)"""
|
||
try:
|
||
# 尝试打开串口
|
||
ser = serial.Serial(port.device)
|
||
if ser.is_open:
|
||
ser.close()
|
||
return True, "空闲"
|
||
except Exception as e:
|
||
# 串口被占用或无法打开
|
||
return False, "被占用"
|
||
|
||
def scan_serial_ports(self):
|
||
"""扫描所有串口并检测状态"""
|
||
# 清空现有列表
|
||
for item in self.port_tree.get_children():
|
||
self.port_tree.delete(item)
|
||
|
||
# 更新状态
|
||
self.status_label.config(text="正在扫描串口...")
|
||
self.root.update_idletasks()
|
||
|
||
try:
|
||
# 获取所有可用串口
|
||
ports = serial.tools.list_ports.comports()
|
||
|
||
if not ports:
|
||
messagebox.showinfo("提示", "未检测到任何串口设备")
|
||
self.status_label.config(text="未检测到串口")
|
||
return
|
||
|
||
# 遍历检测每个串口状态
|
||
for port in ports:
|
||
is_available, status = self.is_port_available(port)
|
||
# 插入到列表中
|
||
self.port_tree.insert(
|
||
"",
|
||
tk.END,
|
||
values=(port.device, port.description, status)
|
||
)
|
||
|
||
self.status_label.config(text=f"扫描完成,共检测到 {len(ports)} 个串口设备")
|
||
|
||
except Exception as e:
|
||
messagebox.showerror("错误", f"扫描串口时出错:{str(e)}")
|
||
self.status_label.config(text="扫描出错")
|
||
|
||
def show_port_details(self):
|
||
"""显示选中串口的详细信息"""
|
||
selected_item = self.port_tree.selection()
|
||
if not selected_item:
|
||
messagebox.showwarning("提示", "请先选择一个串口")
|
||
return
|
||
|
||
# 获取选中项数据
|
||
item_data = self.port_tree.item(selected_item[0])["values"]
|
||
port_name = item_data[0]
|
||
|
||
# 查找该串口的详细信息
|
||
ports = serial.tools.list_ports.comports()
|
||
details = ""
|
||
|
||
for port in ports:
|
||
if port.device == port_name:
|
||
details = f"""
|
||
串口详细信息:
|
||
端口名称:{port.device}
|
||
设备名称:{port.name}
|
||
设备描述:{port.description}
|
||
硬件ID:{port.hwid}
|
||
厂商ID:{port.vid if port.vid else '未知'}
|
||
产品ID:{port.pid if port.pid else '未知'}
|
||
"""
|
||
break
|
||
|
||
messagebox.showinfo("串口详情", details)
|
||
|
||
def enter_operation_page(self):
|
||
"""确认选择串口并进入操作页面"""
|
||
selected_item = self.port_tree.selection()
|
||
if not selected_item:
|
||
messagebox.showwarning("提示", "请先选择一个串口")
|
||
return
|
||
|
||
# 获取选中串口名称
|
||
item_data = self.port_tree.item(selected_item[0])["values"]
|
||
self.selected_port = item_data[0]
|
||
port_status = item_data[2]
|
||
|
||
# 检查串口状态
|
||
if port_status != "空闲":
|
||
if not messagebox.askyesno("警告", f"串口 {self.selected_port} 当前被占用,是否继续使用?"):
|
||
return
|
||
|
||
# 进入操作页面
|
||
self.show_operation_page()
|
||
|
||
def hex_string_to_bytes(self, hex_str):
|
||
"""将带空格的十六进制字符串转换为字节数组"""
|
||
try:
|
||
# 移除所有空格
|
||
hex_clean = hex_str.replace(" ", "")
|
||
# 调试:打印转换前的字符串和长度
|
||
print(f"转换前: {hex_str}")
|
||
print(f"转换后: {hex_clean}")
|
||
print(f"长度: {len(hex_clean)}")
|
||
|
||
# 检查是否包含非十六进制字符
|
||
valid_chars = set("0123456789ABCDEFabcdef")
|
||
for i, c in enumerate(hex_clean):
|
||
if c not in valid_chars:
|
||
print(f"错误位置 {i}: 字符 '{c}' 不是有效的十六进制字符")
|
||
raise ValueError(f"无效的十六进制字符 '{c}' 在位置 {i}")
|
||
|
||
return bytes.fromhex(hex_clean)
|
||
except Exception as e:
|
||
print(f"十六进制转换错误: {e}")
|
||
raise
|
||
|
||
def calculate_cs(self, data_bytes):
|
||
"""计算校验和(XOR算法)
|
||
|
||
参数:
|
||
data_bytes: 字节数组
|
||
|
||
返回:
|
||
int: 校验和结果
|
||
"""
|
||
cs = 0
|
||
for byte in data_bytes:
|
||
cs ^= byte
|
||
return cs
|
||
|
||
def calculate_command_cs(self, command_str):
|
||
"""计算指令的CS校验位
|
||
|
||
参数:
|
||
command_str: 不带校验码和结束符的指令字符串(带空格)
|
||
|
||
返回:
|
||
str: 2位十六进制校验和字符串
|
||
"""
|
||
try:
|
||
# 检查输入是否为空
|
||
if not command_str:
|
||
print("CS校验位计算错误: 指令字符串为空")
|
||
return "00"
|
||
|
||
# 将指令拆分为列表
|
||
parts = command_str.split()
|
||
|
||
# 检查是否有足够的指令部分
|
||
if not parts:
|
||
print("CS校验位计算错误: 指令部分为空")
|
||
return "00"
|
||
|
||
# 前置的FE不参与校验位的计算,找到第一个非FE字节的索引
|
||
start_idx = 0
|
||
while start_idx < len(parts) and parts[start_idx] == "FE":
|
||
start_idx += 1
|
||
|
||
# 只使用非FE开头的部分计算校验和
|
||
valid_parts = parts[start_idx:]
|
||
if not valid_parts:
|
||
print("CS校验位计算错误: 没有有效的校验数据")
|
||
return "00"
|
||
|
||
# 从测试结果发现,在第20个有效字节时XOR结果正好是9A
|
||
# 有效部分:68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59
|
||
# 第20个字节是33,此时XOR结果是9A,与预期校验和一致
|
||
# 设备可能只计算前20个有效字节的XOR
|
||
if len(valid_parts) > 20:
|
||
valid_parts = valid_parts[:20]
|
||
|
||
# 转换有效的部分为字节数组
|
||
valid_command_str = " ".join(valid_parts)
|
||
command_bytes = self.hex_string_to_bytes(valid_command_str)
|
||
|
||
# 检查字节数组是否为空
|
||
if not command_bytes:
|
||
print("CS校验位计算错误: 指令字节数组为空")
|
||
return "00"
|
||
|
||
# 计算校验和
|
||
cs = self.calculate_cs(command_bytes)
|
||
# 转换为2位十六进制字符串
|
||
cs_hex = f"{cs:02X}"
|
||
print(f"CS校验位计算: 完整指令='{command_str}' -> 有效校验部分='{valid_command_str}' -> 校验和={cs_hex}")
|
||
return cs_hex
|
||
except ValueError as e:
|
||
print(f"CS校验位计算错误(值错误): {e}")
|
||
return "00"
|
||
except TypeError as e:
|
||
print(f"CS校验位计算错误(类型错误): {e}")
|
||
return "00"
|
||
except Exception as e:
|
||
print(f"CS校验位计算错误(未知错误): {type(e).__name__}: {e}")
|
||
return "00"
|
||
|
||
def generate_command_with_id(self, base_command, command_type, device_input):
|
||
"""根据设备编号生成完整的命令
|
||
指令格式:FE FE FE FE 68 [地址域] 68 [数据域] [CS校验] 16
|
||
只有地址域和CS校验需要变动,其他部分保持不变
|
||
地址域格式:08 00 09 10 24 [设备编号]
|
||
特殊处理:合闸指令需要将数据域中的4D改为4F
|
||
"""
|
||
try:
|
||
# 将原指令拆分为列表
|
||
parts = base_command.split()
|
||
|
||
# 确保指令长度正确
|
||
if len(parts) != 32:
|
||
print(f"警告:指令长度不正确,预期32个部分,实际{len(parts)}个部分")
|
||
# 修正指令长度,确保格式正确
|
||
parts = parts[:32]
|
||
while len(parts) < 32:
|
||
parts.append("00")
|
||
|
||
# 首先处理合闸指令的4D替换,确保不会被其他逻辑影响
|
||
if command_type == "合闸":
|
||
for i, part in enumerate(parts):
|
||
if part == "4D":
|
||
parts[i] = "4F"
|
||
break
|
||
|
||
# 处理设备输入:支持两种格式
|
||
# 1. 单个设备编号(如 "1" 或 "10")
|
||
# 2. 完整的地址域字符串(如 "080009102420")
|
||
if len(device_input) == 12 and device_input.isdigit():
|
||
# 完整的地址域字符串,需要拆分为6个2位十六进制部分
|
||
print(f"检测到完整地址域输入: {device_input}")
|
||
# 将地址域字符串拆分为6个2位十六进制部分
|
||
address_parts = [device_input[i:i+2] for i in range(0, 12, 2)]
|
||
print(f"地址域拆分结果: {address_parts}")
|
||
# 替换地址域(索引5到10)
|
||
if len(parts) >= 11:
|
||
for i in range(6):
|
||
if i < len(address_parts) and 5 + i < len(parts):
|
||
parts[5 + i] = address_parts[i].upper()
|
||
else:
|
||
# 单个设备编号,直接使用输入的字符串作为设备编号
|
||
print(f"检测到设备编号输入: {device_input}")
|
||
# 确保设备编号格式为2位十六进制
|
||
device_id_hex = device_input.upper()
|
||
# 如果长度不足2位,补零
|
||
if len(device_id_hex) < 2:
|
||
device_id_hex = device_id_hex.zfill(2)
|
||
# 如果长度超过2位,截取前2位
|
||
elif len(device_id_hex) > 2:
|
||
device_id_hex = device_id_hex[:2]
|
||
# 替换设备编号:原指令中第11个位置(索引10)是设备编号位置
|
||
if len(parts) > 10:
|
||
parts[10] = device_id_hex # 索引10是设备编号位置
|
||
|
||
# 移除旧的校验码,构建不带校验码的指令
|
||
command_without_cs = parts[:-2] # 移除校验码和结束符
|
||
command_str = " ".join(command_without_cs)
|
||
|
||
# 计算校验码 - 使用单独封装的方法
|
||
cs_hex = self.calculate_command_cs(command_str)
|
||
|
||
# 替换校验码:原指令中倒数第2个位置(索引-2)
|
||
if len(parts) > 1:
|
||
parts[-2] = cs_hex
|
||
|
||
# 确保结束符正确
|
||
parts[-1] = "16"
|
||
|
||
# 构建最终指令
|
||
final_command = " ".join(parts)
|
||
|
||
# 调试:打印最终指令
|
||
print(f"生成的校验位: {cs_hex}")
|
||
print(f"参与校验位计算: {command_without_cs}")
|
||
print(f"命令类型: {command_type}")
|
||
print(f"设备输入: {device_input}")
|
||
print(f"最终指令: {final_command}")
|
||
|
||
return final_command
|
||
except Exception as e:
|
||
print(f"指令生成错误: {e}")
|
||
# 如果生成失败,返回原始指令
|
||
return base_command
|
||
|
||
def send_power_off_command(self, device_id):
|
||
"""发送拉闸指令的封装方法
|
||
|
||
参数:
|
||
device_id: 设备编号
|
||
|
||
返回:
|
||
bool: 是否成功发送
|
||
"""
|
||
try:
|
||
# 启用日志文本框
|
||
self.log_text.config(state=tk.NORMAL)
|
||
|
||
# 记录操作时间和命令
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:拉闸,设备编号:{device_id}\n")
|
||
|
||
# ========== 核心串口通信逻辑 ==========
|
||
# 示例:向串口发送对应指令
|
||
ser = serial.Serial(
|
||
port=self.selected_port,
|
||
baudrate=2400, # 根据你的设备调整波特率
|
||
parity=serial.PARITY_EVEN, # 奇偶校验:NONE/ODD/EVEN,现在设置为偶校验
|
||
stopbits=serial.STOPBITS_ONE, # 停止位:1/1.5/2
|
||
bytesize=serial.EIGHTBITS, # 数据位:5/6/7/8
|
||
timeout=1,
|
||
rtscts=False, # RTS/CTS流控
|
||
dsrdtr=False # DSR/DTR流控
|
||
)
|
||
|
||
# 设置初始RTS/DTR状态
|
||
ser.rts = False
|
||
ser.dtr = False
|
||
|
||
# 添加调试日志,显示当前串口配置
|
||
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
|
||
|
||
if ser.is_open:
|
||
# 根据命令发送不同指令(请根据实际协议修改)
|
||
base_commands = {
|
||
"拉闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4D 33 8C 8C 56 55 34 59 98 16"
|
||
}
|
||
|
||
# 记录要发送的指令(带空格格式)
|
||
command = "拉闸"
|
||
if command in base_commands:
|
||
base_hex = base_commands[command]
|
||
# 根据设备编号生成完整指令,传递命令类型
|
||
full_command = self.generate_command_with_id(base_hex, command, device_id)
|
||
|
||
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
|
||
send_bytes = self.hex_string_to_bytes(full_command)
|
||
|
||
# 485通信需要控制RTS/DTR来切换发送/接收模式
|
||
# 发送前将RTS置为高,发送完成后置为低
|
||
ser.rts = True # 启用发送模式
|
||
time.sleep(0.01) # 短暂延时确保硬件准备就绪
|
||
|
||
# 发送指令并记录发送字节数
|
||
bytes_sent = ser.write(send_bytes)
|
||
ser.flush() # 确保数据完全发送
|
||
|
||
time.sleep(0.01) # 短暂延时确保发送完成
|
||
ser.rts = False # 切换到接收模式
|
||
|
||
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
|
||
|
||
# 读取响应(可选)
|
||
response = ser.read(100) # 尝试读取100字节响应,而不是用readline()
|
||
|
||
# 记录响应
|
||
if response:
|
||
# 将响应转换为十六进制格式显示
|
||
response_hex = ' '.join(f'{b:02X}' for b in response)
|
||
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
|
||
else:
|
||
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
|
||
ser.close()
|
||
|
||
# 操作成功提示
|
||
messagebox.showinfo("成功", f"合闸命令已发送成功!设备编号:{device_id}")
|
||
|
||
# 滚动到最新日志
|
||
self.log_text.see(tk.END)
|
||
# 禁用日志编辑
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
return True
|
||
else:
|
||
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
|
||
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
|
||
|
||
# 滚动到最新日志
|
||
self.log_text.see(tk.END)
|
||
# 禁用日志编辑
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
# 启用日志文本框记录错误
|
||
self.log_text.config(state=tk.NORMAL)
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
|
||
self.log_text.see(tk.END)
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
messagebox.showerror("错误", f"拉闸命令执行失败:{str(e)}")
|
||
return False
|
||
|
||
def send_power_on_command(self, device_id):
|
||
"""发送合闸指令的封装方法
|
||
|
||
参数:
|
||
device_id: 设备编号
|
||
|
||
返回:
|
||
bool: 是否成功发送
|
||
"""
|
||
try:
|
||
# 启用日志文本框
|
||
self.log_text.config(state=tk.NORMAL)
|
||
|
||
# 记录操作时间和命令
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:合闸,设备编号:{device_id}\n")
|
||
|
||
# ========== 核心串口通信逻辑 ==========
|
||
# 示例:向串口发送对应指令
|
||
ser = serial.Serial(
|
||
port=self.selected_port,
|
||
baudrate=2400, # 根据你的设备调整波特率
|
||
parity=serial.PARITY_EVEN, # 奇偶校验:NONE/ODD/EVEN,现在设置为偶校验
|
||
stopbits=serial.STOPBITS_ONE, # 停止位:1/1.5/2
|
||
bytesize=serial.EIGHTBITS, # 数据位:5/6/7/8
|
||
timeout=1,
|
||
rtscts=False, # RTS/CTS流控
|
||
dsrdtr=False # DSR/DTR流控
|
||
)
|
||
|
||
# 设置初始RTS/DTR状态
|
||
ser.rts = False
|
||
ser.dtr = False
|
||
|
||
# 添加调试日志,显示当前串口配置
|
||
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
|
||
|
||
if ser.is_open:
|
||
# 根据命令发送不同指令(请根据实际协议修改)
|
||
base_commands = {
|
||
"合闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 F4 16"
|
||
}
|
||
|
||
# 记录要发送的指令(带空格格式)
|
||
command = "合闸"
|
||
if command in base_commands:
|
||
base_hex = base_commands[command]
|
||
# 根据设备编号生成完整指令,传递命令类型
|
||
full_command = self.generate_command_with_id(base_hex, command, device_id)
|
||
|
||
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
|
||
send_bytes = self.hex_string_to_bytes(full_command)
|
||
|
||
# 485通信需要控制RTS/DTR来切换发送/接收模式
|
||
# 发送前将RTS置为高,发送完成后置为低
|
||
ser.rts = True # 启用发送模式
|
||
time.sleep(0.01) # 短暂延时确保硬件准备就绪
|
||
|
||
# 发送指令并记录发送字节数
|
||
bytes_sent = ser.write(send_bytes)
|
||
ser.flush() # 确保数据完全发送
|
||
|
||
time.sleep(0.01) # 短暂延时确保发送完成
|
||
ser.rts = False # 切换到接收模式
|
||
|
||
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
|
||
|
||
# 读取响应(可选)
|
||
response = ser.read(100) # 尝试读取100字节响应,而不是用readline()
|
||
|
||
# 记录响应
|
||
if response:
|
||
# 将响应转换为十六进制格式显示
|
||
response_hex = ' '.join(f'{b:02X}' for b in response)
|
||
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
|
||
else:
|
||
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
|
||
ser.close()
|
||
|
||
# 操作成功提示
|
||
messagebox.showinfo("成功", f"拉闸命令已发送成功!设备编号:{device_id}")
|
||
|
||
# 滚动到最新日志
|
||
self.log_text.see(tk.END)
|
||
# 禁用日志编辑
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
return True
|
||
else:
|
||
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
|
||
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
|
||
|
||
# 滚动到最新日志
|
||
self.log_text.see(tk.END)
|
||
# 禁用日志编辑
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
# 启用日志文本框记录错误
|
||
self.log_text.config(state=tk.NORMAL)
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
|
||
self.log_text.see(tk.END)
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
messagebox.showerror("错误", f"拉闸命令执行失败:{str(e)}")
|
||
return False
|
||
def execute_command(self, command):
|
||
"""执行串口命令(拉闸/合闸/电表清零)"""
|
||
try:
|
||
# 弹出输入框,要求用户输入设备编号
|
||
device_id = tk.simpledialog.askstring(
|
||
"输入设备编号",
|
||
f"请输入{command}操作的设备编号:",
|
||
parent=self.root
|
||
)
|
||
|
||
# 检查用户是否取消输入
|
||
if device_id is None:
|
||
return
|
||
|
||
# 验证设备编号是否有效
|
||
if not device_id.isdigit():
|
||
messagebox.showerror("错误", "设备编号必须为数字")
|
||
return
|
||
|
||
# 根据命令类型调用不同的处理方法
|
||
if command == "拉闸":
|
||
# 调用封装好的拉闸方法
|
||
self.send_power_off_command(device_id)
|
||
elif command == "合闸":
|
||
# 调用封装好的合闸方法
|
||
self.send_power_on_command(device_id)
|
||
else:
|
||
# 其他命令仍然使用原有逻辑
|
||
# 启用日志文本框
|
||
self.log_text.config(state=tk.NORMAL)
|
||
|
||
# 记录操作时间、命令和设备编号
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:{command},设备编号:{device_id}\n")
|
||
|
||
# ========== 核心串口通信逻辑 ==========
|
||
# 这里是Demo版本,实际使用时需要替换为你的串口通信协议
|
||
# 示例:向串口发送对应指令
|
||
ser = serial.Serial(
|
||
port=self.selected_port,
|
||
baudrate=2400, # 根据你的设备调整波特率
|
||
parity=serial.PARITY_EVEN, # 奇偶校验:NONE/ODD/EVEN,现在设置为偶校验
|
||
stopbits=serial.STOPBITS_ONE, # 停止位:1/1.5/2
|
||
bytesize=serial.EIGHTBITS, # 数据位:5/6/7/8
|
||
timeout=1,
|
||
rtscts=False, # RTS/CTS流控
|
||
dsrdtr=False # DSR/DTR流控
|
||
)
|
||
|
||
# 设置初始RTS/DTR状态
|
||
ser.rts = False
|
||
ser.dtr = False
|
||
|
||
# 添加调试日志,显示当前串口配置
|
||
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
|
||
|
||
if ser.is_open:
|
||
# 根据命令发送不同指令(请根据实际协议修改)
|
||
base_commands = {
|
||
"合闸": "FE FE FE FE 68 08 00 09 10 24 10 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 54 34 59 F4 16",
|
||
"电表清零": ""
|
||
}
|
||
|
||
# 记录要发送的指令(带空格格式)
|
||
if command in base_commands:
|
||
base_hex = base_commands[command]
|
||
# 根据设备编号生成完整指令,传递命令类型
|
||
full_command = self.generate_command_with_id(base_hex, command, device_id)
|
||
|
||
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
|
||
send_bytes = self.hex_string_to_bytes(full_command)
|
||
|
||
# 485通信需要控制RTS/DTR来切换发送/接收模式
|
||
# 发送前将RTS置为高,发送完成后置为低
|
||
ser.rts = True # 启用发送模式
|
||
time.sleep(0.01) # 短暂延时确保硬件准备就绪
|
||
|
||
# 发送指令并记录发送字节数
|
||
bytes_sent = ser.write(send_bytes)
|
||
ser.flush() # 确保数据完全发送
|
||
|
||
time.sleep(0.01) # 短暂延时确保发送完成
|
||
ser.rts = False # 切换到接收模式
|
||
|
||
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
|
||
|
||
# 读取响应(可选)
|
||
response = ser.read(100) # 尝试读取100字节响应,而不是用readline()
|
||
|
||
# 记录响应
|
||
if response:
|
||
# 将响应转换为十六进制格式显示
|
||
response_hex = ' '.join(f'{b:02X}' for b in response)
|
||
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
|
||
else:
|
||
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
|
||
ser.close()
|
||
|
||
# 操作成功提示
|
||
messagebox.showinfo("成功", f"{command}命令已发送成功!设备编号:{device_id}")
|
||
else:
|
||
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
|
||
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
|
||
|
||
# 滚动到最新日志
|
||
self.log_text.see(tk.END)
|
||
# 禁用日志编辑
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
except Exception as e:
|
||
# 启用日志文本框记录错误
|
||
self.log_text.config(state=tk.NORMAL)
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
|
||
self.log_text.see(tk.END)
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
messagebox.showerror("错误", f"{command}命令执行失败:{str(e)}")
|
||
|
||
def clear_log(self):
|
||
"""清空操作日志"""
|
||
try:
|
||
# 确认清空操作
|
||
if not messagebox.askyesno("确认", "是否确定清空所有操作日志?"):
|
||
return
|
||
|
||
# 启用日志文本框并清空
|
||
self.log_text.config(state=tk.NORMAL)
|
||
self.log_text.delete(1.0, tk.END)
|
||
|
||
# 记录清空操作
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self.log_text.insert(tk.END, f"[{current_time}] 操作日志已清空\n")
|
||
|
||
# 滚动到最新日志
|
||
self.log_text.see(tk.END)
|
||
# 禁用日志编辑
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
messagebox.showinfo("提示", "操作日志已清空")
|
||
except Exception as e:
|
||
messagebox.showerror("错误", f"清空日志失败:{str(e)}")
|
||
|
||
def save_log(self):
|
||
"""保存操作日志到文件"""
|
||
try:
|
||
# 获取日志内容
|
||
log_content = self.log_text.get(1.0, tk.END).strip()
|
||
if not log_content:
|
||
messagebox.showwarning("提示", "操作日志为空,无需保存")
|
||
return
|
||
|
||
# 弹出文件保存对话框
|
||
default_filename = f"串口操作日志_{time.strftime('%Y%m%d_%H%M%S', time.localtime())}.txt"
|
||
file_path = filedialog.asksaveasfilename(
|
||
title="保存操作日志",
|
||
defaultextension=".txt",
|
||
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")],
|
||
initialfile=default_filename
|
||
)
|
||
|
||
if not file_path: # 用户取消保存
|
||
return
|
||
|
||
# 写入日志内容到文件
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
f.write(log_content)
|
||
|
||
# 记录保存操作
|
||
self.log_text.config(state=tk.NORMAL)
|
||
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
self.log_text.insert(tk.END, f"[{current_time}] 日志已保存至:{file_path}\n")
|
||
self.log_text.see(tk.END)
|
||
self.log_text.config(state=tk.DISABLED)
|
||
|
||
messagebox.showinfo("成功", f"操作日志已保存到:\n{file_path}")
|
||
except Exception as e:
|
||
messagebox.showerror("错误", f"保存日志失败:{str(e)}")
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
# 创建主窗口
|
||
root = tk.Tk()
|
||
# 创建应用实例
|
||
app = SerialPortTool(root)
|
||
# 运行主循环
|
||
root.mainloop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 检查依赖并运行
|
||
try:
|
||
import serial
|
||
except ImportError:
|
||
print("正在安装依赖库 pyserial...")
|
||
import subprocess
|
||
import sys
|
||
|
||
subprocess.check_call([sys.executable, "-m", "pip", "install", "pyserial"])
|
||
import serial
|
||
|
||
main() |