Files
485ChuanKou/main.py
2026-01-22 10:30:38 +08:00

935 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import tkinter as tk
from tkinter import ttk, messagebox, filedialog, simpledialog
import serial
import serial.tools.list_ports
import time
import os
class SerialPortTool:
def __init__(self, root):
self.root = root
self.root.title("串口控制工具")
self.root.geometry("700x550")
self.root.resizable(False, False)
# 选中的串口
self.selected_port = None
# 设置界面样式
self.style = ttk.Style()
self.style.configure("TButton", font=("微软雅黑", 10))
self.style.configure("TLabel", font=("微软雅黑", 10))
self.style.configure("Treeview", font=("微软雅黑", 9))
self.style.configure("Title.TLabel", font=("微软雅黑", 14, "bold"))
self.style.configure("Big.TButton", font=("微软雅黑", 12, "bold"), padding=10)
self.style.configure("Log.TButton", font=("微软雅黑", 9), padding=5)
# 创建主容器(用于页面切换)
self.main_container = ttk.Frame(self.root)
self.main_container.pack(fill=tk.BOTH, expand=True, padx=20, pady=20)
# 初始化显示串口选择页面
self.show_serial_selection_page()
def show_serial_selection_page(self):
"""显示串口选择页面"""
# 清空主容器
self.clear_container()
# ========== 串口选择页面 ==========
# 标题
title_label = ttk.Label(
self.main_container,
text="串口选择",
style="Title.TLabel"
)
title_label.pack(pady=10)
# 按钮区域
button_frame = ttk.Frame(self.main_container)
button_frame.pack(pady=5, fill=tk.X)
refresh_btn = ttk.Button(
button_frame,
text="刷新串口列表",
command=self.scan_serial_ports
)
refresh_btn.pack(side=tk.LEFT, padx=5)
details_btn = ttk.Button(
button_frame,
text="查看选中串口详情",
command=self.show_port_details
)
details_btn.pack(side=tk.LEFT, padx=5)
# 串口列表区域
list_frame = ttk.Frame(self.main_container)
list_frame.pack(pady=10, fill=tk.BOTH, expand=True)
# 创建列表表头
columns = ("端口名称", "描述", "状态")
self.port_tree = ttk.Treeview(
list_frame,
columns=columns,
show="headings",
height=10
)
# 设置列宽
self.port_tree.heading("端口名称", text="端口名称")
self.port_tree.heading("描述", text="设备描述")
self.port_tree.heading("状态", text="状态")
self.port_tree.column("端口名称", width=120)
self.port_tree.column("描述", width=350)
self.port_tree.column("状态", width=100)
# 滚动条
scrollbar = ttk.Scrollbar(
list_frame,
orient=tk.VERTICAL,
command=self.port_tree.yview
)
self.port_tree.configure(yscrollcommand=scrollbar.set)
self.port_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 确认选择按钮
confirm_frame = ttk.Frame(self.main_container)
confirm_frame.pack(pady=20)
self.confirm_btn = ttk.Button(
confirm_frame,
text="确认选择并进入操作页面",
style="Big.TButton",
command=self.enter_operation_page
)
self.confirm_btn.pack()
# 状态标签
self.status_label = ttk.Label(
self.main_container,
text="就绪",
font=("微软雅黑", 9)
)
self.status_label.pack(pady=5)
# 初始扫描串口
self.scan_serial_ports()
def show_operation_page(self):
"""显示主操作页面(拉闸、合闸、电表清零)"""
# 清空主容器
self.clear_container()
# ========== 主操作页面 ==========
# 标题
title_frame = ttk.Frame(self.main_container)
title_frame.pack(fill=tk.X, pady=20)
title_label = ttk.Label(
title_frame,
text=f"串口控制操作 - 当前串口:{self.selected_port}",
style="Title.TLabel"
)
title_label.pack(side=tk.LEFT)
# 返回按钮
back_btn = ttk.Button(
title_frame,
text="返回串口选择",
command=self.show_serial_selection_page
)
back_btn.pack(side=tk.RIGHT, padx=10)
# 操作按钮区域(居中显示)
button_container = ttk.Frame(self.main_container)
button_container.pack(fill=tk.BOTH, expand=True)
# 按钮框架(用于居中)
button_frame = ttk.Frame(button_container)
button_frame.place(relx=0.5, rely=0.4, anchor=tk.CENTER)
# 拉闸按钮
self.power_off_btn = ttk.Button(
button_frame,
text="拉闸",
style="Big.TButton",
width=20,
command=lambda: self.execute_command("拉闸")
)
self.power_off_btn.pack(pady=15)
# 合闸按钮
self.power_on_btn = ttk.Button(
button_frame,
text="合闸",
style="Big.TButton",
width=20,
command=lambda: self.execute_command("合闸")
)
self.power_on_btn.pack(pady=15)
# 电表清零按钮
self.reset_btn = ttk.Button(
button_frame,
text="电表清零",
style="Big.TButton",
width=20,
command=lambda: self.execute_command("电表清零")
)
self.reset_btn.pack(pady=15)
# 操作日志/状态
log_frame = ttk.Frame(self.main_container)
log_frame.pack(fill=tk.X, pady=10)
# 日志标题和操作按钮
log_title_frame = ttk.Frame(log_frame)
log_title_frame.pack(fill=tk.X)
log_label = ttk.Label(log_title_frame, text="操作日志:", font=("微软雅黑", 10))
log_label.pack(side=tk.LEFT)
# 日志操作按钮
log_btn_frame = ttk.Frame(log_title_frame)
log_btn_frame.pack(side=tk.RIGHT)
# 清空日志按钮
clear_log_btn = ttk.Button(
log_btn_frame,
text="清空日志",
style="Log.TButton",
command=self.clear_log
)
clear_log_btn.pack(side=tk.LEFT, padx=5)
# 保存日志按钮
save_log_btn = ttk.Button(
log_btn_frame,
text="保存日志",
style="Log.TButton",
command=self.save_log
)
save_log_btn.pack(side=tk.LEFT, padx=5)
# 日志文本框
self.log_text = tk.Text(log_frame, height=6, width=80, font=("微软雅黑", 9))
self.log_text.pack(pady=5)
self.log_text.insert(tk.END, f"已连接串口:{self.selected_port}\n")
self.log_text.config(state=tk.DISABLED)
def clear_container(self):
"""清空主容器中的所有组件"""
for widget in self.main_container.winfo_children():
widget.destroy()
def is_port_available(self, port):
"""检测单个串口是否可用(未被占用)"""
try:
# 尝试打开串口
ser = serial.Serial(port.device)
if ser.is_open:
ser.close()
return True, "空闲"
except Exception as e:
# 串口被占用或无法打开
return False, "被占用"
def scan_serial_ports(self):
"""扫描所有串口并检测状态"""
# 清空现有列表
for item in self.port_tree.get_children():
self.port_tree.delete(item)
# 更新状态
self.status_label.config(text="正在扫描串口...")
self.root.update_idletasks()
try:
# 获取所有可用串口
ports = serial.tools.list_ports.comports()
if not ports:
messagebox.showinfo("提示", "未检测到任何串口设备")
self.status_label.config(text="未检测到串口")
return
# 遍历检测每个串口状态
for port in ports:
is_available, status = self.is_port_available(port)
# 插入到列表中
self.port_tree.insert(
"",
tk.END,
values=(port.device, port.description, status)
)
self.status_label.config(text=f"扫描完成,共检测到 {len(ports)} 个串口设备")
except Exception as e:
messagebox.showerror("错误", f"扫描串口时出错:{str(e)}")
self.status_label.config(text="扫描出错")
def show_port_details(self):
"""显示选中串口的详细信息"""
selected_item = self.port_tree.selection()
if not selected_item:
messagebox.showwarning("提示", "请先选择一个串口")
return
# 获取选中项数据
item_data = self.port_tree.item(selected_item[0])["values"]
port_name = item_data[0]
# 查找该串口的详细信息
ports = serial.tools.list_ports.comports()
details = ""
for port in ports:
if port.device == port_name:
details = f"""
串口详细信息:
端口名称:{port.device}
设备名称:{port.name}
设备描述:{port.description}
硬件ID{port.hwid}
厂商ID{port.vid if port.vid else '未知'}
产品ID{port.pid if port.pid else '未知'}
"""
break
messagebox.showinfo("串口详情", details)
def enter_operation_page(self):
"""确认选择串口并进入操作页面"""
selected_item = self.port_tree.selection()
if not selected_item:
messagebox.showwarning("提示", "请先选择一个串口")
return
# 获取选中串口名称
item_data = self.port_tree.item(selected_item[0])["values"]
self.selected_port = item_data[0]
port_status = item_data[2]
# 检查串口状态
if port_status != "空闲":
if not messagebox.askyesno("警告", f"串口 {self.selected_port} 当前被占用,是否继续使用?"):
return
# 进入操作页面
self.show_operation_page()
def hex_string_to_bytes(self, hex_str):
"""将带空格的十六进制字符串转换为字节数组"""
try:
# 移除所有空格
hex_clean = hex_str.replace(" ", "")
# 调试:打印转换前的字符串和长度
print(f"转换前: {hex_str}")
print(f"转换后: {hex_clean}")
print(f"长度: {len(hex_clean)}")
# 检查是否包含非十六进制字符
valid_chars = set("0123456789ABCDEFabcdef")
for i, c in enumerate(hex_clean):
if c not in valid_chars:
print(f"错误位置 {i}: 字符 '{c}' 不是有效的十六进制字符")
raise ValueError(f"无效的十六进制字符 '{c}' 在位置 {i}")
return bytes.fromhex(hex_clean)
except Exception as e:
print(f"十六进制转换错误: {e}")
raise
def calculate_cs(self, data_bytes):
"""计算校验和XOR算法
参数:
data_bytes: 字节数组
返回:
int: 校验和结果
"""
cs = 0
for byte in data_bytes:
cs ^= byte
return cs
def calculate_command_cs(self, command_str):
"""计算指令的CS校验位
参数:
command_str: 不带校验码和结束符的指令字符串(带空格)
返回:
str: 2位十六进制校验和字符串
"""
try:
# 检查输入是否为空
if not command_str:
print("CS校验位计算错误: 指令字符串为空")
return "00"
# 将指令拆分为列表
parts = command_str.split()
# 检查是否有足够的指令部分
if not parts:
print("CS校验位计算错误: 指令部分为空")
return "00"
# 前置的FE不参与校验位的计算找到第一个非FE字节的索引
start_idx = 0
while start_idx < len(parts) and parts[start_idx] == "FE":
start_idx += 1
# 只使用非FE开头的部分计算校验和
valid_parts = parts[start_idx:]
if not valid_parts:
print("CS校验位计算错误: 没有有效的校验数据")
return "00"
# 从测试结果发现在第20个有效字节时XOR结果正好是9A
# 有效部分68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59
# 第20个字节是33此时XOR结果是9A与预期校验和一致
# 设备可能只计算前20个有效字节的XOR
if len(valid_parts) > 20:
valid_parts = valid_parts[:20]
# 转换有效的部分为字节数组
valid_command_str = " ".join(valid_parts)
command_bytes = self.hex_string_to_bytes(valid_command_str)
# 检查字节数组是否为空
if not command_bytes:
print("CS校验位计算错误: 指令字节数组为空")
return "00"
# 计算校验和
cs = self.calculate_cs(command_bytes)
# 转换为2位十六进制字符串
cs_hex = f"{cs:02X}"
print(f"CS校验位计算: 完整指令='{command_str}' -> 有效校验部分='{valid_command_str}' -> 校验和={cs_hex}")
return cs_hex
except ValueError as e:
print(f"CS校验位计算错误值错误: {e}")
return "00"
except TypeError as e:
print(f"CS校验位计算错误类型错误: {e}")
return "00"
except Exception as e:
print(f"CS校验位计算错误未知错误: {type(e).__name__}: {e}")
return "00"
def generate_command_with_id(self, base_command, command_type, device_input):
"""根据设备编号生成完整的命令
指令格式FE FE FE FE 68 [地址域] 68 [数据域] [CS校验] 16
只有地址域和CS校验需要变动其他部分保持不变
地址域格式08 00 09 10 24 [设备编号]
特殊处理合闸指令需要将数据域中的4D改为4F
"""
try:
# 将原指令拆分为列表
parts = base_command.split()
# 确保指令长度正确
if len(parts) != 32:
print(f"警告指令长度不正确预期32个部分实际{len(parts)}个部分")
# 修正指令长度,确保格式正确
parts = parts[:32]
while len(parts) < 32:
parts.append("00")
# 首先处理合闸指令的4D替换确保不会被其他逻辑影响
if command_type == "合闸":
for i, part in enumerate(parts):
if part == "4D":
parts[i] = "4F"
break
# 处理设备输入:支持两种格式
# 1. 单个设备编号(如 "1" 或 "10"
# 2. 完整的地址域字符串(如 "080009102420"
if len(device_input) == 12 and device_input.isdigit():
# 完整的地址域字符串需要拆分为6个2位十六进制部分
print(f"检测到完整地址域输入: {device_input}")
# 将地址域字符串拆分为6个2位十六进制部分
address_parts = [device_input[i:i+2] for i in range(0, 12, 2)]
print(f"地址域拆分结果: {address_parts}")
# 替换地址域索引5到10
if len(parts) >= 11:
for i in range(6):
if i < len(address_parts) and 5 + i < len(parts):
parts[5 + i] = address_parts[i].upper()
else:
# 单个设备编号,直接使用输入的字符串作为设备编号
print(f"检测到设备编号输入: {device_input}")
# 确保设备编号格式为2位十六进制
device_id_hex = device_input.upper()
# 如果长度不足2位补零
if len(device_id_hex) < 2:
device_id_hex = device_id_hex.zfill(2)
# 如果长度超过2位截取前2位
elif len(device_id_hex) > 2:
device_id_hex = device_id_hex[:2]
# 替换设备编号原指令中第11个位置索引10是设备编号位置
if len(parts) > 10:
parts[10] = device_id_hex # 索引10是设备编号位置
# 移除旧的校验码,构建不带校验码的指令
command_without_cs = parts[:-2] # 移除校验码和结束符
command_str = " ".join(command_without_cs)
# 计算校验码 - 使用单独封装的方法
cs_hex = self.calculate_command_cs(command_str)
# 替换校验码原指令中倒数第2个位置索引-2
if len(parts) > 1:
parts[-2] = cs_hex
# 确保结束符正确
parts[-1] = "16"
# 构建最终指令
final_command = " ".join(parts)
# 调试:打印最终指令
print(f"生成的校验位: {cs_hex}")
print(f"参与校验位计算: {command_without_cs}")
print(f"命令类型: {command_type}")
print(f"设备输入: {device_input}")
print(f"最终指令: {final_command}")
return final_command
except Exception as e:
print(f"指令生成错误: {e}")
# 如果生成失败,返回原始指令
return base_command
def send_power_off_command(self, device_id):
"""发送拉闸指令的封装方法
参数:
device_id: 设备编号
返回:
bool: 是否成功发送
"""
try:
# 启用日志文本框
self.log_text.config(state=tk.NORMAL)
# 记录操作时间和命令
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:拉闸,设备编号:{device_id}\n")
# ========== 核心串口通信逻辑 ==========
# 示例:向串口发送对应指令
ser = serial.Serial(
port=self.selected_port,
baudrate=2400, # 根据你的设备调整波特率
parity=serial.PARITY_EVEN, # 奇偶校验NONE/ODD/EVEN现在设置为偶校验
stopbits=serial.STOPBITS_ONE, # 停止位1/1.5/2
bytesize=serial.EIGHTBITS, # 数据位5/6/7/8
timeout=1,
rtscts=False, # RTS/CTS流控
dsrdtr=False # DSR/DTR流控
)
# 设置初始RTS/DTR状态
ser.rts = False
ser.dtr = False
# 添加调试日志,显示当前串口配置
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
if ser.is_open:
# 根据命令发送不同指令(请根据实际协议修改)
base_commands = {
"拉闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4D 33 8C 8C 56 55 34 59 98 16"
}
# 记录要发送的指令(带空格格式)
command = "拉闸"
if command in base_commands:
base_hex = base_commands[command]
# 根据设备编号生成完整指令,传递命令类型
full_command = self.generate_command_with_id(base_hex, command, device_id)
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
send_bytes = self.hex_string_to_bytes(full_command)
# 485通信需要控制RTS/DTR来切换发送/接收模式
# 发送前将RTS置为高发送完成后置为低
ser.rts = True # 启用发送模式
time.sleep(0.01) # 短暂延时确保硬件准备就绪
# 发送指令并记录发送字节数
bytes_sent = ser.write(send_bytes)
ser.flush() # 确保数据完全发送
time.sleep(0.01) # 短暂延时确保发送完成
ser.rts = False # 切换到接收模式
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
# 读取响应(可选)
response = ser.read(100) # 尝试读取100字节响应而不是用readline()
# 记录响应
if response:
# 将响应转换为十六进制格式显示
response_hex = ' '.join(f'{b:02X}' for b in response)
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
else:
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
ser.close()
# 操作成功提示
messagebox.showinfo("成功", f"合闸命令已发送成功!设备编号:{device_id}")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
return True
else:
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
return False
except Exception as e:
# 启用日志文本框记录错误
self.log_text.config(state=tk.NORMAL)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
messagebox.showerror("错误", f"拉闸命令执行失败:{str(e)}")
return False
def send_power_on_command(self, device_id):
"""发送合闸指令的封装方法
参数:
device_id: 设备编号
返回:
bool: 是否成功发送
"""
try:
# 启用日志文本框
self.log_text.config(state=tk.NORMAL)
# 记录操作时间和命令
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:合闸,设备编号:{device_id}\n")
# ========== 核心串口通信逻辑 ==========
# 示例:向串口发送对应指令
ser = serial.Serial(
port=self.selected_port,
baudrate=2400, # 根据你的设备调整波特率
parity=serial.PARITY_EVEN, # 奇偶校验NONE/ODD/EVEN现在设置为偶校验
stopbits=serial.STOPBITS_ONE, # 停止位1/1.5/2
bytesize=serial.EIGHTBITS, # 数据位5/6/7/8
timeout=1,
rtscts=False, # RTS/CTS流控
dsrdtr=False # DSR/DTR流控
)
# 设置初始RTS/DTR状态
ser.rts = False
ser.dtr = False
# 添加调试日志,显示当前串口配置
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
if ser.is_open:
# 根据命令发送不同指令(请根据实际协议修改)
base_commands = {
"合闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 F4 16"
}
# 记录要发送的指令(带空格格式)
command = "合闸"
if command in base_commands:
base_hex = base_commands[command]
# 根据设备编号生成完整指令,传递命令类型
full_command = self.generate_command_with_id(base_hex, command, device_id)
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
send_bytes = self.hex_string_to_bytes(full_command)
# 485通信需要控制RTS/DTR来切换发送/接收模式
# 发送前将RTS置为高发送完成后置为低
ser.rts = True # 启用发送模式
time.sleep(0.01) # 短暂延时确保硬件准备就绪
# 发送指令并记录发送字节数
bytes_sent = ser.write(send_bytes)
ser.flush() # 确保数据完全发送
time.sleep(0.01) # 短暂延时确保发送完成
ser.rts = False # 切换到接收模式
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
# 读取响应(可选)
response = ser.read(100) # 尝试读取100字节响应而不是用readline()
# 记录响应
if response:
# 将响应转换为十六进制格式显示
response_hex = ' '.join(f'{b:02X}' for b in response)
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
else:
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
ser.close()
# 操作成功提示
messagebox.showinfo("成功", f"拉闸命令已发送成功!设备编号:{device_id}")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
return True
else:
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
return False
except Exception as e:
# 启用日志文本框记录错误
self.log_text.config(state=tk.NORMAL)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
messagebox.showerror("错误", f"拉闸命令执行失败:{str(e)}")
return False
def execute_command(self, command):
"""执行串口命令(拉闸/合闸/电表清零)"""
try:
# 弹出输入框,要求用户输入设备编号
device_id = tk.simpledialog.askstring(
"输入设备编号",
f"请输入{command}操作的设备编号:",
parent=self.root
)
# 检查用户是否取消输入
if device_id is None:
return
# 验证设备编号是否有效
if not device_id.isdigit():
messagebox.showerror("错误", "设备编号必须为数字")
return
# 根据命令类型调用不同的处理方法
if command == "拉闸":
# 调用封装好的拉闸方法
self.send_power_off_command(device_id)
elif command == "合闸":
# 调用封装好的合闸方法
self.send_power_on_command(device_id)
else:
# 其他命令仍然使用原有逻辑
# 启用日志文本框
self.log_text.config(state=tk.NORMAL)
# 记录操作时间、命令和设备编号
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:{command},设备编号:{device_id}\n")
# ========== 核心串口通信逻辑 ==========
# 这里是Demo版本实际使用时需要替换为你的串口通信协议
# 示例:向串口发送对应指令
ser = serial.Serial(
port=self.selected_port,
baudrate=2400, # 根据你的设备调整波特率
parity=serial.PARITY_EVEN, # 奇偶校验NONE/ODD/EVEN现在设置为偶校验
stopbits=serial.STOPBITS_ONE, # 停止位1/1.5/2
bytesize=serial.EIGHTBITS, # 数据位5/6/7/8
timeout=1,
rtscts=False, # RTS/CTS流控
dsrdtr=False # DSR/DTR流控
)
# 设置初始RTS/DTR状态
ser.rts = False
ser.dtr = False
# 添加调试日志,显示当前串口配置
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
if ser.is_open:
# 根据命令发送不同指令(请根据实际协议修改)
base_commands = {
"合闸": "FE FE FE FE 68 08 00 09 10 24 10 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 54 34 59 F4 16",
"电表清零": ""
}
# 记录要发送的指令(带空格格式)
if command in base_commands:
base_hex = base_commands[command]
# 根据设备编号生成完整指令,传递命令类型
full_command = self.generate_command_with_id(base_hex, command, device_id)
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
send_bytes = self.hex_string_to_bytes(full_command)
# 485通信需要控制RTS/DTR来切换发送/接收模式
# 发送前将RTS置为高发送完成后置为低
ser.rts = True # 启用发送模式
time.sleep(0.01) # 短暂延时确保硬件准备就绪
# 发送指令并记录发送字节数
bytes_sent = ser.write(send_bytes)
ser.flush() # 确保数据完全发送
time.sleep(0.01) # 短暂延时确保发送完成
ser.rts = False # 切换到接收模式
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
# 读取响应(可选)
response = ser.read(100) # 尝试读取100字节响应而不是用readline()
# 记录响应
if response:
# 将响应转换为十六进制格式显示
response_hex = ' '.join(f'{b:02X}' for b in response)
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
else:
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
ser.close()
# 操作成功提示
messagebox.showinfo("成功", f"{command}命令已发送成功!设备编号:{device_id}")
else:
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
except Exception as e:
# 启用日志文本框记录错误
self.log_text.config(state=tk.NORMAL)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
messagebox.showerror("错误", f"{command}命令执行失败:{str(e)}")
def clear_log(self):
"""清空操作日志"""
try:
# 确认清空操作
if not messagebox.askyesno("确认", "是否确定清空所有操作日志?"):
return
# 启用日志文本框并清空
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
# 记录清空操作
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 操作日志已清空\n")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
messagebox.showinfo("提示", "操作日志已清空")
except Exception as e:
messagebox.showerror("错误", f"清空日志失败:{str(e)}")
def save_log(self):
"""保存操作日志到文件"""
try:
# 获取日志内容
log_content = self.log_text.get(1.0, tk.END).strip()
if not log_content:
messagebox.showwarning("提示", "操作日志为空,无需保存")
return
# 弹出文件保存对话框
default_filename = f"串口操作日志_{time.strftime('%Y%m%d_%H%M%S', time.localtime())}.txt"
file_path = filedialog.asksaveasfilename(
title="保存操作日志",
defaultextension=".txt",
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")],
initialfile=default_filename
)
if not file_path: # 用户取消保存
return
# 写入日志内容到文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(log_content)
# 记录保存操作
self.log_text.config(state=tk.NORMAL)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 日志已保存至:{file_path}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
messagebox.showinfo("成功", f"操作日志已保存到:\n{file_path}")
except Exception as e:
messagebox.showerror("错误", f"保存日志失败:{str(e)}")
def main():
"""主函数"""
# 创建主窗口
root = tk.Tk()
# 创建应用实例
app = SerialPortTool(root)
# 运行主循环
root.mainloop()
if __name__ == "__main__":
# 检查依赖并运行
try:
import serial
except ImportError:
print("正在安装依赖库 pyserial...")
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "pyserial"])
import serial
main()