337 lines
11 KiB
Python
337 lines
11 KiB
Python
# mqtt_pub_gui_qt.py
|
||
# pip install PyQt6 paho-mqtt
|
||
import time
|
||
import json
|
||
from dataclasses import dataclass
|
||
from typing import Optional
|
||
|
||
from PyQt6 import QtCore, QtGui, QtWidgets
|
||
from paho.mqtt import client as mqtt
|
||
|
||
|
||
# ----------------- 原函式(稍作參數擴充,保留你的流程與註解) -----------------
|
||
def pub_to_plc(
|
||
BROKER_HOST: str,
|
||
BROKER_PORT: int,
|
||
TOPIC: str,
|
||
QOS: int,
|
||
RETAIN: bool,
|
||
move_forward: bool = True,
|
||
forward_velocity: float = 1.2,
|
||
move_updown: bool = True,
|
||
updown_velocity: float = 55.1,
|
||
gap_seconds: float = 3.0,
|
||
log_cb=None
|
||
):
|
||
"""
|
||
依序發佈兩筆訊息,中間等待 gap_seconds 秒。
|
||
加上 log_cb(str) 讓 GUI 可收到狀態訊息。
|
||
"""
|
||
def _log(msg: str):
|
||
if callable(log_cb):
|
||
log_cb(msg)
|
||
|
||
client = mqtt.Client(client_id="py_pub_plc_test", protocol=mqtt.MQTTv311)
|
||
|
||
# 若 Broker 有帳密、TLS 在這裡設定(目前不需要)
|
||
# client.username_pw_set("user", "pass")
|
||
# client.tls_set(ca_certs="ca.pem") # 若是 TLS
|
||
|
||
_log(f"🔌 Connecting to {BROKER_HOST}:{BROKER_PORT} ...")
|
||
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
|
||
client.loop_start()
|
||
time.sleep(0.3) # 等連線建立
|
||
_log("✅ Connected")
|
||
|
||
# ---- 第一組: ----
|
||
payload_forward = {
|
||
"Move_Forward": bool(move_forward), # BOOL
|
||
"Move_Forward_Velocity": float(forward_velocity) # REAL (float)
|
||
}
|
||
msg1 = json.dumps(payload_forward, ensure_ascii=False)
|
||
_log(f"📤 Publish #1 to {TOPIC} (QoS={QOS}, retain={RETAIN}): {msg1}")
|
||
r1 = client.publish(TOPIC, msg1, qos=QOS, retain=RETAIN)
|
||
r1.wait_for_publish()
|
||
_log("✅ Published #1")
|
||
|
||
# 中間延遲
|
||
if gap_seconds and gap_seconds > 0:
|
||
_log(f"⏳ Waiting {gap_seconds} s before #2 ...")
|
||
time.sleep(gap_seconds)
|
||
|
||
# ---- 第二組: ----
|
||
payload_updown = {
|
||
"Move_UPDown": bool(move_updown), # BOOL
|
||
"Move_UPDown_Velocity": float(updown_velocity) # REAL (float)
|
||
}
|
||
msg2 = json.dumps(payload_updown, ensure_ascii=False)
|
||
_log(f"📤 Publish #2 to {TOPIC} (QoS={QOS}, retain={RETAIN}): {msg2}")
|
||
r2 = client.publish(TOPIC, msg2, qos=QOS, retain=RETAIN)
|
||
r2.wait_for_publish()
|
||
_log("✅ Published #2")
|
||
|
||
time.sleep(0.3)
|
||
client.loop_stop()
|
||
client.disconnect()
|
||
_log("🔌 Disconnected")
|
||
|
||
|
||
# ----------------- GUI 部分 -----------------
|
||
@dataclass
|
||
class PublishParams:
|
||
host: str
|
||
port: int
|
||
topic: str
|
||
qos: int
|
||
retain: bool
|
||
move_forward: bool
|
||
forward_velocity: float
|
||
move_updown: bool
|
||
updown_velocity: float
|
||
gap_seconds: float
|
||
|
||
|
||
class PubWorker(QtCore.QThread):
|
||
sig_log = QtCore.pyqtSignal(str)
|
||
sig_done = QtCore.pyqtSignal(bool, str) # success, message
|
||
|
||
def __init__(self, params: PublishParams, parent=None):
|
||
super().__init__(parent)
|
||
self.params = params
|
||
|
||
def _log(self, text: str):
|
||
self.sig_log.emit(text)
|
||
|
||
def run(self):
|
||
try:
|
||
pub_to_plc(
|
||
self.params.host,
|
||
self.params.port,
|
||
self.params.topic,
|
||
self.params.qos,
|
||
self.params.retain,
|
||
self.params.move_forward,
|
||
self.params.forward_velocity,
|
||
self.params.move_updown,
|
||
self.params.updown_velocity,
|
||
self.params.gap_seconds,
|
||
log_cb=self._log
|
||
)
|
||
self.sig_done.emit(True, "Publish sequence completed.")
|
||
except Exception as e:
|
||
self.sig_done.emit(False, f"Error: {e}")
|
||
|
||
|
||
class MainWindow(QtWidgets.QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("MQTT PLC Publisher (PyQt6)")
|
||
self.resize(920, 620)
|
||
|
||
# 預設值
|
||
self.default_host = "169.254.11.130"
|
||
self.default_port = 1886
|
||
self.default_topic = "topic_plc_and_py_for_AXIS"
|
||
self.default_qos = 1
|
||
self.default_retain = False
|
||
|
||
self.default_move_forward = True
|
||
self.default_forward_vel = 1.2
|
||
self.default_move_updown = True
|
||
self.default_updown_vel = 55.1
|
||
self.default_gap_seconds = 3.0
|
||
|
||
self.worker: Optional[PubWorker] = None
|
||
|
||
central = QtWidgets.QWidget()
|
||
self.setCentralWidget(central)
|
||
vbox = QtWidgets.QVBoxLayout(central)
|
||
vbox.setContentsMargins(12, 12, 12, 12)
|
||
vbox.setSpacing(10)
|
||
|
||
# ---- Connection Panel ----
|
||
conn_group = QtWidgets.QGroupBox("Connection")
|
||
grid = QtWidgets.QGridLayout(conn_group)
|
||
grid.setHorizontalSpacing(12)
|
||
grid.setVerticalSpacing(8)
|
||
|
||
self.ed_host = QtWidgets.QLineEdit(self.default_host)
|
||
self.ed_port = QtWidgets.QSpinBox()
|
||
self.ed_port.setRange(1, 65535)
|
||
self.ed_port.setValue(self.default_port)
|
||
|
||
self.ed_topic = QtWidgets.QLineEdit(self.default_topic)
|
||
|
||
self.cb_qos = QtWidgets.QComboBox()
|
||
self.cb_qos.addItems(["0", "1", "2"])
|
||
self.cb_qos.setCurrentIndex(self.default_qos)
|
||
|
||
self.chk_retain = QtWidgets.QCheckBox("Retain")
|
||
self.chk_retain.setChecked(self.default_retain)
|
||
|
||
grid.addWidget(QtWidgets.QLabel("Broker Host"), 0, 0)
|
||
grid.addWidget(self.ed_host, 0, 1)
|
||
grid.addWidget(QtWidgets.QLabel("Port"), 0, 2)
|
||
grid.addWidget(self.ed_port, 0, 3)
|
||
|
||
grid.addWidget(QtWidgets.QLabel("Topic"), 1, 0)
|
||
grid.addWidget(self.ed_topic, 1, 1, 1, 3)
|
||
|
||
grid.addWidget(QtWidgets.QLabel("QoS"), 2, 0)
|
||
grid.addWidget(self.cb_qos, 2, 1)
|
||
grid.addWidget(self.chk_retain, 2, 2)
|
||
|
||
vbox.addWidget(conn_group)
|
||
|
||
# ---- Payload Panel ----
|
||
payload_group = QtWidgets.QGroupBox("Payloads")
|
||
pgrid = QtWidgets.QGridLayout(payload_group)
|
||
pgrid.setHorizontalSpacing(12)
|
||
pgrid.setVerticalSpacing(8)
|
||
|
||
# 第一筆
|
||
self.chk_forward = QtWidgets.QCheckBox("Move_Forward")
|
||
self.chk_forward.setChecked(self.default_move_forward)
|
||
self.ed_forward_vel = QtWidgets.QDoubleSpinBox()
|
||
self.ed_forward_vel.setRange(-1e6, 1e6)
|
||
self.ed_forward_vel.setDecimals(3)
|
||
self.ed_forward_vel.setValue(self.default_forward_vel)
|
||
self.ed_forward_vel.setSuffix(" (Velocity)")
|
||
|
||
# 第二筆
|
||
self.chk_updown = QtWidgets.QCheckBox("Move_UPDown")
|
||
self.chk_updown.setChecked(self.default_move_updown)
|
||
self.ed_updown_vel = QtWidgets.QDoubleSpinBox()
|
||
self.ed_updown_vel.setRange(-1e6, 1e6)
|
||
self.ed_updown_vel.setDecimals(3)
|
||
self.ed_updown_vel.setValue(self.default_updown_vel)
|
||
self.ed_updown_vel.setSuffix(" (Velocity)")
|
||
|
||
# 延遲
|
||
self.ed_gap = QtWidgets.QDoubleSpinBox()
|
||
self.ed_gap.setRange(0.0, 600.0)
|
||
self.ed_gap.setDecimals(2)
|
||
self.ed_gap.setValue(self.default_gap_seconds)
|
||
self.ed_gap.setSuffix(" s (delay between #1 and #2)")
|
||
|
||
pgrid.addWidget(QtWidgets.QLabel("Payload #1:"), 0, 0)
|
||
pgrid.addWidget(self.chk_forward, 0, 1)
|
||
pgrid.addWidget(self.ed_forward_vel, 0, 2)
|
||
|
||
pgrid.addWidget(QtWidgets.QLabel("Payload #2:"), 1, 0)
|
||
pgrid.addWidget(self.chk_updown, 1, 1)
|
||
pgrid.addWidget(self.ed_updown_vel, 1, 2)
|
||
|
||
pgrid.addWidget(QtWidgets.QLabel("Gap:"), 2, 0)
|
||
pgrid.addWidget(self.ed_gap, 2, 1, 1, 2)
|
||
|
||
vbox.addWidget(payload_group)
|
||
|
||
# ---- Actions ----
|
||
hbox = QtWidgets.QHBoxLayout()
|
||
self.btn_publish = QtWidgets.QPushButton("🚀 Publish Sequence")
|
||
self.btn_publish.clicked.connect(self.on_publish)
|
||
hbox.addWidget(self.btn_publish)
|
||
hbox.addStretch(1)
|
||
vbox.addLayout(hbox)
|
||
|
||
# ---- Logs ----
|
||
log_group = QtWidgets.QGroupBox("Logs")
|
||
lvbox = QtWidgets.QVBoxLayout(log_group)
|
||
self.txt_log = QtWidgets.QPlainTextEdit()
|
||
self.txt_log.setReadOnly(True)
|
||
self.txt_log.setMaximumBlockCount(2000)
|
||
lvbox.addWidget(self.txt_log)
|
||
vbox.addWidget(log_group, 1)
|
||
|
||
# 美化
|
||
self._apply_dark_fusion_palette()
|
||
|
||
def on_publish(self):
|
||
if self.worker and self.worker.isRunning():
|
||
self._log("⚠️ 正在發佈中,請稍候...")
|
||
return
|
||
|
||
try:
|
||
params = PublishParams(
|
||
host=self.ed_host.text().strip(),
|
||
port=int(self.ed_port.value()),
|
||
topic=self.ed_topic.text().strip(),
|
||
qos=int(self.cb_qos.currentText()),
|
||
retain=bool(self.chk_retain.isChecked()),
|
||
move_forward=bool(self.chk_forward.isChecked()),
|
||
forward_velocity=float(self.ed_forward_vel.value()),
|
||
move_updown=bool(self.chk_updown.isChecked()),
|
||
updown_velocity=float(self.ed_updown_vel.value()),
|
||
gap_seconds=float(self.ed_gap.value()),
|
||
)
|
||
except Exception as e:
|
||
self._log(f"❌ 參數錯誤:{e}")
|
||
return
|
||
|
||
if not params.host or not params.topic:
|
||
QtWidgets.QMessageBox.warning(self, "Warning", "Host 與 Topic 不可為空")
|
||
return
|
||
|
||
self.worker = PubWorker(params, self)
|
||
self.worker.sig_log.connect(self._log)
|
||
self.worker.sig_done.connect(self._on_done)
|
||
self._log("▶️ 開始發佈序列...")
|
||
self.btn_publish.setEnabled(False)
|
||
self.worker.start()
|
||
|
||
@QtCore.pyqtSlot(bool, str)
|
||
def _on_done(self, success: bool, message: str):
|
||
self._log(("✅ " if success else "❌ ") + message)
|
||
self.btn_publish.setEnabled(True)
|
||
|
||
def _log(self, text: str):
|
||
self.txt_log.appendPlainText(text)
|
||
sb = self.txt_log.verticalScrollBar()
|
||
sb.setValue(sb.maximum())
|
||
|
||
def _apply_dark_fusion_palette(self):
|
||
app = QtWidgets.QApplication.instance()
|
||
app.setStyle("Fusion")
|
||
pal = QtGui.QPalette()
|
||
|
||
base = QtGui.QColor(30, 30, 30)
|
||
alt = QtGui.QColor(45, 45, 45)
|
||
text = QtGui.QColor(220, 220, 220)
|
||
btn = QtGui.QColor(53, 53, 53)
|
||
hl = QtGui.QColor(42, 130, 218)
|
||
|
||
pal.setColor(QtGui.QPalette.ColorRole.Window, base)
|
||
pal.setColor(QtGui.QPalette.ColorRole.WindowText, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Base, QtGui.QColor(25, 25, 25))
|
||
pal.setColor(QtGui.QPalette.ColorRole.AlternateBase, alt)
|
||
pal.setColor(QtGui.QPalette.ColorRole.ToolTipBase, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.ToolTipText, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Text, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Button, btn)
|
||
pal.setColor(QtGui.QPalette.ColorRole.ButtonText, text)
|
||
pal.setColor(QtGui.QPalette.ColorRole.BrightText, QtGui.QColor(255, 0, 0))
|
||
pal.setColor(QtGui.QPalette.ColorRole.Link, hl)
|
||
pal.setColor(QtGui.QPalette.ColorRole.Highlight, hl)
|
||
pal.setColor(QtGui.QPalette.ColorRole.HighlightedText, QtGui.QColor(255, 255, 255))
|
||
|
||
app.setPalette(pal)
|
||
|
||
|
||
def main():
|
||
import sys
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
# 高 DPI(Windows 可忽略失敗)
|
||
try:
|
||
import ctypes
|
||
ctypes.windll.shcore.SetProcessDpiAwareness(1)
|
||
except Exception:
|
||
pass
|
||
w = MainWindow()
|
||
w.show()
|
||
sys.exit(app.exec())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|