diff --git a/MQTT_project/mqtt_pubsub_gui.py b/MQTT_project/mqtt_pubsub_gui.py new file mode 100644 index 0000000..cac4394 --- /dev/null +++ b/MQTT_project/mqtt_pubsub_gui.py @@ -0,0 +1,433 @@ +# mqtt_pubsub_split.py +# pip install PyQt6 paho-mqtt +import json +import re +from typing import Optional, Dict, Any, List, Tuple + +from PyQt6 import QtCore, QtGui, QtWidgets +from paho.mqtt import client as mqtt + + +# ---------- PLC 風格 Payload 的容錯解析(選用) ---------- +def robust_json_parse(raw: str) -> Dict[str, Any]: + s = raw.strip() + if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')): + s = s[1:-1].strip() + s = s.replace("TRUE", "true").replace("FALSE", "false") + s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s) + s = s.replace("'", '"') + return json.loads(s) + + +# ---------- MQTT 控制器 ---------- +class MqttController(QtCore.QObject): + sig_connected = QtCore.pyqtSignal() + sig_disconnected = QtCore.pyqtSignal(int) + sig_log = QtCore.pyqtSignal(str) + sig_message_raw = QtCore.pyqtSignal(str, str) # topic, raw + + def __init__(self, parent=None): + super().__init__(parent) + self.client: Optional[mqtt.Client] = None + + def connect(self, host: str, port: int, client_id: str = ""): + if self.client: + try: + self.client.loop_stop() + self.client.disconnect() + except Exception: + pass + self.client = None + + self.client = mqtt.Client(client_id=client_id or None, protocol=mqtt.MQTTv311) + self.client.reconnect_delay_set(min_delay=1, max_delay=10) + + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + self.client.on_message = self._on_message + + try: + self.sig_log.emit(f"🔌 Connecting to {host}:{port} ...") + self.client.connect(host, port, keepalive=60) + except Exception as e: + self.sig_log.emit(f"❌ Connect failed: {e}") + return + + self.client.loop_start() + + def disconnect(self): + if not self.client: + return + try: + self.client.loop_stop() + self.client.disconnect() + except Exception as e: + self.sig_log.emit(f"Disconnect error: {e}") + + def subscribe(self, topic: str, qos: int): + if not self.client: + self.sig_log.emit("⚠️ Not connected.") + return + try: + self.client.subscribe(topic, qos=qos) + self.sig_log.emit(f"📡 Subscribed: {topic} (QoS={qos})") + except Exception as e: + self.sig_log.emit(f"Subscribe error: {e}") + + def publish(self, topic: str, payload: str, qos: int, retain: bool): + if not self.client: + self.sig_log.emit("⚠️ Not connected.") + return + try: + self.client.publish(topic, payload=payload, qos=qos, retain=retain) + self.sig_log.emit(f"📤 Publish -> topic={topic}, qos={qos}, retain={retain}\n{payload}") + except Exception as e: + self.sig_log.emit(f"Publish error: {e}") + + # paho callbacks + def _on_connect(self, client, userdata, flags, reason_code, properties=None): + if reason_code == 0: + self.sig_log.emit("✅ Connected") + self.sig_connected.emit() + else: + self.sig_log.emit(f"❌ Connect failed (reason_code={reason_code})") + + def _on_disconnect(self, client, userdata, reason_code, properties=None): + self.sig_log.emit(f"🔌 Disconnected (reason_code={reason_code})") + self.sig_disconnected.emit(reason_code) + + def _on_message(self, client, userdata, msg: mqtt.MQTTMessage): + self.sig_message_raw.emit(msg.topic, msg.payload.decode("utf-8", errors="replace")) + + +# ---------- 共同小元件:動態 Key/Value 表 ---------- +class KeyValueTable(QtWidgets.QWidget): + """ + 動態 Key/Value 表。 + - editable_values = True → Value 可編輯(Publisher) + - editable_values = False → Value 只讀(Subscriber) + """ + def __init__(self, editable_values: bool, parent=None): + super().__init__(parent) + self.editable_values = editable_values + self.layout = QtWidgets.QGridLayout(self) + self.layout.setHorizontalSpacing(8) + self.layout.setVerticalSpacing(6) + self.layout.addWidget(QtWidgets.QLabel("Key"), 0, 0) + self.layout.addWidget(QtWidgets.QLabel("Value"), 0, 1) + self.rows: List[Tuple[QtWidgets.QLineEdit, QtWidgets.QWidget, QtWidgets.QPushButton]] = [] + + # 預先 3 列 + for _ in range(3): + self.add_row() + + def add_row(self): + row_index = len(self.rows) + 1 + key_edit = QtWidgets.QLineEdit() + key_edit.setPlaceholderText("key(例如:Move_Forward)") + + if self.editable_values: + val_widget: QtWidgets.QWidget = QtWidgets.QLineEdit() + val_widget.setPlaceholderText("value(例如:true / 1.2 / text)") + else: + val_widget = QtWidgets.QLineEdit() + val_widget.setReadOnly(True) + val_widget.setPlaceholderText("等待資料...") + + btn_del = QtWidgets.QPushButton("✕") + btn_del.setFixedWidth(28) + + def _remove(): + key_edit.deleteLater() + val_widget.deleteLater() + btn_del.deleteLater() + self.rows.remove((key_edit, val_widget, btn_del)) + self._reflow() + + btn_del.clicked.connect(_remove) + + self.rows.append((key_edit, val_widget, btn_del)) + self._reflow() + + def _reflow(self): + # 清除標題列以外的 widget 排版 + for i in reversed(range(1, self.layout.rowCount())): + for j in range(self.layout.columnCount()): + item = self.layout.itemAtPosition(i, j) + if item: + w = item.widget() + if w: + self.layout.removeWidget(w) + # 重新佈局 + for r, (k, v, b) in enumerate(self.rows, start=1): + self.layout.addWidget(k, r, 0) + self.layout.addWidget(v, r, 1) + self.layout.addWidget(b, r, 2) + + def as_dict(self) -> Dict[str, Any]: + """Publisher 使用:將 Key/Value 組為 dict(自動判斷型別)。""" + def _coerce(s: str): + t = s.strip() + if t.lower() == "true": return True + if t.lower() == "false": return False + try: + if "." in t or "e" in t.lower(): + return float(t) + return int(t) + except Exception: + return t + data = {} + for k_edit, v_edit, _ in self.rows: + key = k_edit.text().strip() + if not key: + continue + if isinstance(v_edit, QtWidgets.QLineEdit): + data[key] = _coerce(v_edit.text()) + return data + + def update_values_from_dict(self, data: Dict[str, Any]): + """Subscriber 使用:根據 data 更新對應 Key 的 Value。""" + for k_edit, v_edit, _ in self.rows: + key = k_edit.text().strip() + if not key: + continue + if key in data and isinstance(v_edit, QtWidgets.QLineEdit): + v_edit.setText(str(data[key])) + + +# ---------- 主視窗 ---------- +class MainWindow(QtWidgets.QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle("MQTT Pub/Sub Split GUI (PyQt6)") + self.resize(1100, 720) + + central = QtWidgets.QWidget(self) + self.setCentralWidget(central) + root = QtWidgets.QVBoxLayout(central) + root.setContentsMargins(12, 12, 12, 12) + root.setSpacing(10) + + # ------- Connection (上方通用) ------- + conn_box = QtWidgets.QGroupBox("Connection") + grid = QtWidgets.QGridLayout(conn_box) + grid.setHorizontalSpacing(12); grid.setVerticalSpacing(8) + + self.ed_host = QtWidgets.QLineEdit(); self.ed_host.setPlaceholderText("Broker Host (e.g. 192.168.1.10)") + self.ed_port = QtWidgets.QLineEdit(); self.ed_port.setPlaceholderText("Port (e.g. 1883)") + self.ed_client_id = QtWidgets.QLineEdit(); self.ed_client_id.setPlaceholderText("Client ID (optional)") + self.btn_connect = QtWidgets.QPushButton("Connect") + self.btn_disconnect = QtWidgets.QPushButton("Disconnect") + self.lbl_status = QtWidgets.QLabel("Disconnected") + + grid.addWidget(QtWidgets.QLabel("Host"), 0, 0); grid.addWidget(self.ed_host, 0, 1) + grid.addWidget(QtWidgets.QLabel("Port"), 0, 2); grid.addWidget(self.ed_port, 0, 3) + grid.addWidget(QtWidgets.QLabel("Client ID"), 1, 0); grid.addWidget(self.ed_client_id, 1, 1, 1, 3) + grid.addWidget(self.btn_connect, 2, 2); grid.addWidget(self.btn_disconnect, 2, 3) + grid.addWidget(QtWidgets.QLabel("Status"), 3, 0); grid.addWidget(self.lbl_status, 3, 1, 1, 3) + root.addWidget(conn_box) + + # ------- 左右分欄 ------- + splitter = QtWidgets.QSplitter(QtCore.Qt.Orientation.Horizontal) + + # ---- 左:Subscriber ---- + sub_panel = QtWidgets.QWidget() + sub_layout = QtWidgets.QVBoxLayout(sub_panel); sub_layout.setSpacing(8) + + sub_box = QtWidgets.QGroupBox("Subscriber") + sgrid = QtWidgets.QGridLayout(sub_box); sgrid.setHorizontalSpacing(12); sgrid.setVerticalSpacing(8) + self.ed_sub_topic = QtWidgets.QLineEdit(); self.ed_sub_topic.setPlaceholderText("Topic to subscribe") + self.ed_sub_qos = QtWidgets.QLineEdit(); self.ed_sub_qos.setPlaceholderText("QoS (0, 1, or 2)") + self.chk_plc_parse = QtWidgets.QCheckBox("Try PLC-style parsing if JSON fails") + self.btn_sub = QtWidgets.QPushButton("Subscribe") + sgrid.addWidget(QtWidgets.QLabel("Topic"), 0, 0); sgrid.addWidget(self.ed_sub_topic, 0, 1, 1, 3) + sgrid.addWidget(QtWidgets.QLabel("QoS"), 1, 0); sgrid.addWidget(self.ed_sub_qos, 1, 1) + sgrid.addWidget(self.chk_plc_parse, 1, 2); sgrid.addWidget(self.btn_sub, 1, 3) + + self.sub_table = KeyValueTable(editable_values=False) + btn_add_sub = QtWidgets.QPushButton("+ Add field") + btn_add_sub.clicked.connect(self.sub_table.add_row) + + sub_layout.addWidget(sub_box) + sub_layout.addWidget(self.sub_table) + sub_layout.addWidget(btn_add_sub) + + # ---- 右:Publisher ---- + pub_panel = QtWidgets.QWidget() + pub_layout = QtWidgets.QVBoxLayout(pub_panel); pub_layout.setSpacing(8) + + pub_box = QtWidgets.QGroupBox("Publisher") + pgrid = QtWidgets.QGridLayout(pub_box); pgrid.setHorizontalSpacing(12); pgrid.setVerticalSpacing(8) + self.ed_pub_topic = QtWidgets.QLineEdit(); self.ed_pub_topic.setPlaceholderText("Topic to publish") + self.ed_pub_qos = QtWidgets.QLineEdit(); self.ed_pub_qos.setPlaceholderText("QoS (0, 1, or 2)") + self.chk_retain = QtWidgets.QCheckBox("Retain") + self.btn_publish = QtWidgets.QPushButton("Publish") + pgrid.addWidget(QtWidgets.QLabel("Topic"), 0, 0); pgrid.addWidget(self.ed_pub_topic, 0, 1, 1, 3) + pgrid.addWidget(QtWidgets.QLabel("QoS"), 1, 0); pgrid.addWidget(self.ed_pub_qos, 1, 1) + pgrid.addWidget(self.chk_retain, 1, 2); pgrid.addWidget(self.btn_publish, 1, 3) + + self.pub_table = KeyValueTable(editable_values=True) + btn_add_pub = QtWidgets.QPushButton("+ Add field") + btn_add_pub.clicked.connect(self.pub_table.add_row) + + pub_layout.addWidget(pub_box) + pub_layout.addWidget(self.pub_table) + pub_layout.addWidget(btn_add_pub) + + splitter.addWidget(sub_panel) + splitter.addWidget(pub_panel) + splitter.setSizes([550, 550]) + root.addWidget(splitter, 1) + + # ------- Logs ------- + logs = QtWidgets.QGroupBox("Logs (Raw)") + v = QtWidgets.QVBoxLayout(logs) + self.txt_raw = QtWidgets.QPlainTextEdit(); self.txt_raw.setReadOnly(True); self.txt_raw.setMaximumBlockCount(2000) + v.addWidget(self.txt_raw) + root.addWidget(logs) + + # 控制器與事件 + self.mqtt = MqttController(self) + self.mqtt.sig_connected.connect(self._on_connected) + self.mqtt.sig_disconnected.connect(self._on_disconnected) + self.mqtt.sig_log.connect(self._log) + self.mqtt.sig_message_raw.connect(self._on_msg_raw) + + self.btn_connect.clicked.connect(self._do_connect) + self.btn_disconnect.clicked.connect(self.mqtt.disconnect) + self.btn_sub.clicked.connect(self._do_subscribe) + self.btn_publish.clicked.connect(self._do_publish) + + # 美化 + self._apply_dark_fusion_palette() + + # ---- UI handlers ---- + def _do_connect(self): + host = self.ed_host.text().strip() + port_txt = self.ed_port.text().strip() + client_id = self.ed_client_id.text().strip() + + if not host or not port_txt: + QtWidgets.QMessageBox.warning(self, "Warning", "Host 與 Port 皆不可為空") + return + try: + port = int(port_txt) + if not (1 <= port <= 65535): raise ValueError + except Exception: + QtWidgets.QMessageBox.warning(self, "Warning", "Port 必須是 1~65535 的整數") + return + + self.lbl_status.setText("Connecting...") + self.mqtt.connect(host, port, client_id) + + def _do_subscribe(self): + topic = self.ed_sub_topic.text().strip() + qos_txt = self.ed_sub_qos.text().strip() + if not topic or not qos_txt: + QtWidgets.QMessageBox.warning(self, "Warning", "Subscribe 的 Topic 與 QoS 不可為空") + return + try: + qos = int(qos_txt) + if qos not in (0, 1, 2): raise ValueError + except Exception: + QtWidgets.QMessageBox.warning(self, "Warning", "QoS 必須是 0、1 或 2") + return + self.mqtt.subscribe(topic, qos) + + def _do_publish(self): + topic = self.ed_pub_topic.text().strip() + qos_txt = self.ed_pub_qos.text().strip() + if not topic or not qos_txt: + QtWidgets.QMessageBox.warning(self, "Warning", "Publish 的 Topic 與 QoS 不可為空") + return + try: + qos = int(qos_txt) + if qos not in (0, 1, 2): raise ValueError + except Exception: + QtWidgets.QMessageBox.warning(self, "Warning", "QoS 必須是 0、1 或 2") + return + retain = self.chk_retain.isChecked() + + payload_dict = self.pub_table.as_dict() + if not payload_dict: + QtWidgets.QMessageBox.warning(self, "Warning", "至少輸入一個非空的 Key") + return + + payload = json.dumps(payload_dict, ensure_ascii=False) + self.mqtt.publish(topic, payload, qos, retain) + + # ---- MQTT slots ---- + @QtCore.pyqtSlot() + def _on_connected(self): + self.lbl_status.setText("Connected") + + @QtCore.pyqtSlot(int) + def _on_disconnected(self, reason_code: int): + self.lbl_status.setText(f"Disconnected ({reason_code})") + + @QtCore.pyqtSlot(str, str) + def _on_msg_raw(self, topic: str, raw: str): + self._append(self.txt_raw, f"[{topic}] {raw}") + + # 嘗試解析 + parsed = None + try: + parsed = json.loads(raw) + except json.JSONDecodeError: + if self.chk_plc_parse.isChecked(): + try: + parsed = robust_json_parse(raw) + except Exception as e: + self._append(self.txt_raw, f"❌ Parse error: {e}") + + if isinstance(parsed, dict): + # 只更新左邊 Subscriber 表中對應的 key + self.sub_table.update_values_from_dict(parsed) + + # ---- Utils ---- + def _append(self, widget: QtWidgets.QPlainTextEdit, text: str): + widget.appendPlainText(text) + widget.verticalScrollBar().setValue(widget.verticalScrollBar().maximum()) + + def _log(self, text: str): + self._append(self.txt_raw, text) + + def _apply_dark_fusion_palette(self): + app = QtWidgets.QApplication.instance() + app.setStyle("Fusion") + pal = QtGui.QPalette() + base = QtGui.QColor(30, 30, 30) + alt = QtGui.QColor(45, 45, 45) + text = QtGui.QColor(220, 220, 220) + btn = QtGui.QColor(53, 53, 53) + hl = QtGui.QColor(42, 130, 218) + pal.setColor(QtGui.QPalette.ColorRole.Window, base) + pal.setColor(QtGui.QPalette.ColorRole.WindowText, text) + pal.setColor(QtGui.QPalette.ColorRole.Base, QtGui.QColor(25, 25, 25)) + pal.setColor(QtGui.QPalette.ColorRole.AlternateBase, alt) + pal.setColor(QtGui.QPalette.ColorRole.ToolTipBase, text) + pal.setColor(QtGui.QPalette.ColorRole.ToolTipText, text) + pal.setColor(QtGui.QPalette.ColorRole.Text, text) + pal.setColor(QtGui.QPalette.ColorRole.Button, btn) + pal.setColor(QtGui.QPalette.ColorRole.ButtonText, text) + pal.setColor(QtGui.QPalette.ColorRole.Highlight, hl) + pal.setColor(QtGui.QPalette.ColorRole.HighlightedText, QtGui.QColor(255, 255, 255)) + app.setPalette(pal) + + +def main(): + import sys + app = QtWidgets.QApplication(sys.argv) + # Windows 高 DPI(可忽略失敗) + try: + import ctypes + ctypes.windll.shcore.SetProcessDpiAwareness(1) + except Exception: + pass + w = MainWindow() + w.show() + sys.exit(app.exec()) + + +if __name__ == "__main__": + main()