229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
# mqtt_gui.py
|
||
# pip install paho-mqtt
|
||
import json
|
||
import re
|
||
import tkinter as tk
|
||
from tkinter import ttk, messagebox
|
||
from paho.mqtt import client as mqtt
|
||
|
||
DEFAULT_HOST = "169.254.11.130"
|
||
DEFAULT_PORT = 1886
|
||
DEFAULT_TOPIC = "topic_plc_and_py_for_AXIS"
|
||
DEFAULT_QOS = 1
|
||
|
||
def robust_json_parse(raw: str):
|
||
"""
|
||
將 PLC 常見不標準 JSON 修正後解析成 dict。
|
||
例:'{Forward_RPM:1500,UPDown_RPM:800}' -> {'Forward_RPM':1500,'UPDown_RPM':800}
|
||
也會把 TRUE/FALSE -> true/false;補 key 雙引號;去外層引號。
|
||
"""
|
||
s = raw.strip()
|
||
|
||
# 去掉最外層引號(若有)
|
||
if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):
|
||
s = s[1:-1].strip()
|
||
|
||
# PLC 常見布林大寫 -> JSON 規範小寫
|
||
s = s.replace("TRUE", "true").replace("FALSE", "false")
|
||
|
||
# 自動補 key 的雙引號:{key: -> {"key":
|
||
s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s)
|
||
|
||
# 將單引號字串轉雙引號(在補 key 之後)
|
||
s = s.replace("'", '"')
|
||
|
||
return json.loads(s)
|
||
|
||
class MQTTGui:
|
||
def __init__(self, root: tk.Tk):
|
||
self.root = root
|
||
self.root.title("MQTT PLC Subscriber")
|
||
|
||
# 狀態
|
||
self.client: mqtt.Client | None = None
|
||
self.connected = False
|
||
|
||
# UI 變數
|
||
self.var_host = tk.StringVar(value=DEFAULT_HOST)
|
||
self.var_port = tk.IntVar(value=DEFAULT_PORT)
|
||
self.var_topic = tk.StringVar(value=DEFAULT_TOPIC)
|
||
self.var_qos = tk.IntVar(value=DEFAULT_QOS)
|
||
|
||
self.var_button_y = tk.StringVar(value="None")
|
||
self.var_forward_rpm = tk.StringVar(value="None")
|
||
self.var_updown_rpm = tk.StringVar(value="None")
|
||
self.var_status = tk.StringVar(value="Disconnected")
|
||
|
||
self._build_ui()
|
||
|
||
# 關閉事件
|
||
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
|
||
|
||
def _build_ui(self):
|
||
pad = {"padx": 8, "pady": 6}
|
||
|
||
frm_top = ttk.LabelFrame(self.root, text="Connection")
|
||
frm_top.grid(row=0, column=0, sticky="nsew", **pad)
|
||
|
||
ttk.Label(frm_top, text="Broker Host").grid(row=0, column=0, sticky="e")
|
||
ttk.Entry(frm_top, textvariable=self.var_host, width=18).grid(row=0, column=1, sticky="w")
|
||
|
||
ttk.Label(frm_top, text="Port").grid(row=0, column=2, sticky="e")
|
||
ttk.Spinbox(frm_top, from_=1, to=65535, textvariable=self.var_port, width=8).grid(row=0, column=3, sticky="w")
|
||
|
||
ttk.Label(frm_top, text="Topic").grid(row=1, column=0, sticky="e")
|
||
ttk.Entry(frm_top, textvariable=self.var_topic, width=40).grid(row=1, column=1, columnspan=3, sticky="w")
|
||
|
||
ttk.Label(frm_top, text="QoS").grid(row=2, column=0, sticky="e")
|
||
ttk.Spinbox(frm_top, values=(0,1,2), textvariable=self.var_qos, width=8).grid(row=2, column=1, sticky="w")
|
||
|
||
btn_conn = ttk.Button(frm_top, text="Connect", command=self.connect)
|
||
btn_dis = ttk.Button(frm_top, text="Disconnect", command=self.disconnect)
|
||
btn_conn.grid(row=2, column=2, sticky="we")
|
||
btn_dis.grid(row=2, column=3, sticky="we")
|
||
|
||
ttk.Label(frm_top, text="Status:").grid(row=3, column=0, sticky="e")
|
||
ttk.Label(frm_top, textvariable=self.var_status).grid(row=3, column=1, columnspan=3, sticky="w")
|
||
|
||
frm_vals = ttk.LabelFrame(self.root, text="Current State")
|
||
frm_vals.grid(row=1, column=0, sticky="nsew", **pad)
|
||
|
||
ttk.Label(frm_vals, text="BUTTON_Y").grid(row=0, column=0, sticky="e")
|
||
ttk.Label(frm_vals, textvariable=self.var_button_y, width=20).grid(row=0, column=1, sticky="w")
|
||
|
||
ttk.Label(frm_vals, text="Forward_RPM").grid(row=1, column=0, sticky="e")
|
||
ttk.Label(frm_vals, textvariable=self.var_forward_rpm, width=20).grid(row=1, column=1, sticky="w")
|
||
|
||
ttk.Label(frm_vals, text="UPDown_RPM").grid(row=2, column=0, sticky="e")
|
||
ttk.Label(frm_vals, textvariable=self.var_updown_rpm, width=20).grid(row=2, column=1, sticky="w")
|
||
|
||
frm_log = ttk.LabelFrame(self.root, text="Logs")
|
||
frm_log.grid(row=2, column=0, sticky="nsew", **pad)
|
||
frm_log.rowconfigure(0, weight=1)
|
||
frm_log.columnconfigure(0, weight=1)
|
||
|
||
self.txt_log = tk.Text(frm_log, height=14)
|
||
self.txt_log.grid(row=0, column=0, sticky="nsew")
|
||
scr = ttk.Scrollbar(frm_log, command=self.txt_log.yview)
|
||
scr.grid(row=0, column=1, sticky="ns")
|
||
self.txt_log.configure(yscrollcommand=scr.set)
|
||
|
||
# 讓整體可縮放
|
||
self.root.rowconfigure(2, weight=1)
|
||
self.root.columnconfigure(0, weight=1)
|
||
|
||
# ---------- MQTT ----------
|
||
def connect(self):
|
||
if self.connected:
|
||
self._log("Already connected.")
|
||
return
|
||
|
||
host = self.var_host.get().strip()
|
||
port = int(self.var_port.get())
|
||
topic = self.var_topic.get().strip()
|
||
qos = int(self.var_qos.get())
|
||
|
||
if not host or not topic:
|
||
messagebox.showwarning("Warning", "Host 與 Topic 不可為空")
|
||
return
|
||
|
||
# 建立 client
|
||
self.client = mqtt.Client(client_id="gui_sub_plc", protocol=mqtt.MQTTv311)
|
||
|
||
# 自動重連延遲
|
||
self.client.reconnect_delay_set(min_delay=1, max_delay=10)
|
||
|
||
# Callbacks
|
||
self.client.on_connect = self._on_connect
|
||
self.client.on_message = self._on_message
|
||
self.client.on_disconnect = self._on_disconnect
|
||
|
||
try:
|
||
self.client.connect(host, port, keepalive=60)
|
||
except Exception as e:
|
||
self._log(f"❌ Connect failed: {e}")
|
||
self.var_status.set("Connect failed")
|
||
return
|
||
|
||
# 非阻塞背景 loop
|
||
self.client.loop_start()
|
||
self.var_status.set("Connecting...")
|
||
self._log(f"🔌 Connecting to {host}:{port}, topic={topic}, qos={qos}")
|
||
|
||
def disconnect(self):
|
||
if self.client is None:
|
||
return
|
||
try:
|
||
self.client.loop_stop()
|
||
self.client.disconnect()
|
||
except Exception as e:
|
||
self._log(f"Disconnect error: {e}")
|
||
|
||
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
|
||
if reason_code == 0:
|
||
self.connected = True
|
||
self.var_status.set("Connected")
|
||
topic = self.var_topic.get().strip()
|
||
qos = int(self.var_qos.get())
|
||
client.subscribe(topic, qos=qos)
|
||
self._log(f"✅ Connected. Subscribed: {topic} (QoS={qos})")
|
||
else:
|
||
self.connected = False
|
||
self.var_status.set(f"Connect failed ({reason_code})")
|
||
self._log(f"❌ Connect failed. reason_code={reason_code}")
|
||
|
||
def _on_disconnect(self, client, userdata, reason_code, properties=None):
|
||
self.connected = False
|
||
self.var_status.set(f"Disconnected ({reason_code})")
|
||
self._log(f"🔌 Disconnected. reason_code={reason_code}")
|
||
|
||
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
|
||
payload_raw = msg.payload.decode("utf-8", errors="replace")
|
||
self._log(f"📥 Raw: {payload_raw}")
|
||
|
||
# 先試標準 JSON
|
||
try:
|
||
data = json.loads(payload_raw)
|
||
except json.JSONDecodeError:
|
||
# 失敗再容錯解析
|
||
try:
|
||
data = robust_json_parse(payload_raw)
|
||
except Exception as e:
|
||
self._log(f"❌ Parse error: {e}")
|
||
return
|
||
|
||
self._log(f"✅ Parsed: {data}")
|
||
|
||
# 更新顯示(有傳才改)
|
||
if "BUTTON_Y" in data:
|
||
self.var_button_y.set(str(data["BUTTON_Y"]))
|
||
if "Forward_RPM" in data:
|
||
self.var_forward_rpm.set(str(data["Forward_RPM"]))
|
||
if "UPDown_RPM" in data:
|
||
self.var_updown_rpm.set(str(data["UPDown_RPM"]))
|
||
|
||
# ---------- Utils ----------
|
||
def _log(self, text: str):
|
||
self.txt_log.insert("end", text + "\n")
|
||
self.txt_log.see("end")
|
||
|
||
def on_close(self):
|
||
try:
|
||
if self.client:
|
||
self.client.loop_stop()
|
||
self.client.disconnect()
|
||
except Exception:
|
||
pass
|
||
self.root.destroy()
|
||
|
||
if __name__ == "__main__":
|
||
root = tk.Tk()
|
||
# Windows 讓標準字體稍微大一點
|
||
try:
|
||
from ctypes import windll
|
||
windll.shcore.SetProcessDpiAwareness(1) # 可忽略失敗
|
||
except Exception:
|
||
pass
|
||
MQTTGui(root)
|
||
root.mainloop()
|