diff --git a/software/test_software/__pycache__/config_reader.cpython-311.pyc b/software/test_software/__pycache__/config_reader.cpython-311.pyc index cdfb182..f78461c 100644 Binary files a/software/test_software/__pycache__/config_reader.cpython-311.pyc and b/software/test_software/__pycache__/config_reader.cpython-311.pyc differ diff --git a/software/test_software/__pycache__/mqtt_callbacks.cpython-311.pyc b/software/test_software/__pycache__/mqtt_callbacks.cpython-311.pyc index b1d64e3..e0a8923 100644 Binary files a/software/test_software/__pycache__/mqtt_callbacks.cpython-311.pyc and b/software/test_software/__pycache__/mqtt_callbacks.cpython-311.pyc differ diff --git a/software/test_software/config.yml b/software/test_software/config.yml index 2b0c604..93b241b 100644 --- a/software/test_software/config.yml +++ b/software/test_software/config.yml @@ -7,6 +7,8 @@ global: #program-log: /home/marcel/rs458.log # All program output will be written to this file (0 = do not log to file) data-log: /home/marcel/weather_station_data.log # All sensor data will be written to this file (0 = do not log to file). Every day the current date is added to the end of the filename, so every dat a new file is created. mqtt-server: mqtt.meezenest.nl + mqtt-port: 1883 + mqtt-root-topic: mees_electronics # ModBus hardware settings modbus: diff --git a/software/test_software/config_reader.py b/software/test_software/config_reader.py index 2255fc0..a745daa 100644 --- a/software/test_software/config_reader.py +++ b/software/test_software/config_reader.py @@ -60,6 +60,8 @@ class config_reader: tmp = self.config_file_settings['global']['program-log'] tmp = self.config_file_settings['global']['data-log'] tmp = self.config_file_settings['global']['mqtt-server'] + tmp = self.config_file_settings['global']['mqtt-port'] + tmp = self.config_file_settings['global']['mqtt-root-topic'] except: print ("Error in the global section of the configuration file.") return 0 diff --git a/software/test_software/modbus_registers.yaml b/software/test_software/modbus_registers.yaml index e29d625..df8830e 100644 --- a/software/test_software/modbus_registers.yaml +++ b/software/test_software/modbus_registers.yaml @@ -13,18 +13,18 @@ devices: - device_type: 2 input_registers: 15 # The number of available input registers, starting from offset 40 input_register_names: # Description, unit, scaling 0 = as is, 1 = decimal one position to the left, 2 = decimal two positions to the left, enz. - - [Weater station ID, '', 0] - - [Wind direction, °, 1] - - [Wind speed, 'km/h', 2] - - [Wind gust, 'km/h', 2] - - [Temperature, °C, 2] - - [Rain last hour, 'l/m2', 2] - - [Rain last 24 hours, 'l/m2', 2] - - [Rain since midnight, 'l/m2', 2] - - [Humidity, '%', 2] - - [Barometric pressure, hPa, 1] - - [Luminosity, 'W/m2', 0] - - [Snow fall, NA, 0] - - [Raw rainfall counter, mm, 0] - - [Temperature pressure sensor, °C, 2] - - [Status bits, '', 0] + - [weater_station_id, '', 0] + - [wind_direction, °, 1] + - [wind_speed, 'km/h', 2] + - [wind_gust, 'km/h', 2] + - [temperature, °C, 2] + - [rain_last_hour, 'l/m2', 2] + - [rain_last_24 hours, 'l/m2', 2] + - [rain_since_midnight, 'l/m2', 2] + - [humidity, '%', 2] + - [barometric_pressure, hPa, 1] + - [luminosity, 'W/m2', 0] + - [snow_fall, NA, 0] + - [raw_rainfall_counter, mm, 0] + - [temperature_pressure_sensor, °C, 2] + - [status_bits, '', 0] diff --git a/software/test_software/mqtt_callbacks.py b/software/test_software/mqtt_callbacks.py index 874c7df..ecfaf27 100644 --- a/software/test_software/mqtt_callbacks.py +++ b/software/test_software/mqtt_callbacks.py @@ -1,8 +1,10 @@ +import logging + def on_connect(client, userdata, flags, reason_code, properties=None): - client.subscribe(topic="topic/important") + logging.info("Connected to MQTT broker.") + def on_message(client, userdata, message, properties=None): - print( - f"(Received message {message.payload} on topic '{message.topic}' with QoS {message.qos}" - ) + logging.info(f"(Received message {message.payload} on topic '{message.topic}' with QoS {message.qos}") + def on_subscribe(client, userdata, mid, qos, properties=None): - print(f"Subscribed with QoS {qos}") + logging.info(f"Subscribed with QoS {qos}") diff --git a/software/test_software/mqtt_control.py b/software/test_software/mqtt_control.py deleted file mode 100644 index 306a229..0000000 --- a/software/test_software/mqtt_control.py +++ /dev/null @@ -1,132 +0,0 @@ -# NOTE: does not work, head exploded! -import json -import logging -import random -import time -from paho.mqtt.client import Client -from paho.mqtt.client import CallbackAPIVersion - -BROKER = 'mqtt.meezenest.nl' -PORT = 1883 -TOPIC = "python-mqtt/tcp" -# generate client ID with pub prefix randomly -CLIENT_ID = f'python-mqtt-tcp-pub-sub-{random.randint(0, 1000)}' -USERNAME = 'emqx' -PASSWORD = 'public' - -FIRST_RECONNECT_DELAY = 1 -RECONNECT_RATE = 2 -MAX_RECONNECT_COUNT = 12 -MAX_RECONNECT_DELAY = 60 - -FLAG_EXIT = False - -class MqttController(object): - - def __init__(self, config: dict, message_processor=None): - self.config = config - - self.client = Client( - client_id=self.config['mqtt_client'], - clean_session=self.config['mqtt_clean_session'], - userdata={"client": self.config['mqtt_client']}, - ) - - self.client.username_pw_set(self.config['mqtt_username'], self.config['mqtt_password']) - - if self.config['mqtt_debug']: - self.client.on_log = self._on_log - - self.client.on_connect = self._on_connect - #self.client.on_subscribe = self._on_subscribe - self.client.on_message = self._on_message - #self.client.on_publish = self._on_publish - self.client.on_disconnect = self._on_disconnect - - self.client.connect(self.config['mqtt_host'], self.config['mqtt_port'], 60) - - if message_processor: - self.message_processor = message_processor - - def _on_connect(self, client, userdata, flags, rc, properties): - if rc == 0 and self.client.is_connected(): - print("Connected to MQTT Broker!") - self.client.subscribe(TOPIC) - else: - print(f'Failed to connect, return code {rc}') - - - def _on_disconnect(self, client, userdata, rc): - logging.info("Disconnected with result code: %s", rc) - reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY - while reconnect_count < MAX_RECONNECT_COUNT: - logging.info("Reconnecting in %d seconds...", reconnect_delay) - time.sleep(reconnect_delay) - - try: - self.client.reconnect() - logging.info("Reconnected successfully!") - return - except Exception as err: - logging.error("%s. Reconnect failed. Retrying...", err) - - reconnect_delay *= RECONNECT_RATE - reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) - reconnect_count += 1 - logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count) - global FLAG_EXIT - FLAG_EXIT = True - - def _on_message(self, client, userdata, msg): - print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic') - - def loop_start(self): - self.client.loop_start() - - def publish(self): - msg_count = 0 - while not FLAG_EXIT: - msg_dict = { - 'msg': msg_count - } - msg = json.dumps(msg_dict) - if not self.client.is_connected(): - logging.error("publish: MQTT client is not connected!") - time.sleep(1) - continue - result = self.client.publish(TOPIC, msg) - # result: [0, 1] - status = result[0] - if status == 0: - print(f'Send `{msg}` to topic `{TOPIC}`') - else: - print(f'Failed to send message to topic {TOPIC}') - msg_count += 1 - time.sleep(1) - - -def run(): - mqtt_config = { - "mqtt_client": "MeesElecronics", - "mqtt_clean_session": "", - "mqtt_username": "1964", - "mqtt_password": "NoPasswd", - "mqtt_debug": False, - "mqtt_host": "mqtt.meezenest.nl", - "mqtt_port": 1883 - } - print(mqtt_config) - logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s', - level=logging.DEBUG) - client = MqttController(mqtt_config) - client.loop_start() - time.sleep(1) - client.publish() - #if client.is_connected(): - # publish() - #else: - # client.loop_stop() - - -if __name__ == '__main__': - run() diff --git a/software/test_software/mqtt_control_simple.py b/software/test_software/mqtt_control_simple.py deleted file mode 100644 index bf6c956..0000000 --- a/software/test_software/mqtt_control_simple.py +++ /dev/null @@ -1,89 +0,0 @@ -# python 3.11 - -import random -import time - -from paho.mqtt import client as mqtt_client -from paho.mqtt.client import CallbackAPIVersion - - -broker = 'mqtt.meezenest.nl' -port = 1883 -topic = "python/mqtt" -# Generate a Client ID with the publish prefix. -client_id = f'publish-{random.randint(0, 1000)}' -# username = 'emqx' -# password = 'public' - -msg_count = 1 - -def connect_mqtt(): - def on_connect(client, userdata, flags, rc, properties): - if rc == 0: - print("Connected to MQTT Broker!") - else: - print("Failed to connect, return code %d\n", rc) - - def on_disconnect(client, userdata, rc): - print("Disconnected with result code: %s", rc) - reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY - while reconnect_count < MAX_RECONNECT_COUNT: - print("Reconnecting in %d seconds...", reconnect_delay) - time.sleep(reconnect_delay) - - try: - client.reconnect() - print("Reconnected successfully!") - return - except Exception as err: - logging.error("%s. Reconnect failed. Retrying...", err) - - reconnect_delay *= RECONNECT_RATE - reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) - reconnect_count += 1 - print("Reconnect failed after %s attempts. Exiting...", reconnect_count) - global FLAG_EXIT - FLAG_EXIT = True - - client = mqtt_client.Client(CallbackAPIVersion.VERSION2, client_id) - # client.username_pw_set(username, password) - client.on_connect = on_connect - client.on_disconnect = on_disconnect - client.connect(broker, port, 120) - return client - - -def publish(client): - global msg_count - #while True: - time.sleep(1) - msg = f"messages: {msg_count}" - result = client.publish(topic, msg) - # result: [0, 1] - status = result[0] - if status == 0: - print(f"Send `{msg}` to topic `{topic}`") - else: - print(f"Failed to send message to topic {topic}") - msg_count += 1 - if msg_count > 5: - #break - msg_count=1 - - -def run(): - client = connect_mqtt() - client.loop_start() - while 1: - if client.is_connected(): - print(client.is_connected()) - publish(client) - - #else: - #client.loop_stop() - - client.loop_stop() - - -if __name__ == '__main__': - run() diff --git a/software/test_software/weather_station_rs485_client.py b/software/test_software/weather_station_rs485_client.py index 70ec0c2..5bd9901 100644 --- a/software/test_software/weather_station_rs485_client.py +++ b/software/test_software/weather_station_rs485_client.py @@ -35,52 +35,53 @@ from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes properties=Properties(PacketTypes.PUBLISH) properties.MessageExpiryInterval=30 # in seconds -import ssl +#import ssl -version = '3' # or '5' -mqtt_transport = 'tcp' # or 'websockets' +def start_mqtt(config): -if version == '5': - client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy", - transport=mqtt_transport, - protocol=mqtt.MQTTv5) -if version == '3': - client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy", - transport=mqtt_transport, - protocol=mqtt.MQTTv311, - clean_session=True) -#client.username_pw_set("user", "password") + version = '3' # or '5' + mqtt_transport = 'tcp' # or 'websockets' -import mqtt_callbacks -client.on_message = mqtt_callbacks.on_message; -client.on_connect = mqtt_callbacks.on_connect; -#client.on_publish = mqtt_callbacks.on_publish; -client.on_subscribe = mqtt_callbacks.on_subscribe; + if version == '5': + client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy", + transport=mqtt_transport, + protocol=mqtt.MQTTv5) + if version == '3': + client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy", + transport=mqtt_transport, + protocol=mqtt.MQTTv311, + clean_session=True) + #client.username_pw_set("user", "password") -mqtt_broker = 'mqtt.meezenest.nl' -mqtt_port = 1883 -mqtt_keepalalive=60 -mqtt_topic = 'topic/important' + import mqtt_callbacks + client.on_message = mqtt_callbacks.on_message; + client.on_connect = mqtt_callbacks.on_connect; + #client.on_publish = mqtt_callbacks.on_publish; + client.on_subscribe = mqtt_callbacks.on_subscribe; -if version == '5': - from paho.mqtt.properties import Properties - from paho.mqtt.packettypes import PacketTypes - properties=Properties(PacketTypes.CONNECT) - properties.SessionExpiryInterval=30*60 # in seconds - client.connect(mqtt_broker, - port=mqtt_port, - clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, - properties=properties, - keepalive=60); -elif version == '3': - client.connect(mqtt_broker, port=mqtt_port, keepalive=mqtt_keepalalive); + if version == '5': + from paho.mqtt.properties import Properties + from paho.mqtt.packettypes import PacketTypes + properties=Properties(PacketTypes.CONNECT) + properties.SessionExpiryInterval=30*60 # in seconds + client.connect(config['mqtt-server'], + port=config['mqtt-port'], + clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, + properties=properties, + keepalive=60); -client.loop_start(); + elif version == '3': + client.connect(config['mqtt-server'], port=config['mqtt-port'], keepalive=60); -while 1: - client.publish(mqtt_topic,'Cedalo Mosquitto is awesome',2,properties=properties); - time.sleep(1) + client.loop_start(); + +# Debug code +# while 1: +# client.publish(mqtt_topic,'Cedalo Mosquitto is awesome',2,properties=properties); +# time.sleep(1) + + return client def setup(): @@ -118,6 +119,20 @@ def setup(): logging.info("Using RS485 port : " + configuration.config_file_settings['modbus']['port']) logging.info("Publishing sensor data on MQTT broker: " + configuration.config_file_settings['global']['mqtt-server']) + # Start MQTT section + mqtt_connected = False + while not mqtt_connected: + try: + mqtt_client = start_mqtt(configuration.config_file_settings['global']) + mqtt_connected = True + except: + logging.error("Could not connect to MQTT broker. Retry until success (of until CTRL-C is pressed).") + time.sleep(3) # Sleep for 3 seconds + + # End MQTT section + + # Start ModBus section + # Read the ModBus definition file modbus_registers = definition_file_reader(definition_file) @@ -139,7 +154,9 @@ def setup(): logging.error("Could not open serial port " + configuration.config_file_settings['modbus']['port']) sys.exit("Exiting") - return configuration, controller, modbus_addresses, modbus_registers + # End ModBus section + + return configuration, controller, modbus_addresses, modbus_registers, mqtt_client def data_logger(data, configuration): @@ -189,23 +206,46 @@ def send_data_to_aprs(weather_data, configuration): logging.error(f"Command returned: {e}") -def send_data_to_mqtt(data, configuration, modbus_registers): +def send_data_to_mqtt(data, configuration, modbus_registers, mqtt_client): + mqtt_top_topic = [] + mqtt_full_topic = [] #logging.debug(modbus_registers) # Match actual device on ModBus with definition in modbus_registers.yaml for index1, entry1 in enumerate(modbus_registers['devices']): if entry1['device_type'] == data['Type']: + # Format serial number for unique MQTT ID - logging.debug("topic: mees_electronics_" + hex(data['ID'][0])[2:].zfill(4) + hex(data['ID'][1])[2:].zfill(4) + hex(data['ID'][2])[2:].zfill(4) + hex(data['ID'][3])[2:].zfill(4)) - logging.debug("type: " + str(data['Type'])) - logging.debug("type_string: " + data['TypeString']) + mqtt_type_topic = hex(data['ID'][0])[2:].zfill(4) + hex(data['ID'][1])[2:].zfill(4) + hex(data['ID'][2])[2:].zfill(4) + hex(data['ID'][3])[2:].zfill(4) + mqtt_top_topic = configuration.config_file_settings['global']['mqtt-root-topic'] + "/" + mqtt_type_topic + "/" + message_topic = "id" + mqtt_message = "mees_electronics_" + mqtt_type_topic + mqtt_full_topic = mqtt_top_topic + message_topic + MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties); + logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message) + + message_topic = "type" + mqtt_message = str(data['Type']) + mqtt_full_topic = mqtt_top_topic + message_topic + MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties); + logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message) + + message_topic = "type_string" + mqtt_message = data['TypeString'] + mqtt_full_topic = mqtt_top_topic + message_topic + MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties); + logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message) # Go through every input register and match the unit and description with the value for index2, entry2 in enumerate(data['InputRegisters']): - print(entry1['input_register_names'][index2][2]) # Scale values entry2 = entry2/ (10^entry1['input_register_names'][index2][2]) - logging.debug(entry1['input_register_names'][index2][0] + ": " + str(round(entry2,1)) + entry1['input_register_names'][index2][1]) + + message_topic = entry1['input_register_names'][index2][0] + mqtt_message = str(round(entry2,1)) + mqtt_full_topic = mqtt_top_topic + message_topic + MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties); + logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message) logging.debug("Send data to MQTT broker.") @@ -225,13 +265,14 @@ def ReconnectModBus(configuration): logging.info("Reconnected to ModBus dongle.") -Configuration, Controller, ModbusAddresses, ModbusRegisters = setup() +Configuration, Controller, ModbusAddresses, ModbusRegisters, MqttClient = setup() LoopCounter = 0 while (1): time.sleep(3) # Sleep for 3 seconds # Send APRS telemetry every 10 cycles = every 10 minutes + ''' LoopCounter = LoopCounter + 1 if LoopCounter >= 1: @@ -242,7 +283,7 @@ while (1): time.sleep(3) LoopCounter = 0 - + ''' ModBusData={} # Loop through all configured ModBus devices and try to read the sensor data @@ -299,6 +340,6 @@ while (1): data_logger(ModBusData, Configuration) # Send sensor data to MQTT broker - send_data_to_mqtt(ModBusData, Configuration, ModbusRegisters.definition_file_data) + send_data_to_mqtt(ModBusData, Configuration, ModbusRegisters.definition_file_data, MqttClient)