modbus_MediaPipe/MQTT_project/mqtt_gui_qt.py

337 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# mqtt_gui_qt.py
# pip install PyQt6 paho-mqtt
import json
import re
from typing import Optional, Dict, Any
from PyQt6 import QtCore, QtGui, QtWidgets
from paho.mqtt import client as mqtt
DEFAULT_HOST = "169.254.11.130"
DEFAULT_PORT = 1886
DEFAULT_TOPIC = "topic_plc_and_py_for_AXIS"
DEFAULT_QOS = 1
# -------- Robust JSON fixer for PLC-style payloads --------
def robust_json_parse(raw: str) -> Dict[str, Any]:
"""
將 PLC 常見不標準 JSON 修正後解析成 dict。
例:'{Forward_RPM:1500,UPDown_RPM:800}' -> {'Forward_RPM':1500,'UPDown_RPM':800}
也會把 TRUE/FALSE -> true/false補 key 雙引號;去外層引號;單引號轉雙引號。
"""
s = raw.strip()
# 去掉最外層引號(若有)
if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):
s = s[1:-1].strip()
# PLC 常見布林大寫 -> JSON 規範小寫
s = s.replace("TRUE", "true").replace("FALSE", "false")
# 自動補 key 的雙引號:{key: -> {"key":
s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s)
# 可能存在單引號包字串:統一轉成雙引號
s = s.replace("'", '"')
return json.loads(s)
# -------- MQTT client wrapper with Qt signals --------
class MqttClientWrapper(QtCore.QObject):
sig_connected = QtCore.pyqtSignal()
sig_disconnected = QtCore.pyqtSignal(int)
sig_raw_message = QtCore.pyqtSignal(str)
sig_parsed_message = QtCore.pyqtSignal(dict)
sig_log = QtCore.pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
self.client: Optional[mqtt.Client] = None
self.topic: str = ""
self.qos: int = 0
def connect_and_start(self, host: str, port: int, topic: str, qos: int):
# 若已存在先清理
if self.client is not None:
try:
self.client.loop_stop()
self.client.disconnect()
except Exception:
pass
self.client = None
self.topic = topic
self.qos = qos
# 建立 client
self.client = mqtt.Client(client_id="qt_gui_sub_plc", protocol=mqtt.MQTTv311)
# 自動重連延遲
self.client.reconnect_delay_set(min_delay=1, max_delay=10)
# 綁定回呼
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
try:
self.client.connect(host, port, keepalive=60)
except Exception as e:
self.sig_log.emit(f"❌ Connect failed: {e}")
# 不要 loop_start以免空轉
return
# 背景執行,不阻塞 UI
self.client.loop_start()
self.sig_log.emit(f"🔌 Connecting to {host}:{port} | topic={topic} | qos={qos}")
def disconnect_and_stop(self):
if self.client is None:
return
try:
self.client.loop_stop()
self.client.disconnect()
except Exception as e:
self.sig_log.emit(f"Disconnect error: {e}")
# ---- Callbacks (paho-mqtt 2.x signature) ----
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
# 訂閱
try:
client.subscribe(self.topic, qos=self.qos)
self.sig_log.emit(f"✅ Connected. Subscribed: {self.topic} (QoS={self.qos})")
except Exception as e:
self.sig_log.emit(f"Subscribe error: {e}")
self.sig_connected.emit()
else:
self.sig_log.emit(f"❌ Connect failed. reason_code={reason_code}")
def _on_disconnect(self, client, userdata, reason_code, properties=None):
self.sig_disconnected.emit(reason_code)
self.sig_log.emit(f"🔌 Disconnected. reason_code={reason_code}")
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
payload_raw = msg.payload.decode("utf-8", errors="replace")
self.sig_raw_message.emit(payload_raw)
# 先試標準 JSON失敗再容錯
try:
data = json.loads(payload_raw)
except json.JSONDecodeError:
try:
data = robust_json_parse(payload_raw)
except Exception as e:
self.sig_log.emit(f"❌ Parse error: {e}")
return
self.sig_parsed_message.emit(data)
# -------- Main Window --------
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("MQTT PLC Subscriber (PyQt6)")
self.resize(900, 620)
# 狀態
self.state: Dict[str, Any] = {
"BUTTON_Y": None,
"Forward_RPM": None,
"UPDown_RPM": None,
}
# 中央 widget + 版面
central = QtWidgets.QWidget(self)
self.setCentralWidget(central)
root_layout = QtWidgets.QVBoxLayout(central)
root_layout.setContentsMargins(12, 12, 12, 12)
root_layout.setSpacing(10)
# ---- Connection Panel ----
panel = QtWidgets.QGroupBox("Connection")
grid = QtWidgets.QGridLayout(panel)
grid.setHorizontalSpacing(12)
grid.setVerticalSpacing(8)
self.ed_host = QtWidgets.QLineEdit(DEFAULT_HOST)
self.ed_port = QtWidgets.QSpinBox()
self.ed_port.setRange(1, 65535)
self.ed_port.setValue(DEFAULT_PORT)
self.ed_topic = QtWidgets.QLineEdit(DEFAULT_TOPIC)
self.ed_qos = QtWidgets.QComboBox()
self.ed_qos.addItems(["0", "1", "2"])
self.ed_qos.setCurrentIndex(DEFAULT_QOS)
self.btn_connect = QtWidgets.QPushButton("Connect")
self.btn_disconnect = QtWidgets.QPushButton("Disconnect")
self.lbl_status = QtWidgets.QLabel("Disconnected")
grid.addWidget(QtWidgets.QLabel("Broker Host"), 0, 0)
grid.addWidget(self.ed_host, 0, 1)
grid.addWidget(QtWidgets.QLabel("Port"), 0, 2)
grid.addWidget(self.ed_port, 0, 3)
grid.addWidget(QtWidgets.QLabel("Topic"), 1, 0)
grid.addWidget(self.ed_topic, 1, 1, 1, 3)
grid.addWidget(QtWidgets.QLabel("QoS"), 2, 0)
grid.addWidget(self.ed_qos, 2, 1)
grid.addWidget(self.btn_connect, 2, 2)
grid.addWidget(self.btn_disconnect, 2, 3)
grid.addWidget(QtWidgets.QLabel("Status"), 3, 0)
grid.addWidget(self.lbl_status, 3, 1, 1, 3)
root_layout.addWidget(panel)
# ---- State Panel ----
state_panel = QtWidgets.QGroupBox("Current State")
sgrid = QtWidgets.QGridLayout(state_panel)
sgrid.setHorizontalSpacing(20)
sgrid.setVerticalSpacing(8)
self.lbl_btn_y = QtWidgets.QLabel("None")
self.lbl_forward = QtWidgets.QLabel("None")
self.lbl_updown = QtWidgets.QLabel("None")
for lab in (self.lbl_btn_y, self.lbl_forward, self.lbl_updown):
lab.setFrameShape(QtWidgets.QFrame.Shape.StyledPanel)
lab.setMinimumWidth(140)
lab.setAlignment(QtCore.Qt.AlignmentFlag.AlignLeft | QtCore.Qt.AlignmentFlag.AlignVCenter)
sgrid.addWidget(QtWidgets.QLabel("BUTTON_Y"), 0, 0)
sgrid.addWidget(self.lbl_btn_y, 0, 1)
sgrid.addWidget(QtWidgets.QLabel("Forward_RPM"), 1, 0)
sgrid.addWidget(self.lbl_forward, 1, 1)
sgrid.addWidget(QtWidgets.QLabel("UPDown_RPM"), 2, 0)
sgrid.addWidget(self.lbl_updown, 2, 1)
root_layout.addWidget(state_panel)
# ---- Logs ----
log_panel = QtWidgets.QGroupBox("Logs")
vbox = QtWidgets.QVBoxLayout(log_panel)
self.txt_log = QtWidgets.QPlainTextEdit()
self.txt_log.setReadOnly(True)
self.txt_log.setMaximumBlockCount(2000)
vbox.addWidget(self.txt_log)
root_layout.addWidget(log_panel, 1)
# MQTT wrapper
self.mqtt = MqttClientWrapper(self)
self.mqtt.sig_connected.connect(self._on_connected)
self.mqtt.sig_disconnected.connect(self._on_disconnected)
self.mqtt.sig_raw_message.connect(self._on_raw_message)
self.mqtt.sig_parsed_message.connect(self._on_parsed_message)
self.mqtt.sig_log.connect(self._log)
# actions
self.btn_connect.clicked.connect(self._do_connect)
self.btn_disconnect.clicked.connect(self._do_disconnect)
# 美化
self._apply_dark_fusion_palette()
# ---- UI Actions ----
def _do_connect(self):
host = self.ed_host.text().strip()
port = int(self.ed_port.value())
topic = self.ed_topic.text().strip()
qos = int(self.ed_qos.currentText())
if not host or not topic:
QtWidgets.QMessageBox.warning(self, "Warning", "Host 與 Topic 不可為空")
return
self.lbl_status.setText("Connecting...")
self._log(f"🔌 Connecting to {host}:{port}, topic={topic}, qos={qos}")
self.mqtt.connect_and_start(host, port, topic, qos)
def _do_disconnect(self):
self.mqtt.disconnect_and_stop()
# ---- Signal slots from MQTT wrapper ----
@QtCore.pyqtSlot()
def _on_connected(self):
self.lbl_status.setText("Connected")
@QtCore.pyqtSlot(int)
def _on_disconnected(self, reason_code: int):
self.lbl_status.setText(f"Disconnected ({reason_code})")
@QtCore.pyqtSlot(str)
def _on_raw_message(self, text: str):
self._log(f"📥 Raw: {text}")
@QtCore.pyqtSlot(dict)
def _on_parsed_message(self, data: dict):
self._log(f"✅ Parsed: {data}")
if "BUTTON_Y" in data:
self.state["BUTTON_Y"] = data["BUTTON_Y"]
self.lbl_btn_y.setText(str(data["BUTTON_Y"]))
if "Forward_RPM" in data:
self.state["Forward_RPM"] = data["Forward_RPM"]
self.lbl_forward.setText(str(data["Forward_RPM"]))
if "UPDown_RPM" in data:
self.state["UPDown_RPM"] = data["UPDown_RPM"]
self.lbl_updown.setText(str(data["UPDown_RPM"]))
# ---- Utils ----
def _log(self, text: str):
self.txt_log.appendPlainText(text)
self.txt_log.verticalScrollBar().setValue(self.txt_log.verticalScrollBar().maximum())
def _apply_dark_fusion_palette(self):
# Fusion 深色主題
app = QtWidgets.QApplication.instance()
app.setStyle("Fusion")
pal = QtGui.QPalette()
base = QtGui.QColor(30, 30, 30)
alt = QtGui.QColor(45, 45, 45)
text = QtGui.QColor(220, 220, 220)
btn = QtGui.QColor(53, 53, 53)
hl = QtGui.QColor(42, 130, 218)
pal.setColor(QtGui.QPalette.ColorRole.Window, base)
pal.setColor(QtGui.QPalette.ColorRole.WindowText, text)
pal.setColor(QtGui.QPalette.ColorRole.Base, QtGui.QColor(25, 25, 25))
pal.setColor(QtGui.QPalette.ColorRole.AlternateBase, alt)
pal.setColor(QtGui.QPalette.ColorRole.ToolTipBase, text)
pal.setColor(QtGui.QPalette.ColorRole.ToolTipText, text)
pal.setColor(QtGui.QPalette.ColorRole.Text, text)
pal.setColor(QtGui.QPalette.ColorRole.Button, btn)
pal.setColor(QtGui.QPalette.ColorRole.ButtonText, text)
pal.setColor(QtGui.QPalette.ColorRole.BrightText, QtGui.QColor(255, 0, 0))
pal.setColor(QtGui.QPalette.ColorRole.Link, hl)
pal.setColor(QtGui.QPalette.ColorRole.Highlight, hl)
pal.setColor(QtGui.QPalette.ColorRole.HighlightedText, QtGui.QColor(255, 255, 255))
app.setPalette(pal)
def main():
import sys
app = QtWidgets.QApplication(sys.argv)
# Windows 高 DPI可忽略失敗
try:
import ctypes
ctypes.windll.shcore.SetProcessDpiAwareness(1)
except Exception:
pass
w = MainWindow()
w.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()