187 lines
8.1 KiB
Python
187 lines
8.1 KiB
Python
""""
|
|
ModBus test routines
|
|
Copyright (C) 2025 M.T. Konstapel
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
"""
|
|
import json
|
|
import time
|
|
import sys
|
|
import logging
|
|
import minimalmodbus
|
|
import datetime
|
|
|
|
from pathlib import Path
|
|
from modbus_control import ModBusController
|
|
from config_reader import config_reader
|
|
from modbus_definition_file_reader import definition_file_reader
|
|
|
|
def setup():
|
|
config_file = "config.yml"
|
|
definition_file = "modbus_registers.yaml"
|
|
|
|
# Read the configuration file
|
|
configuration = config_reader(config_file)
|
|
|
|
if configuration.read_settings() == 0:
|
|
sys.exit()
|
|
|
|
print("Succesfully read the configuration from file: " + config_file)
|
|
|
|
# show values from config file
|
|
if configuration.config_file_settings['global']['program-log'] == 0:
|
|
print("Program output will not be logged to a file.")
|
|
else:
|
|
print("Program output will be logged to file: " + configuration.config_file_settings['global']['program-log'])
|
|
|
|
if configuration.config_file_settings['global']['data-log'] == 0:
|
|
print("Sensor data will not be logged to a file.")
|
|
else:
|
|
print("Sensor data will be logged to file: " + configuration.config_file_settings['global']['data-log'])
|
|
|
|
# Enable the logging module. Log to file if enabled in configuration file, otherwise log to stand i/o (probably the screen)
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format="{asctime} - {levelname} - {message}",
|
|
style="{",
|
|
datefmt="%Y-%m-%d %H:%M",
|
|
filename=configuration.config_file_settings['global']['program-log'],
|
|
)
|
|
|
|
logging.info("Using RS485 port : " + configuration.config_file_settings['modbus']['port'])
|
|
logging.info("Publishing sensor data on MQTT broker: " + configuration.config_file_settings['global']['mqtt-server'])
|
|
|
|
# Read the ModBus definition file
|
|
modbus_registers = definition_file_reader(definition_file)
|
|
|
|
if modbus_registers.read_settings() == 0:
|
|
sys.exit()
|
|
|
|
logging.info("Succesfully read the ModBus register definitions from file: " + definition_file)
|
|
|
|
controller = []
|
|
modbus_addresses = []
|
|
for index, entry in enumerate(configuration.config_file_settings['modbus_servers']):
|
|
modbus_addresses.append(entry['address'])
|
|
logging.info("Using device: " + entry['description'] + " on ModBus address: " + str(entry['address']))
|
|
|
|
# Open serial port and try to connect to all the ModBus devices from the configuration file
|
|
try:
|
|
controller.append(ModBusController(configuration.config_file_settings['modbus']['port'], modbus_addresses[index]))
|
|
except:
|
|
logging.error("Could not open serial port " + configuration.config_file_settings['modbus']['port'])
|
|
sys.exit("Exiting")
|
|
|
|
return configuration, controller, modbus_addresses, modbus_registers
|
|
|
|
def data_logger(data, configuration):
|
|
|
|
if configuration.config_file_settings['global']['data-log'] != 0:
|
|
|
|
# Add date to file name, so every day a new file is created
|
|
filename = Path(configuration.config_file_settings['global']['data-log'])
|
|
new_filename = '_'.join([filename.stem, datetime.datetime.today().strftime('%Y-%m-%d')])
|
|
new_filename = ''.join([new_filename, filename.suffix])
|
|
new_filename = '/'.join([str(filename.parent), new_filename])
|
|
logging.debug(new_filename)
|
|
|
|
try:
|
|
f = open(new_filename, 'a')
|
|
json.dump(data, f)
|
|
f.write("\n")
|
|
f.close()
|
|
except:
|
|
logging.warning("Could not write to file: " + new_filename)
|
|
|
|
def send_data_to_mqtt(data, configuration):
|
|
logging.debug("Send data to MQTT broker.")
|
|
|
|
|
|
# Lost serial port, try to reconnect. We can try to connect to any ModBus address, even if it does not exists. We are only trying to reconnect to the RS485 serial/USB dongle.
|
|
def ReconnectModBus(configuration):
|
|
Retrying = True
|
|
while (Retrying):
|
|
|
|
logging.error("Try to reconnect to ModBus dongle")
|
|
try:
|
|
controller = ModBusController(configuration.config_file_settings['modbus']['port'], 0)
|
|
Retrying = False
|
|
except:
|
|
logging.error("Serial port " + configuration.config_file_settings['modbus']['port'] + " not available. Retry in 10 seconds.")
|
|
time.sleep(10)
|
|
|
|
logging.info("Reconnected to ModBus dongle.")
|
|
|
|
#print("Enable heater function")
|
|
#controller.enable_heater()
|
|
|
|
Configuration, Controller, ModbusAddresses, ModbusRegisters = setup()
|
|
|
|
while (1):
|
|
time.sleep(3) # Sleep for 3 seconds
|
|
ModBusData={}
|
|
|
|
# Loop through all configured ModBus devices and try to read the sensor data
|
|
for index, current_modbus_device in enumerate(ModbusAddresses):
|
|
logging.debug("Reading device on ModBus address: " + str(current_modbus_device))
|
|
|
|
try:
|
|
ModBusData['DateTime'] = datetime.datetime.now().replace(microsecond=0).isoformat()
|
|
ModBusData['ID'] = Controller[index].get_id()
|
|
if ModBusData['ID'][0] == 0x4D45:
|
|
ModBusData['Type'] = Controller[index].get_type()
|
|
ModBusData['TypeString'] = Controller[index].get_type_string()
|
|
ModBusData['InputRegisters'] = Controller[index].read_all_input_registers()
|
|
NoError = True
|
|
except minimalmodbus.NoResponseError:
|
|
# Handle communication timeout
|
|
logging.warning("No response from the instrument on ModBus address: " + str(current_modbus_device))
|
|
NoError = False
|
|
except minimalmodbus.InvalidResponseError:
|
|
# Handle invalid Modbus responses
|
|
logging.warning("Invalid response from the instrument on ModBus address: " + str(current_modbus_device))
|
|
NoError = False
|
|
except minimalmodbus.SlaveReportedException as e:
|
|
# Handle errors reported by the slave device
|
|
logging.error(f"The instrument reported an error: {e}")
|
|
NoError = False
|
|
except minimalmodbus.ModbusException as e:
|
|
# Handle all other Modbus-related errors
|
|
logging.error(f"Modbus error: {e}")
|
|
NoError = False
|
|
except Exception as e:
|
|
# Handle unexpected errors, probably I/O error in USB/serial
|
|
logging.error(f"Unexpected error: {e}" + ", serial port " + Configuration.config_file_settings['modbus']['port'] + " not available while reading device on ModBus address " + str(ModbusAddresses[index]) + ". Try to reconnect.")
|
|
Controller[index].serial.close()
|
|
ReconnectModBus(Configuration)
|
|
NoError = False
|
|
|
|
if NoError == True:
|
|
try:
|
|
DeviceType = ModbusRegisters.definition_file_data['devices'][ModBusData['Type']]
|
|
logging.debug("Found Mees Electronics sensor on ModBus address " + str(current_modbus_device))
|
|
logging.debug("Serial number: " + hex(ModBusData['ID'][1]) + " " + hex(ModBusData['ID'][2]) + " " + hex(ModBusData['ID'][3]))
|
|
logging.debug("Device type: " + str(ModBusData['Type']) + " (" + ModBusData['TypeString'] + ")")
|
|
logging.debug (ModBusData['InputRegisters'])
|
|
logging.debug (json.dumps(ModBusData, indent=1, sort_keys=False))
|
|
|
|
except:
|
|
logging.warning("Modbus device type " + str(ModBusData['Type']) + " not found in register definition file. Ignoring sensor data.")
|
|
|
|
# Log sensor data to file if enabled in configuration file
|
|
data_logger(ModBusData, Configuration)
|
|
|
|
# Send sensor data to MQTT broker
|
|
send_data_to_mqtt(ModBusData, Configuration)
|