modbus_MediaPipe/MQTT_project/mqtt_gui.py

229 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# mqtt_gui.py
# pip install paho-mqtt
import json
import re
import tkinter as tk
from tkinter import ttk, messagebox
from paho.mqtt import client as mqtt
DEFAULT_HOST = "169.254.11.130"
DEFAULT_PORT = 1886
DEFAULT_TOPIC = "topic_plc_and_py_for_AXIS"
DEFAULT_QOS = 1
def robust_json_parse(raw: str):
"""
將 PLC 常見不標準 JSON 修正後解析成 dict。
例:'{Forward_RPM:1500,UPDown_RPM:800}' -> {'Forward_RPM':1500,'UPDown_RPM':800}
也會把 TRUE/FALSE -> true/false補 key 雙引號;去外層引號。
"""
s = raw.strip()
# 去掉最外層引號(若有)
if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):
s = s[1:-1].strip()
# PLC 常見布林大寫 -> JSON 規範小寫
s = s.replace("TRUE", "true").replace("FALSE", "false")
# 自動補 key 的雙引號:{key: -> {"key":
s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s)
# 將單引號字串轉雙引號(在補 key 之後)
s = s.replace("'", '"')
return json.loads(s)
class MQTTGui:
def __init__(self, root: tk.Tk):
self.root = root
self.root.title("MQTT PLC Subscriber")
# 狀態
self.client: mqtt.Client | None = None
self.connected = False
# UI 變數
self.var_host = tk.StringVar(value=DEFAULT_HOST)
self.var_port = tk.IntVar(value=DEFAULT_PORT)
self.var_topic = tk.StringVar(value=DEFAULT_TOPIC)
self.var_qos = tk.IntVar(value=DEFAULT_QOS)
self.var_button_y = tk.StringVar(value="None")
self.var_forward_rpm = tk.StringVar(value="None")
self.var_updown_rpm = tk.StringVar(value="None")
self.var_status = tk.StringVar(value="Disconnected")
self._build_ui()
# 關閉事件
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
def _build_ui(self):
pad = {"padx": 8, "pady": 6}
frm_top = ttk.LabelFrame(self.root, text="Connection")
frm_top.grid(row=0, column=0, sticky="nsew", **pad)
ttk.Label(frm_top, text="Broker Host").grid(row=0, column=0, sticky="e")
ttk.Entry(frm_top, textvariable=self.var_host, width=18).grid(row=0, column=1, sticky="w")
ttk.Label(frm_top, text="Port").grid(row=0, column=2, sticky="e")
ttk.Spinbox(frm_top, from_=1, to=65535, textvariable=self.var_port, width=8).grid(row=0, column=3, sticky="w")
ttk.Label(frm_top, text="Topic").grid(row=1, column=0, sticky="e")
ttk.Entry(frm_top, textvariable=self.var_topic, width=40).grid(row=1, column=1, columnspan=3, sticky="w")
ttk.Label(frm_top, text="QoS").grid(row=2, column=0, sticky="e")
ttk.Spinbox(frm_top, values=(0,1,2), textvariable=self.var_qos, width=8).grid(row=2, column=1, sticky="w")
btn_conn = ttk.Button(frm_top, text="Connect", command=self.connect)
btn_dis = ttk.Button(frm_top, text="Disconnect", command=self.disconnect)
btn_conn.grid(row=2, column=2, sticky="we")
btn_dis.grid(row=2, column=3, sticky="we")
ttk.Label(frm_top, text="Status:").grid(row=3, column=0, sticky="e")
ttk.Label(frm_top, textvariable=self.var_status).grid(row=3, column=1, columnspan=3, sticky="w")
frm_vals = ttk.LabelFrame(self.root, text="Current State")
frm_vals.grid(row=1, column=0, sticky="nsew", **pad)
ttk.Label(frm_vals, text="BUTTON_Y").grid(row=0, column=0, sticky="e")
ttk.Label(frm_vals, textvariable=self.var_button_y, width=20).grid(row=0, column=1, sticky="w")
ttk.Label(frm_vals, text="Forward_RPM").grid(row=1, column=0, sticky="e")
ttk.Label(frm_vals, textvariable=self.var_forward_rpm, width=20).grid(row=1, column=1, sticky="w")
ttk.Label(frm_vals, text="UPDown_RPM").grid(row=2, column=0, sticky="e")
ttk.Label(frm_vals, textvariable=self.var_updown_rpm, width=20).grid(row=2, column=1, sticky="w")
frm_log = ttk.LabelFrame(self.root, text="Logs")
frm_log.grid(row=2, column=0, sticky="nsew", **pad)
frm_log.rowconfigure(0, weight=1)
frm_log.columnconfigure(0, weight=1)
self.txt_log = tk.Text(frm_log, height=14)
self.txt_log.grid(row=0, column=0, sticky="nsew")
scr = ttk.Scrollbar(frm_log, command=self.txt_log.yview)
scr.grid(row=0, column=1, sticky="ns")
self.txt_log.configure(yscrollcommand=scr.set)
# 讓整體可縮放
self.root.rowconfigure(2, weight=1)
self.root.columnconfigure(0, weight=1)
# ---------- MQTT ----------
def connect(self):
if self.connected:
self._log("Already connected.")
return
host = self.var_host.get().strip()
port = int(self.var_port.get())
topic = self.var_topic.get().strip()
qos = int(self.var_qos.get())
if not host or not topic:
messagebox.showwarning("Warning", "Host 與 Topic 不可為空")
return
# 建立 client
self.client = mqtt.Client(client_id="gui_sub_plc", protocol=mqtt.MQTTv311)
# 自動重連延遲
self.client.reconnect_delay_set(min_delay=1, max_delay=10)
# Callbacks
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
try:
self.client.connect(host, port, keepalive=60)
except Exception as e:
self._log(f"❌ Connect failed: {e}")
self.var_status.set("Connect failed")
return
# 非阻塞背景 loop
self.client.loop_start()
self.var_status.set("Connecting...")
self._log(f"🔌 Connecting to {host}:{port}, topic={topic}, qos={qos}")
def disconnect(self):
if self.client is None:
return
try:
self.client.loop_stop()
self.client.disconnect()
except Exception as e:
self._log(f"Disconnect error: {e}")
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
self.connected = True
self.var_status.set("Connected")
topic = self.var_topic.get().strip()
qos = int(self.var_qos.get())
client.subscribe(topic, qos=qos)
self._log(f"✅ Connected. Subscribed: {topic} (QoS={qos})")
else:
self.connected = False
self.var_status.set(f"Connect failed ({reason_code})")
self._log(f"❌ Connect failed. reason_code={reason_code}")
def _on_disconnect(self, client, userdata, reason_code, properties=None):
self.connected = False
self.var_status.set(f"Disconnected ({reason_code})")
self._log(f"🔌 Disconnected. reason_code={reason_code}")
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
payload_raw = msg.payload.decode("utf-8", errors="replace")
self._log(f"📥 Raw: {payload_raw}")
# 先試標準 JSON
try:
data = json.loads(payload_raw)
except json.JSONDecodeError:
# 失敗再容錯解析
try:
data = robust_json_parse(payload_raw)
except Exception as e:
self._log(f"❌ Parse error: {e}")
return
self._log(f"✅ Parsed: {data}")
# 更新顯示(有傳才改)
if "BUTTON_Y" in data:
self.var_button_y.set(str(data["BUTTON_Y"]))
if "Forward_RPM" in data:
self.var_forward_rpm.set(str(data["Forward_RPM"]))
if "UPDown_RPM" in data:
self.var_updown_rpm.set(str(data["UPDown_RPM"]))
# ---------- Utils ----------
def _log(self, text: str):
self.txt_log.insert("end", text + "\n")
self.txt_log.see("end")
def on_close(self):
try:
if self.client:
self.client.loop_stop()
self.client.disconnect()
except Exception:
pass
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
# Windows 讓標準字體稍微大一點
try:
from ctypes import windll
windll.shcore.SetProcessDpiAwareness(1) # 可忽略失敗
except Exception:
pass
MQTTGui(root)
root.mainloop()