434 lines
17 KiB
Python
434 lines
17 KiB
Python
# mqtt_pubsub_split.py
|
||
# pip install PyQt6 paho-mqtt
|
||
import json
|
||
import re
|
||
from typing import Optional, Dict, Any, List, Tuple
|
||
|
||
from PyQt6 import QtCore, QtGui, QtWidgets
|
||
from paho.mqtt import client as mqtt
|
||
|
||
|
||
# ---------- PLC 風格 Payload 的容錯解析(選用) ----------
|
||
def robust_json_parse(raw: str) -> Dict[str, Any]:
|
||
s = raw.strip()
|
||
if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):
|
||
s = s[1:-1].strip()
|
||
s = s.replace("TRUE", "true").replace("FALSE", "false")
|
||
s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s)
|
||
s = s.replace("'", '"')
|
||
return json.loads(s)
|
||
|
||
|
||
# ---------- MQTT 控制器 ----------
|
||
class MqttController(QtCore.QObject):
|
||
sig_connected = QtCore.pyqtSignal()
|
||
sig_disconnected = QtCore.pyqtSignal(int)
|
||
sig_log = QtCore.pyqtSignal(str)
|
||
sig_message_raw = QtCore.pyqtSignal(str, str) # topic, raw
|
||
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
self.client: Optional[mqtt.Client] = None
|
||
|
||
def connect(self, host: str, port: int, client_id: str = ""):
|
||
if self.client:
|
||
try:
|
||
self.client.loop_stop()
|
||
self.client.disconnect()
|
||
except Exception:
|
||
pass
|
||
self.client = None
|
||
|
||
self.client = mqtt.Client(client_id=client_id or None, protocol=mqtt.MQTTv311)
|
||
self.client.reconnect_delay_set(min_delay=1, max_delay=10)
|
||
|
||
self.client.on_connect = self._on_connect
|
||
self.client.on_disconnect = self._on_disconnect
|
||
self.client.on_message = self._on_message
|
||
|
||
try:
|
||
self.sig_log.emit(f"🔌 Connecting to {host}:{port} ...")
|
||
self.client.connect(host, port, keepalive=60)
|
||
except Exception as e:
|
||
self.sig_log.emit(f"❌ Connect failed: {e}")
|
||
return
|
||
|
||
self.client.loop_start()
|
||
|
||
def disconnect(self):
|
||
if not self.client:
|
||
return
|
||
try:
|
||
self.client.loop_stop()
|
||
self.client.disconnect()
|
||
except Exception as e:
|
||
self.sig_log.emit(f"Disconnect error: {e}")
|
||
|
||
def subscribe(self, topic: str, qos: int):
|
||
if not self.client:
|
||
self.sig_log.emit("⚠️ Not connected.")
|
||
return
|
||
try:
|
||
self.client.subscribe(topic, qos=qos)
|
||
self.sig_log.emit(f"📡 Subscribed: {topic} (QoS={qos})")
|
||
except Exception as e:
|
||
self.sig_log.emit(f"Subscribe error: {e}")
|
||
|
||
def publish(self, topic: str, payload: str, qos: int, retain: bool):
|
||
if not self.client:
|
||
self.sig_log.emit("⚠️ Not connected.")
|
||
return
|
||
try:
|
||
self.client.publish(topic, payload=payload, qos=qos, retain=retain)
|
||
self.sig_log.emit(f"📤 Publish -> topic={topic}, qos={qos}, retain={retain}\n{payload}")
|
||
except Exception as e:
|
||
self.sig_log.emit(f"Publish error: {e}")
|
||
|
||
# paho callbacks
|
||
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
|
||
if reason_code == 0:
|
||
self.sig_log.emit("✅ Connected")
|
||
self.sig_connected.emit()
|
||
else:
|
||
self.sig_log.emit(f"❌ Connect failed (reason_code={reason_code})")
|
||
|
||
def _on_disconnect(self, client, userdata, reason_code, properties=None):
|
||
self.sig_log.emit(f"🔌 Disconnected (reason_code={reason_code})")
|
||
self.sig_disconnected.emit(reason_code)
|
||
|
||
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
|
||
self.sig_message_raw.emit(msg.topic, msg.payload.decode("utf-8", errors="replace"))
|
||
|
||
|
||
# ---------- 共同小元件:動態 Key/Value 表 ----------
|
||
class KeyValueTable(QtWidgets.QWidget):
|
||
"""
|
||
動態 Key/Value 表。
|
||
- editable_values = True → Value 可編輯(Publisher)
|
||
- editable_values = False → Value 只讀(Subscriber)
|
||
"""
|
||
def __init__(self, editable_values: bool, parent=None):
|
||
super().__init__(parent)
|
||
self.editable_values = editable_values
|
||
self.layout = QtWidgets.QGridLayout(self)
|
||
self.layout.setHorizontalSpacing(8)
|
||
self.layout.setVerticalSpacing(6)
|
||
self.layout.addWidget(QtWidgets.QLabel("Key"), 0, 0)
|
||
self.layout.addWidget(QtWidgets.QLabel("Value"), 0, 1)
|
||
self.rows: List[Tuple[QtWidgets.QLineEdit, QtWidgets.QWidget, QtWidgets.QPushButton]] = []
|
||
|
||
# 預先 3 列
|
||
for _ in range(3):
|
||
self.add_row()
|
||
|
||
def add_row(self):
|
||
row_index = len(self.rows) + 1
|
||
key_edit = QtWidgets.QLineEdit()
|
||
key_edit.setPlaceholderText("key(例如:Move_Forward)")
|
||
|
||
if self.editable_values:
|
||
val_widget: QtWidgets.QWidget = QtWidgets.QLineEdit()
|
||
val_widget.setPlaceholderText("value(例如:true / 1.2 / text)")
|
||
else:
|
||
val_widget = QtWidgets.QLineEdit()
|
||
val_widget.setReadOnly(True)
|
||
val_widget.setPlaceholderText("等待資料...")
|
||
|
||
btn_del = QtWidgets.QPushButton("✕")
|
||
btn_del.setFixedWidth(28)
|
||
|
||
def _remove():
|
||
key_edit.deleteLater()
|
||
val_widget.deleteLater()
|
||
btn_del.deleteLater()
|
||
self.rows.remove((key_edit, val_widget, btn_del))
|
||
self._reflow()
|
||
|
||
btn_del.clicked.connect(_remove)
|
||
|
||
self.rows.append((key_edit, val_widget, btn_del))
|
||
self._reflow()
|
||
|
||
def _reflow(self):
|
||
# 清除標題列以外的 widget 排版
|
||
for i in reversed(range(1, self.layout.rowCount())):
|
||
for j in range(self.layout.columnCount()):
|
||
item = self.layout.itemAtPosition(i, j)
|
||
if item:
|
||
w = item.widget()
|
||
if w:
|
||
self.layout.removeWidget(w)
|
||
# 重新佈局
|
||
for r, (k, v, b) in enumerate(self.rows, start=1):
|
||
self.layout.addWidget(k, r, 0)
|
||
self.layout.addWidget(v, r, 1)
|
||
self.layout.addWidget(b, r, 2)
|
||
|
||
def as_dict(self) -> Dict[str, Any]:
|
||
"""Publisher 使用:將 Key/Value 組為 dict(自動判斷型別)。"""
|
||
def _coerce(s: str):
|
||
t = s.strip()
|
||
if t.lower() == "true": return True
|
||
if t.lower() == "false": return False
|
||
try:
|
||
if "." in t or "e" in t.lower():
|
||
return float(t)
|
||
return int(t)
|
||
except Exception:
|
||
return t
|
||
data = {}
|
||
for k_edit, v_edit, _ in self.rows:
|
||
key = k_edit.text().strip()
|
||
if not key:
|
||
continue
|
||
if isinstance(v_edit, QtWidgets.QLineEdit):
|
||
data[key] = _coerce(v_edit.text())
|
||
return data
|
||
|
||
def update_values_from_dict(self, data: Dict[str, Any]):
|
||
"""Subscriber 使用:根據 data 更新對應 Key 的 Value。"""
|
||
for k_edit, v_edit, _ in self.rows:
|
||
key = k_edit.text().strip()
|
||
if not key:
|
||
continue
|
||
if key in data and isinstance(v_edit, QtWidgets.QLineEdit):
|
||
v_edit.setText(str(data[key]))
|
||
|
||
|
||
# ---------- 主視窗 ----------
|
||
class MainWindow(QtWidgets.QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("MQTT Pub/Sub Split GUI (PyQt6)")
|
||
self.resize(1100, 720)
|
||
|
||
central = QtWidgets.QWidget(self)
|
||
self.setCentralWidget(central)
|
||
root = QtWidgets.QVBoxLayout(central)
|
||
root.setContentsMargins(12, 12, 12, 12)
|
||
root.setSpacing(10)
|
||
|
||
# ------- Connection (上方通用) -------
|
||
conn_box = QtWidgets.QGroupBox("Connection")
|
||
grid = QtWidgets.QGridLayout(conn_box)
|
||
grid.setHorizontalSpacing(12); grid.setVerticalSpacing(8)
|
||
|
||
self.ed_host = QtWidgets.QLineEdit(); self.ed_host.setPlaceholderText("Broker Host (e.g. 192.168.1.10)")
|
||
self.ed_port = QtWidgets.QLineEdit(); self.ed_port.setPlaceholderText("Port (e.g. 1883)")
|
||
self.ed_client_id = QtWidgets.QLineEdit(); self.ed_client_id.setPlaceholderText("Client ID (optional)")
|
||
self.btn_connect = QtWidgets.QPushButton("Connect")
|
||
self.btn_disconnect = QtWidgets.QPushButton("Disconnect")
|
||
self.lbl_status = QtWidgets.QLabel("Disconnected")
|
||
|
||
grid.addWidget(QtWidgets.QLabel("Host"), 0, 0); grid.addWidget(self.ed_host, 0, 1)
|
||
grid.addWidget(QtWidgets.QLabel("Port"), 0, 2); grid.addWidget(self.ed_port, 0, 3)
|
||
grid.addWidget(QtWidgets.QLabel("Client ID"), 1, 0); grid.addWidget(self.ed_client_id, 1, 1, 1, 3)
|
||
grid.addWidget(self.btn_connect, 2, 2); grid.addWidget(self.btn_disconnect, 2, 3)
|
||
grid.addWidget(QtWidgets.QLabel("Status"), 3, 0); grid.addWidget(self.lbl_status, 3, 1, 1, 3)
|
||
root.addWidget(conn_box)
|
||
|
||
# ------- 左右分欄 -------
|
||
splitter = QtWidgets.QSplitter(QtCore.Qt.Orientation.Horizontal)
|
||
|
||
# ---- 左:Subscriber ----
|
||
sub_panel = QtWidgets.QWidget()
|
||
sub_layout = QtWidgets.QVBoxLayout(sub_panel); sub_layout.setSpacing(8)
|
||
|
||
sub_box = QtWidgets.QGroupBox("Subscriber")
|
||
sgrid = QtWidgets.QGridLayout(sub_box); sgrid.setHorizontalSpacing(12); sgrid.setVerticalSpacing(8)
|
||
self.ed_sub_topic = QtWidgets.QLineEdit(); self.ed_sub_topic.setPlaceholderText("Topic to subscribe")
|
||
self.ed_sub_qos = QtWidgets.QLineEdit(); self.ed_sub_qos.setPlaceholderText("QoS (0, 1, or 2)")
|
||
self.chk_plc_parse = QtWidgets.QCheckBox("Try PLC-style parsing if JSON fails")
|
||
self.btn_sub = QtWidgets.QPushButton("Subscribe")
|
||
sgrid.addWidget(QtWidgets.QLabel("Topic"), 0, 0); sgrid.addWidget(self.ed_sub_topic, 0, 1, 1, 3)
|
||
sgrid.addWidget(QtWidgets.QLabel("QoS"), 1, 0); sgrid.addWidget(self.ed_sub_qos, 1, 1)
|
||
sgrid.addWidget(self.chk_plc_parse, 1, 2); sgrid.addWidget(self.btn_sub, 1, 3)
|
||
|
||
self.sub_table = KeyValueTable(editable_values=False)
|
||
btn_add_sub = QtWidgets.QPushButton("+ Add field")
|
||
btn_add_sub.clicked.connect(self.sub_table.add_row)
|
||
|
||
sub_layout.addWidget(sub_box)
|
||
sub_layout.addWidget(self.sub_table)
|
||
sub_layout.addWidget(btn_add_sub)
|
||
|
||
# ---- 右:Publisher ----
|
||
pub_panel = QtWidgets.QWidget()
|
||
pub_layout = QtWidgets.QVBoxLayout(pub_panel); pub_layout.setSpacing(8)
|
||
|
||
pub_box = QtWidgets.QGroupBox("Publisher")
|
||
pgrid = QtWidgets.QGridLayout(pub_box); pgrid.setHorizontalSpacing(12); pgrid.setVerticalSpacing(8)
|
||
self.ed_pub_topic = QtWidgets.QLineEdit(); self.ed_pub_topic.setPlaceholderText("Topic to publish")
|
||
self.ed_pub_qos = QtWidgets.QLineEdit(); self.ed_pub_qos.setPlaceholderText("QoS (0, 1, or 2)")
|
||
self.chk_retain = QtWidgets.QCheckBox("Retain")
|
||
self.btn_publish = QtWidgets.QPushButton("Publish")
|
||
pgrid.addWidget(QtWidgets.QLabel("Topic"), 0, 0); pgrid.addWidget(self.ed_pub_topic, 0, 1, 1, 3)
|
||
pgrid.addWidget(QtWidgets.QLabel("QoS"), 1, 0); pgrid.addWidget(self.ed_pub_qos, 1, 1)
|
||
pgrid.addWidget(self.chk_retain, 1, 2); pgrid.addWidget(self.btn_publish, 1, 3)
|
||
|
||
self.pub_table = KeyValueTable(editable_values=True)
|
||
btn_add_pub = QtWidgets.QPushButton("+ Add field")
|
||
btn_add_pub.clicked.connect(self.pub_table.add_row)
|
||
|
||
pub_layout.addWidget(pub_box)
|
||
pub_layout.addWidget(self.pub_table)
|
||
pub_layout.addWidget(btn_add_pub)
|
||
|
||
splitter.addWidget(sub_panel)
|
||
splitter.addWidget(pub_panel)
|
||
splitter.setSizes([550, 550])
|
||
root.addWidget(splitter, 1)
|
||
|
||
# ------- Logs -------
|
||
logs = QtWidgets.QGroupBox("Logs (Raw)")
|
||
v = QtWidgets.QVBoxLayout(logs)
|
||
self.txt_raw = QtWidgets.QPlainTextEdit(); self.txt_raw.setReadOnly(True); self.txt_raw.setMaximumBlockCount(2000)
|
||
v.addWidget(self.txt_raw)
|
||
root.addWidget(logs)
|
||
|
||
# 控制器與事件
|
||
self.mqtt = MqttController(self)
|
||
self.mqtt.sig_connected.connect(self._on_connected)
|
||
self.mqtt.sig_disconnected.connect(self._on_disconnected)
|
||
self.mqtt.sig_log.connect(self._log)
|
||
self.mqtt.sig_message_raw.connect(self._on_msg_raw)
|
||
|
||
self.btn_connect.clicked.connect(self._do_connect)
|
||
self.btn_disconnect.clicked.connect(self.mqtt.disconnect)
|
||
self.btn_sub.clicked.connect(self._do_subscribe)
|
||
self.btn_publish.clicked.connect(self._do_publish)
|
||
|
||
# 美化
|
||
self._apply_dark_fusion_palette()
|
||
|
||
# ---- UI handlers ----
|
||
def _do_connect(self):
|
||
host = self.ed_host.text().strip()
|
||
port_txt = self.ed_port.text().strip()
|
||
client_id = self.ed_client_id.text().strip()
|
||
|
||
if not host or not port_txt:
|
||
QtWidgets.QMessageBox.warning(self, "Warning", "Host 與 Port 皆不可為空")
|
||
return
|
||
try:
|
||
port = int(port_txt)
|
||
if not (1 <= port <= 65535): raise ValueError
|
||
except Exception:
|
||
QtWidgets.QMessageBox.warning(self, "Warning", "Port 必須是 1~65535 的整數")
|
||
return
|
||
|
||
self.lbl_status.setText("Connecting...")
|
||
self.mqtt.connect(host, port, client_id)
|
||
|
||
def _do_subscribe(self):
|
||
topic = self.ed_sub_topic.text().strip()
|
||
qos_txt = self.ed_sub_qos.text().strip()
|
||
if not topic or not qos_txt:
|
||
QtWidgets.QMessageBox.warning(self, "Warning", "Subscribe 的 Topic 與 QoS 不可為空")
|
||
return
|
||
try:
|
||
qos = int(qos_txt)
|
||
if qos not in (0, 1, 2): raise ValueError
|
||
except Exception:
|
||
QtWidgets.QMessageBox.warning(self, "Warning", "QoS 必須是 0、1 或 2")
|
||
return
|
||
self.mqtt.subscribe(topic, qos)
|
||
|
||
def _do_publish(self):
|
||
topic = self.ed_pub_topic.text().strip()
|
||
qos_txt = self.ed_pub_qos.text().strip()
|
||
if not topic or not qos_txt:
|
||
QtWidgets.QMessageBox.warning(self, "Warning", "Publish 的 Topic 與 QoS 不可為空")
|
||
return
|
||
try:
|
||
qos = int(qos_txt)
|
||
if qos not in (0, 1, 2): raise ValueError
|
||
except Exception:
|
||
QtWidgets.QMessageBox.warning(self, "Warning", "QoS 必須是 0、1 或 2")
|
||
return
|
||
retain = self.chk_retain.isChecked()
|
||
|
||
payload_dict = self.pub_table.as_dict()
|
||
if not payload_dict:
|
||
QtWidgets.QMessageBox.warning(self, "Warning", "至少輸入一個非空的 Key")
|
||
return
|
||
|
||
payload = json.dumps(payload_dict, ensure_ascii=False)
|
||
self.mqtt.publish(topic, payload, qos, retain)
|
||
|
||
# ---- MQTT slots ----
|
||
@QtCore.pyqtSlot()
|
||
def _on_connected(self):
|
||
self.lbl_status.setText("Connected")
|
||
|
||
@QtCore.pyqtSlot(int)
|
||
def _on_disconnected(self, reason_code: int):
|
||
self.lbl_status.setText(f"Disconnected ({reason_code})")
|
||
|
||
@QtCore.pyqtSlot(str, str)
|
||
def _on_msg_raw(self, topic: str, raw: str):
|
||
self._append(self.txt_raw, f"[{topic}] {raw}")
|
||
|
||
# 嘗試解析
|
||
parsed = None
|
||
try:
|
||
parsed = json.loads(raw)
|
||
except json.JSONDecodeError:
|
||
if self.chk_plc_parse.isChecked():
|
||
try:
|
||
parsed = robust_json_parse(raw)
|
||
except Exception as e:
|
||
self._append(self.txt_raw, f"❌ Parse error: {e}")
|
||
|
||
if isinstance(parsed, dict):
|
||
# 只更新左邊 Subscriber 表中對應的 key
|
||
self.sub_table.update_values_from_dict(parsed)
|
||
|
||
# ---- Utils ----
|
||
def _append(self, widget: QtWidgets.QPlainTextEdit, text: str):
|
||
widget.appendPlainText(text)
|
||
widget.verticalScrollBar().setValue(widget.verticalScrollBar().maximum())
|
||
|
||
def _log(self, text: str):
|
||
self._append(self.txt_raw, text)
|
||
|
||
def _apply_dark_fusion_palette(self):
|
||
app = QtWidgets.QApplication.instance()
|
||
app.setStyle("Fusion")
|
||
pal = QtGui.QPalette()
|
||
base = QtGui.QColor(30, 30, 30)
|
||
alt = QtGui.QColor(45, 45, 45)
|
||
text = QtGui.QColor(220, 220, 220)
|
||
btn = QtGui.QColor(53, 53, 53)
|
||
hl = QtGui.QColor(42, 130, 218)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Window, base)
|
||
pal.setColor(QtGui.QPalette.ColorRole.WindowText, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Base, QtGui.QColor(25, 25, 25))
|
||
pal.setColor(QtGui.QPalette.ColorRole.AlternateBase, alt)
|
||
pal.setColor(QtGui.QPalette.ColorRole.ToolTipBase, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.ToolTipText, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Text, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Button, btn)
|
||
pal.setColor(QtGui.QPalette.ColorRole.ButtonText, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Highlight, hl)
|
||
pal.setColor(QtGui.QPalette.ColorRole.HighlightedText, QtGui.QColor(255, 255, 255))
|
||
app.setPalette(pal)
|
||
|
||
|
||
def main():
|
||
import sys
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
# Windows 高 DPI(可忽略失敗)
|
||
try:
|
||
import ctypes
|
||
ctypes.windll.shcore.SetProcessDpiAwareness(1)
|
||
except Exception:
|
||
pass
|
||
w = MainWindow()
|
||
w.show()
|
||
sys.exit(app.exec())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|