架构采用python3.8+pyqt5
先来看下原题:
视频中软件的效果
看看解答出的程序效果怎么样?
对应代码已经上传到了gitcode
https://gitcode.com/m0_37662818/fan_protocol_tool/overview
实现中的难点是双悬浮可视化,同时要高亮悬浮对应内容,程序中用的的事件过滤器。也许原题解用的是更好的方法。
代码如下 app.py 主文件
from PyQt5 import QtWidgets,QtCore,QtGui
from ui_main import Ui_MainWindow # 导入主界面类
import sys
import socket
import traceback
from typing import List, Tuple
class MySignal(QtCore.QObject):
update_sheet = QtCore.pyqtSignal(str)
hover_signal = QtCore.pyqtSignal(int)
my_signal = MySignal()
class MainWindow(QtWidgets.QMainWindow):
def __init__(self, parent=None):
super().__init__()
self.ui = Ui_MainWindow()
self.ui.setupUi(self) # 加载主界面
self.socket_fan = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建socket对象
self.ui.btn_connect_server.clicked.connect(self.connect_fan) # 绑定按钮事件
self.ui.stackedWidget.setCurrentIndex(0)
self.ui.msg_list.setFlow(QtWidgets.QListView.Flow.LeftToRight)
self.ui.msg_list.setViewMode(QtWidgets.QListView.ViewMode.IconMode)
self.ui.msg_list.setResizeMode(QtWidgets.QListView.ResizeMode.Adjust)
self.ui.msg_list.setWrapping(True)
self.ui.msg_list.setSpacing(3)
self.ui.msg_head_list.setFlow(QtWidgets.QListView.Flow.LeftToRight)
self.ui.msg_head_list.setWrapping(True)
self.ui.msg_head_list.setViewMode(QtWidgets.QListView.ViewMode.IconMode)
self.ui.msg_head_list.setResizeMode(QtWidgets.QListView.ResizeMode.Adjust)
self.ui.msg_head_list.setFixedHeight(70)
self.ui.msg_list.setDragEnabled(False)
self.ui.msg_head_list.setDragEnabled(False)
self.ui.msg_body_table.horizontalHeader().setSectionResizeMode(QtWidgets.QHeaderView.ResizeMode.Stretch)
self.ui.msg_body_table.verticalHeader().hide()
self.ui.msg_body_table.horizontalHeader().setStyleSheet("color:#00789d;")
self.ui.msg_list.setAttribute(QtCore.Qt.WidgetAttribute.WA_Hover,True)
self.ui.msg_body_table.setAttribute(QtCore.Qt.WidgetAttribute.WA_Hover,True)
self.ui.msg_head_list.setAttribute(QtCore.Qt.WidgetAttribute.WA_Hover,True)
self.ui.msg_list.installEventFilter(self)
self.ui.msg_body_table.installEventFilter(self)
self.ui.msg_head_list.installEventFilter(self)
self.last_hover_index = -1
self.msg_id = 0
self.ui.listWidget.itemClicked.connect(self.list_widget_clicked)
self.ui.btn_set.clicked.connect(self.btn_set_clicked)
self.ui.btn_get_sn.clicked.connect(self.btn_get_sn_clicked)
self.ui.btn_get_state.clicked.connect(self.btn_get_state_clicked)
my_signal.update_sheet.connect(self.update_sheet_cb)
my_signal.hover_signal.connect(self.draw_cb)
def draw_cb(self,index):
def draw_(index:int,color:str):
self.ui.msg_list.item(index).setBackground(QtGui.QColor(color))
if index < 3:
self.ui.msg_head_list.item(index * 2 + 1).setBackground(QtGui.QColor(color))
else:
row = index//3 - 1
col = index%3 + 1
self.ui.msg_body_table.item(row,col).setBackground(QtGui.QColor(color))
if(index == self.last_hover_index):
return
if(self.last_hover_index != index):
if(self.last_hover_index >= 0):
draw_(self.last_hover_index,'white')
if(index >= 0):
draw_(index,'orangered')
if(index < 0 and self.last_hover_index >= 0):
draw_(self.last_hover_index,'white')
self.last_hover_index = index
def eventFilter(self, watched:QtWidgets.QWidget, event:QtCore.QEvent):
if(watched == self.ui.msg_head_list):
if(event.type() == QtCore.QEvent.Type.HoverMove):
index = watched.indexAt(event.pos()).row()
my_signal.hover_signal.emit(index//2)
if(event.type() == QtCore.QEvent.Type.HoverLeave):
my_signal.hover_signal.emit(-1)
if(watched == self.ui.msg_list):
if(event.type() == QtCore.QEvent.Type.HoverMove):
index = watched.indexAt(event.pos()).row()
my_signal.hover_signal.emit(index)
if(event.type() == QtCore.QEvent.Type.HoverLeave):
my_signal.hover_signal.emit(-1)
if(watched == self.ui.msg_body_table):
if(event.type() == QtCore.QEvent.Type.HoverMove):
x = event.pos().x()
y = event.pos().y()
hight = watched.horizontalHeader().height()
point = QtCore.QPoint(x,y - hight)
Model_index = watched.indexAt(point)
p = event.pos()
if Model_index is None:
my_signal.hover_signal.emit(-1)
else:
index = -1
row = Model_index.row()
col = Model_index.column()
if(row >= 0 and col >= 1):
row = Model_index.row()
#print("row:" + str(row))
col = Model_index.column()
#print("col:" + str(col))
index = (row) * 3 + col - 1 + 3
my_signal.hover_signal.emit(index)
if(event.type() == QtCore.QEvent.Type.HoverLeave):
my_signal.hover_signal.emit(-1)
return super().eventFilter( watched, event)
def update_sheet_cb(self,text:str):
def fill_data(infos:List[Tuple[str,str]]):
for index,info in enumerate(infos):
self.ui.msg_list.addItem(info[0])
if index < 3:
if index == 0:
self.ui.msg_head_list.addItem("消息长度:")
if index == 1:
self.ui.msg_head_list.addItem("类型:")
if index == 2:
self.ui.msg_head_list.addItem("消息ID:")
self.ui.msg_head_list.addItem(str(info[1]))
elif index % 3 == 0:
self.ui.msg_body_table.insertRow(index // 3 - 1)
table_item = QtWidgets.QTableWidgetItem(str(infos[index][1]))
table_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self.ui.msg_body_table.setItem(index // 3 - 1, 1, table_item)
table_item = QtWidgets.QTableWidgetItem(str(infos[index + 1][1]))
table_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self.ui.msg_body_table.setItem(index // 3 - 1, 2, table_item)
table_item = QtWidgets.QTableWidgetItem(str(infos[index + 2][1]))
table_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self.ui.msg_body_table.setItem(index // 3 - 1, 3, table_item)
if infos[index][0] == "01":
table_item = QtWidgets.QTableWidgetItem("设备编号")
table_item.setForeground(QtGui.QColor(255,128,0))
table_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self.ui.msg_body_table.setItem(index // 3 - 1, 0, table_item)
elif infos[index][0] == "02":
table_item = QtWidgets.QTableWidgetItem("设备状态")
table_item.setForeground(QtGui.QColor(255,128,0))
table_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self.ui.msg_body_table.setItem(index // 3 - 1, 0,table_item)
elif infos[index][0] == "a0":
table_item = QtWidgets.QTableWidgetItem("结果码")
table_item.setForeground(QtGui.QColor(255,128,0))
table_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self.ui.msg_body_table.setItem(index // 3 - 1, 0,table_item)
elif infos[index][0] == "a1":
table_item = QtWidgets.QTableWidgetItem("结果描述")
table_item.setForeground(QtGui.QColor(255,128,0))
table_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignCenter)
self.ui.msg_body_table.setItem(index // 3 - 1, 0, table_item)
#上色
for i in range(self.ui.msg_list.count()):
if i < 3:
self.ui.msg_list.item(i).setForeground(QtGui.QColor(0,128,0))
elif i % 3 == 0:
self.ui.msg_list.item(i).setForeground(QtGui.QColor(255,128,0))
elif i % 3 == 1:
self.ui.msg_list.item(i).setForeground(QtGui.QColor(0, 120, 157))
pass
res = bytes.fromhex(text)
infos = self.resolve_response(res)
self.ui.msg_list.clear()
self.ui.msg_head_list.clear()
self.ui.msg_body_table.setRowCount(0)
fill_data(infos)
pass
def connect_fan(self):
ip = self.ui.input_server_ip.text()
port = int(self.ui.input_server_port.text())
print(ip, port)
if not ip or not port:
self.ui.msgWindow.append("请输入风机IP和端口!")
return
if self.ui.btn_connect_server.text() == "连接":
try:
self.socket_fan.connect((ip, port)) # 连接风机
self.ui.msgWindow.append("连接风机成功!")
#self.ui.btn_connect_server.setText("断开")
except Exception as error:
self.ui.msgWindow.append("连接风机失败!")
print(error)
elif self.ui.btn_connect_server.text() == "断开":
self.socket_fan.close() # 断开连接
self.ui.msgWindow.append("断开风机连接!")
self.ui.btn_connect_server.setText("连接")
def btn_set_clicked(self):
text = self.ui.btn_set.text()
if(text == '设置'):
self.set_dev_stats_process()
def btn_get_sn_clicked(self):
self.get_dev_stats_process(state=1)
def btn_get_state_clicked(self):
self.get_dev_stats_process(state=2)
def list_widget_clicked(self,item:QtWidgets.QListWidgetItem):
text = item.text()
if(text == "连接设备"):
self.change_page_connect()
elif(text == "设置风泵状态"):
self.change_page_set()
elif(text == "读取风泵状态"):
self.change_page_get()
def change_page_connect(self):
self.ui.stackedWidget.setCurrentIndex(0)
def change_page_set(self):
self.ui.stackedWidget.setCurrentIndex(1)
def change_page_get(self):
self.ui.stackedWidget.setCurrentIndex(2)
# 获取设备配置请求
def get_dev_config_request(self,msg_id:int,state:int)-> bytes:
body = b''
body += b'\xC1'
body += b'\x03'
body += (state.to_bytes(1,byteorder='big'))
msg_len = 8 + len(body)
msg_type = 0x01F0
msg_id = msg_id
return msg_len.to_bytes(2,byteorder='big') + msg_type.to_bytes(2,byteorder='big') + msg_id.to_bytes(4,byteorder='big') + body
# 获取设备配置通信过程
def get_dev_stats_process(self,state=0):
msg_id = self.msg_id
try:
req = self.get_dev_config_request(msg_id, state)
self.socket_fan.send(req)
self.ui.msgWindow.append("发送消息:")
self.ui.msgWindow.append("<font color=\"#006400\">"+ req.hex() + "</font> ")
res = self.socket_fan.recv(1024)
self.ui.msgWindow.append("接收消息:")
self.ui.msgWindow.append("<font color=\"#006400\">"+ res.hex() + "</font> ")
infos = self.resolve_response(res)
for index,info in enumerate(infos):
if index == 3 or index == 6:
if info[0] == "01":
self.ui.output_devSn.setText(str(infos[index + 2][1]))
elif info[0] == "02":
self.ui.output_devState.setText(str(infos[index + 2][1]))
my_signal.update_sheet.emit(res.hex())
except Exception as e:
traceback.print_exc()
self.msg_id += 1
pass
# 组装设备配置请求
def set_dev_config_request(self,msgid:int,dev:str, stats:int):
body = b''
if dev is not None:
body += b'\x01'
body += (len(dev) + 2).to_bytes(1,byteorder='big')
body += dev.encode()
if stats is not None:
body += b'\x02'
body += (3).to_bytes(1,byteorder='big')
body += stats.to_bytes(1,byteorder='big')
msg_len = 8 + len(body)
msg_type = b'\x02\xF0'
msgid = msgid
return msg_len.to_bytes(2,byteorder='big') + msg_type + msgid.to_bytes(4,byteorder='big') + body
# 设置设备状态通信过程
def set_dev_stats_process(self):
msg_id = self.msg_id
dev_id_text = self.ui.input_devSn.text() if len(self.ui.input_devSn.text()) > 0 else None
dev_stats_text = self.ui.input_devState.text()
dev_stats = None
if dev_stats_text is not None and len(dev_stats_text) != 0 :
dev_stats = int(dev_stats_text)
try:
req = self.set_dev_config_request(msgid=msg_id,dev = dev_id_text,stats=dev_stats)
self.socket_fan.send(req)
self.ui.msgWindow.append("发送消息:")
self.ui.msgWindow.append("<font color=\"#006400\">"+ req.hex() + "</font> ")
res = self.socket_fan.recv(1024)
self.ui.msgWindow.append("接收消息:")
self.ui.msgWindow.append("<font color=\"#006400\">"+ res.hex() + "</font> ")
my_signal.update_sheet.emit(res.hex())
except Exception as e:
traceback.print_exc()
self.msg_id += 1
pass
# 解析响应 bytes版
'''
def resolve_response(self, res:bytes)-> dict:
if len(res) < 8:
return {}
infos = {}
msg_type_bytes = res[2:4]
msg_type = 'unknown massage '
if msg_type_bytes == b'\x01\xF1':
msg_type = 'read state response'
elif msg_type_bytes == b'\x02\xF1':
msg_type = 'set state response'
infos["msg_len"] = int.from_bytes(res[0:2],byteorder='big')
infos["msg_id"] = int.from_bytes(res[4:8],byteorder='big')
infos["msg_type"] = msg_type
pos = 8
while pos < len(res):
code = res[pos:pos + 1]
if(msg_type_bytes == b'\x01\xF1'):
if code == b'\x01':
length = res[pos + 1]
desc = res[pos+2:pos+length].decode()
field = Field(b'\x01',length,desc)
infos['sn'] = field
pos += length
elif code == b'\x02':
length = res[pos + 1]
desc = int.from_bytes(res[pos + 2:pos + length],byteorder='big')
field = Field(b'\x02',length,desc)
infos['state'] = field
pos += length
else:
break
if(msg_type_bytes == b'\x02\xF1'):
if code == b'\xA0':
length = res[pos + 1]
desc = int.from_bytes[pos + 2: pos + length]
field = Field(b'\xA0',length,desc)
infos['code'] = field
pos += length
elif code == b'\xA1':
length = res[pos + 1]
desc = res[pos+2:pos + length].decode()
field = Field(b'\xA1',length,desc)
infos['desc'] = field
pos += length
else:
break
return infos
pass
'''
# 解析响应 str版
def resolve_response(self, res:bytes)-> list:
def resolve_response_head(res_head:bytes)->list:
infos = []
msg_type = 0
msg_len_hex = (res_head[0:2]).hex()
msg_type_hex = (res_head[2:4]).hex()
msg_id_hex = (res_head[4:8]).hex()
msg_type_desc = ""
msg_len = int.from_bytes(res_head[0:2],byteorder='big')
msg_id = int.from_bytes(res_head[4:8],byteorder='big')
if(msg_type_hex.upper() == "01F1"):
msg_type_desc = "get_state_response"
elif(msg_type_hex.upper() == "02F1"):
msg_type_desc = "set_state_response"
infos.append((msg_len_hex,msg_len))
infos.append((msg_type_hex,msg_type_desc))
infos.append((msg_id_hex,msg_id))
return infos
def resolve_response_body_get_state(res_body:bytes)->list:
pos = 0
infos = []
while pos < len(res_body):
code_hex = res_body[pos:pos + 1].hex()
length_hex = res_body[pos + 1:pos + 2].hex()
length = res_body[1]
value_hex = res_body[pos + 2: pos + length].hex()
value = ""
if(code_hex == "01"):
value = res_body[pos + 2:pos + length].decode(encoding="ANSI")
elif(code_hex == "02"):
value = int.from_bytes(res_body[pos + 2:pos + length],byteorder='big')
infos.append((code_hex,code_hex))
infos.append((length_hex,str(length)))
infos.append((value_hex,str(value)))
pos += length
return infos
def resolve_response_body_set_state(res_body:bytes)->list:
pos = 0
infos = []
while pos < len(res_body):
code_hex = res_body[pos:pos + 1].hex()
length_hex = res_body[pos + 1:pos + 2].hex()
length = res_body[pos+1]
value_hex = res_body[pos + 2:pos + length].hex()
value = ""
if(code_hex.upper() == "A0"):
value = res_body[pos+2]
elif(code_hex.upper() == "A1"):
value = res_body[pos + 2:pos + length].decode(encoding="ANSI")
infos.append((code_hex,code_hex))
infos.append((length_hex,str(length)))
infos.append((value_hex,str(value)))
pos += length
return infos
res_len = len(res)
if res_len < 8:
return []
msg_len = int.from_bytes(res[0:2],byteorder='big')
if msg_len > res_len:
return []
res_head = res[0:8]
res_body = res[8:msg_len]
infos = []
infos = resolve_response_head(res_head)
if(infos[1][0].upper() == "01F1"):
infos = infos + resolve_response_body_get_state(res_body)
elif(infos[1][0].upper() == "02F1"):
infos = infos + resolve_response_body_set_state(res_body)
return infos
if __name__ == '__main__':
app = QtWidgets.QApplication([]) # 创建QApplication对象
window = MainWindow() # 创建主界面对象
window.show() # 显示主界面
sys.exit(app.exec_()) # 运行主界面