Created
August 2, 2021 15:49
-
-
Save artizirk/e9f5e6f1d4d495b2f2d95f87e25f079c to your computer and use it in GitHub Desktop.
Minimal CANOpen in plain standalone Python3 using only standard lib
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
"""Minimal CANOpen in plain standalone Python3 using only standard lib | |
This is incomplete but works well enough to read and write small Objects using SDO protocol | |
""" | |
import ctypes | |
import enum | |
import socket | |
import struct | |
import time | |
# CAN frame packing/unpacking (see 'struct can_frame' in <linux/can.h>) | |
can_frame_fmt = "=IB3x8s" | |
can_frame_size = struct.calcsize(can_frame_fmt) | |
NODE_ID = 3 | |
# Broadcast function codes | |
NMT = 0b0000 # 0x0 | |
SYNC = 0b0001 # 0x1 | |
TIME = 0b0010 # 0x2 | |
# Peer to Peer object function codes | |
EMCY = 0b0001 # 0x1 | |
PDO1_TX = 0b0011 # 0x3 3 From device to Rpi | |
PDO1_RX = 0b0100 # 0x4 4 From Rpi to Device | |
PDO2_TX = 0b0101 # 0x5 5 | |
PDO2_RX = 0b0110 # 0x6 6 | |
SDO_TX = 0b1011 # 0xB 11 | |
SDO_RX = 0b1100 # 0xC 12 | |
NMT_ERR = 0b1110 # 0xE 14 | |
P2POFC = { | |
EMCY: "EMCY", | |
PDO1_TX: "PDO1_TX", | |
PDO1_RX: "PDO1_RX", | |
PDO2_TX: "PDO2_TX", | |
PDO2_RX: "PDO2_RX", | |
SDO_TX: "SDO_TX", | |
SDO_RX: "SDO_RX", | |
NMT_ERR: "NMT_ERR", | |
} | |
class SDO_CS(enum.IntEnum): | |
"""Service Data Object Command Specifier""" | |
DOWNLOAD_INIT_REQ = 1 | |
DOWNLOAD_INIT_RESP = 3 | |
DOWNLOAD_SEGMENT_REQ = 0 | |
DOWNLOAD_SEGMENT_RESP = 1 | |
UPLOAD_INIT_REQ = 2 | |
UPLOAD_INIT_RESP = 2 | |
UPLOAD_SEGMENT_REQ = 3 | |
UPLOAD_SEGMENT_RESP = 0 | |
ABORT = 4 | |
class PrettyStructure(ctypes.BigEndianStructure): | |
def __repr__(self) -> str: | |
values = ", ".join(f"{name}={value}" for name, value in self._asdict().items()) | |
return f"<{self.__class__.__name__}: {values}>" | |
def _asdict(self) -> dict: | |
return {field[0]: getattr(self, field[0]) for field in self._fields_} | |
def mkcob(func, node=NODE_ID): | |
"""Build COB-ID form Function Code and Node ID | |
COB-ID is a fancy name for can_id in the can packet""" | |
return (func & 0xf) << 7 | (node & 0x7f) | |
def parscob(cob): | |
"""Return Function Code and Node ID tuble form COB-ID""" | |
return (cob >> 7) & 0xf, cob & 0x7f | |
def build_can_frame(can_id, data): | |
can_dlc = len(data) | |
data = data.ljust(8, b'\x00') | |
return struct.pack(can_frame_fmt, can_id, can_dlc, data) | |
def dissect_can_frame(frame): | |
can_id, can_dlc, data = struct.unpack(can_frame_fmt, frame) | |
return can_id, can_dlc, data[:can_dlc] | |
def build_nmt(cs, node): | |
"""Build a Network Management packet form command specifier (cs) and node id""" | |
return build_can_frame(mkcob(NMT, 0), cs.to_bytes(1, 'little')+node.to_bytes(1, 'little')) | |
class SDOCmd(PrettyStructure): | |
"""SDO Protocol Initiate Command from CiA301 7.2.4.3.3 | |
Used as Download Request or as Upload Response""" | |
_pack_ = 1 | |
_fields_ = ( | |
("cs", ctypes.c_uint8, 3), # command specifier in SDO_CS | |
("x", ctypes.c_uint8, 1), # not used, always 0 | |
("n", ctypes.c_uint8, 2), # size of data if transfer is expedited and `s` is set, else 0 | |
("e", ctypes.c_uint8, 1), # transfer type, 0: normal (use segmented transfer), 1: expedited (aka data is in last 4 bytes) | |
("s", ctypes.c_uint8, 1), # size indicator, 0: size is not indicated, 1: size is indicated | |
) | |
# SDO Packet format | |
# | 0 | 1 2 | 3 | 4 5 6 7| | |
# |CMD|OBJID|IDX| DATA | | |
# Where OBJID and IDX combo is often called multiplexer | |
def build_sdo_upload_init(node, object_id, index=0): | |
"""Read object_id from device""" | |
cmd = SDOCmd(cs=SDO_CS.UPLOAD_INIT_REQ) | |
multiplexer = object_id.to_bytes(2, "little") + index.to_bytes(1, "little") | |
return build_can_frame(mkcob(SDO_RX, node), bytes(cmd) + multiplexer + b'\x00\x00\x00\x00') | |
def build_sdo_download_init(node, object_id, index=0, data=b''): | |
"""Write data to object_id on device""" | |
cmd = SDOCmd(cs=SDO_CS.DOWNLOAD_INIT_REQ, e=1, s=1, n=4-len(data)) | |
multiplexer = object_id.to_bytes(2, "little") + index.to_bytes(1, "little") | |
return build_can_frame(mkcob(SDO_RX, node), bytes(cmd) + multiplexer + data + (b"\x00"*cmd.n)) | |
def recv(s): | |
"""Receive can messages from socket and pretty print them""" | |
cf, addr = s.recvfrom(can_frame_size) | |
cobid, l, data = dissect_can_frame(cf) | |
func_id, node = parscob(cobid) | |
print(f"From node {node} func {P2POFC.get(func_id)}({func_id}) with data {' '.join(['{:02x}'.format(x) for x in data])}") | |
if func_id in {SDO_TX, SDO_RX}: | |
sdo_cs = SDOCmd.from_buffer_copy(data[0:1]) | |
object_id = int.from_bytes(data[1:3], 'little') | |
print(f" {str(SDO_CS(sdo_cs.cs))}:{sdo_cs} {hex(object_id)}:0x{data[3:4].hex()} {data[4:7].hex()}") | |
def main(): | |
s = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW) | |
s.bind(('can0',)) | |
print(f"Sending NMT reset node to {NODE_ID}") | |
s.send(build_nmt(129, NODE_ID)) | |
recv(s) | |
# print("Sending NMT start remote node") | |
# s.send(build_nmt(1, NODE_ID)) | |
print("get device type") | |
s.send(build_sdo_upload_init(NODE_ID, 0x1000)) | |
recv(s) | |
print("get errors") | |
s.send(build_sdo_upload_init(NODE_ID, 0x1001)) | |
recv(s) | |
print("Sending NMT start remote node") | |
s.send(build_nmt(1, NODE_ID)) | |
recv(s) | |
recv(s) | |
print("get errors") | |
s.send(build_sdo_upload_init(NODE_ID, 0x1001)) | |
recv(s) | |
print("get current mode of operation") | |
s.send(build_sdo_upload_init(NODE_ID, 0x6061)) | |
recv(s) | |
print("set mode of operation to velocity mode") | |
s.send(build_sdo_download_init(NODE_ID, 0x6060, data=b'\x03')) | |
recv(s) | |
recv(s) | |
print("get current profile acceleration") | |
s.send(build_sdo_upload_init(NODE_ID, 0x6083)) | |
recv(s) | |
print("set acceleration") | |
s.send(build_sdo_download_init(NODE_ID, 0x6083, data=(1000000).to_bytes(4, 'little'))) | |
print("Switch statemachine to READY_TO_SWITCH_ON state") | |
s.send(build_sdo_download_init(NODE_ID, 0x6040, data=b'\x06\x00')) | |
recv(s) | |
recv(s) | |
print("Switch statemachine to SWITCHED_ON state") | |
s.send(build_sdo_download_init(NODE_ID, 0x6040, data=b'\x07\x00')) | |
recv(s) | |
recv(s) | |
print("Switch statemachine to OPERATION_ENABLED state") | |
s.send(build_sdo_download_init(NODE_ID, 0x6040, data=b'\x0f\x00')) | |
recv(s) | |
recv(s) | |
print("Set target velocity") | |
s.send(build_sdo_download_init(NODE_ID, 0x60ff, data=(5000000).to_bytes(4, 'little'))) | |
time.sleep(3) | |
print("Set target velocity") | |
s.send(build_sdo_download_init(NODE_ID, 0x60ff, data=(0).to_bytes(4, 'little'))) | |
while True: | |
recv(s) | |
# try: | |
# s.send(cf) | |
# except OSError: | |
# print('Error sending CAN frame') | |
# | |
# try: | |
# s.send(build_can_frame(0x01, b'\x01\x02\x03')) | |
# except OSError: | |
# print('Error sending CAN frame') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment