Files
weather_station_MK2/software/test_software/weather_station_rs485_client.py
2025-08-01 11:32:57 +02:00

122 lines
4.9 KiB
Python

# Copyright 2025 Marcel Konstapel
# https://www.meezenest.nl/mees
#
# Testprogram for weather_station_MK2 sensors
#
# V0.1 2025-07-22
# - First working program
# V0.1.1 2025-07-28
# - Added auto detect, reconnect and other fail safe features
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import json
import time
import sys
import logging
import minimalmodbus
from modbus_control import ModBusController
from config_reader import config_reader
def setup():
config_file = "config.yml"
logging.basicConfig(
level=logging.DEBUG,
format="{asctime} - {levelname} - {message}",
style="{",
datefmt="%Y-%m-%d %H:%M",
)
# Read the configuration file
configuration = config_reader(config_file)
if configuration.read_settings() == 0:
sys.exit()
# show values from config file
logging.info("Using RS485 port : " + configuration.config_file_settings['modbus']['port'])
controller = []
modbus_addresses = []
for index, entry in enumerate(configuration.config_file_settings['modbus_servers']):
modbus_addresses.append(entry['address'])
logging.info("Using device: " + entry['description'] + " on ModBus address: " + str(entry['address']))
# Open serial port and try to connect to all the ModBus devices from the configuration file
try:
#controller = ModBusController(configuration.config_file_settings['modbus']['port'], modbus_addresses[0])
controller.append(ModBusController(configuration.config_file_settings['modbus']['port'], modbus_addresses[index]))
except:
logging.error("Could not open serial port " + configuration.config_file_settings['modbus']['port'])
sys.exit("Exiting")
return configuration, controller, modbus_addresses
# Lost serial port, try to reconnect. We can try to connect to any ModBus address, even if it does not exists. We are only trying to reconnect to the RS485 serial/USB dongle.
def ReconnectModBus(configuration):
Retrying = True
while (Retrying):
logging.warning("Try to reconnect to ModBus dongle")
try:
controller = ModBusController(configuration.config_file_settings['modbus']['port'], 0)
Retrying = False
except:
logging.error("Serial port " + configuration.config_file_settings['modbus']['port'] + " not available. Retry in 10 seconds.")
time.sleep(10)
logging.warning("Reconnected to ModBus dongle.")
#print("Enable heater function")
#controller.enable_heater()
Configuration, Controller, ModbusAddresses = setup()
while (1):
time.sleep(3) # Sleep for 3 seconds
ModBusData={}
# Loop through all configured ModBus devices
for index, current_modbus_device in enumerate(ModbusAddresses):
logging.info("Reading device on ModBus address: " + str(current_modbus_device))
try:
ModBusData['ID'] = Controller[index].get_id()
if ModBusData['ID'][0] == 0x4D45:
logging.info("Found Mees Electronics sensor on ModBus address " + str(current_modbus_device))
logging.info("Serial number: " + hex(ModBusData['ID'][1]) + " " + hex(ModBusData['ID'][2]) + " " + hex(ModBusData['ID'][3]))
logging.info (json.dumps(ModBusData, indent=1, sort_keys=False))
except minimalmodbus.NoResponseError:
# Handle communication timeout
logging.error("No response from the instrument")
except minimalmodbus.InvalidResponseError:
# Handle invalid Modbus responses
logging.error("Invalid response from the instrument")
except minimalmodbus.SlaveReportedException as e:
# Handle errors reported by the slave device
logging.error(f"The instrument reported an error: {e}")
except minimalmodbus.ModbusException as e:
# Handle all other Modbus-related errors
logging.error(f"Modbus error: {e}")
except Exception as e:
# Handle unexpected errors, probably I/O error in USB/serial
logging.error(f"Unexpected error: {e}" + ", serial port " + Configuration.config_file_settings['modbus']['port'] + " not available while reading device on ModBus address " + str(ModbusAddresses[index]) + ". Try to reconnect.")
Controller[index].serial.close()
ReconnectModBus(Configuration)