MQTT GUI SUB
This commit is contained in:
parent
38f3d606d0
commit
fbde79a57b
|
@ -0,0 +1,228 @@
|
||||||
|
# mqtt_gui.py
|
||||||
|
# pip install paho-mqtt
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import tkinter as tk
|
||||||
|
from tkinter import ttk, messagebox
|
||||||
|
from paho.mqtt import client as mqtt
|
||||||
|
|
||||||
|
DEFAULT_HOST = "169.254.11.130"
|
||||||
|
DEFAULT_PORT = 1886
|
||||||
|
DEFAULT_TOPIC = "topic_plc_and_py_for_AXIS"
|
||||||
|
DEFAULT_QOS = 1
|
||||||
|
|
||||||
|
def robust_json_parse(raw: str):
|
||||||
|
"""
|
||||||
|
將 PLC 常見不標準 JSON 修正後解析成 dict。
|
||||||
|
例:'{Forward_RPM:1500,UPDown_RPM:800}' -> {'Forward_RPM':1500,'UPDown_RPM':800}
|
||||||
|
也會把 TRUE/FALSE -> true/false;補 key 雙引號;去外層引號。
|
||||||
|
"""
|
||||||
|
s = raw.strip()
|
||||||
|
|
||||||
|
# 去掉最外層引號(若有)
|
||||||
|
if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):
|
||||||
|
s = s[1:-1].strip()
|
||||||
|
|
||||||
|
# PLC 常見布林大寫 -> JSON 規範小寫
|
||||||
|
s = s.replace("TRUE", "true").replace("FALSE", "false")
|
||||||
|
|
||||||
|
# 自動補 key 的雙引號:{key: -> {"key":
|
||||||
|
s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s)
|
||||||
|
|
||||||
|
# 將單引號字串轉雙引號(在補 key 之後)
|
||||||
|
s = s.replace("'", '"')
|
||||||
|
|
||||||
|
return json.loads(s)
|
||||||
|
|
||||||
|
class MQTTGui:
|
||||||
|
def __init__(self, root: tk.Tk):
|
||||||
|
self.root = root
|
||||||
|
self.root.title("MQTT PLC Subscriber")
|
||||||
|
|
||||||
|
# 狀態
|
||||||
|
self.client: mqtt.Client | None = None
|
||||||
|
self.connected = False
|
||||||
|
|
||||||
|
# UI 變數
|
||||||
|
self.var_host = tk.StringVar(value=DEFAULT_HOST)
|
||||||
|
self.var_port = tk.IntVar(value=DEFAULT_PORT)
|
||||||
|
self.var_topic = tk.StringVar(value=DEFAULT_TOPIC)
|
||||||
|
self.var_qos = tk.IntVar(value=DEFAULT_QOS)
|
||||||
|
|
||||||
|
self.var_button_y = tk.StringVar(value="None")
|
||||||
|
self.var_forward_rpm = tk.StringVar(value="None")
|
||||||
|
self.var_updown_rpm = tk.StringVar(value="None")
|
||||||
|
self.var_status = tk.StringVar(value="Disconnected")
|
||||||
|
|
||||||
|
self._build_ui()
|
||||||
|
|
||||||
|
# 關閉事件
|
||||||
|
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
|
||||||
|
|
||||||
|
def _build_ui(self):
|
||||||
|
pad = {"padx": 8, "pady": 6}
|
||||||
|
|
||||||
|
frm_top = ttk.LabelFrame(self.root, text="Connection")
|
||||||
|
frm_top.grid(row=0, column=0, sticky="nsew", **pad)
|
||||||
|
|
||||||
|
ttk.Label(frm_top, text="Broker Host").grid(row=0, column=0, sticky="e")
|
||||||
|
ttk.Entry(frm_top, textvariable=self.var_host, width=18).grid(row=0, column=1, sticky="w")
|
||||||
|
|
||||||
|
ttk.Label(frm_top, text="Port").grid(row=0, column=2, sticky="e")
|
||||||
|
ttk.Spinbox(frm_top, from_=1, to=65535, textvariable=self.var_port, width=8).grid(row=0, column=3, sticky="w")
|
||||||
|
|
||||||
|
ttk.Label(frm_top, text="Topic").grid(row=1, column=0, sticky="e")
|
||||||
|
ttk.Entry(frm_top, textvariable=self.var_topic, width=40).grid(row=1, column=1, columnspan=3, sticky="w")
|
||||||
|
|
||||||
|
ttk.Label(frm_top, text="QoS").grid(row=2, column=0, sticky="e")
|
||||||
|
ttk.Spinbox(frm_top, values=(0,1,2), textvariable=self.var_qos, width=8).grid(row=2, column=1, sticky="w")
|
||||||
|
|
||||||
|
btn_conn = ttk.Button(frm_top, text="Connect", command=self.connect)
|
||||||
|
btn_dis = ttk.Button(frm_top, text="Disconnect", command=self.disconnect)
|
||||||
|
btn_conn.grid(row=2, column=2, sticky="we")
|
||||||
|
btn_dis.grid(row=2, column=3, sticky="we")
|
||||||
|
|
||||||
|
ttk.Label(frm_top, text="Status:").grid(row=3, column=0, sticky="e")
|
||||||
|
ttk.Label(frm_top, textvariable=self.var_status).grid(row=3, column=1, columnspan=3, sticky="w")
|
||||||
|
|
||||||
|
frm_vals = ttk.LabelFrame(self.root, text="Current State")
|
||||||
|
frm_vals.grid(row=1, column=0, sticky="nsew", **pad)
|
||||||
|
|
||||||
|
ttk.Label(frm_vals, text="BUTTON_Y").grid(row=0, column=0, sticky="e")
|
||||||
|
ttk.Label(frm_vals, textvariable=self.var_button_y, width=20).grid(row=0, column=1, sticky="w")
|
||||||
|
|
||||||
|
ttk.Label(frm_vals, text="Forward_RPM").grid(row=1, column=0, sticky="e")
|
||||||
|
ttk.Label(frm_vals, textvariable=self.var_forward_rpm, width=20).grid(row=1, column=1, sticky="w")
|
||||||
|
|
||||||
|
ttk.Label(frm_vals, text="UPDown_RPM").grid(row=2, column=0, sticky="e")
|
||||||
|
ttk.Label(frm_vals, textvariable=self.var_updown_rpm, width=20).grid(row=2, column=1, sticky="w")
|
||||||
|
|
||||||
|
frm_log = ttk.LabelFrame(self.root, text="Logs")
|
||||||
|
frm_log.grid(row=2, column=0, sticky="nsew", **pad)
|
||||||
|
frm_log.rowconfigure(0, weight=1)
|
||||||
|
frm_log.columnconfigure(0, weight=1)
|
||||||
|
|
||||||
|
self.txt_log = tk.Text(frm_log, height=14)
|
||||||
|
self.txt_log.grid(row=0, column=0, sticky="nsew")
|
||||||
|
scr = ttk.Scrollbar(frm_log, command=self.txt_log.yview)
|
||||||
|
scr.grid(row=0, column=1, sticky="ns")
|
||||||
|
self.txt_log.configure(yscrollcommand=scr.set)
|
||||||
|
|
||||||
|
# 讓整體可縮放
|
||||||
|
self.root.rowconfigure(2, weight=1)
|
||||||
|
self.root.columnconfigure(0, weight=1)
|
||||||
|
|
||||||
|
# ---------- MQTT ----------
|
||||||
|
def connect(self):
|
||||||
|
if self.connected:
|
||||||
|
self._log("Already connected.")
|
||||||
|
return
|
||||||
|
|
||||||
|
host = self.var_host.get().strip()
|
||||||
|
port = int(self.var_port.get())
|
||||||
|
topic = self.var_topic.get().strip()
|
||||||
|
qos = int(self.var_qos.get())
|
||||||
|
|
||||||
|
if not host or not topic:
|
||||||
|
messagebox.showwarning("Warning", "Host 與 Topic 不可為空")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 建立 client
|
||||||
|
self.client = mqtt.Client(client_id="gui_sub_plc", protocol=mqtt.MQTTv311)
|
||||||
|
|
||||||
|
# 自動重連延遲
|
||||||
|
self.client.reconnect_delay_set(min_delay=1, max_delay=10)
|
||||||
|
|
||||||
|
# Callbacks
|
||||||
|
self.client.on_connect = self._on_connect
|
||||||
|
self.client.on_message = self._on_message
|
||||||
|
self.client.on_disconnect = self._on_disconnect
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.client.connect(host, port, keepalive=60)
|
||||||
|
except Exception as e:
|
||||||
|
self._log(f"❌ Connect failed: {e}")
|
||||||
|
self.var_status.set("Connect failed")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 非阻塞背景 loop
|
||||||
|
self.client.loop_start()
|
||||||
|
self.var_status.set("Connecting...")
|
||||||
|
self._log(f"🔌 Connecting to {host}:{port}, topic={topic}, qos={qos}")
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
if self.client is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.client.loop_stop()
|
||||||
|
self.client.disconnect()
|
||||||
|
except Exception as e:
|
||||||
|
self._log(f"Disconnect error: {e}")
|
||||||
|
|
||||||
|
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
|
||||||
|
if reason_code == 0:
|
||||||
|
self.connected = True
|
||||||
|
self.var_status.set("Connected")
|
||||||
|
topic = self.var_topic.get().strip()
|
||||||
|
qos = int(self.var_qos.get())
|
||||||
|
client.subscribe(topic, qos=qos)
|
||||||
|
self._log(f"✅ Connected. Subscribed: {topic} (QoS={qos})")
|
||||||
|
else:
|
||||||
|
self.connected = False
|
||||||
|
self.var_status.set(f"Connect failed ({reason_code})")
|
||||||
|
self._log(f"❌ Connect failed. reason_code={reason_code}")
|
||||||
|
|
||||||
|
def _on_disconnect(self, client, userdata, reason_code, properties=None):
|
||||||
|
self.connected = False
|
||||||
|
self.var_status.set(f"Disconnected ({reason_code})")
|
||||||
|
self._log(f"🔌 Disconnected. reason_code={reason_code}")
|
||||||
|
|
||||||
|
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
|
||||||
|
payload_raw = msg.payload.decode("utf-8", errors="replace")
|
||||||
|
self._log(f"📥 Raw: {payload_raw}")
|
||||||
|
|
||||||
|
# 先試標準 JSON
|
||||||
|
try:
|
||||||
|
data = json.loads(payload_raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
# 失敗再容錯解析
|
||||||
|
try:
|
||||||
|
data = robust_json_parse(payload_raw)
|
||||||
|
except Exception as e:
|
||||||
|
self._log(f"❌ Parse error: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._log(f"✅ Parsed: {data}")
|
||||||
|
|
||||||
|
# 更新顯示(有傳才改)
|
||||||
|
if "BUTTON_Y" in data:
|
||||||
|
self.var_button_y.set(str(data["BUTTON_Y"]))
|
||||||
|
if "Forward_RPM" in data:
|
||||||
|
self.var_forward_rpm.set(str(data["Forward_RPM"]))
|
||||||
|
if "UPDown_RPM" in data:
|
||||||
|
self.var_updown_rpm.set(str(data["UPDown_RPM"]))
|
||||||
|
|
||||||
|
# ---------- Utils ----------
|
||||||
|
def _log(self, text: str):
|
||||||
|
self.txt_log.insert("end", text + "\n")
|
||||||
|
self.txt_log.see("end")
|
||||||
|
|
||||||
|
def on_close(self):
|
||||||
|
try:
|
||||||
|
if self.client:
|
||||||
|
self.client.loop_stop()
|
||||||
|
self.client.disconnect()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self.root.destroy()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
root = tk.Tk()
|
||||||
|
# Windows 讓標準字體稍微大一點
|
||||||
|
try:
|
||||||
|
from ctypes import windll
|
||||||
|
windll.shcore.SetProcessDpiAwareness(1) # 可忽略失敗
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
MQTTGui(root)
|
||||||
|
root.mainloop()
|
|
@ -0,0 +1,336 @@
|
||||||
|
# mqtt_gui_qt.py
|
||||||
|
# pip install PyQt6 paho-mqtt
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
from typing import Optional, Dict, Any
|
||||||
|
|
||||||
|
from PyQt6 import QtCore, QtGui, QtWidgets
|
||||||
|
from paho.mqtt import client as mqtt
|
||||||
|
|
||||||
|
DEFAULT_HOST = "169.254.11.130"
|
||||||
|
DEFAULT_PORT = 1886
|
||||||
|
DEFAULT_TOPIC = "topic_plc_and_py_for_AXIS"
|
||||||
|
DEFAULT_QOS = 1
|
||||||
|
|
||||||
|
|
||||||
|
# -------- Robust JSON fixer for PLC-style payloads --------
|
||||||
|
def robust_json_parse(raw: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
將 PLC 常見不標準 JSON 修正後解析成 dict。
|
||||||
|
例:'{Forward_RPM:1500,UPDown_RPM:800}' -> {'Forward_RPM':1500,'UPDown_RPM':800}
|
||||||
|
也會把 TRUE/FALSE -> true/false;補 key 雙引號;去外層引號;單引號轉雙引號。
|
||||||
|
"""
|
||||||
|
s = raw.strip()
|
||||||
|
|
||||||
|
# 去掉最外層引號(若有)
|
||||||
|
if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):
|
||||||
|
s = s[1:-1].strip()
|
||||||
|
|
||||||
|
# PLC 常見布林大寫 -> JSON 規範小寫
|
||||||
|
s = s.replace("TRUE", "true").replace("FALSE", "false")
|
||||||
|
|
||||||
|
# 自動補 key 的雙引號:{key: -> {"key":
|
||||||
|
s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s)
|
||||||
|
|
||||||
|
# 可能存在單引號包字串:統一轉成雙引號
|
||||||
|
s = s.replace("'", '"')
|
||||||
|
|
||||||
|
return json.loads(s)
|
||||||
|
|
||||||
|
|
||||||
|
# -------- MQTT client wrapper with Qt signals --------
|
||||||
|
class MqttClientWrapper(QtCore.QObject):
|
||||||
|
sig_connected = QtCore.pyqtSignal()
|
||||||
|
sig_disconnected = QtCore.pyqtSignal(int)
|
||||||
|
sig_raw_message = QtCore.pyqtSignal(str)
|
||||||
|
sig_parsed_message = QtCore.pyqtSignal(dict)
|
||||||
|
sig_log = QtCore.pyqtSignal(str)
|
||||||
|
|
||||||
|
def __init__(self, parent=None):
|
||||||
|
super().__init__(parent)
|
||||||
|
self.client: Optional[mqtt.Client] = None
|
||||||
|
self.topic: str = ""
|
||||||
|
self.qos: int = 0
|
||||||
|
|
||||||
|
def connect_and_start(self, host: str, port: int, topic: str, qos: int):
|
||||||
|
# 若已存在先清理
|
||||||
|
if self.client is not None:
|
||||||
|
try:
|
||||||
|
self.client.loop_stop()
|
||||||
|
self.client.disconnect()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self.client = None
|
||||||
|
|
||||||
|
self.topic = topic
|
||||||
|
self.qos = qos
|
||||||
|
|
||||||
|
# 建立 client
|
||||||
|
self.client = mqtt.Client(client_id="qt_gui_sub_plc", protocol=mqtt.MQTTv311)
|
||||||
|
# 自動重連延遲
|
||||||
|
self.client.reconnect_delay_set(min_delay=1, max_delay=10)
|
||||||
|
|
||||||
|
# 綁定回呼
|
||||||
|
self.client.on_connect = self._on_connect
|
||||||
|
self.client.on_message = self._on_message
|
||||||
|
self.client.on_disconnect = self._on_disconnect
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.client.connect(host, port, keepalive=60)
|
||||||
|
except Exception as e:
|
||||||
|
self.sig_log.emit(f"❌ Connect failed: {e}")
|
||||||
|
# 不要 loop_start,以免空轉
|
||||||
|
return
|
||||||
|
|
||||||
|
# 背景執行,不阻塞 UI
|
||||||
|
self.client.loop_start()
|
||||||
|
self.sig_log.emit(f"🔌 Connecting to {host}:{port} | topic={topic} | qos={qos}")
|
||||||
|
|
||||||
|
def disconnect_and_stop(self):
|
||||||
|
if self.client is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.client.loop_stop()
|
||||||
|
self.client.disconnect()
|
||||||
|
except Exception as e:
|
||||||
|
self.sig_log.emit(f"Disconnect error: {e}")
|
||||||
|
|
||||||
|
# ---- Callbacks (paho-mqtt 2.x signature) ----
|
||||||
|
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
|
||||||
|
if reason_code == 0:
|
||||||
|
# 訂閱
|
||||||
|
try:
|
||||||
|
client.subscribe(self.topic, qos=self.qos)
|
||||||
|
self.sig_log.emit(f"✅ Connected. Subscribed: {self.topic} (QoS={self.qos})")
|
||||||
|
except Exception as e:
|
||||||
|
self.sig_log.emit(f"Subscribe error: {e}")
|
||||||
|
self.sig_connected.emit()
|
||||||
|
else:
|
||||||
|
self.sig_log.emit(f"❌ Connect failed. reason_code={reason_code}")
|
||||||
|
|
||||||
|
def _on_disconnect(self, client, userdata, reason_code, properties=None):
|
||||||
|
self.sig_disconnected.emit(reason_code)
|
||||||
|
self.sig_log.emit(f"🔌 Disconnected. reason_code={reason_code}")
|
||||||
|
|
||||||
|
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
|
||||||
|
payload_raw = msg.payload.decode("utf-8", errors="replace")
|
||||||
|
self.sig_raw_message.emit(payload_raw)
|
||||||
|
|
||||||
|
# 先試標準 JSON,失敗再容錯
|
||||||
|
try:
|
||||||
|
data = json.loads(payload_raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
try:
|
||||||
|
data = robust_json_parse(payload_raw)
|
||||||
|
except Exception as e:
|
||||||
|
self.sig_log.emit(f"❌ Parse error: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.sig_parsed_message.emit(data)
|
||||||
|
|
||||||
|
|
||||||
|
# -------- Main Window --------
|
||||||
|
class MainWindow(QtWidgets.QMainWindow):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
self.setWindowTitle("MQTT PLC Subscriber (PyQt6)")
|
||||||
|
self.resize(900, 620)
|
||||||
|
|
||||||
|
# 狀態
|
||||||
|
self.state: Dict[str, Any] = {
|
||||||
|
"BUTTON_Y": None,
|
||||||
|
"Forward_RPM": None,
|
||||||
|
"UPDown_RPM": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# 中央 widget + 版面
|
||||||
|
central = QtWidgets.QWidget(self)
|
||||||
|
self.setCentralWidget(central)
|
||||||
|
root_layout = QtWidgets.QVBoxLayout(central)
|
||||||
|
root_layout.setContentsMargins(12, 12, 12, 12)
|
||||||
|
root_layout.setSpacing(10)
|
||||||
|
|
||||||
|
# ---- Connection Panel ----
|
||||||
|
panel = QtWidgets.QGroupBox("Connection")
|
||||||
|
grid = QtWidgets.QGridLayout(panel)
|
||||||
|
grid.setHorizontalSpacing(12)
|
||||||
|
grid.setVerticalSpacing(8)
|
||||||
|
|
||||||
|
self.ed_host = QtWidgets.QLineEdit(DEFAULT_HOST)
|
||||||
|
self.ed_port = QtWidgets.QSpinBox()
|
||||||
|
self.ed_port.setRange(1, 65535)
|
||||||
|
self.ed_port.setValue(DEFAULT_PORT)
|
||||||
|
|
||||||
|
self.ed_topic = QtWidgets.QLineEdit(DEFAULT_TOPIC)
|
||||||
|
|
||||||
|
self.ed_qos = QtWidgets.QComboBox()
|
||||||
|
self.ed_qos.addItems(["0", "1", "2"])
|
||||||
|
self.ed_qos.setCurrentIndex(DEFAULT_QOS)
|
||||||
|
|
||||||
|
self.btn_connect = QtWidgets.QPushButton("Connect")
|
||||||
|
self.btn_disconnect = QtWidgets.QPushButton("Disconnect")
|
||||||
|
self.lbl_status = QtWidgets.QLabel("Disconnected")
|
||||||
|
|
||||||
|
grid.addWidget(QtWidgets.QLabel("Broker Host"), 0, 0)
|
||||||
|
grid.addWidget(self.ed_host, 0, 1)
|
||||||
|
grid.addWidget(QtWidgets.QLabel("Port"), 0, 2)
|
||||||
|
grid.addWidget(self.ed_port, 0, 3)
|
||||||
|
|
||||||
|
grid.addWidget(QtWidgets.QLabel("Topic"), 1, 0)
|
||||||
|
grid.addWidget(self.ed_topic, 1, 1, 1, 3)
|
||||||
|
|
||||||
|
grid.addWidget(QtWidgets.QLabel("QoS"), 2, 0)
|
||||||
|
grid.addWidget(self.ed_qos, 2, 1)
|
||||||
|
grid.addWidget(self.btn_connect, 2, 2)
|
||||||
|
grid.addWidget(self.btn_disconnect, 2, 3)
|
||||||
|
|
||||||
|
grid.addWidget(QtWidgets.QLabel("Status"), 3, 0)
|
||||||
|
grid.addWidget(self.lbl_status, 3, 1, 1, 3)
|
||||||
|
|
||||||
|
root_layout.addWidget(panel)
|
||||||
|
|
||||||
|
# ---- State Panel ----
|
||||||
|
state_panel = QtWidgets.QGroupBox("Current State")
|
||||||
|
sgrid = QtWidgets.QGridLayout(state_panel)
|
||||||
|
sgrid.setHorizontalSpacing(20)
|
||||||
|
sgrid.setVerticalSpacing(8)
|
||||||
|
|
||||||
|
self.lbl_btn_y = QtWidgets.QLabel("None")
|
||||||
|
self.lbl_forward = QtWidgets.QLabel("None")
|
||||||
|
self.lbl_updown = QtWidgets.QLabel("None")
|
||||||
|
|
||||||
|
for lab in (self.lbl_btn_y, self.lbl_forward, self.lbl_updown):
|
||||||
|
lab.setFrameShape(QtWidgets.QFrame.Shape.StyledPanel)
|
||||||
|
lab.setMinimumWidth(140)
|
||||||
|
lab.setAlignment(QtCore.Qt.AlignmentFlag.AlignLeft | QtCore.Qt.AlignmentFlag.AlignVCenter)
|
||||||
|
|
||||||
|
sgrid.addWidget(QtWidgets.QLabel("BUTTON_Y"), 0, 0)
|
||||||
|
sgrid.addWidget(self.lbl_btn_y, 0, 1)
|
||||||
|
sgrid.addWidget(QtWidgets.QLabel("Forward_RPM"), 1, 0)
|
||||||
|
sgrid.addWidget(self.lbl_forward, 1, 1)
|
||||||
|
sgrid.addWidget(QtWidgets.QLabel("UPDown_RPM"), 2, 0)
|
||||||
|
sgrid.addWidget(self.lbl_updown, 2, 1)
|
||||||
|
|
||||||
|
root_layout.addWidget(state_panel)
|
||||||
|
|
||||||
|
# ---- Logs ----
|
||||||
|
log_panel = QtWidgets.QGroupBox("Logs")
|
||||||
|
vbox = QtWidgets.QVBoxLayout(log_panel)
|
||||||
|
self.txt_log = QtWidgets.QPlainTextEdit()
|
||||||
|
self.txt_log.setReadOnly(True)
|
||||||
|
self.txt_log.setMaximumBlockCount(2000)
|
||||||
|
vbox.addWidget(self.txt_log)
|
||||||
|
root_layout.addWidget(log_panel, 1)
|
||||||
|
|
||||||
|
# MQTT wrapper
|
||||||
|
self.mqtt = MqttClientWrapper(self)
|
||||||
|
self.mqtt.sig_connected.connect(self._on_connected)
|
||||||
|
self.mqtt.sig_disconnected.connect(self._on_disconnected)
|
||||||
|
self.mqtt.sig_raw_message.connect(self._on_raw_message)
|
||||||
|
self.mqtt.sig_parsed_message.connect(self._on_parsed_message)
|
||||||
|
self.mqtt.sig_log.connect(self._log)
|
||||||
|
|
||||||
|
# actions
|
||||||
|
self.btn_connect.clicked.connect(self._do_connect)
|
||||||
|
self.btn_disconnect.clicked.connect(self._do_disconnect)
|
||||||
|
|
||||||
|
# 美化
|
||||||
|
self._apply_dark_fusion_palette()
|
||||||
|
|
||||||
|
# ---- UI Actions ----
|
||||||
|
def _do_connect(self):
|
||||||
|
host = self.ed_host.text().strip()
|
||||||
|
port = int(self.ed_port.value())
|
||||||
|
topic = self.ed_topic.text().strip()
|
||||||
|
qos = int(self.ed_qos.currentText())
|
||||||
|
|
||||||
|
if not host or not topic:
|
||||||
|
QtWidgets.QMessageBox.warning(self, "Warning", "Host 與 Topic 不可為空")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.lbl_status.setText("Connecting...")
|
||||||
|
self._log(f"🔌 Connecting to {host}:{port}, topic={topic}, qos={qos}")
|
||||||
|
self.mqtt.connect_and_start(host, port, topic, qos)
|
||||||
|
|
||||||
|
def _do_disconnect(self):
|
||||||
|
self.mqtt.disconnect_and_stop()
|
||||||
|
|
||||||
|
# ---- Signal slots from MQTT wrapper ----
|
||||||
|
@QtCore.pyqtSlot()
|
||||||
|
def _on_connected(self):
|
||||||
|
self.lbl_status.setText("Connected")
|
||||||
|
|
||||||
|
@QtCore.pyqtSlot(int)
|
||||||
|
def _on_disconnected(self, reason_code: int):
|
||||||
|
self.lbl_status.setText(f"Disconnected ({reason_code})")
|
||||||
|
|
||||||
|
@QtCore.pyqtSlot(str)
|
||||||
|
def _on_raw_message(self, text: str):
|
||||||
|
self._log(f"📥 Raw: {text}")
|
||||||
|
|
||||||
|
@QtCore.pyqtSlot(dict)
|
||||||
|
def _on_parsed_message(self, data: dict):
|
||||||
|
self._log(f"✅ Parsed: {data}")
|
||||||
|
|
||||||
|
if "BUTTON_Y" in data:
|
||||||
|
self.state["BUTTON_Y"] = data["BUTTON_Y"]
|
||||||
|
self.lbl_btn_y.setText(str(data["BUTTON_Y"]))
|
||||||
|
if "Forward_RPM" in data:
|
||||||
|
self.state["Forward_RPM"] = data["Forward_RPM"]
|
||||||
|
self.lbl_forward.setText(str(data["Forward_RPM"]))
|
||||||
|
if "UPDown_RPM" in data:
|
||||||
|
self.state["UPDown_RPM"] = data["UPDown_RPM"]
|
||||||
|
self.lbl_updown.setText(str(data["UPDown_RPM"]))
|
||||||
|
|
||||||
|
# ---- Utils ----
|
||||||
|
def _log(self, text: str):
|
||||||
|
self.txt_log.appendPlainText(text)
|
||||||
|
self.txt_log.verticalScrollBar().setValue(self.txt_log.verticalScrollBar().maximum())
|
||||||
|
|
||||||
|
def _apply_dark_fusion_palette(self):
|
||||||
|
# Fusion 深色主題
|
||||||
|
app = QtWidgets.QApplication.instance()
|
||||||
|
app.setStyle("Fusion")
|
||||||
|
pal = QtGui.QPalette()
|
||||||
|
|
||||||
|
base = QtGui.QColor(30, 30, 30)
|
||||||
|
alt = QtGui.QColor(45, 45, 45)
|
||||||
|
text = QtGui.QColor(220, 220, 220)
|
||||||
|
btn = QtGui.QColor(53, 53, 53)
|
||||||
|
hl = QtGui.QColor(42, 130, 218)
|
||||||
|
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.Window, base)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.WindowText, text)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.Base, QtGui.QColor(25, 25, 25))
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.AlternateBase, alt)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.ToolTipBase, text)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.ToolTipText, text)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.Text, text)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.Button, btn)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.ButtonText, text)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.BrightText, QtGui.QColor(255, 0, 0))
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.Link, hl)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.Highlight, hl)
|
||||||
|
pal.setColor(QtGui.QPalette.ColorRole.HighlightedText, QtGui.QColor(255, 255, 255))
|
||||||
|
|
||||||
|
app.setPalette(pal)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
import sys
|
||||||
|
app = QtWidgets.QApplication(sys.argv)
|
||||||
|
|
||||||
|
# Windows 高 DPI(可忽略失敗)
|
||||||
|
try:
|
||||||
|
import ctypes
|
||||||
|
ctypes.windll.shcore.SetProcessDpiAwareness(1)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
w = MainWindow()
|
||||||
|
w.show()
|
||||||
|
sys.exit(app.exec())
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -1,50 +0,0 @@
|
||||||
# pip install paho-mqtt
|
|
||||||
import json
|
|
||||||
import time
|
|
||||||
from paho.mqtt import client as mqtt
|
|
||||||
|
|
||||||
BROKER_HOST = "169.254.11.130"
|
|
||||||
BROKER_PORT = 1886
|
|
||||||
TOPIC = "topic_plc_and_py_for_AXIS"
|
|
||||||
|
|
||||||
# 建議測試用 QoS=1;retain=False
|
|
||||||
QOS = 1
|
|
||||||
RETAIN = False
|
|
||||||
|
|
||||||
def main():
|
|
||||||
client = mqtt.Client(client_id="py_pub_plc_test", protocol=mqtt.MQTTv311)
|
|
||||||
|
|
||||||
# 若 Broker 有帳密、TLS 在這裡設定(目前不需要)
|
|
||||||
# client.username_pw_set("user", "pass")
|
|
||||||
# client.tls_set(ca_certs="ca.pem") # 若是 TLS
|
|
||||||
|
|
||||||
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
|
|
||||||
client.loop_start()
|
|
||||||
time.sleep(0.3) # 等連線建立
|
|
||||||
|
|
||||||
# ---- 第一組: ----
|
|
||||||
payload_forward = {
|
|
||||||
"Move_Forward": False, # BOOL
|
|
||||||
"Move_Forward_Velocity": 6.2 # REAL (float)
|
|
||||||
}
|
|
||||||
msg1 = json.dumps(payload_forward, ensure_ascii=False)
|
|
||||||
r1 = client.publish(TOPIC, msg1, qos=QOS, retain=RETAIN)
|
|
||||||
r1.wait_for_publish()
|
|
||||||
|
|
||||||
time.sleep(3)#第二顆馬達等等再開啟
|
|
||||||
|
|
||||||
# ---- 第二組: ----
|
|
||||||
payload_updown = {
|
|
||||||
"Move_UPDown": False, # BOOL
|
|
||||||
"Move_UPDown_Velocity": 5.1 # REAL (float)
|
|
||||||
}
|
|
||||||
msg2 = json.dumps(payload_updown, ensure_ascii=False)
|
|
||||||
r2 = client.publish(TOPIC, msg2, qos=QOS, retain=RETAIN)
|
|
||||||
r2.wait_for_publish()
|
|
||||||
|
|
||||||
time.sleep(0.3)
|
|
||||||
client.loop_stop()
|
|
||||||
client.disconnect()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
Loading…
Reference in New Issue