This commit is contained in:
tim33824 2025-08-28 16:51:18 +08:00
parent 5fc732b01c
commit e39c4a298f
20 changed files with 742 additions and 40 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
mp_env
.python-version
uv.lock
*.pyc
.vscode/tasks.json

View File

@ -1,16 +1,16 @@
{
"version": 4,
"configurations": [
{
"name": "windows-gcc-x64",
"includePath": [
"${workspaceFolder}/**",
"C:/msys64/mingw64/include"
],
"compilerPath": "C:/msys64/mingw64/bin/gcc.exe",
"cStandard": "c11",
"name": "Win32",
"compilerPath": "C:/msys64/mingw64/bin/g++.exe",
"cStandard": "c17",
"cppStandard": "c++17",
"intelliSenseMode": "windows-gcc-x64"
"intelliSenseMode": "windows-gcc-x64",
"includePath": [
"${workspaceFolder}/**"
],
"defines": []
}
],
"version": 4
]
}

31
.vscode/launch.json vendored
View File

@ -2,43 +2,18 @@
"version": "0.2.0",
"configurations": [
{
"name": "C Launch",
"name": "Debug active .exe (gdb)",
"type": "cppdbg",
"request": "launch",
"program": "${fileDirname}/${fileBasenameNoExtension}.exe",
"args": [],
"stopAtEntry": false,
"cwd": "${fileDirname}",
"cwd": "${workspaceFolder}",
"environment": [],
"externalConsole": true,
"MIMode": "gdb",
"miDebuggerPath": "C:/msys64/mingw64/bin/gdb.exe",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
},
{
"name": "C/C++ Runner: Debug Session",
"type": "cppdbg",
"request": "launch",
"args": [],
"stopAtEntry": false,
"externalConsole": true,
"cwd": "c:/Users/huiting/Desktop/py_document",
"program": "c:/Users/huiting/Desktop/py_document/build/Debug/outDebug",
"MIMode": "gdb",
"miDebuggerPath": "gdb",
"setupCommands": [
{
"description": "Enable pretty-printing for gdb",
"text": "-enable-pretty-printing",
"ignoreFailures": true
}
]
"preLaunchTask": "g++ build active file"
}
]
}

View File

@ -55,5 +55,9 @@
"C_Cpp_Runner.useLeakSanitizer": false,
"C_Cpp_Runner.showCompilationTime": false,
"C_Cpp_Runner.useLinkTimeOptimization": false,
"C_Cpp_Runner.msvcSecureNoWarnings": false
"C_Cpp_Runner.msvcSecureNoWarnings": false,
"files.associations": {
"chrono": "cpp",
"functional": "cpp"
}
}

35
CANopen.py Normal file
View File

@ -0,0 +1,35 @@
import can
import time
# === 修改成你的介面 ===
# 如果你用 socketCAN-tcp (slcand) 綁到 can0
# bus = can.interface.Bus(bustype="socketcan", channel="can0", bitrate=500000)
# 如果是 PCAN (Peak)
# bus = can.interface.Bus(bustype="pcan", channel="PCAN_USBBUS1", bitrate=500000)
# 如果是 Kvaser
# bus = can.interface.Bus(bustype="kvaser", channel=0, bitrate=500000)
# 這裡先假設你用 socketcan
bus = can.interface.Bus(bustype="socketcan", channel="can0", bitrate=500000)
# === 設定參數 ===
node_id = 1
cob_id = 0x200 + node_id # PLC 的 RxPDO1
value = 1234 # 要送的整數值 (INT16)
# 轉成小端 (因為 PLC 是 little endian)
data_bytes = value.to_bytes(2, byteorder="little", signed=True)
# 塞進 CAN 幀
msg = can.Message(arbitration_id=cob_id, data=data_bytes, is_extended_id=False)
# 傳送多次測試
for i in range(10):
try:
bus.send(msg)
print(f"Sent {value} to PLC (COB-ID=0x{cob_id:X})")
except can.CanError:
print("Message NOT sent")
time.sleep(1)

View File

@ -0,0 +1,19 @@
# pip install paho-mqtt
import json
import time
from paho.mqtt import client as mqtt
from MQTT_pub_Module import pub_to_plc
BROKER_HOST = "169.254.11.130"
BROKER_PORT = 1886
TOPIC = "topic_plc_and_py_for_AXIS"
# 建議測試用 QoS=1retain=False
QOS = 1
RETAIN = False
pub_to_plc(BROKER_HOST, BROKER_PORT, TOPIC, QOS, RETAIN)

View File

@ -0,0 +1,38 @@
import time
import json
from paho.mqtt import client as mqtt
def pub_to_plc(BROKER_HOST, BROKER_PORT, TOPIC, QOS, RETAIN):
client = mqtt.Client(client_id="py_pub_plc_test", protocol=mqtt.MQTTv311)
# 若 Broker 有帳密、TLS 在這裡設定(目前不需要)
# client.username_pw_set("user", "pass")
# client.tls_set(ca_certs="ca.pem") # 若是 TLS
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
client.loop_start()
time.sleep(0.3) # 等連線建立
# ---- 第一組: ----
payload_forward = {
"Move_Forward": True, # BOOL
"Move_Forward_Velocity": 1.2 # REAL (float)
}
msg1 = json.dumps(payload_forward, ensure_ascii=False)
r1 = client.publish(TOPIC, msg1, qos=QOS, retain=RETAIN)
r1.wait_for_publish()
time.sleep(3)#第二顆馬達等等再開啟
# ---- 第二組: ----
payload_updown = {
"Move_UPDown": True, # BOOL
"Move_UPDown_Velocity": 55.1 # REAL (float)
}
msg2 = json.dumps(payload_updown, ensure_ascii=False)
r2 = client.publish(TOPIC, msg2, qos=QOS, retain=RETAIN)
r2.wait_for_publish()
time.sleep(0.3)
client.loop_stop()
client.disconnect()

View File

@ -0,0 +1,116 @@
# pip install paho-mqtt
import json
import re
import time
from paho.mqtt import client as mqtt
# ========= 使用者設定 =========
BROKER_HOST = "169.254.11.130"
BROKER_PORT = 1886
TOPIC = "topic_plc_and_py_for_AXIS"
QOS = 1 # 建議測試用 1
# ============================
# 全域狀態(方便觀察最新值)
state = {
"BUTTON_Y": None,
"Forward_RPM": None,
"UPDown_RPM": None,
}
def _robust_json_parse(raw: str):
"""
PLC 端常見的非標準 JSON 嘗試修正後解析為 dict
'{Forward_RPM:1500,UPDown_RPM:800}' -> {'Forward_RPM':1500,'UPDown_RPM':800}
也會把 TRUE/FALSE -> true/false
"""
s = raw.strip()
# 去掉最外層引號(若有)
if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):
s = s[1:-1].strip()
# PLC 常見布林大寫 -> JSON 規範小寫
s = s.replace("TRUE", "true").replace("FALSE", "false")
# 自動補 key 的雙引號:{key: -> {"key":
s = re.sub(r'([{,]\s*)([A-Za-z_]\w*)\s*:', r'\1"\2":', s)
# 轉單引號為雙引號(若 payload 用了單引號包字串)
# 放在補 key 後,避免 key 的引號再被替換
s = s.replace("'", '"')
return json.loads(s)
def on_connect(client: mqtt.Client, userdata, flags, reason_code, properties=None):
"""
連上 Broker 後自動訂閱 Topic
paho-mqtt 2.x on_connect 第四個參數是 reason_code rc名稱略有不同
"""
if reason_code == 0:
print(f"✅ 已連線到 MQTT Broker {BROKER_HOST}:{BROKER_PORT}")
client.subscribe(TOPIC, qos=QOS)
print(f"📡 已訂閱 Topic: {TOPIC}QoS={QOS}")
else:
print(f"❌ 連線失敗reason_code={reason_code}")
def on_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
payload_raw = msg.payload.decode("utf-8", errors="replace")
print(f"\n📥 收到原始訊息:{payload_raw}")
# 先試標準 JSON失敗再用容錯解析
try:
data = json.loads(payload_raw)
except json.JSONDecodeError:
try:
data = _robust_json_parse(payload_raw)
except Exception as e:
print(f"❌ 無法解析為 JSON{payload_raw} ;錯誤:{e}")
return
# 只更新我們關心的欄位(有傳才改)
for key in ("BUTTON_Y", "Forward_RPM", "UPDown_RPM"):
if key in data:
state[key] = data[key]
print(
f"✅ 解析後資料:{data}\n"
f"📊 目前狀態BUTTON_Y={state['BUTTON_Y']}, "
f"Forward_RPM={state['Forward_RPM']}, UPDown_RPM={state['UPDown_RPM']}"
)
def on_disconnect(client: mqtt.Client, userdata, reason_code, properties=None):
print(f"🔌 已斷線reason_code={reason_code}")
def sub_to_plc(broker_host: str, broker_port: int, topic: str):
client = mqtt.Client(client_id="py_sub_plc_test", protocol=mqtt.MQTTv311)
# 如需帳密或 TLS在這裡設定
# client.username_pw_set("user", "pass")
# client.tls_set(ca_certs="ca.pem")
# 自動重連延遲paho-mqtt 2.x
client.reconnect_delay_set(min_delay=1, max_delay=10)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# 先連線,再進入 loop
client.connect(broker_host, broker_port, keepalive=60)
try:
# loop_forever 會阻塞執行緒,內建自動重連
client.loop_forever()
except KeyboardInterrupt:
print("\n🛑 收到中斷訊號,準備斷線...")
finally:
try:
client.disconnect()
except Exception:
pass
if __name__ == "__main__":
print("🚀 啟動 MQTT 訂閱器...")
print(f"Broker={BROKER_HOST}:{BROKER_PORT}, Topic={TOPIC}, QoS={QOS}")
sub_to_plc(BROKER_HOST, BROKER_PORT, TOPIC)

View File

@ -0,0 +1,16 @@
# pip install paho-mqtt
import json
import time
from paho.mqtt import client as mqtt
from MQTT_sub_Module import sub_to_plc
BROKER_HOST = "169.254.11.130"
BROKER_PORT = 1886
TOPIC = "topic_plc_and_py_for_AXIS"
# 建議測試用 QoS=1retain=False
QOS = 1
RETAIN = False
sub_to_plc(BROKER_HOST, BROKER_PORT, TOPIC)

45
OPCUA_test.py Normal file
View File

@ -0,0 +1,45 @@
from opcua import ua, Server
import time
import random
if __name__ == "__main__":
server = Server()
# 設定端點本機4840
server.set_endpoint("opc.tcp://0.0.0.0:4840")
server.set_server_name("MyUA Server")
# 新增命名空間(回傳 nsIndex
idx = server.register_namespace("urn:mycompany:demo")
# 建立地址空間:一個 Object底下放一個 Variable
objects = server.get_objects_node()
myobj = objects.add_object(idx, "DemoObject")
#空間建好,可以設變數了
temp_var = myobj.add_variable(idx, "Temperature", 25.0) # 初值 25.0
A = myobj.add_variable(idx, "A_value", 1)
#變數設好打開Client寫入功能
temp_var.set_writable() # 允許 Client 寫入
A.set_writable() # 允許 Client 寫入
# 啟動 Server
server.start()
print("Server started at opc.tcp://localhost:4840")
x=0
try:
while True:
# 模擬溫度改變
new_value = 20.0 + random.random() * 10.0
temp_var.set_value(ua.Variant(new_value, ua.VariantType.Float))
x=x+1
A.set_value(ua.Variant(x, ua.VariantType.Int64))
print(A.get_value())
print(temp_var.get_value())
time.sleep(1)
except KeyboardInterrupt:
print("Server stopped by user")
finally:
server.stop()
print("Resources released, server closed")

23
canOPENpdo.py Normal file
View File

@ -0,0 +1,23 @@
import can
import os
os.add_dll_directory(r"C:\Users\huiting\PCAN-Basic\x64") # 這裡放 64bit DLL 的資料夾
# 把 DLL 所在資料夾加進搜尋路徑
DLL_DIR = r"C:\Users\huiting\PCAN-Basic\x64" # 如果 Python 是 64bit 請換成 \x64
if os.path.isdir(DLL_DIR):
os.add_dll_directory(DLL_DIR)
bus = can.interface.Bus(interface="pcan", channel="PCAN_USBBUS1", bitrate=500000)
NODE_ID = 1
TPDO1_ID = 0x180 + NODE_ID # PLC 送出的 TPDO1
print(f"Listening TPDO1 at 0x{TPDO1_ID:X}")
while True:
msg = bus.recv(timeout=2.0)
if not msg:
continue
if msg.arbitration_id == TPDO1_ID and len(msg.data) >= 2:
val = int.from_bytes(msg.data[0:2], "little", signed=True)
print(f"PLC TPDO1 -> 0x2000:00 = {val}")

329
canopen_plc_demo.py Normal file
View File

@ -0,0 +1,329 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CANopen PLC demo (python-can + canopen)
----------------------------------------
Fill in the CONFIG section to match your PLC / device.
Tested with python-can >= 4 and canopen >= 2.
Common use cases:
- Python as CANopen "master" talking to a PLC/device node (slave).
- Monitor/drive DS402-like nodes (Controlword/Statusword and Position/Velocity).
Author: ChatGPT
"""
import time
import threading
from typing import Optional, Dict, Any
import can # python-can
import canopen # python-canopen
# =====================
# CONFIG (EDIT ME)
# =====================
CONFIG: Dict[str, Any] = dict(
# Choose a backend matching your CAN interface:
# Linux SocketCAN example: backend="socketcan", channel="can0"
# PEAK PCAN example: backend="pcan", channel="PCAN_USBBUS1"
# Kvaser example: backend="kvaser", channel="0"
backend="socketcan",
channel="can0",
bitrate=500000, # 125000 / 250000 / 500000 / 1000000
# Target node (PLC or device) information
node_id=2, # PLC/device CANopen Node-ID (1..127)
eds_path="device.eds", # Path to the device EDS/DCF file (put the file next to this script)
# Heartbeat / SYNC (optional)
heartbeat_consumer_time_ms=0, # Set >0 to expect heartbeat from node
heartbeat_producer_time_ms=0, # Requires device support; 0 = don't change
use_sync=True, # Use SYNC object for synchronous PDOs
sync_period_ms=10, # SYNC period if acting as producer
# PDO mapping (DS402-like defaults) — adjust to your PLC/device map
# RxPDO: what Python sends TO the device (e.g., Controlword, Target position)
#rxpdo_map=(
# {"index": 0x6040, "subindex": 0x00, "size": 16}, # Controlword (UINT16)
# {"index": 0x607A, "subindex": 0x00, "size": 32}, # Target position (INT32)
#),
TxPDO: what Python receives FROM the device (e.g., Statusword, Actual position)
txpdo_map=(
{"index": 0x6041, "subindex": 0x00, "size": 16}, # Statusword (UINT16)
{"index": 0x6064, "subindex": 0x00, "size": 32}, # Position actual value (INT32)
),
# Transmission types:
# For SYNC: usually 1..240 (1=sub-every SYNC). For async/evented: 255.
rxpdo_trans_type=1,
txpdo_trans_type=1,
# Demo motion (for DS402 profile position mode)
do_enable_sequence=True,
target_positions=[0, 10000, -5000, 0],
dwell_ms=500, # wait time after each command
)
# =============
# Helpers
# =============
def make_bus(cfg: Dict[str, Any]) -> can.Bus:
"""Create and return a python-can Bus."""
backend = cfg["backend"].lower()
if backend == "socketcan":
bus = can.interface.Bus(bustype="socketcan", channel=cfg["channel"], bitrate=cfg["bitrate"])
elif backend == "pcan":
bus = can.interface.Bus(bustype="pcan", channel=cfg["channel"], bitrate=cfg["bitrate"])
elif backend == "kvaser":
bus = can.interface.Bus(bustype="kvaser", channel=cfg["channel"], bitrate=cfg["bitrate"])
else:
raise ValueError(f"Unsupported backend: {backend}")
return bus
def setup_network(cfg: Dict[str, Any]) -> canopen.Network:
"""Attach canopen.Network to an existing python-can Bus."""
bus = make_bus(cfg)
net = canopen.Network()
# Attach python-can bus
net.connect(bustype="python-can", channel=bus)
return net
def add_node(net: canopen.Network, cfg: Dict[str, Any]) -> canopen.RemoteNode:
"""Add a remote node using an EDS file."""
node = canopen.RemoteNode(cfg["node_id"], cfg["eds_path"])
net.add_node(node)
return node
def set_heartbeat(node: canopen.RemoteNode, cfg: Dict[str, Any]) -> None:
"""Optionally configure heartbeat via SDO (if supported by device)."""
try:
if cfg.get("heartbeat_producer_time_ms", 0) > 0:
node.sdo[0x1017].raw = int(cfg["heartbeat_producer_time_ms"])
# Heartbeat consumer is typically configured on the *master* side; for canopen lib,
# you can add a consumer to monitor a node. Here we use basic timeout checks in the loop.
except Exception as e:
print(f"[WARN] Heartbeat config skipped: {e}")
def clear_pdos(node: canopen.RemoteNode) -> None:
"""Disable and clear all existing PDO mappings before re-mapping."""
# Disable all TxPDOs and RxPDOs
for pdo in node.tpdo.values():
pdo.enabled = False
pdo.clear()
pdo.save()
for pdo in node.rpdo.values():
pdo.enabled = False
pdo.clear()
pdo.save()
def map_pdos(node: canopen.RemoteNode, cfg: Dict[str, Any]) -> None:
"""Map RxPDO/TxPDO according to CONFIG and enable them."""
# --- Map RxPDOs (what WE send TO device) ---
rpdo1 = node.rpdo[1]
rpdo1.clear()
for m in cfg["rxpdo_map"]:
rpdo1.add_variable(m["index"], m["subindex"], length=m["size"])
rpdo1.trans_type = cfg["rxpdo_trans_type"]
rpdo1.enabled = True
rpdo1.save()
# --- Map TxPDOs (what device sends TO us) ---
tpdo1 = node.tpdo[1]
tpdo1.clear()
for m in cfg["txpdo_map"]:
tpdo1.add_variable(m["index"], m["subindex"], length=m["size"])
tpdo1.trans_type = cfg["txpdo_trans_type"]
tpdo1.enabled = True
tpdo1.save()
# Apply configuration on device
node.nmt.state = 'PRE-OPERATIONAL'
node.nmt.state = 'OPERATIONAL'
def start_sync_producer(net: canopen.Network, period_ms: int) -> threading.Thread:
"""Start a SYNC producer thread (if this controller should emit SYNC)."""
stop_flag = {"stop": False}
def _loop():
sync = canopen.sync.SyncProducer(net)
interval = max(0.001, period_ms / 1000.0)
while not stop_flag["stop"]:
sync.transmit()
time.sleep(interval)
t = threading.Thread(target=_loop, daemon=True)
t.stop_flag = stop_flag # type: ignore[attr-defined]
t.start()
return t
# =============
# DS402 helpers
# =============
CW_SHUTDOWN = 0x0006
CW_SWITCH_ON = 0x0007
CW_ENABLE_OPERATION = 0x000F
CW_DISABLE_VOLTAGE = 0x0000
CW_QUICK_STOP = 0x0002
CW_FAULT_RESET = 0x0080
def drive_enable_sequence(node: canopen.RemoteNode) -> None:
"""Minimal DS402 enable sequence using SDO Controlword/Statusword."""
cw = node.sdo[0x6040]
sw = node.sdo[0x6041]
def _write(value: int):
cw.raw = value
time.sleep(0.05)
# Fault reset if needed
if sw.raw & 0x0008: # Fault bit
_write(CW_FAULT_RESET)
time.sleep(0.1)
_write(CW_SHUTDOWN)
_write(CW_SWITCH_ON)
_write(CW_ENABLE_OPERATION)
def set_mode_of_operation(node: canopen.RemoteNode, mode: int) -> None:
"""Set DS402 mode of operation (e.g., 1=Profile Position, 3=Profile Velocity, 6=Homing, 8=CSP...)."""
node.sdo[0x6060].raw = int(mode)
# Optional: read back
_ = node.sdo[0x6061].raw
# =============
# Main demo
# =============
def main() -> None:
cfg = CONFIG
print("Connecting CAN...")
net = setup_network(cfg)
print(f"Connected via {cfg['backend']}:{cfg['channel']} @ {cfg['bitrate']}")
# Add node
node = add_node(net, cfg)
# NMT reset + pre-op
print("Resetting node...")
node.nmt.reset_node()
time.sleep(0.5)
node.nmt.state = 'PRE-OPERATIONAL'
time.sleep(0.2)
# Optional: heartbeat
set_heartbeat(node, cfg)
# PDO mapping
print("Mapping PDOs...")
clear_pdos(node)
map_pdos(node, cfg)
# Optional: start SYNC producer if we are the controller
sync_thread: Optional[threading.Thread] = None
if cfg.get("use_sync", False):
print(f"Starting SYNC producer @ {cfg['sync_period_ms']} ms")
sync_thread = start_sync_producer(net, cfg["sync_period_ms"])
# Go operational
print("Switching to OPERATIONAL")
node.nmt.state = 'OPERATIONAL'
time.sleep(0.1)
# Enable sequence (DS402)
if cfg.get("do_enable_sequence", True):
try:
print("Setting mode of operation = 1 (Profile Position)")
set_mode_of_operation(node, 1)
except Exception as e:
print(f"[WARN] Skipping mode set: {e}")
print("Running enable sequence...")
try:
drive_enable_sequence(node)
except Exception as e:
print(f"[WARN] Enable sequence failed/skipped: {e}")
# Shortcut handles to mapped PDO variables
# RxPDO1 variables we control
try:
rpdo1 = node.rpdo[1]
cw_var = rpdo1['Controlword'] if 'Controlword' in rpdo1.map else rpdo1[0]
tgt_pos_var = rpdo1['Target position'] if 'Target position' in rpdo1.map else rpdo1[1]
except Exception:
rpdo1 = None
cw_var = None
tgt_pos_var = None
# TxPDO1 variables we read
try:
tpdo1 = node.tpdo[1]
sw_var = tpdo1['Statusword'] if 'Statusword' in tpdo1.map else tpdo1[0]
act_pos_var = tpdo1['Position actual value'] if 'Position actual value' in tpdo1.map else tpdo1[1]
except Exception:
tpdo1 = None
sw_var = None
act_pos_var = None
# Demo loop: write a few target positions and read back status/position
try:
for idx, target in enumerate(cfg.get("target_positions", []), 1):
print(f"\n[{idx}] Command target position = {target}")
if rpdo1 is not None and tgt_pos_var is not None:
tgt_pos_var.raw = int(target)
# Ensure CW has Enable Operation bit set when using RxPDO
if cw_var is not None:
cw_var.raw = CW_ENABLE_OPERATION
rpdo1.transmit()
else:
# Fallback via SDO
print(" (No RxPDO mapped; writing via SDO 0x607A)")
node.sdo[0x607A].raw = int(target)
node.sdo[0x6040].raw = CW_ENABLE_OPERATION
# Read back
t0 = time.time()
while time.time() - t0 < (cfg.get("dwell_ms", 500) / 1000.0):
if tpdo1 is not None:
# Request update on SYNC; if evented/cyclic, values will refresh automatically
time.sleep(0.01)
try:
sw = sw_var.raw if sw_var is not None else None
pos = act_pos_var.raw if act_pos_var is not None else None
print(f" TPDO: Status=0x{(sw or 0):04X}, Position={pos}")
except Exception:
pass
else:
# Read via SDO occasionally
try:
sw = node.sdo[0x6041].raw
pos = node.sdo[0x6064].raw
print(f" SDO: Status=0x{sw:04X}, Position={pos}")
except Exception:
pass
time.sleep(0.05)
print("\nDemo completed.")
finally:
# Stop SYNC
if sync_thread is not None:
sync_thread.stop_flag["stop"] = True # type: ignore[attr-defined]
time.sleep(0.05)
# Disconnect
try:
net.disconnect()
except Exception:
pass
if __name__ == "__main__":
main()

19
import_can.py Normal file
View File

@ -0,0 +1,19 @@
import can
import time
bus = can.interface.Bus(bustype="socketcan", channel="can0", bitrate=500000)
node_id = 1
cob_id = 0x200 + node_id # 0x201
value = 1234
data_bytes = value.to_bytes(2, byteorder="little", signed=True)
msg = can.Message(arbitration_id=cob_id, data=data_bytes, is_extended_id=False)
for i in range(10):
try:
bus.send(msg)
print(f"Sent {value} to PLC (COB-ID=0x{cob_id:X})")
except can.CanError:
print("Message NOT sent")
time.sleep(1)

3
learnpy.py Normal file
View File

@ -0,0 +1,3 @@
from mymodule import 範圍
print(範圍(60))

5
mymodule.py Normal file
View File

@ -0,0 +1,5 @@
def 範圍 (A):
if 10>A:
return "太小"
else:
return "正常"

0
node_pub.cpp Normal file
View File

View File

@ -1,4 +1,7 @@
from pymodbus.client.sync import ModbusTcpClient
import os
os.add_dll_directory(r"C:\Users\huiting\Desktop\PCAN-Basic\x64") # 換成 DLL 的實際路徑
import time
# 設定 PLC IP 和 Port

14
send_W_value.py Normal file
View File

@ -0,0 +1,14 @@
import socket
PLC_IP = "169.254.11.8" # 換成你的 PLC IP
PLC_PORT = 5000
message = "W=99;"
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((PLC_IP, PLC_PORT))
s.sendall(message.encode())
print(f"[Python] 已傳送: {message}")
except Exception as e:
print(f"[Python] 傳送失敗: {e}")

25
test.cpp Normal file
View File

@ -0,0 +1,25 @@
// rclcpp 是 ROS2 的 C++ 客戶端函式庫ROS Client Library for C++
// 提供 Node、Publisher/Subscriber、Timer、Logging 等 API。
#include "rclcpp/rclcpp.hpp"
// std_msgs/msg/string.hpp 是 ROS2 內建的訊息型別套件 std_msgs 裡的 String 訊息定義。
// 這個訊息就只有一個欄位std::string data;
#include "std_msgs/msg/string.hpp"
// <chrono> 讓我們能用 C++ 時間單位(例如 1s搭配計時器使用。
#include <chrono>
// 匯入 chrono 的字面常數,例如 1s, 100ms。
// 沒有這行的話,程式裡寫 1s 會編譯失敗。
using namespace std::chrono_literals;
// 定義一個節點類別,繼承 rclcpp::Node。
// 一個「節點Node」就是一個 ROS2 程式的執行單元,可以包含 Publisher/Subscriber/Service/Action 等。
class MyfristRos2_pub : public rclcpp::Node{
public:
MyfristRos2_pub():Node("node_pub"),count_(0){
publisher_=this->create_publisher<std_msgs::msg::String>("topic_for_vulue",10);
timer_ = this->create_wall_timer(
1s, std::bind(&MyfristRos2_pub::publish_message, this));
}
private:
}

31
write_sdo_min.py Normal file
View File

@ -0,0 +1,31 @@
# file: write_sdo_min.py
# pip install python-can canopen
import time
import can
import canopen
# === 請依你的硬體改這裡 ===
IFACE = "pcan" # pcan | kvaser | ixxat | slcan
CHANNEL = "PCAN_USBBUS1" # kvaser/ixxat 用 0slcan 用 "COM5"
BITRATE = 500000
NODE_ID = 1 # 你的 PLC Node-ID
EDS = "PLC.eds" # 你的 EDS/DCF放同資料夾
INDEX = 0x2000 # 你在 PLC 端對外開放的 OD 物件
SUB = 0x00
VALUE = 1234 # 要寫入的整數(型別需與 EDS 一致)
# === 建立 bus & network ===
bus = can.interface.Bus(bustype=IFACE, channel=CHANNEL, bitrate=BITRATE)
net = canopen.Network()
net.connect(bustype="python-can", channel=bus)
node = canopen.RemoteNode(NODE_ID, EDS)
net.add_node(node)
# 建議在 PRE-OP 寫參數
node.nmt.state = "PRE-OPERATIONAL"
node.sdo[INDEX][SUB].raw = VALUE
time.sleep(0.05)
print(f"SDO write OK -> {INDEX:04X}:{SUB:02X} = {VALUE}")
net.disconnect()