modbus_MediaPipe/MQTT_project/mqtt_pubsub_gui.py

434 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# mqtt_pubsub_split.py
# pip install PyQt6 paho-mqtt
import json
import re
from typing import Optional, Dict, Any, List, Tuple
from PyQt6 import QtCore, QtGui, QtWidgets
from paho.mqtt import client as mqtt
# ---------- PLC 風格 Payload 的容錯解析(選用) ----------
def robust_json_parse(raw: str) -> Dict[str, Any]:
s = raw.strip()
if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):
s = s[1:-1].strip()
s = s.replace("TRUE", "true").replace("FALSE", "false")
s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s)
s = s.replace("'", '"')
return json.loads(s)
# ---------- MQTT 控制器 ----------
class MqttController(QtCore.QObject):
sig_connected = QtCore.pyqtSignal()
sig_disconnected = QtCore.pyqtSignal(int)
sig_log = QtCore.pyqtSignal(str)
sig_message_raw = QtCore.pyqtSignal(str, str) # topic, raw
def __init__(self, parent=None):
super().__init__(parent)
self.client: Optional[mqtt.Client] = None
def connect(self, host: str, port: int, client_id: str = ""):
if self.client:
try:
self.client.loop_stop()
self.client.disconnect()
except Exception:
pass
self.client = None
self.client = mqtt.Client(client_id=client_id or None, protocol=mqtt.MQTTv311)
self.client.reconnect_delay_set(min_delay=1, max_delay=10)
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
try:
self.sig_log.emit(f"🔌 Connecting to {host}:{port} ...")
self.client.connect(host, port, keepalive=60)
except Exception as e:
self.sig_log.emit(f"❌ Connect failed: {e}")
return
self.client.loop_start()
def disconnect(self):
if not self.client:
return
try:
self.client.loop_stop()
self.client.disconnect()
except Exception as e:
self.sig_log.emit(f"Disconnect error: {e}")
def subscribe(self, topic: str, qos: int):
if not self.client:
self.sig_log.emit("⚠️ Not connected.")
return
try:
self.client.subscribe(topic, qos=qos)
self.sig_log.emit(f"📡 Subscribed: {topic} (QoS={qos})")
except Exception as e:
self.sig_log.emit(f"Subscribe error: {e}")
def publish(self, topic: str, payload: str, qos: int, retain: bool):
if not self.client:
self.sig_log.emit("⚠️ Not connected.")
return
try:
self.client.publish(topic, payload=payload, qos=qos, retain=retain)
self.sig_log.emit(f"📤 Publish -> topic={topic}, qos={qos}, retain={retain}\n{payload}")
except Exception as e:
self.sig_log.emit(f"Publish error: {e}")
# paho callbacks
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
self.sig_log.emit("✅ Connected")
self.sig_connected.emit()
else:
self.sig_log.emit(f"❌ Connect failed (reason_code={reason_code})")
def _on_disconnect(self, client, userdata, reason_code, properties=None):
self.sig_log.emit(f"🔌 Disconnected (reason_code={reason_code})")
self.sig_disconnected.emit(reason_code)
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
self.sig_message_raw.emit(msg.topic, msg.payload.decode("utf-8", errors="replace"))
# ---------- 共同小元件:動態 Key/Value 表 ----------
class KeyValueTable(QtWidgets.QWidget):
"""
動態 Key/Value 表。
- editable_values = True → Value 可編輯Publisher
- editable_values = False → Value 只讀Subscriber
"""
def __init__(self, editable_values: bool, parent=None):
super().__init__(parent)
self.editable_values = editable_values
self.layout = QtWidgets.QGridLayout(self)
self.layout.setHorizontalSpacing(8)
self.layout.setVerticalSpacing(6)
self.layout.addWidget(QtWidgets.QLabel("Key"), 0, 0)
self.layout.addWidget(QtWidgets.QLabel("Value"), 0, 1)
self.rows: List[Tuple[QtWidgets.QLineEdit, QtWidgets.QWidget, QtWidgets.QPushButton]] = []
# 預先 3 列
for _ in range(3):
self.add_row()
def add_row(self):
row_index = len(self.rows) + 1
key_edit = QtWidgets.QLineEdit()
key_edit.setPlaceholderText("key例如Move_Forward")
if self.editable_values:
val_widget: QtWidgets.QWidget = QtWidgets.QLineEdit()
val_widget.setPlaceholderText("value例如true / 1.2 / text")
else:
val_widget = QtWidgets.QLineEdit()
val_widget.setReadOnly(True)
val_widget.setPlaceholderText("等待資料...")
btn_del = QtWidgets.QPushButton("")
btn_del.setFixedWidth(28)
def _remove():
key_edit.deleteLater()
val_widget.deleteLater()
btn_del.deleteLater()
self.rows.remove((key_edit, val_widget, btn_del))
self._reflow()
btn_del.clicked.connect(_remove)
self.rows.append((key_edit, val_widget, btn_del))
self._reflow()
def _reflow(self):
# 清除標題列以外的 widget 排版
for i in reversed(range(1, self.layout.rowCount())):
for j in range(self.layout.columnCount()):
item = self.layout.itemAtPosition(i, j)
if item:
w = item.widget()
if w:
self.layout.removeWidget(w)
# 重新佈局
for r, (k, v, b) in enumerate(self.rows, start=1):
self.layout.addWidget(k, r, 0)
self.layout.addWidget(v, r, 1)
self.layout.addWidget(b, r, 2)
def as_dict(self) -> Dict[str, Any]:
"""Publisher 使用:將 Key/Value 組為 dict自動判斷型別"""
def _coerce(s: str):
t = s.strip()
if t.lower() == "true": return True
if t.lower() == "false": return False
try:
if "." in t or "e" in t.lower():
return float(t)
return int(t)
except Exception:
return t
data = {}
for k_edit, v_edit, _ in self.rows:
key = k_edit.text().strip()
if not key:
continue
if isinstance(v_edit, QtWidgets.QLineEdit):
data[key] = _coerce(v_edit.text())
return data
def update_values_from_dict(self, data: Dict[str, Any]):
"""Subscriber 使用:根據 data 更新對應 Key 的 Value。"""
for k_edit, v_edit, _ in self.rows:
key = k_edit.text().strip()
if not key:
continue
if key in data and isinstance(v_edit, QtWidgets.QLineEdit):
v_edit.setText(str(data[key]))
# ---------- 主視窗 ----------
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("MQTT Pub/Sub Split GUI (PyQt6)")
self.resize(1100, 720)
central = QtWidgets.QWidget(self)
self.setCentralWidget(central)
root = QtWidgets.QVBoxLayout(central)
root.setContentsMargins(12, 12, 12, 12)
root.setSpacing(10)
# ------- Connection (上方通用) -------
conn_box = QtWidgets.QGroupBox("Connection")
grid = QtWidgets.QGridLayout(conn_box)
grid.setHorizontalSpacing(12); grid.setVerticalSpacing(8)
self.ed_host = QtWidgets.QLineEdit(); self.ed_host.setPlaceholderText("Broker Host (e.g. 192.168.1.10)")
self.ed_port = QtWidgets.QLineEdit(); self.ed_port.setPlaceholderText("Port (e.g. 1883)")
self.ed_client_id = QtWidgets.QLineEdit(); self.ed_client_id.setPlaceholderText("Client ID (optional)")
self.btn_connect = QtWidgets.QPushButton("Connect")
self.btn_disconnect = QtWidgets.QPushButton("Disconnect")
self.lbl_status = QtWidgets.QLabel("Disconnected")
grid.addWidget(QtWidgets.QLabel("Host"), 0, 0); grid.addWidget(self.ed_host, 0, 1)
grid.addWidget(QtWidgets.QLabel("Port"), 0, 2); grid.addWidget(self.ed_port, 0, 3)
grid.addWidget(QtWidgets.QLabel("Client ID"), 1, 0); grid.addWidget(self.ed_client_id, 1, 1, 1, 3)
grid.addWidget(self.btn_connect, 2, 2); grid.addWidget(self.btn_disconnect, 2, 3)
grid.addWidget(QtWidgets.QLabel("Status"), 3, 0); grid.addWidget(self.lbl_status, 3, 1, 1, 3)
root.addWidget(conn_box)
# ------- 左右分欄 -------
splitter = QtWidgets.QSplitter(QtCore.Qt.Orientation.Horizontal)
# ---- 左Subscriber ----
sub_panel = QtWidgets.QWidget()
sub_layout = QtWidgets.QVBoxLayout(sub_panel); sub_layout.setSpacing(8)
sub_box = QtWidgets.QGroupBox("Subscriber")
sgrid = QtWidgets.QGridLayout(sub_box); sgrid.setHorizontalSpacing(12); sgrid.setVerticalSpacing(8)
self.ed_sub_topic = QtWidgets.QLineEdit(); self.ed_sub_topic.setPlaceholderText("Topic to subscribe")
self.ed_sub_qos = QtWidgets.QLineEdit(); self.ed_sub_qos.setPlaceholderText("QoS (0, 1, or 2)")
self.chk_plc_parse = QtWidgets.QCheckBox("Try PLC-style parsing if JSON fails")
self.btn_sub = QtWidgets.QPushButton("Subscribe")
sgrid.addWidget(QtWidgets.QLabel("Topic"), 0, 0); sgrid.addWidget(self.ed_sub_topic, 0, 1, 1, 3)
sgrid.addWidget(QtWidgets.QLabel("QoS"), 1, 0); sgrid.addWidget(self.ed_sub_qos, 1, 1)
sgrid.addWidget(self.chk_plc_parse, 1, 2); sgrid.addWidget(self.btn_sub, 1, 3)
self.sub_table = KeyValueTable(editable_values=False)
btn_add_sub = QtWidgets.QPushButton("+ Add field")
btn_add_sub.clicked.connect(self.sub_table.add_row)
sub_layout.addWidget(sub_box)
sub_layout.addWidget(self.sub_table)
sub_layout.addWidget(btn_add_sub)
# ---- 右Publisher ----
pub_panel = QtWidgets.QWidget()
pub_layout = QtWidgets.QVBoxLayout(pub_panel); pub_layout.setSpacing(8)
pub_box = QtWidgets.QGroupBox("Publisher")
pgrid = QtWidgets.QGridLayout(pub_box); pgrid.setHorizontalSpacing(12); pgrid.setVerticalSpacing(8)
self.ed_pub_topic = QtWidgets.QLineEdit(); self.ed_pub_topic.setPlaceholderText("Topic to publish")
self.ed_pub_qos = QtWidgets.QLineEdit(); self.ed_pub_qos.setPlaceholderText("QoS (0, 1, or 2)")
self.chk_retain = QtWidgets.QCheckBox("Retain")
self.btn_publish = QtWidgets.QPushButton("Publish")
pgrid.addWidget(QtWidgets.QLabel("Topic"), 0, 0); pgrid.addWidget(self.ed_pub_topic, 0, 1, 1, 3)
pgrid.addWidget(QtWidgets.QLabel("QoS"), 1, 0); pgrid.addWidget(self.ed_pub_qos, 1, 1)
pgrid.addWidget(self.chk_retain, 1, 2); pgrid.addWidget(self.btn_publish, 1, 3)
self.pub_table = KeyValueTable(editable_values=True)
btn_add_pub = QtWidgets.QPushButton("+ Add field")
btn_add_pub.clicked.connect(self.pub_table.add_row)
pub_layout.addWidget(pub_box)
pub_layout.addWidget(self.pub_table)
pub_layout.addWidget(btn_add_pub)
splitter.addWidget(sub_panel)
splitter.addWidget(pub_panel)
splitter.setSizes([550, 550])
root.addWidget(splitter, 1)
# ------- Logs -------
logs = QtWidgets.QGroupBox("Logs (Raw)")
v = QtWidgets.QVBoxLayout(logs)
self.txt_raw = QtWidgets.QPlainTextEdit(); self.txt_raw.setReadOnly(True); self.txt_raw.setMaximumBlockCount(2000)
v.addWidget(self.txt_raw)
root.addWidget(logs)
# 控制器與事件
self.mqtt = MqttController(self)
self.mqtt.sig_connected.connect(self._on_connected)
self.mqtt.sig_disconnected.connect(self._on_disconnected)
self.mqtt.sig_log.connect(self._log)
self.mqtt.sig_message_raw.connect(self._on_msg_raw)
self.btn_connect.clicked.connect(self._do_connect)
self.btn_disconnect.clicked.connect(self.mqtt.disconnect)
self.btn_sub.clicked.connect(self._do_subscribe)
self.btn_publish.clicked.connect(self._do_publish)
# 美化
self._apply_dark_fusion_palette()
# ---- UI handlers ----
def _do_connect(self):
host = self.ed_host.text().strip()
port_txt = self.ed_port.text().strip()
client_id = self.ed_client_id.text().strip()
if not host or not port_txt:
QtWidgets.QMessageBox.warning(self, "Warning", "Host 與 Port 皆不可為空")
return
try:
port = int(port_txt)
if not (1 <= port <= 65535): raise ValueError
except Exception:
QtWidgets.QMessageBox.warning(self, "Warning", "Port 必須是 1~65535 的整數")
return
self.lbl_status.setText("Connecting...")
self.mqtt.connect(host, port, client_id)
def _do_subscribe(self):
topic = self.ed_sub_topic.text().strip()
qos_txt = self.ed_sub_qos.text().strip()
if not topic or not qos_txt:
QtWidgets.QMessageBox.warning(self, "Warning", "Subscribe 的 Topic 與 QoS 不可為空")
return
try:
qos = int(qos_txt)
if qos not in (0, 1, 2): raise ValueError
except Exception:
QtWidgets.QMessageBox.warning(self, "Warning", "QoS 必須是 0、1 或 2")
return
self.mqtt.subscribe(topic, qos)
def _do_publish(self):
topic = self.ed_pub_topic.text().strip()
qos_txt = self.ed_pub_qos.text().strip()
if not topic or not qos_txt:
QtWidgets.QMessageBox.warning(self, "Warning", "Publish 的 Topic 與 QoS 不可為空")
return
try:
qos = int(qos_txt)
if qos not in (0, 1, 2): raise ValueError
except Exception:
QtWidgets.QMessageBox.warning(self, "Warning", "QoS 必須是 0、1 或 2")
return
retain = self.chk_retain.isChecked()
payload_dict = self.pub_table.as_dict()
if not payload_dict:
QtWidgets.QMessageBox.warning(self, "Warning", "至少輸入一個非空的 Key")
return
payload = json.dumps(payload_dict, ensure_ascii=False)
self.mqtt.publish(topic, payload, qos, retain)
# ---- MQTT slots ----
@QtCore.pyqtSlot()
def _on_connected(self):
self.lbl_status.setText("Connected")
@QtCore.pyqtSlot(int)
def _on_disconnected(self, reason_code: int):
self.lbl_status.setText(f"Disconnected ({reason_code})")
@QtCore.pyqtSlot(str, str)
def _on_msg_raw(self, topic: str, raw: str):
self._append(self.txt_raw, f"[{topic}] {raw}")
# 嘗試解析
parsed = None
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
if self.chk_plc_parse.isChecked():
try:
parsed = robust_json_parse(raw)
except Exception as e:
self._append(self.txt_raw, f"❌ Parse error: {e}")
if isinstance(parsed, dict):
# 只更新左邊 Subscriber 表中對應的 key
self.sub_table.update_values_from_dict(parsed)
# ---- Utils ----
def _append(self, widget: QtWidgets.QPlainTextEdit, text: str):
widget.appendPlainText(text)
widget.verticalScrollBar().setValue(widget.verticalScrollBar().maximum())
def _log(self, text: str):
self._append(self.txt_raw, text)
def _apply_dark_fusion_palette(self):
app = QtWidgets.QApplication.instance()
app.setStyle("Fusion")
pal = QtGui.QPalette()
base = QtGui.QColor(30, 30, 30)
alt = QtGui.QColor(45, 45, 45)
text = QtGui.QColor(220, 220, 220)
btn = QtGui.QColor(53, 53, 53)
hl = QtGui.QColor(42, 130, 218)
pal.setColor(QtGui.QPalette.ColorRole.Window, base)
pal.setColor(QtGui.QPalette.ColorRole.WindowText, text)
pal.setColor(QtGui.QPalette.ColorRole.Base, QtGui.QColor(25, 25, 25))
pal.setColor(QtGui.QPalette.ColorRole.AlternateBase, alt)
pal.setColor(QtGui.QPalette.ColorRole.ToolTipBase, text)
pal.setColor(QtGui.QPalette.ColorRole.ToolTipText, text)
pal.setColor(QtGui.QPalette.ColorRole.Text, text)
pal.setColor(QtGui.QPalette.ColorRole.Button, btn)
pal.setColor(QtGui.QPalette.ColorRole.ButtonText, text)
pal.setColor(QtGui.QPalette.ColorRole.Highlight, hl)
pal.setColor(QtGui.QPalette.ColorRole.HighlightedText, QtGui.QColor(255, 255, 255))
app.setPalette(pal)
def main():
import sys
app = QtWidgets.QApplication(sys.argv)
# Windows 高 DPI可忽略失敗
try:
import ctypes
ctypes.windll.shcore.SetProcessDpiAwareness(1)
except Exception:
pass
w = MainWindow()
w.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()