#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CANopen ↔ PLC demo (python-can + canopen) ---------------------------------------- Fill in the CONFIG section to match your PLC / device. Tested with python-can >= 4 and canopen >= 2. Common use cases: - Python as CANopen "master" talking to a PLC/device node (slave). - Monitor/drive DS402-like nodes (Controlword/Statusword and Position/Velocity). Author: ChatGPT """ import time import threading from typing import Optional, Dict, Any import can # python-can import canopen # python-canopen # ===================== # CONFIG (EDIT ME) # ===================== CONFIG: Dict[str, Any] = dict( # Choose a backend matching your CAN interface: # Linux SocketCAN example: backend="socketcan", channel="can0" # PEAK PCAN example: backend="pcan", channel="PCAN_USBBUS1" # Kvaser example: backend="kvaser", channel="0" backend="socketcan", channel="can0", bitrate=500000, # 125000 / 250000 / 500000 / 1000000 # Target node (PLC or device) information node_id=2, # PLC/device CANopen Node-ID (1..127) eds_path="device.eds", # Path to the device EDS/DCF file (put the file next to this script) # Heartbeat / SYNC (optional) heartbeat_consumer_time_ms=0, # Set >0 to expect heartbeat from node heartbeat_producer_time_ms=0, # Requires device support; 0 = don't change use_sync=True, # Use SYNC object for synchronous PDOs sync_period_ms=10, # SYNC period if acting as producer # PDO mapping (DS402-like defaults) — adjust to your PLC/device map # RxPDO: what Python sends TO the device (e.g., Controlword, Target position) #rxpdo_map=( # {"index": 0x6040, "subindex": 0x00, "size": 16}, # Controlword (UINT16) # {"index": 0x607A, "subindex": 0x00, "size": 32}, # Target position (INT32) #), TxPDO: what Python receives FROM the device (e.g., Statusword, Actual position) txpdo_map=( {"index": 0x6041, "subindex": 0x00, "size": 16}, # Statusword (UINT16) {"index": 0x6064, "subindex": 0x00, "size": 32}, # Position actual value (INT32) ), # Transmission types: # For SYNC: usually 1..240 (1=sub-every SYNC). For async/evented: 255. rxpdo_trans_type=1, txpdo_trans_type=1, # Demo motion (for DS402 profile position mode) do_enable_sequence=True, target_positions=[0, 10000, -5000, 0], dwell_ms=500, # wait time after each command ) # ============= # Helpers # ============= def make_bus(cfg: Dict[str, Any]) -> can.Bus: """Create and return a python-can Bus.""" backend = cfg["backend"].lower() if backend == "socketcan": bus = can.interface.Bus(bustype="socketcan", channel=cfg["channel"], bitrate=cfg["bitrate"]) elif backend == "pcan": bus = can.interface.Bus(bustype="pcan", channel=cfg["channel"], bitrate=cfg["bitrate"]) elif backend == "kvaser": bus = can.interface.Bus(bustype="kvaser", channel=cfg["channel"], bitrate=cfg["bitrate"]) else: raise ValueError(f"Unsupported backend: {backend}") return bus def setup_network(cfg: Dict[str, Any]) -> canopen.Network: """Attach canopen.Network to an existing python-can Bus.""" bus = make_bus(cfg) net = canopen.Network() # Attach python-can bus net.connect(bustype="python-can", channel=bus) return net def add_node(net: canopen.Network, cfg: Dict[str, Any]) -> canopen.RemoteNode: """Add a remote node using an EDS file.""" node = canopen.RemoteNode(cfg["node_id"], cfg["eds_path"]) net.add_node(node) return node def set_heartbeat(node: canopen.RemoteNode, cfg: Dict[str, Any]) -> None: """Optionally configure heartbeat via SDO (if supported by device).""" try: if cfg.get("heartbeat_producer_time_ms", 0) > 0: node.sdo[0x1017].raw = int(cfg["heartbeat_producer_time_ms"]) # Heartbeat consumer is typically configured on the *master* side; for canopen lib, # you can add a consumer to monitor a node. Here we use basic timeout checks in the loop. except Exception as e: print(f"[WARN] Heartbeat config skipped: {e}") def clear_pdos(node: canopen.RemoteNode) -> None: """Disable and clear all existing PDO mappings before re-mapping.""" # Disable all TxPDOs and RxPDOs for pdo in node.tpdo.values(): pdo.enabled = False pdo.clear() pdo.save() for pdo in node.rpdo.values(): pdo.enabled = False pdo.clear() pdo.save() def map_pdos(node: canopen.RemoteNode, cfg: Dict[str, Any]) -> None: """Map RxPDO/TxPDO according to CONFIG and enable them.""" # --- Map RxPDOs (what WE send TO device) --- rpdo1 = node.rpdo[1] rpdo1.clear() for m in cfg["rxpdo_map"]: rpdo1.add_variable(m["index"], m["subindex"], length=m["size"]) rpdo1.trans_type = cfg["rxpdo_trans_type"] rpdo1.enabled = True rpdo1.save() # --- Map TxPDOs (what device sends TO us) --- tpdo1 = node.tpdo[1] tpdo1.clear() for m in cfg["txpdo_map"]: tpdo1.add_variable(m["index"], m["subindex"], length=m["size"]) tpdo1.trans_type = cfg["txpdo_trans_type"] tpdo1.enabled = True tpdo1.save() # Apply configuration on device node.nmt.state = 'PRE-OPERATIONAL' node.nmt.state = 'OPERATIONAL' def start_sync_producer(net: canopen.Network, period_ms: int) -> threading.Thread: """Start a SYNC producer thread (if this controller should emit SYNC).""" stop_flag = {"stop": False} def _loop(): sync = canopen.sync.SyncProducer(net) interval = max(0.001, period_ms / 1000.0) while not stop_flag["stop"]: sync.transmit() time.sleep(interval) t = threading.Thread(target=_loop, daemon=True) t.stop_flag = stop_flag # type: ignore[attr-defined] t.start() return t # ============= # DS402 helpers # ============= CW_SHUTDOWN = 0x0006 CW_SWITCH_ON = 0x0007 CW_ENABLE_OPERATION = 0x000F CW_DISABLE_VOLTAGE = 0x0000 CW_QUICK_STOP = 0x0002 CW_FAULT_RESET = 0x0080 def drive_enable_sequence(node: canopen.RemoteNode) -> None: """Minimal DS402 enable sequence using SDO Controlword/Statusword.""" cw = node.sdo[0x6040] sw = node.sdo[0x6041] def _write(value: int): cw.raw = value time.sleep(0.05) # Fault reset if needed if sw.raw & 0x0008: # Fault bit _write(CW_FAULT_RESET) time.sleep(0.1) _write(CW_SHUTDOWN) _write(CW_SWITCH_ON) _write(CW_ENABLE_OPERATION) def set_mode_of_operation(node: canopen.RemoteNode, mode: int) -> None: """Set DS402 mode of operation (e.g., 1=Profile Position, 3=Profile Velocity, 6=Homing, 8=CSP...).""" node.sdo[0x6060].raw = int(mode) # Optional: read back _ = node.sdo[0x6061].raw # ============= # Main demo # ============= def main() -> None: cfg = CONFIG print("Connecting CAN...") net = setup_network(cfg) print(f"Connected via {cfg['backend']}:{cfg['channel']} @ {cfg['bitrate']}") # Add node node = add_node(net, cfg) # NMT reset + pre-op print("Resetting node...") node.nmt.reset_node() time.sleep(0.5) node.nmt.state = 'PRE-OPERATIONAL' time.sleep(0.2) # Optional: heartbeat set_heartbeat(node, cfg) # PDO mapping print("Mapping PDOs...") clear_pdos(node) map_pdos(node, cfg) # Optional: start SYNC producer if we are the controller sync_thread: Optional[threading.Thread] = None if cfg.get("use_sync", False): print(f"Starting SYNC producer @ {cfg['sync_period_ms']} ms") sync_thread = start_sync_producer(net, cfg["sync_period_ms"]) # Go operational print("Switching to OPERATIONAL") node.nmt.state = 'OPERATIONAL' time.sleep(0.1) # Enable sequence (DS402) if cfg.get("do_enable_sequence", True): try: print("Setting mode of operation = 1 (Profile Position)") set_mode_of_operation(node, 1) except Exception as e: print(f"[WARN] Skipping mode set: {e}") print("Running enable sequence...") try: drive_enable_sequence(node) except Exception as e: print(f"[WARN] Enable sequence failed/skipped: {e}") # Shortcut handles to mapped PDO variables # RxPDO1 variables we control try: rpdo1 = node.rpdo[1] cw_var = rpdo1['Controlword'] if 'Controlword' in rpdo1.map else rpdo1[0] tgt_pos_var = rpdo1['Target position'] if 'Target position' in rpdo1.map else rpdo1[1] except Exception: rpdo1 = None cw_var = None tgt_pos_var = None # TxPDO1 variables we read try: tpdo1 = node.tpdo[1] sw_var = tpdo1['Statusword'] if 'Statusword' in tpdo1.map else tpdo1[0] act_pos_var = tpdo1['Position actual value'] if 'Position actual value' in tpdo1.map else tpdo1[1] except Exception: tpdo1 = None sw_var = None act_pos_var = None # Demo loop: write a few target positions and read back status/position try: for idx, target in enumerate(cfg.get("target_positions", []), 1): print(f"\n[{idx}] Command target position = {target}") if rpdo1 is not None and tgt_pos_var is not None: tgt_pos_var.raw = int(target) # Ensure CW has Enable Operation bit set when using RxPDO if cw_var is not None: cw_var.raw = CW_ENABLE_OPERATION rpdo1.transmit() else: # Fallback via SDO print(" (No RxPDO mapped; writing via SDO 0x607A)") node.sdo[0x607A].raw = int(target) node.sdo[0x6040].raw = CW_ENABLE_OPERATION # Read back t0 = time.time() while time.time() - t0 < (cfg.get("dwell_ms", 500) / 1000.0): if tpdo1 is not None: # Request update on SYNC; if evented/cyclic, values will refresh automatically time.sleep(0.01) try: sw = sw_var.raw if sw_var is not None else None pos = act_pos_var.raw if act_pos_var is not None else None print(f" TPDO: Status=0x{(sw or 0):04X}, Position={pos}") except Exception: pass else: # Read via SDO occasionally try: sw = node.sdo[0x6041].raw pos = node.sdo[0x6064].raw print(f" SDO: Status=0x{sw:04X}, Position={pos}") except Exception: pass time.sleep(0.05) print("\nDemo completed.") finally: # Stop SYNC if sync_thread is not None: sync_thread.stop_flag["stop"] = True # type: ignore[attr-defined] time.sleep(0.05) # Disconnect try: net.disconnect() except Exception: pass if __name__ == "__main__": main()