From 89615caed120e8ba7d31575dd0cd658ae5d2c579 Mon Sep 17 00:00:00 2001 From: marcel Date: Tue, 12 Aug 2025 10:15:01 +0200 Subject: [PATCH] Reliable MQTT implementation. --- .../__pycache__/config_reader.cpython-311.pyc | Bin 4786 -> 4930 bytes .../mqtt_callbacks.cpython-311.pyc | Bin 1092 -> 1134 bytes software/test_software/config.yml | 2 + software/test_software/config_reader.py | 2 + software/test_software/modbus_registers.yaml | 30 ++-- software/test_software/mqtt_callbacks.py | 12 +- software/test_software/mqtt_control.py | 132 ----------------- software/test_software/mqtt_control_simple.py | 89 ------------ .../weather_station_rs485_client.py | 137 ++++++++++++------ 9 files changed, 115 insertions(+), 289 deletions(-) delete mode 100644 software/test_software/mqtt_control.py delete mode 100644 software/test_software/mqtt_control_simple.py diff --git a/software/test_software/__pycache__/config_reader.cpython-311.pyc b/software/test_software/__pycache__/config_reader.cpython-311.pyc index cdfb182788c45e9f3076e6fb355ea15d41544bd3..f78461c8ec3ae7b2b807bb3f0bd5cc4471ad56d7 100644 GIT binary patch delta 308 zcmdm_dPt3LIWI340}x#LG%KT-cO&0ZPR1{j-*M|scIT3uY{g{^q8V8yTXU%@vq^#! zf+H|JglN07~q85$5IWI340}$*Los{vNXCvQJPR13J>v(i0=WCD8^;Gu{9 z4+X2>LDb^ivj_ixE#x2zo;-OAe`jANOX|8muu%ZL4Ta<)h zo6w$W^pDc{td^`Zxd>}zJdRxAB5>9ita_1SRJa`F{hyXa^b9Y#EMGSN42h?33 zR@dFbKzH0NvF+AM(jY6#z|u;e9EF0gw3J;+Xd)W<#FUqxOM|R zbqiS0Z~c>v5hhQj+TzI-`K&pF-BKe8RH%{n}2cG_ssUSEX({GgV_l- literal 1092 zcmZuvPiqrF6rb6hP1B@RsgWK;T@XyS*e>)eg3wzrpb>-~Ot-t!X4~DJ?aai|&_fTN z`~nrL(32wi0X+C2wo83jFb+{b0{jNADs6zqpiR(5fSSm$8opKB18ojxe@S26N5`NqOMt%_pWxZA zFb9H>4reyH*Ck6s-worGap@(}!MUZEz?c-Bb>Z`HojO)t@}rO@GB0-o<$))?+}en| zO_5hQ^#n^=J_~4GNjXa?mmw8G{nS=+zU{VIOx@VyK8@UE=67NUC0rU&Uvid&!f(?S zT-^enChjivWSjDqke&=7^wX6kSFjDa>v8HzDr9Tq#XC~Ad@qXDJ%3v)rh9os|0PLO z=qh|cB`s`#19V0zzew#Tsb#kxW~6pP7EZ}RR*XKJK|WD9lrm_=asLItA(A8h5DM_V zu{H9KP+)*xloEk&g0JpsIYJ*xS5|;Dx-)ansZYb#G_Ye}&D*4Qw`#M*)@88k-D=(5 z4Q1PY#+tT6ax3K_)L|m^_9Es5Ts@MTJ4`r)E6RCEEldn!@k#K6N=j9Nt2WdO727kg zpdyGT00-zdnLI16y?gngm6g|Ee4qVsXJBQ`)zjvyf&JGstkORO&>IIn1yROYRK2Vz zoCo6)g(eVlb!!@pZmn6gUvPl{%&$Qt$GVWK8HWMC0jq;#M}0TQb_)7o)X4l69GieC z?g8k0v&w_xn*cCQNd1)5vtsCM9NchB-AG( 5: - #break - msg_count=1 - - -def run(): - client = connect_mqtt() - client.loop_start() - while 1: - if client.is_connected(): - print(client.is_connected()) - publish(client) - - #else: - #client.loop_stop() - - client.loop_stop() - - -if __name__ == '__main__': - run() diff --git a/software/test_software/weather_station_rs485_client.py b/software/test_software/weather_station_rs485_client.py index 70ec0c2..5bd9901 100644 --- a/software/test_software/weather_station_rs485_client.py +++ b/software/test_software/weather_station_rs485_client.py @@ -35,52 +35,53 @@ from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes properties=Properties(PacketTypes.PUBLISH) properties.MessageExpiryInterval=30 # in seconds -import ssl +#import ssl -version = '3' # or '5' -mqtt_transport = 'tcp' # or 'websockets' +def start_mqtt(config): -if version == '5': - client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy", - transport=mqtt_transport, - protocol=mqtt.MQTTv5) -if version == '3': - client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy", - transport=mqtt_transport, - protocol=mqtt.MQTTv311, - clean_session=True) -#client.username_pw_set("user", "password") + version = '3' # or '5' + mqtt_transport = 'tcp' # or 'websockets' -import mqtt_callbacks -client.on_message = mqtt_callbacks.on_message; -client.on_connect = mqtt_callbacks.on_connect; -#client.on_publish = mqtt_callbacks.on_publish; -client.on_subscribe = mqtt_callbacks.on_subscribe; + if version == '5': + client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy", + transport=mqtt_transport, + protocol=mqtt.MQTTv5) + if version == '3': + client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy", + transport=mqtt_transport, + protocol=mqtt.MQTTv311, + clean_session=True) + #client.username_pw_set("user", "password") -mqtt_broker = 'mqtt.meezenest.nl' -mqtt_port = 1883 -mqtt_keepalalive=60 -mqtt_topic = 'topic/important' + import mqtt_callbacks + client.on_message = mqtt_callbacks.on_message; + client.on_connect = mqtt_callbacks.on_connect; + #client.on_publish = mqtt_callbacks.on_publish; + client.on_subscribe = mqtt_callbacks.on_subscribe; -if version == '5': - from paho.mqtt.properties import Properties - from paho.mqtt.packettypes import PacketTypes - properties=Properties(PacketTypes.CONNECT) - properties.SessionExpiryInterval=30*60 # in seconds - client.connect(mqtt_broker, - port=mqtt_port, - clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, - properties=properties, - keepalive=60); -elif version == '3': - client.connect(mqtt_broker, port=mqtt_port, keepalive=mqtt_keepalalive); + if version == '5': + from paho.mqtt.properties import Properties + from paho.mqtt.packettypes import PacketTypes + properties=Properties(PacketTypes.CONNECT) + properties.SessionExpiryInterval=30*60 # in seconds + client.connect(config['mqtt-server'], + port=config['mqtt-port'], + clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, + properties=properties, + keepalive=60); -client.loop_start(); + elif version == '3': + client.connect(config['mqtt-server'], port=config['mqtt-port'], keepalive=60); -while 1: - client.publish(mqtt_topic,'Cedalo Mosquitto is awesome',2,properties=properties); - time.sleep(1) + client.loop_start(); + +# Debug code +# while 1: +# client.publish(mqtt_topic,'Cedalo Mosquitto is awesome',2,properties=properties); +# time.sleep(1) + + return client def setup(): @@ -118,6 +119,20 @@ def setup(): logging.info("Using RS485 port : " + configuration.config_file_settings['modbus']['port']) logging.info("Publishing sensor data on MQTT broker: " + configuration.config_file_settings['global']['mqtt-server']) + # Start MQTT section + mqtt_connected = False + while not mqtt_connected: + try: + mqtt_client = start_mqtt(configuration.config_file_settings['global']) + mqtt_connected = True + except: + logging.error("Could not connect to MQTT broker. Retry until success (of until CTRL-C is pressed).") + time.sleep(3) # Sleep for 3 seconds + + # End MQTT section + + # Start ModBus section + # Read the ModBus definition file modbus_registers = definition_file_reader(definition_file) @@ -139,7 +154,9 @@ def setup(): logging.error("Could not open serial port " + configuration.config_file_settings['modbus']['port']) sys.exit("Exiting") - return configuration, controller, modbus_addresses, modbus_registers + # End ModBus section + + return configuration, controller, modbus_addresses, modbus_registers, mqtt_client def data_logger(data, configuration): @@ -189,23 +206,46 @@ def send_data_to_aprs(weather_data, configuration): logging.error(f"Command returned: {e}") -def send_data_to_mqtt(data, configuration, modbus_registers): +def send_data_to_mqtt(data, configuration, modbus_registers, mqtt_client): + mqtt_top_topic = [] + mqtt_full_topic = [] #logging.debug(modbus_registers) # Match actual device on ModBus with definition in modbus_registers.yaml for index1, entry1 in enumerate(modbus_registers['devices']): if entry1['device_type'] == data['Type']: + # Format serial number for unique MQTT ID - logging.debug("topic: mees_electronics_" + hex(data['ID'][0])[2:].zfill(4) + hex(data['ID'][1])[2:].zfill(4) + hex(data['ID'][2])[2:].zfill(4) + hex(data['ID'][3])[2:].zfill(4)) - logging.debug("type: " + str(data['Type'])) - logging.debug("type_string: " + data['TypeString']) + mqtt_type_topic = hex(data['ID'][0])[2:].zfill(4) + hex(data['ID'][1])[2:].zfill(4) + hex(data['ID'][2])[2:].zfill(4) + hex(data['ID'][3])[2:].zfill(4) + mqtt_top_topic = configuration.config_file_settings['global']['mqtt-root-topic'] + "/" + mqtt_type_topic + "/" + message_topic = "id" + mqtt_message = "mees_electronics_" + mqtt_type_topic + mqtt_full_topic = mqtt_top_topic + message_topic + MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties); + logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message) + + message_topic = "type" + mqtt_message = str(data['Type']) + mqtt_full_topic = mqtt_top_topic + message_topic + MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties); + logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message) + + message_topic = "type_string" + mqtt_message = data['TypeString'] + mqtt_full_topic = mqtt_top_topic + message_topic + MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties); + logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message) # Go through every input register and match the unit and description with the value for index2, entry2 in enumerate(data['InputRegisters']): - print(entry1['input_register_names'][index2][2]) # Scale values entry2 = entry2/ (10^entry1['input_register_names'][index2][2]) - logging.debug(entry1['input_register_names'][index2][0] + ": " + str(round(entry2,1)) + entry1['input_register_names'][index2][1]) + + message_topic = entry1['input_register_names'][index2][0] + mqtt_message = str(round(entry2,1)) + mqtt_full_topic = mqtt_top_topic + message_topic + MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties); + logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message) logging.debug("Send data to MQTT broker.") @@ -225,13 +265,14 @@ def ReconnectModBus(configuration): logging.info("Reconnected to ModBus dongle.") -Configuration, Controller, ModbusAddresses, ModbusRegisters = setup() +Configuration, Controller, ModbusAddresses, ModbusRegisters, MqttClient = setup() LoopCounter = 0 while (1): time.sleep(3) # Sleep for 3 seconds # Send APRS telemetry every 10 cycles = every 10 minutes + ''' LoopCounter = LoopCounter + 1 if LoopCounter >= 1: @@ -242,7 +283,7 @@ while (1): time.sleep(3) LoopCounter = 0 - + ''' ModBusData={} # Loop through all configured ModBus devices and try to read the sensor data @@ -299,6 +340,6 @@ while (1): data_logger(ModBusData, Configuration) # Send sensor data to MQTT broker - send_data_to_mqtt(ModBusData, Configuration, ModbusRegisters.definition_file_data) + send_data_to_mqtt(ModBusData, Configuration, ModbusRegisters.definition_file_data, MqttClient)