Skip to content

Instantly share code, notes, and snippets.

@wildstray
Last active November 23, 2022 11:03
Show Gist options
  • Save wildstray/3bf49a8fad7637e84c54cf018b1c3588 to your computer and use it in GitHub Desktop.
Save wildstray/3bf49a8fad7637e84c54cf018b1c3588 to your computer and use it in GitHub Desktop.
Modbus TCP scan for Contrel EMS meters and print configuration in human readable form
#!/usr/bin/env python3
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from pymodbus.pdu import ExceptionResponse
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadDecoder
import sys
from tabulate import tabulate
if len(sys.argv) < 2:
print("Please specify hostname!")
sys.exit()
name, host = sys.argv
client = ModbusClient(host=host, port=502, timeout=1, retries=1)
client.connect()
units = set()
regs = {0x4000: ['s', 5, 'Serial No.'], 0x4005: ['s', 4, 'HW rev.'], 0x4009: ['s', 4, 'HW custom.'], 0x6008: ['I', 2, 'Node id'],
0x600A: ['I', 2, 'Baud rate', {0: 4800, 1: 9600, 2: 19200, 3: 38400, 4: 57600, 5: 115200}], 0x600C: ['I', 2, 'Stop bits'],
0x600E: ['I', 2, 'Parity', {0: 'N', 1: 'O', 2: 'E'}], 0x6010: ['I', 2, 'Response delay'],
0x50A0: ['I', 2, 'CT primary'], 0x50A2: ['I', 2, 'CT secondary'], 0x50A4: ['I', 2, 'CT-N primary'],
0x50A6: ['I', 2, 'CT-N secondary'], 0x50A8: ['I', 2, 'VT primary'], 0x50AA: ['I', 2, 'VT secondary'],
0x50B0: ['I', 2, 'Units LMH ', {0: 'Light', 1: 'Medium', 2: 'Heavy'}], 0x5052: ['I', 2, 'Hour'], 0x5054: ['I', 2, 'Minute'], 0x5056: ['I', 2, 'Seconds'],
0x5058: ['I', 2, 'Week day'], 0x505A: ['I', 2, 'Day'], 0x505C: ['I', 2, 'Month'], 0x505E: ['I', 2, 'Year'],
0x5074: ['I', 2, 'Fundamental freq.', {0: '50Hz', 1: '60Hz'}], 0x507C: ['I', 2, 'Wiring', {0: '3-Phase', 1: 'Aron', 2: '3-Phase Balanced',
3: '3-Phase Multi Load Balanced', 4: 'Single-Phase', 5: 'Single-Phase - Multi Load', 6: 'Multi Single-Phase', 7: 'Two-Phase'}],
0x507E: ['I', 2, 'Neutral current', {0: 'Computed', 1: 'Measured'}],
0x5080: ['I', 2, 'PF convention', {0: 'Sign convention', 1: 'IEC convention', 2: 'IEEE/DIN convention'}]}
# Credits: https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
def progressBar(iterable, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
total = len(iterable)
# Progress Bar Printing Function
def printProgressBar (iteration):
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd)
# Initial Call
printProgressBar(0)
# Update Progress Bar
for i, item in enumerate(iterable):
yield item
printProgressBar(i + 1)
# Print New Line on Complete
print()
for unit in progressBar(range(63), prefix = 'Progress:', suffix = 'Complete', length = 50):
result = client.read_holding_registers(0x4000, 1, unit=unit+1)
if hasattr(result, 'registers') and result.registers:
units.add(unit)
results = {}
for unit in units:
results[unit] = {}
for reg in regs.keys():
result = client.read_holding_registers(reg, regs[reg][1], unit=unit+1)
if hasattr(result, 'registers'):
bpd = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
decoders = {'b': bpd.decode_8bit_int,
'B': bpd.decode_8bit_uint,
'h': bpd.decode_16bit_int,
'H': bpd.decode_16bit_uint,
'i': bpd.decode_32bit_int,
'I': bpd.decode_32bit_uint,
'q': bpd.decode_64bit_int,
'Q': bpd.decode_64bit_int,
'f': bpd.decode_32bit_float,
'd': bpd.decode_64bit_float,
's': bpd.decode_string}
if regs[reg][0] == 's':
value = decoders.get(regs[reg][0])(regs[reg][1] << 1).decode()
else:
value = decoders.get(regs[reg][0])()
results[unit].update({reg: value})
rows = []
for reg in regs.keys():
rows.append([regs[reg][2]])
client.close()
cols = list(units)
cols.insert(0, "SLAVE ID")
row = 0
for unit in units:
for reg in regs.keys():
value = results[unit][reg]
try:
value = regs[reg][3][value]
except IndexError:
pass
rows[row].append(value)
row += 1
row = 0
print(tabulate(rows, cols, tablefmt="grid"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment