MQtt gui for pub

This commit is contained in:
tim33824 2025-08-15 16:05:01 +08:00
parent fbde79a57b
commit aa74ffb9d9
1 changed files with 336 additions and 0 deletions

View File

@ -0,0 +1,336 @@
# mqtt_pub_gui_qt.py
# pip install PyQt6 paho-mqtt
import time
import json
from dataclasses import dataclass
from typing import Optional
from PyQt6 import QtCore, QtGui, QtWidgets
from paho.mqtt import client as mqtt
# ----------------- 原函式(稍作參數擴充,保留你的流程與註解) -----------------
def pub_to_plc(
BROKER_HOST: str,
BROKER_PORT: int,
TOPIC: str,
QOS: int,
RETAIN: bool,
move_forward: bool = True,
forward_velocity: float = 1.2,
move_updown: bool = True,
updown_velocity: float = 55.1,
gap_seconds: float = 3.0,
log_cb=None
):
"""
依序發佈兩筆訊息中間等待 gap_seconds
加上 log_cb(str) GUI 可收到狀態訊息
"""
def _log(msg: str):
if callable(log_cb):
log_cb(msg)
client = mqtt.Client(client_id="py_pub_plc_test", protocol=mqtt.MQTTv311)
# 若 Broker 有帳密、TLS 在這裡設定(目前不需要)
# client.username_pw_set("user", "pass")
# client.tls_set(ca_certs="ca.pem") # 若是 TLS
_log(f"🔌 Connecting to {BROKER_HOST}:{BROKER_PORT} ...")
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
client.loop_start()
time.sleep(0.3) # 等連線建立
_log("✅ Connected")
# ---- 第一組: ----
payload_forward = {
"Move_Forward": bool(move_forward), # BOOL
"Move_Forward_Velocity": float(forward_velocity) # REAL (float)
}
msg1 = json.dumps(payload_forward, ensure_ascii=False)
_log(f"📤 Publish #1 to {TOPIC} (QoS={QOS}, retain={RETAIN}): {msg1}")
r1 = client.publish(TOPIC, msg1, qos=QOS, retain=RETAIN)
r1.wait_for_publish()
_log("✅ Published #1")
# 中間延遲
if gap_seconds and gap_seconds > 0:
_log(f"⏳ Waiting {gap_seconds} s before #2 ...")
time.sleep(gap_seconds)
# ---- 第二組: ----
payload_updown = {
"Move_UPDown": bool(move_updown), # BOOL
"Move_UPDown_Velocity": float(updown_velocity) # REAL (float)
}
msg2 = json.dumps(payload_updown, ensure_ascii=False)
_log(f"📤 Publish #2 to {TOPIC} (QoS={QOS}, retain={RETAIN}): {msg2}")
r2 = client.publish(TOPIC, msg2, qos=QOS, retain=RETAIN)
r2.wait_for_publish()
_log("✅ Published #2")
time.sleep(0.3)
client.loop_stop()
client.disconnect()
_log("🔌 Disconnected")
# ----------------- GUI 部分 -----------------
@dataclass
class PublishParams:
host: str
port: int
topic: str
qos: int
retain: bool
move_forward: bool
forward_velocity: float
move_updown: bool
updown_velocity: float
gap_seconds: float
class PubWorker(QtCore.QThread):
sig_log = QtCore.pyqtSignal(str)
sig_done = QtCore.pyqtSignal(bool, str) # success, message
def __init__(self, params: PublishParams, parent=None):
super().__init__(parent)
self.params = params
def _log(self, text: str):
self.sig_log.emit(text)
def run(self):
try:
pub_to_plc(
self.params.host,
self.params.port,
self.params.topic,
self.params.qos,
self.params.retain,
self.params.move_forward,
self.params.forward_velocity,
self.params.move_updown,
self.params.updown_velocity,
self.params.gap_seconds,
log_cb=self._log
)
self.sig_done.emit(True, "Publish sequence completed.")
except Exception as e:
self.sig_done.emit(False, f"Error: {e}")
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("MQTT PLC Publisher (PyQt6)")
self.resize(920, 620)
# 預設值
self.default_host = "169.254.11.130"
self.default_port = 1886
self.default_topic = "topic_plc_and_py_for_AXIS"
self.default_qos = 1
self.default_retain = False
self.default_move_forward = True
self.default_forward_vel = 1.2
self.default_move_updown = True
self.default_updown_vel = 55.1
self.default_gap_seconds = 3.0
self.worker: Optional[PubWorker] = None
central = QtWidgets.QWidget()
self.setCentralWidget(central)
vbox = QtWidgets.QVBoxLayout(central)
vbox.setContentsMargins(12, 12, 12, 12)
vbox.setSpacing(10)
# ---- Connection Panel ----
conn_group = QtWidgets.QGroupBox("Connection")
grid = QtWidgets.QGridLayout(conn_group)
grid.setHorizontalSpacing(12)
grid.setVerticalSpacing(8)
self.ed_host = QtWidgets.QLineEdit(self.default_host)
self.ed_port = QtWidgets.QSpinBox()
self.ed_port.setRange(1, 65535)
self.ed_port.setValue(self.default_port)
self.ed_topic = QtWidgets.QLineEdit(self.default_topic)
self.cb_qos = QtWidgets.QComboBox()
self.cb_qos.addItems(["0", "1", "2"])
self.cb_qos.setCurrentIndex(self.default_qos)
self.chk_retain = QtWidgets.QCheckBox("Retain")
self.chk_retain.setChecked(self.default_retain)
grid.addWidget(QtWidgets.QLabel("Broker Host"), 0, 0)
grid.addWidget(self.ed_host, 0, 1)
grid.addWidget(QtWidgets.QLabel("Port"), 0, 2)
grid.addWidget(self.ed_port, 0, 3)
grid.addWidget(QtWidgets.QLabel("Topic"), 1, 0)
grid.addWidget(self.ed_topic, 1, 1, 1, 3)
grid.addWidget(QtWidgets.QLabel("QoS"), 2, 0)
grid.addWidget(self.cb_qos, 2, 1)
grid.addWidget(self.chk_retain, 2, 2)
vbox.addWidget(conn_group)
# ---- Payload Panel ----
payload_group = QtWidgets.QGroupBox("Payloads")
pgrid = QtWidgets.QGridLayout(payload_group)
pgrid.setHorizontalSpacing(12)
pgrid.setVerticalSpacing(8)
# 第一筆
self.chk_forward = QtWidgets.QCheckBox("Move_Forward")
self.chk_forward.setChecked(self.default_move_forward)
self.ed_forward_vel = QtWidgets.QDoubleSpinBox()
self.ed_forward_vel.setRange(-1e6, 1e6)
self.ed_forward_vel.setDecimals(3)
self.ed_forward_vel.setValue(self.default_forward_vel)
self.ed_forward_vel.setSuffix(" (Velocity)")
# 第二筆
self.chk_updown = QtWidgets.QCheckBox("Move_UPDown")
self.chk_updown.setChecked(self.default_move_updown)
self.ed_updown_vel = QtWidgets.QDoubleSpinBox()
self.ed_updown_vel.setRange(-1e6, 1e6)
self.ed_updown_vel.setDecimals(3)
self.ed_updown_vel.setValue(self.default_updown_vel)
self.ed_updown_vel.setSuffix(" (Velocity)")
# 延遲
self.ed_gap = QtWidgets.QDoubleSpinBox()
self.ed_gap.setRange(0.0, 600.0)
self.ed_gap.setDecimals(2)
self.ed_gap.setValue(self.default_gap_seconds)
self.ed_gap.setSuffix(" s (delay between #1 and #2)")
pgrid.addWidget(QtWidgets.QLabel("Payload #1:"), 0, 0)
pgrid.addWidget(self.chk_forward, 0, 1)
pgrid.addWidget(self.ed_forward_vel, 0, 2)
pgrid.addWidget(QtWidgets.QLabel("Payload #2:"), 1, 0)
pgrid.addWidget(self.chk_updown, 1, 1)
pgrid.addWidget(self.ed_updown_vel, 1, 2)
pgrid.addWidget(QtWidgets.QLabel("Gap:"), 2, 0)
pgrid.addWidget(self.ed_gap, 2, 1, 1, 2)
vbox.addWidget(payload_group)
# ---- Actions ----
hbox = QtWidgets.QHBoxLayout()
self.btn_publish = QtWidgets.QPushButton("🚀 Publish Sequence")
self.btn_publish.clicked.connect(self.on_publish)
hbox.addWidget(self.btn_publish)
hbox.addStretch(1)
vbox.addLayout(hbox)
# ---- Logs ----
log_group = QtWidgets.QGroupBox("Logs")
lvbox = QtWidgets.QVBoxLayout(log_group)
self.txt_log = QtWidgets.QPlainTextEdit()
self.txt_log.setReadOnly(True)
self.txt_log.setMaximumBlockCount(2000)
lvbox.addWidget(self.txt_log)
vbox.addWidget(log_group, 1)
# 美化
self._apply_dark_fusion_palette()
def on_publish(self):
if self.worker and self.worker.isRunning():
self._log("⚠️ 正在發佈中,請稍候...")
return
try:
params = PublishParams(
host=self.ed_host.text().strip(),
port=int(self.ed_port.value()),
topic=self.ed_topic.text().strip(),
qos=int(self.cb_qos.currentText()),
retain=bool(self.chk_retain.isChecked()),
move_forward=bool(self.chk_forward.isChecked()),
forward_velocity=float(self.ed_forward_vel.value()),
move_updown=bool(self.chk_updown.isChecked()),
updown_velocity=float(self.ed_updown_vel.value()),
gap_seconds=float(self.ed_gap.value()),
)
except Exception as e:
self._log(f"❌ 參數錯誤:{e}")
return
if not params.host or not params.topic:
QtWidgets.QMessageBox.warning(self, "Warning", "Host 與 Topic 不可為空")
return
self.worker = PubWorker(params, self)
self.worker.sig_log.connect(self._log)
self.worker.sig_done.connect(self._on_done)
self._log("▶️ 開始發佈序列...")
self.btn_publish.setEnabled(False)
self.worker.start()
@QtCore.pyqtSlot(bool, str)
def _on_done(self, success: bool, message: str):
self._log(("" if success else "") + message)
self.btn_publish.setEnabled(True)
def _log(self, text: str):
self.txt_log.appendPlainText(text)
sb = self.txt_log.verticalScrollBar()
sb.setValue(sb.maximum())
def _apply_dark_fusion_palette(self):
app = QtWidgets.QApplication.instance()
app.setStyle("Fusion")
pal = QtGui.QPalette()
base = QtGui.QColor(30, 30, 30)
alt = QtGui.QColor(45, 45, 45)
text = QtGui.QColor(220, 220, 220)
btn = QtGui.QColor(53, 53, 53)
hl = QtGui.QColor(42, 130, 218)
pal.setColor(QtGui.QPalette.ColorRole.Window, base)
pal.setColor(QtGui.QPalette.ColorRole.WindowText, text)
pal.setColor(QtGui.QPalette.ColorRole.Base, QtGui.QColor(25, 25, 25))
pal.setColor(QtGui.QPalette.ColorRole.AlternateBase, alt)
pal.setColor(QtGui.QPalette.ColorRole.ToolTipBase, text)
pal.setColor(QtGui.QPalette.ColorRole.ToolTipText, text)
pal.setColor(QtGui.QPalette.ColorRole.Text, text)
pal.setColor(QtGui.QPalette.ColorRole.Button, btn)
pal.setColor(QtGui.QPalette.ColorRole.ButtonText, text)
pal.setColor(QtGui.QPalette.ColorRole.BrightText, QtGui.QColor(255, 0, 0))
pal.setColor(QtGui.QPalette.ColorRole.Link, hl)
pal.setColor(QtGui.QPalette.ColorRole.Highlight, hl)
pal.setColor(QtGui.QPalette.ColorRole.HighlightedText, QtGui.QColor(255, 255, 255))
app.setPalette(pal)
def main():
import sys
app = QtWidgets.QApplication(sys.argv)
# 高 DPIWindows 可忽略失敗)
try:
import ctypes
ctypes.windll.shcore.SetProcessDpiAwareness(1)
except Exception:
pass
w = MainWindow()
w.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()