From e39c4a298f4090c873a8df9267d41e08bbdfc354 Mon Sep 17 00:00:00 2001 From: tim33824 Date: Thu, 28 Aug 2025 16:51:18 +0800 Subject: [PATCH] 8/28git --- .gitignore | 2 + .vscode/c_cpp_properties.json | 20 +- .vscode/launch.json | 33 +--- .vscode/settings.json | 6 +- CANopen.py | 35 ++++ MQTT_project/MQTT_main_part.py | 19 ++ MQTT_project/MQTT_pub_Module.py | 38 ++++ MQTT_project/MQTT_sub_Module.py | 116 +++++++++++ MQTT_project/MQTT_sub_main.py | 16 ++ OPCUA_test.py | 45 +++++ canOPENpdo.py | 23 +++ canopen_plc_demo.py | 329 ++++++++++++++++++++++++++++++++ import_can.py | 19 ++ learnpy.py | 3 + mymodule.py | 5 + node_pub.cpp | 0 python_modbus.py | 3 + send_W_value.py | 14 ++ test.cpp | 25 +++ write_sdo_min.py | 31 +++ 20 files changed, 742 insertions(+), 40 deletions(-) create mode 100644 CANopen.py create mode 100644 MQTT_project/MQTT_main_part.py create mode 100644 MQTT_project/MQTT_pub_Module.py create mode 100644 MQTT_project/MQTT_sub_Module.py create mode 100644 MQTT_project/MQTT_sub_main.py create mode 100644 OPCUA_test.py create mode 100644 canOPENpdo.py create mode 100644 canopen_plc_demo.py create mode 100644 import_can.py create mode 100644 learnpy.py create mode 100644 mymodule.py create mode 100644 node_pub.cpp create mode 100644 send_W_value.py create mode 100644 test.cpp create mode 100644 write_sdo_min.py diff --git a/.gitignore b/.gitignore index bf4f51a..bcc04c1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ mp_env .python-version uv.lock +*.pyc +.vscode/tasks.json diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index cb9ae4f..910cbb7 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -1,16 +1,16 @@ { + "version": 4, "configurations": [ { - "name": "windows-gcc-x64", - "includePath": [ - "${workspaceFolder}/**", - "C:/msys64/mingw64/include" - ], - "compilerPath": "C:/msys64/mingw64/bin/gcc.exe", - "cStandard": "c11", + "name": "Win32", + "compilerPath": "C:/msys64/mingw64/bin/g++.exe", + "cStandard": "c17", "cppStandard": "c++17", - "intelliSenseMode": "windows-gcc-x64" + "intelliSenseMode": "windows-gcc-x64", + "includePath": [ + "${workspaceFolder}/**" + ], + "defines": [] } - ], - "version": 4 + ] } diff --git a/.vscode/launch.json b/.vscode/launch.json index f9c0a72..2b9f125 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -2,43 +2,18 @@ "version": "0.2.0", "configurations": [ { - "name": "C Launch", + "name": "Debug active .exe (gdb)", "type": "cppdbg", "request": "launch", "program": "${fileDirname}/${fileBasenameNoExtension}.exe", "args": [], "stopAtEntry": false, - "cwd": "${fileDirname}", + "cwd": "${workspaceFolder}", "environment": [], "externalConsole": true, "MIMode": "gdb", "miDebuggerPath": "C:/msys64/mingw64/bin/gdb.exe", - "setupCommands": [ - { - "description": "Enable pretty-printing for gdb", - "text": "-enable-pretty-printing", - "ignoreFailures": true - } - ] - }, - { - "name": "C/C++ Runner: Debug Session", - "type": "cppdbg", - "request": "launch", - "args": [], - "stopAtEntry": false, - "externalConsole": true, - "cwd": "c:/Users/huiting/Desktop/py_document", - "program": "c:/Users/huiting/Desktop/py_document/build/Debug/outDebug", - "MIMode": "gdb", - "miDebuggerPath": "gdb", - "setupCommands": [ - { - "description": "Enable pretty-printing for gdb", - "text": "-enable-pretty-printing", - "ignoreFailures": true - } - ] + "preLaunchTask": "g++ build active file" } ] -} \ No newline at end of file +} diff --git a/.vscode/settings.json b/.vscode/settings.json index bb879da..bb30a47 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -55,5 +55,9 @@ "C_Cpp_Runner.useLeakSanitizer": false, "C_Cpp_Runner.showCompilationTime": false, "C_Cpp_Runner.useLinkTimeOptimization": false, - "C_Cpp_Runner.msvcSecureNoWarnings": false + "C_Cpp_Runner.msvcSecureNoWarnings": false, + "files.associations": { + "chrono": "cpp", + "functional": "cpp" + } } \ No newline at end of file diff --git a/CANopen.py b/CANopen.py new file mode 100644 index 0000000..21e2a19 --- /dev/null +++ b/CANopen.py @@ -0,0 +1,35 @@ +import can +import time + +# === 修改成你的介面 === +# 如果你用 socketCAN-tcp (slcand) 綁到 can0: +# bus = can.interface.Bus(bustype="socketcan", channel="can0", bitrate=500000) + +# 如果是 PCAN (Peak): +# bus = can.interface.Bus(bustype="pcan", channel="PCAN_USBBUS1", bitrate=500000) + +# 如果是 Kvaser: +# bus = can.interface.Bus(bustype="kvaser", channel=0, bitrate=500000) + +# 這裡先假設你用 socketcan +bus = can.interface.Bus(bustype="socketcan", channel="can0", bitrate=500000) + +# === 設定參數 === +node_id = 1 +cob_id = 0x200 + node_id # PLC 的 RxPDO1 +value = 1234 # 要送的整數值 (INT16) + +# 轉成小端 (因為 PLC 是 little endian) +data_bytes = value.to_bytes(2, byteorder="little", signed=True) + +# 塞進 CAN 幀 +msg = can.Message(arbitration_id=cob_id, data=data_bytes, is_extended_id=False) + +# 傳送多次測試 +for i in range(10): + try: + bus.send(msg) + print(f"Sent {value} to PLC (COB-ID=0x{cob_id:X})") + except can.CanError: + print("Message NOT sent") + time.sleep(1) diff --git a/MQTT_project/MQTT_main_part.py b/MQTT_project/MQTT_main_part.py new file mode 100644 index 0000000..303bb1b --- /dev/null +++ b/MQTT_project/MQTT_main_part.py @@ -0,0 +1,19 @@ +# pip install paho-mqtt +import json +import time +from paho.mqtt import client as mqtt +from MQTT_pub_Module import pub_to_plc + + + +BROKER_HOST = "169.254.11.130" +BROKER_PORT = 1886 +TOPIC = "topic_plc_and_py_for_AXIS" + +# 建議測試用 QoS=1;retain=False +QOS = 1 +RETAIN = False + + +pub_to_plc(BROKER_HOST, BROKER_PORT, TOPIC, QOS, RETAIN) + diff --git a/MQTT_project/MQTT_pub_Module.py b/MQTT_project/MQTT_pub_Module.py new file mode 100644 index 0000000..2b37063 --- /dev/null +++ b/MQTT_project/MQTT_pub_Module.py @@ -0,0 +1,38 @@ +import time +import json +from paho.mqtt import client as mqtt + +def pub_to_plc(BROKER_HOST, BROKER_PORT, TOPIC, QOS, RETAIN): + client = mqtt.Client(client_id="py_pub_plc_test", protocol=mqtt.MQTTv311) + + # 若 Broker 有帳密、TLS 在這裡設定(目前不需要) + # client.username_pw_set("user", "pass") + # client.tls_set(ca_certs="ca.pem") # 若是 TLS + + client.connect(BROKER_HOST, BROKER_PORT, keepalive=60) + client.loop_start() + time.sleep(0.3) # 等連線建立 + + # ---- 第一組: ---- + payload_forward = { + "Move_Forward": True, # BOOL + "Move_Forward_Velocity": 1.2 # REAL (float) + } + msg1 = json.dumps(payload_forward, ensure_ascii=False) + r1 = client.publish(TOPIC, msg1, qos=QOS, retain=RETAIN) + r1.wait_for_publish() + + time.sleep(3)#第二顆馬達等等再開啟 + + # ---- 第二組: ---- + payload_updown = { + "Move_UPDown": True, # BOOL + "Move_UPDown_Velocity": 55.1 # REAL (float) + } + msg2 = json.dumps(payload_updown, ensure_ascii=False) + r2 = client.publish(TOPIC, msg2, qos=QOS, retain=RETAIN) + r2.wait_for_publish() + + time.sleep(0.3) + client.loop_stop() + client.disconnect() diff --git a/MQTT_project/MQTT_sub_Module.py b/MQTT_project/MQTT_sub_Module.py new file mode 100644 index 0000000..61a68de --- /dev/null +++ b/MQTT_project/MQTT_sub_Module.py @@ -0,0 +1,116 @@ +# pip install paho-mqtt +import json +import re +import time +from paho.mqtt import client as mqtt + +# ========= 使用者設定 ========= +BROKER_HOST = "169.254.11.130" +BROKER_PORT = 1886 +TOPIC = "topic_plc_and_py_for_AXIS" +QOS = 1 # 建議測試用 1 +# ============================ + +# 全域狀態(方便觀察最新值) +state = { + "BUTTON_Y": None, + "Forward_RPM": None, + "UPDown_RPM": None, +} + +def _robust_json_parse(raw: str): + """ + 將 PLC 端常見的非標準 JSON 嘗試修正後解析為 dict。 + 例:'{Forward_RPM:1500,UPDown_RPM:800}' -> {'Forward_RPM':1500,'UPDown_RPM':800} + 也會把 TRUE/FALSE -> true/false + """ + s = raw.strip() + + # 去掉最外層引號(若有) + if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')): + s = s[1:-1].strip() + + # PLC 常見布林大寫 -> JSON 規範小寫 + s = s.replace("TRUE", "true").replace("FALSE", "false") + + # 自動補 key 的雙引號:{key: -> {"key": + s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s) + + # 轉單引號為雙引號(若 payload 用了單引號包字串) + # 放在補 key 後,避免 key 的引號再被替換 + s = s.replace("'", '"') + + return json.loads(s) + +def on_connect(client: mqtt.Client, userdata, flags, reason_code, properties=None): + """ + 連上 Broker 後自動訂閱 Topic。 + paho-mqtt 2.x 的 on_connect 第四個參數是 reason_code(非 rc),名稱略有不同。 + """ + if reason_code == 0: + print(f"✅ 已連線到 MQTT Broker {BROKER_HOST}:{BROKER_PORT}") + client.subscribe(TOPIC, qos=QOS) + print(f"📡 已訂閱 Topic: {TOPIC}(QoS={QOS})") + else: + print(f"❌ 連線失敗,reason_code={reason_code}") + +def on_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage): + payload_raw = msg.payload.decode("utf-8", errors="replace") + print(f"\n📥 收到原始訊息:{payload_raw}") + + # 先試標準 JSON,失敗再用容錯解析 + try: + data = json.loads(payload_raw) + except json.JSONDecodeError: + try: + data = _robust_json_parse(payload_raw) + except Exception as e: + print(f"❌ 無法解析為 JSON:{payload_raw} ;錯誤:{e}") + return + + # 只更新我們關心的欄位(有傳才改) + for key in ("BUTTON_Y", "Forward_RPM", "UPDown_RPM"): + if key in data: + state[key] = data[key] + + print( + f"✅ 解析後資料:{data}\n" + f"📊 目前狀態:BUTTON_Y={state['BUTTON_Y']}, " + f"Forward_RPM={state['Forward_RPM']}, UPDown_RPM={state['UPDown_RPM']}" + ) + +def on_disconnect(client: mqtt.Client, userdata, reason_code, properties=None): + print(f"🔌 已斷線,reason_code={reason_code}") + +def sub_to_plc(broker_host: str, broker_port: int, topic: str): + client = mqtt.Client(client_id="py_sub_plc_test", protocol=mqtt.MQTTv311) + + # 如需帳密或 TLS,在這裡設定 + # client.username_pw_set("user", "pass") + # client.tls_set(ca_certs="ca.pem") + + # 自動重連延遲(paho-mqtt 2.x) + client.reconnect_delay_set(min_delay=1, max_delay=10) + + client.on_connect = on_connect + client.on_message = on_message + client.on_disconnect = on_disconnect + + # 先連線,再進入 loop + client.connect(broker_host, broker_port, keepalive=60) + + try: + # loop_forever 會阻塞執行緒,內建自動重連 + client.loop_forever() + except KeyboardInterrupt: + print("\n🛑 收到中斷訊號,準備斷線...") + finally: + try: + client.disconnect() + except Exception: + pass + +if __name__ == "__main__": + print("🚀 啟動 MQTT 訂閱器...") + print(f"Broker={BROKER_HOST}:{BROKER_PORT}, Topic={TOPIC}, QoS={QOS}") + sub_to_plc(BROKER_HOST, BROKER_PORT, TOPIC) diff --git a/MQTT_project/MQTT_sub_main.py b/MQTT_project/MQTT_sub_main.py new file mode 100644 index 0000000..c98a7ee --- /dev/null +++ b/MQTT_project/MQTT_sub_main.py @@ -0,0 +1,16 @@ +# pip install paho-mqtt +import json +import time +from paho.mqtt import client as mqtt + +from MQTT_sub_Module import sub_to_plc + + +BROKER_HOST = "169.254.11.130" +BROKER_PORT = 1886 +TOPIC = "topic_plc_and_py_for_AXIS" + +# 建議測試用 QoS=1;retain=False +QOS = 1 +RETAIN = False +sub_to_plc(BROKER_HOST, BROKER_PORT, TOPIC) \ No newline at end of file diff --git a/OPCUA_test.py b/OPCUA_test.py new file mode 100644 index 0000000..a7f0062 --- /dev/null +++ b/OPCUA_test.py @@ -0,0 +1,45 @@ +from opcua import ua, Server +import time +import random + +if __name__ == "__main__": + server = Server() + # 設定端點(本機:4840) + server.set_endpoint("opc.tcp://0.0.0.0:4840") + server.set_server_name("MyUA Server") + + # 新增命名空間(回傳 nsIndex) + idx = server.register_namespace("urn:mycompany:demo") + + # 建立地址空間:一個 Object,底下放一個 Variable + objects = server.get_objects_node() + myobj = objects.add_object(idx, "DemoObject") + + #空間建好,可以設變數了 + temp_var = myobj.add_variable(idx, "Temperature", 25.0) # 初值 25.0 + A = myobj.add_variable(idx, "A_value", 1) + + #變數設好,打開Client寫入功能 + temp_var.set_writable() # 允許 Client 寫入 + A.set_writable() # 允許 Client 寫入 + + # 啟動 Server + server.start() + print("Server started at opc.tcp://localhost:4840") + + x=0 + try: + while True: + # 模擬溫度改變 + new_value = 20.0 + random.random() * 10.0 + temp_var.set_value(ua.Variant(new_value, ua.VariantType.Float)) + x=x+1 + A.set_value(ua.Variant(x, ua.VariantType.Int64)) + print(A.get_value()) + print(temp_var.get_value()) + time.sleep(1) + except KeyboardInterrupt: + print("Server stopped by user") + finally: + server.stop() + print("Resources released, server closed") diff --git a/canOPENpdo.py b/canOPENpdo.py new file mode 100644 index 0000000..825d0a4 --- /dev/null +++ b/canOPENpdo.py @@ -0,0 +1,23 @@ + +import can +import os +os.add_dll_directory(r"C:\Users\huiting\PCAN-Basic\x64") # 這裡放 64bit DLL 的資料夾 + +# 把 DLL 所在資料夾加進搜尋路徑 +DLL_DIR = r"C:\Users\huiting\PCAN-Basic\x64" # 如果 Python 是 64bit 請換成 \x64 +if os.path.isdir(DLL_DIR): + os.add_dll_directory(DLL_DIR) + +bus = can.interface.Bus(interface="pcan", channel="PCAN_USBBUS1", bitrate=500000) + +NODE_ID = 1 +TPDO1_ID = 0x180 + NODE_ID # PLC 送出的 TPDO1 + +print(f"Listening TPDO1 at 0x{TPDO1_ID:X} …") +while True: + msg = bus.recv(timeout=2.0) + if not msg: + continue + if msg.arbitration_id == TPDO1_ID and len(msg.data) >= 2: + val = int.from_bytes(msg.data[0:2], "little", signed=True) + print(f"PLC TPDO1 -> 0x2000:00 = {val}") diff --git a/canopen_plc_demo.py b/canopen_plc_demo.py new file mode 100644 index 0000000..7648929 --- /dev/null +++ b/canopen_plc_demo.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +CANopen ↔ PLC demo (python-can + canopen) +---------------------------------------- +Fill in the CONFIG section to match your PLC / device. +Tested with python-can >= 4 and canopen >= 2. + +Common use cases: +- Python as CANopen "master" talking to a PLC/device node (slave). +- Monitor/drive DS402-like nodes (Controlword/Statusword and Position/Velocity). + +Author: ChatGPT +""" + +import time +import threading +from typing import Optional, Dict, Any + +import can # python-can +import canopen # python-canopen + + +# ===================== +# CONFIG (EDIT ME) +# ===================== +CONFIG: Dict[str, Any] = dict( + # Choose a backend matching your CAN interface: + # Linux SocketCAN example: backend="socketcan", channel="can0" + # PEAK PCAN example: backend="pcan", channel="PCAN_USBBUS1" + # Kvaser example: backend="kvaser", channel="0" + backend="socketcan", + channel="can0", + bitrate=500000, # 125000 / 250000 / 500000 / 1000000 + + # Target node (PLC or device) information + node_id=2, # PLC/device CANopen Node-ID (1..127) + eds_path="device.eds", # Path to the device EDS/DCF file (put the file next to this script) + + # Heartbeat / SYNC (optional) + heartbeat_consumer_time_ms=0, # Set >0 to expect heartbeat from node + heartbeat_producer_time_ms=0, # Requires device support; 0 = don't change + use_sync=True, # Use SYNC object for synchronous PDOs + sync_period_ms=10, # SYNC period if acting as producer + + # PDO mapping (DS402-like defaults) — adjust to your PLC/device map + # RxPDO: what Python sends TO the device (e.g., Controlword, Target position) + #rxpdo_map=( + # {"index": 0x6040, "subindex": 0x00, "size": 16}, # Controlword (UINT16) + # {"index": 0x607A, "subindex": 0x00, "size": 32}, # Target position (INT32) + #), + TxPDO: what Python receives FROM the device (e.g., Statusword, Actual position) + txpdo_map=( + {"index": 0x6041, "subindex": 0x00, "size": 16}, # Statusword (UINT16) + {"index": 0x6064, "subindex": 0x00, "size": 32}, # Position actual value (INT32) + ), + # Transmission types: + # For SYNC: usually 1..240 (1=sub-every SYNC). For async/evented: 255. + rxpdo_trans_type=1, + txpdo_trans_type=1, + + # Demo motion (for DS402 profile position mode) + do_enable_sequence=True, + target_positions=[0, 10000, -5000, 0], + dwell_ms=500, # wait time after each command +) + + +# ============= +# Helpers +# ============= +def make_bus(cfg: Dict[str, Any]) -> can.Bus: + """Create and return a python-can Bus.""" + backend = cfg["backend"].lower() + if backend == "socketcan": + bus = can.interface.Bus(bustype="socketcan", channel=cfg["channel"], bitrate=cfg["bitrate"]) + elif backend == "pcan": + bus = can.interface.Bus(bustype="pcan", channel=cfg["channel"], bitrate=cfg["bitrate"]) + elif backend == "kvaser": + bus = can.interface.Bus(bustype="kvaser", channel=cfg["channel"], bitrate=cfg["bitrate"]) + else: + raise ValueError(f"Unsupported backend: {backend}") + return bus + + +def setup_network(cfg: Dict[str, Any]) -> canopen.Network: + """Attach canopen.Network to an existing python-can Bus.""" + bus = make_bus(cfg) + net = canopen.Network() + # Attach python-can bus + net.connect(bustype="python-can", channel=bus) + return net + + +def add_node(net: canopen.Network, cfg: Dict[str, Any]) -> canopen.RemoteNode: + """Add a remote node using an EDS file.""" + node = canopen.RemoteNode(cfg["node_id"], cfg["eds_path"]) + net.add_node(node) + return node + + +def set_heartbeat(node: canopen.RemoteNode, cfg: Dict[str, Any]) -> None: + """Optionally configure heartbeat via SDO (if supported by device).""" + try: + if cfg.get("heartbeat_producer_time_ms", 0) > 0: + node.sdo[0x1017].raw = int(cfg["heartbeat_producer_time_ms"]) + # Heartbeat consumer is typically configured on the *master* side; for canopen lib, + # you can add a consumer to monitor a node. Here we use basic timeout checks in the loop. + except Exception as e: + print(f"[WARN] Heartbeat config skipped: {e}") + + +def clear_pdos(node: canopen.RemoteNode) -> None: + """Disable and clear all existing PDO mappings before re-mapping.""" + # Disable all TxPDOs and RxPDOs + for pdo in node.tpdo.values(): + pdo.enabled = False + pdo.clear() + pdo.save() + for pdo in node.rpdo.values(): + pdo.enabled = False + pdo.clear() + pdo.save() + + +def map_pdos(node: canopen.RemoteNode, cfg: Dict[str, Any]) -> None: + """Map RxPDO/TxPDO according to CONFIG and enable them.""" + # --- Map RxPDOs (what WE send TO device) --- + rpdo1 = node.rpdo[1] + rpdo1.clear() + for m in cfg["rxpdo_map"]: + rpdo1.add_variable(m["index"], m["subindex"], length=m["size"]) + rpdo1.trans_type = cfg["rxpdo_trans_type"] + rpdo1.enabled = True + rpdo1.save() + + # --- Map TxPDOs (what device sends TO us) --- + tpdo1 = node.tpdo[1] + tpdo1.clear() + for m in cfg["txpdo_map"]: + tpdo1.add_variable(m["index"], m["subindex"], length=m["size"]) + tpdo1.trans_type = cfg["txpdo_trans_type"] + tpdo1.enabled = True + tpdo1.save() + + # Apply configuration on device + node.nmt.state = 'PRE-OPERATIONAL' + node.nmt.state = 'OPERATIONAL' + + +def start_sync_producer(net: canopen.Network, period_ms: int) -> threading.Thread: + """Start a SYNC producer thread (if this controller should emit SYNC).""" + stop_flag = {"stop": False} + + def _loop(): + sync = canopen.sync.SyncProducer(net) + interval = max(0.001, period_ms / 1000.0) + while not stop_flag["stop"]: + sync.transmit() + time.sleep(interval) + + t = threading.Thread(target=_loop, daemon=True) + t.stop_flag = stop_flag # type: ignore[attr-defined] + t.start() + return t + + +# ============= +# DS402 helpers +# ============= +CW_SHUTDOWN = 0x0006 +CW_SWITCH_ON = 0x0007 +CW_ENABLE_OPERATION = 0x000F +CW_DISABLE_VOLTAGE = 0x0000 +CW_QUICK_STOP = 0x0002 +CW_FAULT_RESET = 0x0080 + + +def drive_enable_sequence(node: canopen.RemoteNode) -> None: + """Minimal DS402 enable sequence using SDO Controlword/Statusword.""" + cw = node.sdo[0x6040] + sw = node.sdo[0x6041] + + def _write(value: int): + cw.raw = value + time.sleep(0.05) + + # Fault reset if needed + if sw.raw & 0x0008: # Fault bit + _write(CW_FAULT_RESET) + time.sleep(0.1) + + _write(CW_SHUTDOWN) + _write(CW_SWITCH_ON) + _write(CW_ENABLE_OPERATION) + + +def set_mode_of_operation(node: canopen.RemoteNode, mode: int) -> None: + """Set DS402 mode of operation (e.g., 1=Profile Position, 3=Profile Velocity, 6=Homing, 8=CSP...).""" + node.sdo[0x6060].raw = int(mode) + # Optional: read back + _ = node.sdo[0x6061].raw + + +# ============= +# Main demo +# ============= +def main() -> None: + cfg = CONFIG + print("Connecting CAN...") + net = setup_network(cfg) + print(f"Connected via {cfg['backend']}:{cfg['channel']} @ {cfg['bitrate']}") + + # Add node + node = add_node(net, cfg) + + # NMT reset + pre-op + print("Resetting node...") + node.nmt.reset_node() + time.sleep(0.5) + node.nmt.state = 'PRE-OPERATIONAL' + time.sleep(0.2) + + # Optional: heartbeat + set_heartbeat(node, cfg) + + # PDO mapping + print("Mapping PDOs...") + clear_pdos(node) + map_pdos(node, cfg) + + # Optional: start SYNC producer if we are the controller + sync_thread: Optional[threading.Thread] = None + if cfg.get("use_sync", False): + print(f"Starting SYNC producer @ {cfg['sync_period_ms']} ms") + sync_thread = start_sync_producer(net, cfg["sync_period_ms"]) + + # Go operational + print("Switching to OPERATIONAL") + node.nmt.state = 'OPERATIONAL' + time.sleep(0.1) + + # Enable sequence (DS402) + if cfg.get("do_enable_sequence", True): + try: + print("Setting mode of operation = 1 (Profile Position)") + set_mode_of_operation(node, 1) + except Exception as e: + print(f"[WARN] Skipping mode set: {e}") + print("Running enable sequence...") + try: + drive_enable_sequence(node) + except Exception as e: + print(f"[WARN] Enable sequence failed/skipped: {e}") + + # Shortcut handles to mapped PDO variables + # RxPDO1 variables we control + try: + rpdo1 = node.rpdo[1] + cw_var = rpdo1['Controlword'] if 'Controlword' in rpdo1.map else rpdo1[0] + tgt_pos_var = rpdo1['Target position'] if 'Target position' in rpdo1.map else rpdo1[1] + except Exception: + rpdo1 = None + cw_var = None + tgt_pos_var = None + + # TxPDO1 variables we read + try: + tpdo1 = node.tpdo[1] + sw_var = tpdo1['Statusword'] if 'Statusword' in tpdo1.map else tpdo1[0] + act_pos_var = tpdo1['Position actual value'] if 'Position actual value' in tpdo1.map else tpdo1[1] + except Exception: + tpdo1 = None + sw_var = None + act_pos_var = None + + # Demo loop: write a few target positions and read back status/position + try: + for idx, target in enumerate(cfg.get("target_positions", []), 1): + print(f"\n[{idx}] Command target position = {target}") + if rpdo1 is not None and tgt_pos_var is not None: + tgt_pos_var.raw = int(target) + # Ensure CW has Enable Operation bit set when using RxPDO + if cw_var is not None: + cw_var.raw = CW_ENABLE_OPERATION + rpdo1.transmit() + else: + # Fallback via SDO + print(" (No RxPDO mapped; writing via SDO 0x607A)") + node.sdo[0x607A].raw = int(target) + node.sdo[0x6040].raw = CW_ENABLE_OPERATION + + # Read back + t0 = time.time() + while time.time() - t0 < (cfg.get("dwell_ms", 500) / 1000.0): + if tpdo1 is not None: + # Request update on SYNC; if evented/cyclic, values will refresh automatically + time.sleep(0.01) + try: + sw = sw_var.raw if sw_var is not None else None + pos = act_pos_var.raw if act_pos_var is not None else None + print(f" TPDO: Status=0x{(sw or 0):04X}, Position={pos}") + except Exception: + pass + else: + # Read via SDO occasionally + try: + sw = node.sdo[0x6041].raw + pos = node.sdo[0x6064].raw + print(f" SDO: Status=0x{sw:04X}, Position={pos}") + except Exception: + pass + time.sleep(0.05) + + print("\nDemo completed.") + finally: + # Stop SYNC + if sync_thread is not None: + sync_thread.stop_flag["stop"] = True # type: ignore[attr-defined] + time.sleep(0.05) + # Disconnect + try: + net.disconnect() + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/import_can.py b/import_can.py new file mode 100644 index 0000000..72e5d86 --- /dev/null +++ b/import_can.py @@ -0,0 +1,19 @@ +import can +import time + +bus = can.interface.Bus(bustype="socketcan", channel="can0", bitrate=500000) + +node_id = 1 +cob_id = 0x200 + node_id # 0x201 +value = 1234 + +data_bytes = value.to_bytes(2, byteorder="little", signed=True) +msg = can.Message(arbitration_id=cob_id, data=data_bytes, is_extended_id=False) + +for i in range(10): + try: + bus.send(msg) + print(f"Sent {value} to PLC (COB-ID=0x{cob_id:X})") + except can.CanError: + print("Message NOT sent") + time.sleep(1) diff --git a/learnpy.py b/learnpy.py new file mode 100644 index 0000000..7b2325e --- /dev/null +++ b/learnpy.py @@ -0,0 +1,3 @@ +from mymodule import 範圍 + +print(範圍(60)) \ No newline at end of file diff --git a/mymodule.py b/mymodule.py new file mode 100644 index 0000000..215f7d4 --- /dev/null +++ b/mymodule.py @@ -0,0 +1,5 @@ +def 範圍 (A): + if 10>A: + return "太小" + else: + return "正常" \ No newline at end of file diff --git a/node_pub.cpp b/node_pub.cpp new file mode 100644 index 0000000..e69de29 diff --git a/python_modbus.py b/python_modbus.py index 5a0f391..82fc00a 100644 --- a/python_modbus.py +++ b/python_modbus.py @@ -1,4 +1,7 @@ from pymodbus.client.sync import ModbusTcpClient +import os +os.add_dll_directory(r"C:\Users\huiting\Desktop\PCAN-Basic\x64") # 換成 DLL 的實際路徑 + import time # 設定 PLC IP 和 Port diff --git a/send_W_value.py b/send_W_value.py new file mode 100644 index 0000000..47c5d1f --- /dev/null +++ b/send_W_value.py @@ -0,0 +1,14 @@ +import socket + +PLC_IP = "169.254.11.8" # 換成你的 PLC IP +PLC_PORT = 5000 + +message = "W=99;" + +try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((PLC_IP, PLC_PORT)) + s.sendall(message.encode()) + print(f"[Python] 已傳送: {message}") +except Exception as e: + print(f"[Python] 傳送失敗: {e}") diff --git a/test.cpp b/test.cpp new file mode 100644 index 0000000..e06bad2 --- /dev/null +++ b/test.cpp @@ -0,0 +1,25 @@ +// rclcpp 是 ROS2 的 C++ 客戶端函式庫(ROS Client Library for C++) +// 提供 Node、Publisher/Subscriber、Timer、Logging 等 API。 +#include "rclcpp/rclcpp.hpp" +// std_msgs/msg/string.hpp 是 ROS2 內建的訊息型別套件 std_msgs 裡的 String 訊息定義。 +// 這個訊息就只有一個欄位:std::string data; +#include "std_msgs/msg/string.hpp" +// 讓我們能用 C++ 時間單位(例如 1s),搭配計時器使用。 +#include +// 匯入 chrono 的字面常數,例如 1s, 100ms。 +// 沒有這行的話,程式裡寫 1s 會編譯失敗。 +using namespace std::chrono_literals; + +// 定義一個節點類別,繼承 rclcpp::Node。 +// 一個「節點(Node)」就是一個 ROS2 程式的執行單元,可以包含 Publisher/Subscriber/Service/Action 等。 +class MyfristRos2_pub : public rclcpp::Node{ + public: + MyfristRos2_pub():Node("node_pub"),count_(0){ + publisher_=this->create_publisher("topic_for_vulue",10); + timer_ = this->create_wall_timer( + 1s, std::bind(&MyfristRos2_pub::publish_message, this)); + } + private: + + +} \ No newline at end of file diff --git a/write_sdo_min.py b/write_sdo_min.py new file mode 100644 index 0000000..9d9eec9 --- /dev/null +++ b/write_sdo_min.py @@ -0,0 +1,31 @@ +# file: write_sdo_min.py +# pip install python-can canopen +import time +import can +import canopen + +# === 請依你的硬體改這裡 === +IFACE = "pcan" # pcan | kvaser | ixxat | slcan +CHANNEL = "PCAN_USBBUS1" # kvaser/ixxat 用 0;slcan 用 "COM5" +BITRATE = 500000 +NODE_ID = 1 # 你的 PLC Node-ID +EDS = "PLC.eds" # 你的 EDS/DCF(放同資料夾) +INDEX = 0x2000 # 你在 PLC 端對外開放的 OD 物件 +SUB = 0x00 +VALUE = 1234 # 要寫入的整數(型別需與 EDS 一致) + +# === 建立 bus & network === +bus = can.interface.Bus(bustype=IFACE, channel=CHANNEL, bitrate=BITRATE) +net = canopen.Network() +net.connect(bustype="python-can", channel=bus) + +node = canopen.RemoteNode(NODE_ID, EDS) +net.add_node(node) + +# 建議在 PRE-OP 寫參數 +node.nmt.state = "PRE-OPERATIONAL" +node.sdo[INDEX][SUB].raw = VALUE +time.sleep(0.05) +print(f"SDO write OK -> {INDEX:04X}:{SUB:02X} = {VALUE}") + +net.disconnect()