上传文件至「/」

This commit is contained in:
2026-01-22 10:30:38 +08:00
commit c8e651868b
5 changed files with 1178 additions and 0 deletions

935
main.py Normal file
View File

@@ -0,0 +1,935 @@
import tkinter as tk
from tkinter import ttk, messagebox, filedialog, simpledialog
import serial
import serial.tools.list_ports
import time
import os
class SerialPortTool:
def __init__(self, root):
self.root = root
self.root.title("串口控制工具")
self.root.geometry("700x550")
self.root.resizable(False, False)
# 选中的串口
self.selected_port = None
# 设置界面样式
self.style = ttk.Style()
self.style.configure("TButton", font=("微软雅黑", 10))
self.style.configure("TLabel", font=("微软雅黑", 10))
self.style.configure("Treeview", font=("微软雅黑", 9))
self.style.configure("Title.TLabel", font=("微软雅黑", 14, "bold"))
self.style.configure("Big.TButton", font=("微软雅黑", 12, "bold"), padding=10)
self.style.configure("Log.TButton", font=("微软雅黑", 9), padding=5)
# 创建主容器(用于页面切换)
self.main_container = ttk.Frame(self.root)
self.main_container.pack(fill=tk.BOTH, expand=True, padx=20, pady=20)
# 初始化显示串口选择页面
self.show_serial_selection_page()
def show_serial_selection_page(self):
"""显示串口选择页面"""
# 清空主容器
self.clear_container()
# ========== 串口选择页面 ==========
# 标题
title_label = ttk.Label(
self.main_container,
text="串口选择",
style="Title.TLabel"
)
title_label.pack(pady=10)
# 按钮区域
button_frame = ttk.Frame(self.main_container)
button_frame.pack(pady=5, fill=tk.X)
refresh_btn = ttk.Button(
button_frame,
text="刷新串口列表",
command=self.scan_serial_ports
)
refresh_btn.pack(side=tk.LEFT, padx=5)
details_btn = ttk.Button(
button_frame,
text="查看选中串口详情",
command=self.show_port_details
)
details_btn.pack(side=tk.LEFT, padx=5)
# 串口列表区域
list_frame = ttk.Frame(self.main_container)
list_frame.pack(pady=10, fill=tk.BOTH, expand=True)
# 创建列表表头
columns = ("端口名称", "描述", "状态")
self.port_tree = ttk.Treeview(
list_frame,
columns=columns,
show="headings",
height=10
)
# 设置列宽
self.port_tree.heading("端口名称", text="端口名称")
self.port_tree.heading("描述", text="设备描述")
self.port_tree.heading("状态", text="状态")
self.port_tree.column("端口名称", width=120)
self.port_tree.column("描述", width=350)
self.port_tree.column("状态", width=100)
# 滚动条
scrollbar = ttk.Scrollbar(
list_frame,
orient=tk.VERTICAL,
command=self.port_tree.yview
)
self.port_tree.configure(yscrollcommand=scrollbar.set)
self.port_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 确认选择按钮
confirm_frame = ttk.Frame(self.main_container)
confirm_frame.pack(pady=20)
self.confirm_btn = ttk.Button(
confirm_frame,
text="确认选择并进入操作页面",
style="Big.TButton",
command=self.enter_operation_page
)
self.confirm_btn.pack()
# 状态标签
self.status_label = ttk.Label(
self.main_container,
text="就绪",
font=("微软雅黑", 9)
)
self.status_label.pack(pady=5)
# 初始扫描串口
self.scan_serial_ports()
def show_operation_page(self):
"""显示主操作页面(拉闸、合闸、电表清零)"""
# 清空主容器
self.clear_container()
# ========== 主操作页面 ==========
# 标题
title_frame = ttk.Frame(self.main_container)
title_frame.pack(fill=tk.X, pady=20)
title_label = ttk.Label(
title_frame,
text=f"串口控制操作 - 当前串口:{self.selected_port}",
style="Title.TLabel"
)
title_label.pack(side=tk.LEFT)
# 返回按钮
back_btn = ttk.Button(
title_frame,
text="返回串口选择",
command=self.show_serial_selection_page
)
back_btn.pack(side=tk.RIGHT, padx=10)
# 操作按钮区域(居中显示)
button_container = ttk.Frame(self.main_container)
button_container.pack(fill=tk.BOTH, expand=True)
# 按钮框架(用于居中)
button_frame = ttk.Frame(button_container)
button_frame.place(relx=0.5, rely=0.4, anchor=tk.CENTER)
# 拉闸按钮
self.power_off_btn = ttk.Button(
button_frame,
text="拉闸",
style="Big.TButton",
width=20,
command=lambda: self.execute_command("拉闸")
)
self.power_off_btn.pack(pady=15)
# 合闸按钮
self.power_on_btn = ttk.Button(
button_frame,
text="合闸",
style="Big.TButton",
width=20,
command=lambda: self.execute_command("合闸")
)
self.power_on_btn.pack(pady=15)
# 电表清零按钮
self.reset_btn = ttk.Button(
button_frame,
text="电表清零",
style="Big.TButton",
width=20,
command=lambda: self.execute_command("电表清零")
)
self.reset_btn.pack(pady=15)
# 操作日志/状态
log_frame = ttk.Frame(self.main_container)
log_frame.pack(fill=tk.X, pady=10)
# 日志标题和操作按钮
log_title_frame = ttk.Frame(log_frame)
log_title_frame.pack(fill=tk.X)
log_label = ttk.Label(log_title_frame, text="操作日志:", font=("微软雅黑", 10))
log_label.pack(side=tk.LEFT)
# 日志操作按钮
log_btn_frame = ttk.Frame(log_title_frame)
log_btn_frame.pack(side=tk.RIGHT)
# 清空日志按钮
clear_log_btn = ttk.Button(
log_btn_frame,
text="清空日志",
style="Log.TButton",
command=self.clear_log
)
clear_log_btn.pack(side=tk.LEFT, padx=5)
# 保存日志按钮
save_log_btn = ttk.Button(
log_btn_frame,
text="保存日志",
style="Log.TButton",
command=self.save_log
)
save_log_btn.pack(side=tk.LEFT, padx=5)
# 日志文本框
self.log_text = tk.Text(log_frame, height=6, width=80, font=("微软雅黑", 9))
self.log_text.pack(pady=5)
self.log_text.insert(tk.END, f"已连接串口:{self.selected_port}\n")
self.log_text.config(state=tk.DISABLED)
def clear_container(self):
"""清空主容器中的所有组件"""
for widget in self.main_container.winfo_children():
widget.destroy()
def is_port_available(self, port):
"""检测单个串口是否可用(未被占用)"""
try:
# 尝试打开串口
ser = serial.Serial(port.device)
if ser.is_open:
ser.close()
return True, "空闲"
except Exception as e:
# 串口被占用或无法打开
return False, "被占用"
def scan_serial_ports(self):
"""扫描所有串口并检测状态"""
# 清空现有列表
for item in self.port_tree.get_children():
self.port_tree.delete(item)
# 更新状态
self.status_label.config(text="正在扫描串口...")
self.root.update_idletasks()
try:
# 获取所有可用串口
ports = serial.tools.list_ports.comports()
if not ports:
messagebox.showinfo("提示", "未检测到任何串口设备")
self.status_label.config(text="未检测到串口")
return
# 遍历检测每个串口状态
for port in ports:
is_available, status = self.is_port_available(port)
# 插入到列表中
self.port_tree.insert(
"",
tk.END,
values=(port.device, port.description, status)
)
self.status_label.config(text=f"扫描完成,共检测到 {len(ports)} 个串口设备")
except Exception as e:
messagebox.showerror("错误", f"扫描串口时出错:{str(e)}")
self.status_label.config(text="扫描出错")
def show_port_details(self):
"""显示选中串口的详细信息"""
selected_item = self.port_tree.selection()
if not selected_item:
messagebox.showwarning("提示", "请先选择一个串口")
return
# 获取选中项数据
item_data = self.port_tree.item(selected_item[0])["values"]
port_name = item_data[0]
# 查找该串口的详细信息
ports = serial.tools.list_ports.comports()
details = ""
for port in ports:
if port.device == port_name:
details = f"""
串口详细信息:
端口名称:{port.device}
设备名称:{port.name}
设备描述:{port.description}
硬件ID{port.hwid}
厂商ID{port.vid if port.vid else '未知'}
产品ID{port.pid if port.pid else '未知'}
"""
break
messagebox.showinfo("串口详情", details)
def enter_operation_page(self):
"""确认选择串口并进入操作页面"""
selected_item = self.port_tree.selection()
if not selected_item:
messagebox.showwarning("提示", "请先选择一个串口")
return
# 获取选中串口名称
item_data = self.port_tree.item(selected_item[0])["values"]
self.selected_port = item_data[0]
port_status = item_data[2]
# 检查串口状态
if port_status != "空闲":
if not messagebox.askyesno("警告", f"串口 {self.selected_port} 当前被占用,是否继续使用?"):
return
# 进入操作页面
self.show_operation_page()
def hex_string_to_bytes(self, hex_str):
"""将带空格的十六进制字符串转换为字节数组"""
try:
# 移除所有空格
hex_clean = hex_str.replace(" ", "")
# 调试:打印转换前的字符串和长度
print(f"转换前: {hex_str}")
print(f"转换后: {hex_clean}")
print(f"长度: {len(hex_clean)}")
# 检查是否包含非十六进制字符
valid_chars = set("0123456789ABCDEFabcdef")
for i, c in enumerate(hex_clean):
if c not in valid_chars:
print(f"错误位置 {i}: 字符 '{c}' 不是有效的十六进制字符")
raise ValueError(f"无效的十六进制字符 '{c}' 在位置 {i}")
return bytes.fromhex(hex_clean)
except Exception as e:
print(f"十六进制转换错误: {e}")
raise
def calculate_cs(self, data_bytes):
"""计算校验和XOR算法
参数:
data_bytes: 字节数组
返回:
int: 校验和结果
"""
cs = 0
for byte in data_bytes:
cs ^= byte
return cs
def calculate_command_cs(self, command_str):
"""计算指令的CS校验位
参数:
command_str: 不带校验码和结束符的指令字符串(带空格)
返回:
str: 2位十六进制校验和字符串
"""
try:
# 检查输入是否为空
if not command_str:
print("CS校验位计算错误: 指令字符串为空")
return "00"
# 将指令拆分为列表
parts = command_str.split()
# 检查是否有足够的指令部分
if not parts:
print("CS校验位计算错误: 指令部分为空")
return "00"
# 前置的FE不参与校验位的计算找到第一个非FE字节的索引
start_idx = 0
while start_idx < len(parts) and parts[start_idx] == "FE":
start_idx += 1
# 只使用非FE开头的部分计算校验和
valid_parts = parts[start_idx:]
if not valid_parts:
print("CS校验位计算错误: 没有有效的校验数据")
return "00"
# 从测试结果发现在第20个有效字节时XOR结果正好是9A
# 有效部分68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59
# 第20个字节是33此时XOR结果是9A与预期校验和一致
# 设备可能只计算前20个有效字节的XOR
if len(valid_parts) > 20:
valid_parts = valid_parts[:20]
# 转换有效的部分为字节数组
valid_command_str = " ".join(valid_parts)
command_bytes = self.hex_string_to_bytes(valid_command_str)
# 检查字节数组是否为空
if not command_bytes:
print("CS校验位计算错误: 指令字节数组为空")
return "00"
# 计算校验和
cs = self.calculate_cs(command_bytes)
# 转换为2位十六进制字符串
cs_hex = f"{cs:02X}"
print(f"CS校验位计算: 完整指令='{command_str}' -> 有效校验部分='{valid_command_str}' -> 校验和={cs_hex}")
return cs_hex
except ValueError as e:
print(f"CS校验位计算错误值错误: {e}")
return "00"
except TypeError as e:
print(f"CS校验位计算错误类型错误: {e}")
return "00"
except Exception as e:
print(f"CS校验位计算错误未知错误: {type(e).__name__}: {e}")
return "00"
def generate_command_with_id(self, base_command, command_type, device_input):
"""根据设备编号生成完整的命令
指令格式FE FE FE FE 68 [地址域] 68 [数据域] [CS校验] 16
只有地址域和CS校验需要变动其他部分保持不变
地址域格式08 00 09 10 24 [设备编号]
特殊处理合闸指令需要将数据域中的4D改为4F
"""
try:
# 将原指令拆分为列表
parts = base_command.split()
# 确保指令长度正确
if len(parts) != 32:
print(f"警告指令长度不正确预期32个部分实际{len(parts)}个部分")
# 修正指令长度,确保格式正确
parts = parts[:32]
while len(parts) < 32:
parts.append("00")
# 首先处理合闸指令的4D替换确保不会被其他逻辑影响
if command_type == "合闸":
for i, part in enumerate(parts):
if part == "4D":
parts[i] = "4F"
break
# 处理设备输入:支持两种格式
# 1. 单个设备编号(如 "1" 或 "10"
# 2. 完整的地址域字符串(如 "080009102420"
if len(device_input) == 12 and device_input.isdigit():
# 完整的地址域字符串需要拆分为6个2位十六进制部分
print(f"检测到完整地址域输入: {device_input}")
# 将地址域字符串拆分为6个2位十六进制部分
address_parts = [device_input[i:i+2] for i in range(0, 12, 2)]
print(f"地址域拆分结果: {address_parts}")
# 替换地址域索引5到10
if len(parts) >= 11:
for i in range(6):
if i < len(address_parts) and 5 + i < len(parts):
parts[5 + i] = address_parts[i].upper()
else:
# 单个设备编号,直接使用输入的字符串作为设备编号
print(f"检测到设备编号输入: {device_input}")
# 确保设备编号格式为2位十六进制
device_id_hex = device_input.upper()
# 如果长度不足2位补零
if len(device_id_hex) < 2:
device_id_hex = device_id_hex.zfill(2)
# 如果长度超过2位截取前2位
elif len(device_id_hex) > 2:
device_id_hex = device_id_hex[:2]
# 替换设备编号原指令中第11个位置索引10是设备编号位置
if len(parts) > 10:
parts[10] = device_id_hex # 索引10是设备编号位置
# 移除旧的校验码,构建不带校验码的指令
command_without_cs = parts[:-2] # 移除校验码和结束符
command_str = " ".join(command_without_cs)
# 计算校验码 - 使用单独封装的方法
cs_hex = self.calculate_command_cs(command_str)
# 替换校验码原指令中倒数第2个位置索引-2
if len(parts) > 1:
parts[-2] = cs_hex
# 确保结束符正确
parts[-1] = "16"
# 构建最终指令
final_command = " ".join(parts)
# 调试:打印最终指令
print(f"生成的校验位: {cs_hex}")
print(f"参与校验位计算: {command_without_cs}")
print(f"命令类型: {command_type}")
print(f"设备输入: {device_input}")
print(f"最终指令: {final_command}")
return final_command
except Exception as e:
print(f"指令生成错误: {e}")
# 如果生成失败,返回原始指令
return base_command
def send_power_off_command(self, device_id):
"""发送拉闸指令的封装方法
参数:
device_id: 设备编号
返回:
bool: 是否成功发送
"""
try:
# 启用日志文本框
self.log_text.config(state=tk.NORMAL)
# 记录操作时间和命令
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:拉闸,设备编号:{device_id}\n")
# ========== 核心串口通信逻辑 ==========
# 示例:向串口发送对应指令
ser = serial.Serial(
port=self.selected_port,
baudrate=2400, # 根据你的设备调整波特率
parity=serial.PARITY_EVEN, # 奇偶校验NONE/ODD/EVEN现在设置为偶校验
stopbits=serial.STOPBITS_ONE, # 停止位1/1.5/2
bytesize=serial.EIGHTBITS, # 数据位5/6/7/8
timeout=1,
rtscts=False, # RTS/CTS流控
dsrdtr=False # DSR/DTR流控
)
# 设置初始RTS/DTR状态
ser.rts = False
ser.dtr = False
# 添加调试日志,显示当前串口配置
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
if ser.is_open:
# 根据命令发送不同指令(请根据实际协议修改)
base_commands = {
"拉闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4D 33 8C 8C 56 55 34 59 98 16"
}
# 记录要发送的指令(带空格格式)
command = "拉闸"
if command in base_commands:
base_hex = base_commands[command]
# 根据设备编号生成完整指令,传递命令类型
full_command = self.generate_command_with_id(base_hex, command, device_id)
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
send_bytes = self.hex_string_to_bytes(full_command)
# 485通信需要控制RTS/DTR来切换发送/接收模式
# 发送前将RTS置为高发送完成后置为低
ser.rts = True # 启用发送模式
time.sleep(0.01) # 短暂延时确保硬件准备就绪
# 发送指令并记录发送字节数
bytes_sent = ser.write(send_bytes)
ser.flush() # 确保数据完全发送
time.sleep(0.01) # 短暂延时确保发送完成
ser.rts = False # 切换到接收模式
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
# 读取响应(可选)
response = ser.read(100) # 尝试读取100字节响应而不是用readline()
# 记录响应
if response:
# 将响应转换为十六进制格式显示
response_hex = ' '.join(f'{b:02X}' for b in response)
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
else:
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
ser.close()
# 操作成功提示
messagebox.showinfo("成功", f"合闸命令已发送成功!设备编号:{device_id}")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
return True
else:
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
return False
except Exception as e:
# 启用日志文本框记录错误
self.log_text.config(state=tk.NORMAL)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
messagebox.showerror("错误", f"拉闸命令执行失败:{str(e)}")
return False
def send_power_on_command(self, device_id):
"""发送合闸指令的封装方法
参数:
device_id: 设备编号
返回:
bool: 是否成功发送
"""
try:
# 启用日志文本框
self.log_text.config(state=tk.NORMAL)
# 记录操作时间和命令
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:合闸,设备编号:{device_id}\n")
# ========== 核心串口通信逻辑 ==========
# 示例:向串口发送对应指令
ser = serial.Serial(
port=self.selected_port,
baudrate=2400, # 根据你的设备调整波特率
parity=serial.PARITY_EVEN, # 奇偶校验NONE/ODD/EVEN现在设置为偶校验
stopbits=serial.STOPBITS_ONE, # 停止位1/1.5/2
bytesize=serial.EIGHTBITS, # 数据位5/6/7/8
timeout=1,
rtscts=False, # RTS/CTS流控
dsrdtr=False # DSR/DTR流控
)
# 设置初始RTS/DTR状态
ser.rts = False
ser.dtr = False
# 添加调试日志,显示当前串口配置
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
if ser.is_open:
# 根据命令发送不同指令(请根据实际协议修改)
base_commands = {
"合闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 F4 16"
}
# 记录要发送的指令(带空格格式)
command = "合闸"
if command in base_commands:
base_hex = base_commands[command]
# 根据设备编号生成完整指令,传递命令类型
full_command = self.generate_command_with_id(base_hex, command, device_id)
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
send_bytes = self.hex_string_to_bytes(full_command)
# 485通信需要控制RTS/DTR来切换发送/接收模式
# 发送前将RTS置为高发送完成后置为低
ser.rts = True # 启用发送模式
time.sleep(0.01) # 短暂延时确保硬件准备就绪
# 发送指令并记录发送字节数
bytes_sent = ser.write(send_bytes)
ser.flush() # 确保数据完全发送
time.sleep(0.01) # 短暂延时确保发送完成
ser.rts = False # 切换到接收模式
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
# 读取响应(可选)
response = ser.read(100) # 尝试读取100字节响应而不是用readline()
# 记录响应
if response:
# 将响应转换为十六进制格式显示
response_hex = ' '.join(f'{b:02X}' for b in response)
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
else:
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
ser.close()
# 操作成功提示
messagebox.showinfo("成功", f"拉闸命令已发送成功!设备编号:{device_id}")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
return True
else:
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
return False
except Exception as e:
# 启用日志文本框记录错误
self.log_text.config(state=tk.NORMAL)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
messagebox.showerror("错误", f"拉闸命令执行失败:{str(e)}")
return False
def execute_command(self, command):
"""执行串口命令(拉闸/合闸/电表清零)"""
try:
# 弹出输入框,要求用户输入设备编号
device_id = tk.simpledialog.askstring(
"输入设备编号",
f"请输入{command}操作的设备编号:",
parent=self.root
)
# 检查用户是否取消输入
if device_id is None:
return
# 验证设备编号是否有效
if not device_id.isdigit():
messagebox.showerror("错误", "设备编号必须为数字")
return
# 根据命令类型调用不同的处理方法
if command == "拉闸":
# 调用封装好的拉闸方法
self.send_power_off_command(device_id)
elif command == "合闸":
# 调用封装好的合闸方法
self.send_power_on_command(device_id)
else:
# 其他命令仍然使用原有逻辑
# 启用日志文本框
self.log_text.config(state=tk.NORMAL)
# 记录操作时间、命令和设备编号
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行命令:{command},设备编号:{device_id}\n")
# ========== 核心串口通信逻辑 ==========
# 这里是Demo版本实际使用时需要替换为你的串口通信协议
# 示例:向串口发送对应指令
ser = serial.Serial(
port=self.selected_port,
baudrate=2400, # 根据你的设备调整波特率
parity=serial.PARITY_EVEN, # 奇偶校验NONE/ODD/EVEN现在设置为偶校验
stopbits=serial.STOPBITS_ONE, # 停止位1/1.5/2
bytesize=serial.EIGHTBITS, # 数据位5/6/7/8
timeout=1,
rtscts=False, # RTS/CTS流控
dsrdtr=False # DSR/DTR流控
)
# 设置初始RTS/DTR状态
ser.rts = False
ser.dtr = False
# 添加调试日志,显示当前串口配置
self.log_text.insert(tk.END, f"[{current_time}] 串口配置:波特率={ser.baudrate}, 奇偶校验={ser.parity}, 停止位={ser.stopbits}, 数据位={ser.bytesize}\n")
if ser.is_open:
# 根据命令发送不同指令(请根据实际协议修改)
base_commands = {
"合闸": "FE FE FE FE 68 08 00 09 10 24 10 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 54 34 59 F4 16",
"电表清零": ""
}
# 记录要发送的指令(带空格格式)
if command in base_commands:
base_hex = base_commands[command]
# 根据设备编号生成完整指令,传递命令类型
full_command = self.generate_command_with_id(base_hex, command, device_id)
self.log_text.insert(tk.END, f"[{current_time}] 发送指令:{full_command}\n")
send_bytes = self.hex_string_to_bytes(full_command)
# 485通信需要控制RTS/DTR来切换发送/接收模式
# 发送前将RTS置为高发送完成后置为低
ser.rts = True # 启用发送模式
time.sleep(0.01) # 短暂延时确保硬件准备就绪
# 发送指令并记录发送字节数
bytes_sent = ser.write(send_bytes)
ser.flush() # 确保数据完全发送
time.sleep(0.01) # 短暂延时确保发送完成
ser.rts = False # 切换到接收模式
self.log_text.insert(tk.END, f"[{current_time}] 成功发送 {bytes_sent} 字节\n")
# 读取响应(可选)
response = ser.read(100) # 尝试读取100字节响应而不是用readline()
# 记录响应
if response:
# 将响应转换为十六进制格式显示
response_hex = ' '.join(f'{b:02X}' for b in response)
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:{response_hex}\n")
else:
self.log_text.insert(tk.END, f"[{current_time}] 设备响应:无响应\n")
ser.close()
# 操作成功提示
messagebox.showinfo("成功", f"{command}命令已发送成功!设备编号:{device_id}")
else:
self.log_text.insert(tk.END, f"[{current_time}] 错误:无法打开串口\n")
messagebox.showerror("错误", "无法打开串口,请检查串口状态")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
except Exception as e:
# 启用日志文本框记录错误
self.log_text.config(state=tk.NORMAL)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 执行失败:{str(e)}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
messagebox.showerror("错误", f"{command}命令执行失败:{str(e)}")
def clear_log(self):
"""清空操作日志"""
try:
# 确认清空操作
if not messagebox.askyesno("确认", "是否确定清空所有操作日志?"):
return
# 启用日志文本框并清空
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
# 记录清空操作
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 操作日志已清空\n")
# 滚动到最新日志
self.log_text.see(tk.END)
# 禁用日志编辑
self.log_text.config(state=tk.DISABLED)
messagebox.showinfo("提示", "操作日志已清空")
except Exception as e:
messagebox.showerror("错误", f"清空日志失败:{str(e)}")
def save_log(self):
"""保存操作日志到文件"""
try:
# 获取日志内容
log_content = self.log_text.get(1.0, tk.END).strip()
if not log_content:
messagebox.showwarning("提示", "操作日志为空,无需保存")
return
# 弹出文件保存对话框
default_filename = f"串口操作日志_{time.strftime('%Y%m%d_%H%M%S', time.localtime())}.txt"
file_path = filedialog.asksaveasfilename(
title="保存操作日志",
defaultextension=".txt",
filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")],
initialfile=default_filename
)
if not file_path: # 用户取消保存
return
# 写入日志内容到文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(log_content)
# 记录保存操作
self.log_text.config(state=tk.NORMAL)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.log_text.insert(tk.END, f"[{current_time}] 日志已保存至:{file_path}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
messagebox.showinfo("成功", f"操作日志已保存到:\n{file_path}")
except Exception as e:
messagebox.showerror("错误", f"保存日志失败:{str(e)}")
def main():
"""主函数"""
# 创建主窗口
root = tk.Tk()
# 创建应用实例
app = SerialPortTool(root)
# 运行主循环
root.mainloop()
if __name__ == "__main__":
# 检查依赖并运行
try:
import serial
except ImportError:
print("正在安装依赖库 pyserial...")
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "pyserial"])
import serial
main()

38
main.spec Normal file
View File

@@ -0,0 +1,38 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['main.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='main',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=False,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

135
test_address_input.py Normal file
View File

@@ -0,0 +1,135 @@
# 测试地址域输入处理
class TestAddressInput:
def __init__(self):
# 模拟SerialPortTool类的相关方法
self.base_commands = {
"合闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 9A 16",
"拉闸": "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4D 33 8C 8C 56 54 34 59 97 16"
}
def hex_string_to_bytes(self, hex_str):
"""将带空格的十六进制字符串转换为字节数组"""
hex_clean = hex_str.replace(" ", "")
return bytes.fromhex(hex_clean)
def calculate_cs(self, data_bytes):
"""计算校验和XOR算法"""
cs = 0
for byte in data_bytes:
cs ^= byte
return cs
def generate_command_with_id(self, base_command, command_type, device_input):
"""测试地址域输入处理"""
try:
# 将原指令拆分为列表
parts = base_command.split()
# 确保指令长度正确
if len(parts) != 32:
print(f"警告指令长度不正确预期32个部分实际{len(parts)}个部分")
# 修正指令长度,确保格式正确
parts = parts[:32]
while len(parts) < 32:
parts.append("00")
# 首先处理合闸指令的4D替换确保不会被其他逻辑影响
if command_type == "合闸":
for i, part in enumerate(parts):
if part == "4D":
parts[i] = "4F"
break
# 处理设备输入:支持两种格式
# 1. 单个设备编号(如 "1" 或 "10"
# 2. 完整的地址域字符串(如 "080009102420"
if len(device_input) == 12 and device_input.isdigit():
# 完整的地址域字符串需要拆分为6个2位十六进制部分
print(f"检测到完整地址域输入: {device_input}")
# 将地址域字符串拆分为6个2位十六进制部分
address_parts = [device_input[i:i+2] for i in range(0, 12, 2)]
print(f"地址域拆分结果: {address_parts}")
# 替换地址域索引5到10
if len(parts) >= 11:
for i in range(6):
if i < len(address_parts) and 5 + i < len(parts):
parts[5 + i] = address_parts[i].upper()
else:
# 单个设备编号转换为2位十六进制
print(f"检测到设备编号输入: {device_input}")
device_id_int = int(device_input)
device_id_hex = f"{device_id_int:02X}"
# 替换设备编号原指令中第11个位置索引10是设备编号位置
if len(parts) > 10:
parts[10] = device_id_hex # 索引10是设备编号位置
# 移除旧的校验码,构建不带校验码的指令
command_without_cs = parts[:-2] # 移除校验码和结束符
command_str = " ".join(command_without_cs)
# 计算校验码
command_bytes = self.hex_string_to_bytes(command_str)
cs = self.calculate_cs(command_bytes)
cs_hex = f"{cs:02X}"
# 替换校验码原指令中倒数第2个位置索引-2
if len(parts) > 1:
parts[-2] = cs_hex
# 确保结束符正确
parts[-1] = "16"
# 构建最终指令
final_command = " ".join(parts)
# 调试:打印最终指令
print(f"命令类型: {command_type}")
print(f"设备输入: {device_input}")
print(f"最终指令: {final_command}")
# 验证指令格式是否正确
final_parts = final_command.split()
print(f"\n最终指令各部分索引和值:")
for i, part in enumerate(final_parts):
print(f"索引 {i}: {part}")
print(f"\n指令长度: {len(final_parts)}")
print(f"地址域索引5-10: {' '.join(final_parts[5:11])}")
print(f"指令格式是否正确: {len(final_parts) == 32 and final_parts[0] == 'FE' and final_parts[-1] == '16'}")
return final_command
except Exception as e:
print(f"指令生成错误: {e}")
return base_command
# 测试
if __name__ == "__main__":
test = TestAddressInput()
# 测试1完整地址域输入 "080009102420"
print("=== 测试拉闸指令,完整地址域输入: 080009102420 ===")
test.generate_command_with_id(
test.base_commands["拉闸"],
"拉闸",
"080009102420"
)
print("\n" + "="*50 + "\n")
# 测试2单个设备编号输入 "10"
print("=== 测试合闸指令,单个设备编号输入: 10 ===")
test.generate_command_with_id(
test.base_commands["合闸"],
"合闸",
"10"
)
print("\n" + "="*50 + "\n")
# 测试3完整地址域输入 "080009102408"
print("=== 测试拉闸指令,完整地址域输入: 080009102408 ===")
test.generate_command_with_id(
test.base_commands["拉闸"],
"拉闸",
"080009102408"
)

55
test_check_cs.py Normal file
View File

@@ -0,0 +1,55 @@
# 测试校验位计算
# 成功的合闸报文
success_command = "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 9A 16"
# 失败的合闸报文(校验位错误)
fail_command = "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 99 16"
def calculate_command_cs(command_str):
"""计算指令的CS校验位"""
try:
# 将指令拆分为列表
parts = command_str.split()
# 移除校验码和结束符(如果存在)
if len(parts) >= 2:
parts = parts[:-2]
# 重新组合为不带校验码和结束符的指令字符串
command_without_cs = " ".join(parts)
print(f"计算校验位的指令部分: {command_without_cs}")
# 转换为字节数组
command_bytes = bytes.fromhex(command_without_cs)
print(f"指令字节长度: {len(command_bytes)}")
# 计算校验和
cs = 0
for byte in command_bytes:
cs ^= byte
print(f"XOR进行中: 0x{cs:02X}")
# 转换为2位十六进制字符串
cs_hex = f"{cs:02X}"
print(f"最终校验位: {cs_hex}")
return cs_hex
except Exception as e:
print(f"CS校验位计算错误: {e}")
return "00"
# 测试成功报文的校验位
print("=== 测试成功报文的校验位 ===")
success_cs = calculate_command_cs(success_command)
print(f"成功报文的校验位: {success_cs}")
print("\n" + "="*50 + "\n")
# 测试失败报文的校验位
print("=== 测试失败报文的校验位 ===")
fail_cs = calculate_command_cs(fail_command)
print(f"失败报文的校验位: {fail_cs}")
print(f"失败报文原校验位: 99")
print(f"正确的校验位应该是: {fail_cs}")
# 生成正确的失败报文
fail_parts = fail_command.split()
fail_parts[-2] = fail_cs
correct_fail_command = " ".join(fail_parts)
print(f"\n正确的报文: {correct_fail_command}")

15
test_command.py Normal file
View File

@@ -0,0 +1,15 @@
# 测试指令结构分析
command = "FE FE FE FE 68 08 00 09 10 24 20 68 1C 10 35 89 67 45 B7 72 38 9C 4F 33 8C 8C 56 55 34 59 9A 16"
parts = command.split()
print("指令各部分索引和值:")
for i, part in enumerate(parts):
print(f"索引 {i}: {part}")
print("\n指令长度:", len(parts))
print("地址域开始:索引 4 (68)")
print("地址域内容:", parts[5:11]) # 索引5到10
print("地址域结束:索引 11 (68)")
print("数据域开始:索引 12")
print("校验码:索引", len(parts)-2)
print("结束符:索引", len(parts)-1)