Last active
November 23, 2022 11:03
-
-
Save wildstray/3bf49a8fad7637e84c54cf018b1c3588 to your computer and use it in GitHub Desktop.
Modbus TCP scan for Contrel EMS meters and print configuration in human readable form
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from pymodbus.client.sync import ModbusTcpClient as ModbusClient | |
from pymodbus.pdu import ExceptionResponse | |
from pymodbus.constants import Endian | |
from pymodbus.payload import BinaryPayloadDecoder | |
import sys | |
from tabulate import tabulate | |
if len(sys.argv) < 2: | |
print("Please specify hostname!") | |
sys.exit() | |
name, host = sys.argv | |
client = ModbusClient(host=host, port=502, timeout=1, retries=1) | |
client.connect() | |
units = set() | |
regs = {0x4000: ['s', 5, 'Serial No.'], 0x4005: ['s', 4, 'HW rev.'], 0x4009: ['s', 4, 'HW custom.'], 0x6008: ['I', 2, 'Node id'], | |
0x600A: ['I', 2, 'Baud rate', {0: 4800, 1: 9600, 2: 19200, 3: 38400, 4: 57600, 5: 115200}], 0x600C: ['I', 2, 'Stop bits'], | |
0x600E: ['I', 2, 'Parity', {0: 'N', 1: 'O', 2: 'E'}], 0x6010: ['I', 2, 'Response delay'], | |
0x50A0: ['I', 2, 'CT primary'], 0x50A2: ['I', 2, 'CT secondary'], 0x50A4: ['I', 2, 'CT-N primary'], | |
0x50A6: ['I', 2, 'CT-N secondary'], 0x50A8: ['I', 2, 'VT primary'], 0x50AA: ['I', 2, 'VT secondary'], | |
0x50B0: ['I', 2, 'Units LMH ', {0: 'Light', 1: 'Medium', 2: 'Heavy'}], 0x5052: ['I', 2, 'Hour'], 0x5054: ['I', 2, 'Minute'], 0x5056: ['I', 2, 'Seconds'], | |
0x5058: ['I', 2, 'Week day'], 0x505A: ['I', 2, 'Day'], 0x505C: ['I', 2, 'Month'], 0x505E: ['I', 2, 'Year'], | |
0x5074: ['I', 2, 'Fundamental freq.', {0: '50Hz', 1: '60Hz'}], 0x507C: ['I', 2, 'Wiring', {0: '3-Phase', 1: 'Aron', 2: '3-Phase Balanced', | |
3: '3-Phase Multi Load Balanced', 4: 'Single-Phase', 5: 'Single-Phase - Multi Load', 6: 'Multi Single-Phase', 7: 'Two-Phase'}], | |
0x507E: ['I', 2, 'Neutral current', {0: 'Computed', 1: 'Measured'}], | |
0x5080: ['I', 2, 'PF convention', {0: 'Sign convention', 1: 'IEC convention', 2: 'IEEE/DIN convention'}]} | |
# Credits: https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console | |
def progressBar(iterable, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"): | |
""" | |
Call in a loop to create terminal progress bar | |
@params: | |
iteration - Required : current iteration (Int) | |
total - Required : total iterations (Int) | |
prefix - Optional : prefix string (Str) | |
suffix - Optional : suffix string (Str) | |
decimals - Optional : positive number of decimals in percent complete (Int) | |
length - Optional : character length of bar (Int) | |
fill - Optional : bar fill character (Str) | |
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) | |
""" | |
total = len(iterable) | |
# Progress Bar Printing Function | |
def printProgressBar (iteration): | |
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) | |
filledLength = int(length * iteration // total) | |
bar = fill * filledLength + '-' * (length - filledLength) | |
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) | |
# Initial Call | |
printProgressBar(0) | |
# Update Progress Bar | |
for i, item in enumerate(iterable): | |
yield item | |
printProgressBar(i + 1) | |
# Print New Line on Complete | |
print() | |
for unit in progressBar(range(63), prefix = 'Progress:', suffix = 'Complete', length = 50): | |
result = client.read_holding_registers(0x4000, 1, unit=unit+1) | |
if hasattr(result, 'registers') and result.registers: | |
units.add(unit) | |
results = {} | |
for unit in units: | |
results[unit] = {} | |
for reg in regs.keys(): | |
result = client.read_holding_registers(reg, regs[reg][1], unit=unit+1) | |
if hasattr(result, 'registers'): | |
bpd = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big) | |
decoders = {'b': bpd.decode_8bit_int, | |
'B': bpd.decode_8bit_uint, | |
'h': bpd.decode_16bit_int, | |
'H': bpd.decode_16bit_uint, | |
'i': bpd.decode_32bit_int, | |
'I': bpd.decode_32bit_uint, | |
'q': bpd.decode_64bit_int, | |
'Q': bpd.decode_64bit_int, | |
'f': bpd.decode_32bit_float, | |
'd': bpd.decode_64bit_float, | |
's': bpd.decode_string} | |
if regs[reg][0] == 's': | |
value = decoders.get(regs[reg][0])(regs[reg][1] << 1).decode() | |
else: | |
value = decoders.get(regs[reg][0])() | |
results[unit].update({reg: value}) | |
rows = [] | |
for reg in regs.keys(): | |
rows.append([regs[reg][2]]) | |
client.close() | |
cols = list(units) | |
cols.insert(0, "SLAVE ID") | |
row = 0 | |
for unit in units: | |
for reg in regs.keys(): | |
value = results[unit][reg] | |
try: | |
value = regs[reg][3][value] | |
except IndexError: | |
pass | |
rows[row].append(value) | |
row += 1 | |
row = 0 | |
print(tabulate(rows, cols, tablefmt="grid")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment