modbus_MediaPipe/MQTT_project/mqtt_pub_gui_qt.py

337 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# mqtt_pub_gui_qt.py
# pip install PyQt6 paho-mqtt
import time
import json
from dataclasses import dataclass
from typing import Optional
from PyQt6 import QtCore, QtGui, QtWidgets
from paho.mqtt import client as mqtt
# ----------------- 原函式(稍作參數擴充,保留你的流程與註解) -----------------
def pub_to_plc(
BROKER_HOST: str,
BROKER_PORT: int,
TOPIC: str,
QOS: int,
RETAIN: bool,
move_forward: bool = True,
forward_velocity: float = 1.2,
move_updown: bool = True,
updown_velocity: float = 55.1,
gap_seconds: float = 3.0,
log_cb=None
):
"""
依序發佈兩筆訊息,中間等待 gap_seconds 秒。
加上 log_cb(str) 讓 GUI 可收到狀態訊息。
"""
def _log(msg: str):
if callable(log_cb):
log_cb(msg)
client = mqtt.Client(client_id="py_pub_plc_test", protocol=mqtt.MQTTv311)
# 若 Broker 有帳密、TLS 在這裡設定(目前不需要)
# client.username_pw_set("user", "pass")
# client.tls_set(ca_certs="ca.pem") # 若是 TLS
_log(f"🔌 Connecting to {BROKER_HOST}:{BROKER_PORT} ...")
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
client.loop_start()
time.sleep(0.3) # 等連線建立
_log("✅ Connected")
# ---- 第一組: ----
payload_forward = {
"Move_Forward": bool(move_forward), # BOOL
"Move_Forward_Velocity": float(forward_velocity) # REAL (float)
}
msg1 = json.dumps(payload_forward, ensure_ascii=False)
_log(f"📤 Publish #1 to {TOPIC} (QoS={QOS}, retain={RETAIN}): {msg1}")
r1 = client.publish(TOPIC, msg1, qos=QOS, retain=RETAIN)
r1.wait_for_publish()
_log("✅ Published #1")
# 中間延遲
if gap_seconds and gap_seconds > 0:
_log(f"⏳ Waiting {gap_seconds} s before #2 ...")
time.sleep(gap_seconds)
# ---- 第二組: ----
payload_updown = {
"Move_UPDown": bool(move_updown), # BOOL
"Move_UPDown_Velocity": float(updown_velocity) # REAL (float)
}
msg2 = json.dumps(payload_updown, ensure_ascii=False)
_log(f"📤 Publish #2 to {TOPIC} (QoS={QOS}, retain={RETAIN}): {msg2}")
r2 = client.publish(TOPIC, msg2, qos=QOS, retain=RETAIN)
r2.wait_for_publish()
_log("✅ Published #2")
time.sleep(0.3)
client.loop_stop()
client.disconnect()
_log("🔌 Disconnected")
# ----------------- GUI 部分 -----------------
@dataclass
class PublishParams:
host: str
port: int
topic: str
qos: int
retain: bool
move_forward: bool
forward_velocity: float
move_updown: bool
updown_velocity: float
gap_seconds: float
class PubWorker(QtCore.QThread):
sig_log = QtCore.pyqtSignal(str)
sig_done = QtCore.pyqtSignal(bool, str) # success, message
def __init__(self, params: PublishParams, parent=None):
super().__init__(parent)
self.params = params
def _log(self, text: str):
self.sig_log.emit(text)
def run(self):
try:
pub_to_plc(
self.params.host,
self.params.port,
self.params.topic,
self.params.qos,
self.params.retain,
self.params.move_forward,
self.params.forward_velocity,
self.params.move_updown,
self.params.updown_velocity,
self.params.gap_seconds,
log_cb=self._log
)
self.sig_done.emit(True, "Publish sequence completed.")
except Exception as e:
self.sig_done.emit(False, f"Error: {e}")
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("MQTT PLC Publisher (PyQt6)")
self.resize(920, 620)
# 預設值
self.default_host = "169.254.11.130"
self.default_port = 1886
self.default_topic = "topic_plc_and_py_for_AXIS"
self.default_qos = 1
self.default_retain = False
self.default_move_forward = True
self.default_forward_vel = 1.2
self.default_move_updown = True
self.default_updown_vel = 55.1
self.default_gap_seconds = 3.0
self.worker: Optional[PubWorker] = None
central = QtWidgets.QWidget()
self.setCentralWidget(central)
vbox = QtWidgets.QVBoxLayout(central)
vbox.setContentsMargins(12, 12, 12, 12)
vbox.setSpacing(10)
# ---- Connection Panel ----
conn_group = QtWidgets.QGroupBox("Connection")
grid = QtWidgets.QGridLayout(conn_group)
grid.setHorizontalSpacing(12)
grid.setVerticalSpacing(8)
self.ed_host = QtWidgets.QLineEdit(self.default_host)
self.ed_port = QtWidgets.QSpinBox()
self.ed_port.setRange(1, 65535)
self.ed_port.setValue(self.default_port)
self.ed_topic = QtWidgets.QLineEdit(self.default_topic)
self.cb_qos = QtWidgets.QComboBox()
self.cb_qos.addItems(["0", "1", "2"])
self.cb_qos.setCurrentIndex(self.default_qos)
self.chk_retain = QtWidgets.QCheckBox("Retain")
self.chk_retain.setChecked(self.default_retain)
grid.addWidget(QtWidgets.QLabel("Broker Host"), 0, 0)
grid.addWidget(self.ed_host, 0, 1)
grid.addWidget(QtWidgets.QLabel("Port"), 0, 2)
grid.addWidget(self.ed_port, 0, 3)
grid.addWidget(QtWidgets.QLabel("Topic"), 1, 0)
grid.addWidget(self.ed_topic, 1, 1, 1, 3)
grid.addWidget(QtWidgets.QLabel("QoS"), 2, 0)
grid.addWidget(self.cb_qos, 2, 1)
grid.addWidget(self.chk_retain, 2, 2)
vbox.addWidget(conn_group)
# ---- Payload Panel ----
payload_group = QtWidgets.QGroupBox("Payloads")
pgrid = QtWidgets.QGridLayout(payload_group)
pgrid.setHorizontalSpacing(12)
pgrid.setVerticalSpacing(8)
# 第一筆
self.chk_forward = QtWidgets.QCheckBox("Move_Forward")
self.chk_forward.setChecked(self.default_move_forward)
self.ed_forward_vel = QtWidgets.QDoubleSpinBox()
self.ed_forward_vel.setRange(-1e6, 1e6)
self.ed_forward_vel.setDecimals(3)
self.ed_forward_vel.setValue(self.default_forward_vel)
self.ed_forward_vel.setSuffix(" (Velocity)")
# 第二筆
self.chk_updown = QtWidgets.QCheckBox("Move_UPDown")
self.chk_updown.setChecked(self.default_move_updown)
self.ed_updown_vel = QtWidgets.QDoubleSpinBox()
self.ed_updown_vel.setRange(-1e6, 1e6)
self.ed_updown_vel.setDecimals(3)
self.ed_updown_vel.setValue(self.default_updown_vel)
self.ed_updown_vel.setSuffix(" (Velocity)")
# 延遲
self.ed_gap = QtWidgets.QDoubleSpinBox()
self.ed_gap.setRange(0.0, 600.0)
self.ed_gap.setDecimals(2)
self.ed_gap.setValue(self.default_gap_seconds)
self.ed_gap.setSuffix(" s (delay between #1 and #2)")
pgrid.addWidget(QtWidgets.QLabel("Payload #1:"), 0, 0)
pgrid.addWidget(self.chk_forward, 0, 1)
pgrid.addWidget(self.ed_forward_vel, 0, 2)
pgrid.addWidget(QtWidgets.QLabel("Payload #2:"), 1, 0)
pgrid.addWidget(self.chk_updown, 1, 1)
pgrid.addWidget(self.ed_updown_vel, 1, 2)
pgrid.addWidget(QtWidgets.QLabel("Gap:"), 2, 0)
pgrid.addWidget(self.ed_gap, 2, 1, 1, 2)
vbox.addWidget(payload_group)
# ---- Actions ----
hbox = QtWidgets.QHBoxLayout()
self.btn_publish = QtWidgets.QPushButton("🚀 Publish Sequence")
self.btn_publish.clicked.connect(self.on_publish)
hbox.addWidget(self.btn_publish)
hbox.addStretch(1)
vbox.addLayout(hbox)
# ---- Logs ----
log_group = QtWidgets.QGroupBox("Logs")
lvbox = QtWidgets.QVBoxLayout(log_group)
self.txt_log = QtWidgets.QPlainTextEdit()
self.txt_log.setReadOnly(True)
self.txt_log.setMaximumBlockCount(2000)
lvbox.addWidget(self.txt_log)
vbox.addWidget(log_group, 1)
# 美化
self._apply_dark_fusion_palette()
def on_publish(self):
if self.worker and self.worker.isRunning():
self._log("⚠️ 正在發佈中,請稍候...")
return
try:
params = PublishParams(
host=self.ed_host.text().strip(),
port=int(self.ed_port.value()),
topic=self.ed_topic.text().strip(),
qos=int(self.cb_qos.currentText()),
retain=bool(self.chk_retain.isChecked()),
move_forward=bool(self.chk_forward.isChecked()),
forward_velocity=float(self.ed_forward_vel.value()),
move_updown=bool(self.chk_updown.isChecked()),
updown_velocity=float(self.ed_updown_vel.value()),
gap_seconds=float(self.ed_gap.value()),
)
except Exception as e:
self._log(f"❌ 參數錯誤:{e}")
return
if not params.host or not params.topic:
QtWidgets.QMessageBox.warning(self, "Warning", "Host 與 Topic 不可為空")
return
self.worker = PubWorker(params, self)
self.worker.sig_log.connect(self._log)
self.worker.sig_done.connect(self._on_done)
self._log("▶️ 開始發佈序列...")
self.btn_publish.setEnabled(False)
self.worker.start()
@QtCore.pyqtSlot(bool, str)
def _on_done(self, success: bool, message: str):
self._log(("" if success else "") + message)
self.btn_publish.setEnabled(True)
def _log(self, text: str):
self.txt_log.appendPlainText(text)
sb = self.txt_log.verticalScrollBar()
sb.setValue(sb.maximum())
def _apply_dark_fusion_palette(self):
app = QtWidgets.QApplication.instance()
app.setStyle("Fusion")
pal = QtGui.QPalette()
base = QtGui.QColor(30, 30, 30)
alt = QtGui.QColor(45, 45, 45)
text = QtGui.QColor(220, 220, 220)
btn = QtGui.QColor(53, 53, 53)
hl = QtGui.QColor(42, 130, 218)
pal.setColor(QtGui.QPalette.ColorRole.Window, base)
pal.setColor(QtGui.QPalette.ColorRole.WindowText, text)
pal.setColor(QtGui.QPalette.ColorRole.Base, QtGui.QColor(25, 25, 25))
pal.setColor(QtGui.QPalette.ColorRole.AlternateBase, alt)
pal.setColor(QtGui.QPalette.ColorRole.ToolTipBase, text)
pal.setColor(QtGui.QPalette.ColorRole.ToolTipText, text)
pal.setColor(QtGui.QPalette.ColorRole.Text, text)
pal.setColor(QtGui.QPalette.ColorRole.Button, btn)
pal.setColor(QtGui.QPalette.ColorRole.ButtonText, text)
pal.setColor(QtGui.QPalette.ColorRole.BrightText, QtGui.QColor(255, 0, 0))
pal.setColor(QtGui.QPalette.ColorRole.Link, hl)
pal.setColor(QtGui.QPalette.ColorRole.Highlight, hl)
pal.setColor(QtGui.QPalette.ColorRole.HighlightedText, QtGui.QColor(255, 255, 255))
app.setPalette(pal)
def main():
import sys
app = QtWidgets.QApplication(sys.argv)
# 高 DPIWindows 可忽略失敗)
try:
import ctypes
ctypes.windll.shcore.SetProcessDpiAwareness(1)
except Exception:
pass
w = MainWindow()
w.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()