Skip to content

Instantly share code, notes, and snippets.

@goran-mahovlic
Last active January 12, 2020 17:17
Show Gist options
  • Save goran-mahovlic/8e8cbba99de24d06450d95844f7a0bf2 to your computer and use it in GitHub Desktop.
Save goran-mahovlic/8e8cbba99de24d06450d95844f7a0bf2 to your computer and use it in GitHub Desktop.
"""STM32 MCU serial firmware loader"""
# Mycropython test >> Should return :DEV_ID: 0410
#import stm32flasher
#address = 0x00
#size = 1
#stm32bl = stm32flasher.Stm32flasher(1, 115200,5)
import time
from machine import UART
from machine import Pin
reset=Pin(27,Pin.OUT)
VERSION_STR = "stm32flasher v0.0.0"
DESCRIPTION_STR = VERSION_STR + """
(c) 2016 by [email protected]
https://github.com/pavelrevak/stm32flasher
"""
class Stm32flasherException(Exception):
"""General STM32 loader exception"""
class SerialException(Stm32flasherException):
"""Serial communication Exception"""
class ConnectingException(Stm32flasherException):
"""Connecting to boot-loader exception"""
class NoAnswerException(Stm32flasherException):
"""No answer exception"""
class CommandNotAllowedException(Stm32flasherException):
"""Command not allowed exception"""
class UnexpectedAnswerException(Stm32flasherException):
"""Unexpected answer exception"""
class NoAckException(Stm32flasherException):
"""General No ACK exception"""
class NoAckCommandException(NoAckException):
"""No ACK after command exception"""
class NoAckDataException(NoAckException):
"""No ACK after data exception"""
class Stm32flasher():
"""STM32 firmware loader class"""
CMD_INIT = 0x7f
CMD_ACK = 0x79
CMD_NOACK = 0x1f
CMD_GET = 0x00
CMD_GET_VERSION = 0x01
CMD_GET_ID = 0x02
CMD_READ_MEMORY = 0x11
CMD_GO = 0x21
CMD_WRITE_MEMORY = 0x31
CMD_ERASE = 0x43
CMD_EXTENDED_ERASE = 0x44
CMD_WRITE_PROTECT = 0x63
CMD_WRITE_UNPROTECT = 0x73
CMD_READOUT_PROTECT = 0x82
CMD_READOUT_UNPROTECT = 0x92
FLASH_START = 0x08000000
def __init__(self, port, baudrate=115200, verbosity=1):
try:
#.init(9600, bits=8, parity=None, stop=1)
self._serial_port = UART(port, baudrate, bits=8, parity=0, stop=1, tx=25, rx=26)
# self._serial_port.flush()
#serial.Serial(
# port=port,
# baudrate=baudrate,
# parity=serial.PARITY_EVEN,
# stopbits=1,
# timeout=1
#)
except (FileNotFoundError, serial.serialutil.SerialException):
raise SerialException("Error opening serial port: %s" % port)
self._verbosity = verbosity
self._connect(5)
self._allowed_commands = [self.CMD_GET, ]
self._boot_version = self._cmd_get()
self._option_bytes = self._cmd_get_version()
self._dev_id = self._cmd_get_id()
@staticmethod
def print_buffer(addr, data, bytes_per_line=16):
"""print buffer"""
prev_chunk = []
same_chunk = False
for i in range(0, len(data), bytes_per_line):
chunk = data[i:i + bytes_per_line]
if prev_chunk != chunk:
print('%08x %s%s %s' % (
addr,
' '.join(['%02x' % d for d in chunk]),
' ' * (16 - len(chunk)),
''.join([chr(d) if d >= 32 and d < 127 else '.' for d in chunk]),
))
prev_chunk = chunk
same_chunk = False
elif not same_chunk:
print('*')
same_chunk = True
addr += len(chunk)
print('%08x' % addr)
def log(self, message, operation=None, level=1):
"""logging printing"""
if self._verbosity < level:
return
msg = ''
if level > 0:
msg += ':' * level
if operation:
msg += '%s: ' % operation
msg += message
print(msg)
def _write(self, data):
"""Write data to serial port"""
self.log(":".join(['%02x' % d for d in data]), 'WR', level=3)
self._serial_port.write(bytes(data))
def _read(self, cnt=1, timeout=1):
"""Read data from serial port"""
self.log("Read started", level=1)
# originaly , data was empty
data = []
#data = [0x00,0x00]
while not data and timeout > 0:
data = list(self._serial_port.read(cnt))
timeout -= 1
self.log(":".join(['%02x' % d for d in data]), 'RD', level=3)
return data
def _reset_mcu(self):
"""Reset MCU"""
# originaly , resets MCU -- we need our version
# self._serial_port.setDTR(0)
reset.value(0)
time.sleep(0.1)
# self._serial_port.setDTR(1)
reset.value(1)
time.sleep(0.2)
def _connect(self, repeat=1):
"""connect to boot-loader"""
self.log("Connecting to boot-loader", level=1)
# self._serial_port.setRTS(0)
self._reset_mcu()
self.log("Target Reseted", level=1)
while repeat:
self._write([self.CMD_INIT])
self.log("CMD_INIT write done", level=1)
ret = self._read()
self.log("Reading ret", level=1)
if ret and ret[0] in (self.CMD_ACK, self.CMD_NOACK):
return
repeat -= 1
raise ConnectingException("Can't connect to MCU boot-loader.")
def exit_bootloader(self):
"""Exit boot-loader and restart MCU"""
# self._serial_port.setRTS(1)
self._reset_mcu()
def _talk(self, data_wr, cnt_rd, timeout=1):
"""talk with boot-loader"""
if isinstance(data_wr, (tuple, list)):
xor = data_wr[0]
for i in data_wr[1:]:
xor ^= i
data_wr.append(xor)
else:
data_wr = [data_wr, data_wr ^ 0xff]
self._write(data_wr)
res = self._read(cnt_rd, timeout=timeout)
if not res:
raise NoAnswerException("No answer.")
return res
def _send_command(self, cmd, cnt_rd=None):
"""send command to boot-loader"""
if cmd not in self._allowed_commands:
raise CommandNotAllowedException("command %02x: is not supported by this device." % cmd)
if cnt_rd is None:
cnt_rd = 1
else:
cnt_rd += 2
res = self._talk(cmd, cnt_rd)
if res[0] != self.CMD_ACK or res[-1] != self.CMD_ACK:
raise NoAckCommandException("NoACK for command.")
return res[1:-1]
def _send_data(self, data, cnt_rd=None, timeout=1):
"""send command to boot-loader"""
res = self._talk(data, 1, timeout=timeout)
if res[0] != self.CMD_ACK:
raise NoAckDataException("NoACK for data.")
if cnt_rd is not None:
return self._read(cnt_rd, timeout=timeout)
@staticmethod
def _convert_version(ver):
return 'v%d.%d' % (ver // 16, ver % 16)
@staticmethod
def _convert_32bit(val):
return [
val >> 24,
0xff & (val >> 16),
0xff & (val >> 8),
0xff & val,
]
@staticmethod
def _convert_16bit(val):
return [
val >> 8,
0xff & val,
]
def _cmd_get(self):
"""Gets the version and the allowed commands supported
by the current version of the boot-loader"""
self.log("CMD_GET", level=2)
res = self._send_command(self.CMD_GET, 13)
if len(res) - 2 != res[0]:
raise UnexpectedAnswerException("CMD_GET command: wrong result length.")
boot_version = self._convert_version(res[1])
self.log(boot_version, 'BOOT_VERSION', level=1)
# update list of allowed commands
self._allowed_commands = res[2:]
return boot_version
def _cmd_get_version(self):
"""Gets the boot-loader version and the Read Protection
status of the Flash memory"""
self.log("CMD_GET_VERSION", level=2)
res = self._send_command(self.CMD_GET_VERSION, 3)
if len(res) != 3:
raise UnexpectedAnswerException("CMD_GET_VERSION: wrong length of result")
boot_version = self._convert_version(res[0])
if boot_version != self._boot_version:
raise UnexpectedAnswerException("Version between GET and GET_VERSION are different.")
option_bytes = res[1:]
self.log(":".join(['%02x' % i for i in option_bytes]), 'OPTION_BYTES', level=1)
return option_bytes
def _cmd_get_id(self):
"""Gets the chip ID"""
self.log("CMD_GET_ID", level=2)
res = self._send_command(self.CMD_GET_ID, 3)
if len(res) - 2 != res[0]:
raise UnexpectedAnswerException("CMD_GET_ID: wrong result length.")
dev_id = (res[1] << 8) + res[2]
self.log("%04x" % dev_id, 'DEV_ID', level=1)
return dev_id
def _cmd_read_memory(self, address, length):
"""Reads up to 256 bytes of memory starting from an
address specified by the application"""
self.log("CMD_READ_MEMORY(%08x, %d)" % (address, length), level=2)
self._send_command(self.CMD_READ_MEMORY)
self._send_data(self._convert_32bit(address))
return self._send_data(length - 1, length)
def cmd_go(self, address):
"""Jumps to user application code located in the internal
Flash memory or in SRAM"""
self.log("CMD_GO", level=2)
self._send_command(self.CMD_GO)
self._send_data(self._convert_32bit(address))
def _cmd_write_memory(self, address, data):
"""Writes up to 256 bytes to the RAM or Flash memory
starting from an address specified by the application"""
self.log("CMD_WRITE_MEMORY(%08x, %d)" % (address, len(data)), level=2)
self._send_command(self.CMD_WRITE_MEMORY)
self._send_data(self._convert_32bit(address))
return self._send_data([len(data) - 1] + data)
def _cmd_erase(self, pages=0xff):
"""Erases from one to all the Flash memory pages"""
self.log("CMD_ERASE(%d)" % pages, level=2)
self._send_command(self.CMD_ERASE)
if isinstance(pages, (list, tuple)):
data = [len(pages) - 1]
for page in pages:
data.append(page)
else:
data = pages
self._send_data(data, timeout=20)
def _cmd_extended_erase(self, pages=0xffff):
"""Erases from one to all the Flash memory pages using
two byte addressing mode (available only for v3.0 usart
bootloader versions and above)"""
self.log("CMD_EXTENDED_ERASE", level=2)
self._send_command(self.CMD_EXTENDED_ERASE)
if isinstance(pages, (list, tuple)):
data = self._convert_16bit(len(pages) - 1)
for page in pages:
data += self._convert_16bit(page)
else:
data = self._convert_16bit(0xffff)
self._send_data(data, timeout=20)
def cmd_write_protect(self, sectors):
"""Enables the write protection for some sectors"""
self.log("CMD_WRITE_PROTECT", level=2)
data = [len(sectors) - 1]
for sector in sectors:
data.append(sector)
self._send_data(data, timeout=20)
self._connect(5)
def cmd_write_unprotect(self):
"""Disables the write protection for all Flash memory sectors"""
self.log("CMD_WRITE_UNPROTECT", level=2)
self._send_command(self.CMD_WRITE_UNPROTECT, 0)
self._connect(5)
def cmd_readout_protect(self):
"""Enables the read protection"""
self.log("CMD_READOUT_PROTECT", level=2)
self._send_command(self.CMD_READOUT_PROTECT, 0)
self.log("Set readout protection, device is restarted", level=1)
self._connect(5)
def cmd_readout_unprotect(self):
"""Disables the read protection"""
self.log("CMD_READOUT_UNPROTECT", level=2)
self._send_command(self.CMD_READOUT_UNPROTECT, 0)
self.log("Removed readout protection, device is restarted", level=1)
self._connect(5)
def read_memory(self, address, size=None):
"""read memory"""
mem = []
if size is None:
self.log("address=0x%08x" % address, 'READ_MEMORY', level=1)
while True:
try:
mem += self._cmd_read_memory(address, 256)
except NoAckDataException:
self._read()
break
address += 256
self.log("done (%d Bytes)" % len(mem), 'READ_MEMORY', level=1)
else:
self.log("from 0x%08x (%d Bytes)" % (address, size), 'READ_MEMORY', level=1)
while size > 0:
_rd_size = size
if size > 256:
_rd_size = 256
size -= _rd_size
mem += self._cmd_read_memory(address, _rd_size)
address += _rd_size
self.log("done", 'READ_MEMORY', level=1)
return mem
def write_memory(self, address, data):
"""write memory"""
self.log("from 0x%08x (%d Bytes)" % (address, len(data)), 'WRITE_MEMORY', level=1)
_data = data[:]
while _data:
self._cmd_write_memory(address, _data[:256])
address += 256
_data = _data[256:]
self.log("done", 'WRITE_MEMORY', level=1)
def write_file(self, address, file_name, verify=False):
"""Write file and or verify"""
binfile = open(file_name, 'rb')
mem = list(binfile.read())
size = len(mem)
if size % 4:
mem += [0] * (size % 4)
size = len(mem)
self.write_memory(address, mem)
if not verify:
return
addr = address
mem_verify = self.read_memory(address, size)
_errors = 0
for data_a, data_b in zip(mem, mem_verify):
if data_a != data_b:
if _errors < 10:
self.log("0x%08x: 0x%02x != 0x%02x" % (addr, data_a, data_b), 'VERIFY', level=0)
_errors += 1
addr += 1
if _errors >= 10:
self.log(".. %d errors" % _errors, 'VERIFY', level=0)
else:
self.log("OK", 'VERIFY', level=1)
def mass_erase(self):
"""Mass erase"""
self.log("MASS_ERASE", level=1)
if self.CMD_ERASE in self._allowed_commands:
self._cmd_erase()
return
try:
self._cmd_extended_erase()
except NoAckException:
# some chips don't support mass erase
# protect and unprotect also make chip erase
try:
self.cmd_readout_protect()
except NoAckException:
# chip is already protected
pass
self.cmd_readout_unprotect()
def erase_blocks(self, blocks):
"""Mass erase"""
blocks = sorted(set(blocks))
self.log(",".join([str(b) for b in blocks]), 'ERASE_BLOCKS', level=1)
if self.CMD_ERASE in self._allowed_commands:
self._cmd_erase(blocks)
return
self._cmd_extended_erase(blocks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment