Reliable MQTT implementation.
This commit is contained in:
@@ -35,52 +35,53 @@ from paho.mqtt.properties import Properties
|
||||
from paho.mqtt.packettypes import PacketTypes
|
||||
properties=Properties(PacketTypes.PUBLISH)
|
||||
properties.MessageExpiryInterval=30 # in seconds
|
||||
import ssl
|
||||
#import ssl
|
||||
|
||||
version = '3' # or '5'
|
||||
mqtt_transport = 'tcp' # or 'websockets'
|
||||
def start_mqtt(config):
|
||||
|
||||
if version == '5':
|
||||
client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy",
|
||||
transport=mqtt_transport,
|
||||
protocol=mqtt.MQTTv5)
|
||||
if version == '3':
|
||||
client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy",
|
||||
transport=mqtt_transport,
|
||||
protocol=mqtt.MQTTv311,
|
||||
clean_session=True)
|
||||
#client.username_pw_set("user", "password")
|
||||
version = '3' # or '5'
|
||||
mqtt_transport = 'tcp' # or 'websockets'
|
||||
|
||||
import mqtt_callbacks
|
||||
client.on_message = mqtt_callbacks.on_message;
|
||||
client.on_connect = mqtt_callbacks.on_connect;
|
||||
#client.on_publish = mqtt_callbacks.on_publish;
|
||||
client.on_subscribe = mqtt_callbacks.on_subscribe;
|
||||
if version == '5':
|
||||
client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy",
|
||||
transport=mqtt_transport,
|
||||
protocol=mqtt.MQTTv5)
|
||||
if version == '3':
|
||||
client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy",
|
||||
transport=mqtt_transport,
|
||||
protocol=mqtt.MQTTv311,
|
||||
clean_session=True)
|
||||
#client.username_pw_set("user", "password")
|
||||
|
||||
mqtt_broker = 'mqtt.meezenest.nl'
|
||||
mqtt_port = 1883
|
||||
mqtt_keepalalive=60
|
||||
mqtt_topic = 'topic/important'
|
||||
import mqtt_callbacks
|
||||
client.on_message = mqtt_callbacks.on_message;
|
||||
client.on_connect = mqtt_callbacks.on_connect;
|
||||
#client.on_publish = mqtt_callbacks.on_publish;
|
||||
client.on_subscribe = mqtt_callbacks.on_subscribe;
|
||||
|
||||
if version == '5':
|
||||
from paho.mqtt.properties import Properties
|
||||
from paho.mqtt.packettypes import PacketTypes
|
||||
properties=Properties(PacketTypes.CONNECT)
|
||||
properties.SessionExpiryInterval=30*60 # in seconds
|
||||
client.connect(mqtt_broker,
|
||||
port=mqtt_port,
|
||||
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
|
||||
properties=properties,
|
||||
keepalive=60);
|
||||
|
||||
elif version == '3':
|
||||
client.connect(mqtt_broker, port=mqtt_port, keepalive=mqtt_keepalalive);
|
||||
if version == '5':
|
||||
from paho.mqtt.properties import Properties
|
||||
from paho.mqtt.packettypes import PacketTypes
|
||||
properties=Properties(PacketTypes.CONNECT)
|
||||
properties.SessionExpiryInterval=30*60 # in seconds
|
||||
client.connect(config['mqtt-server'],
|
||||
port=config['mqtt-port'],
|
||||
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
|
||||
properties=properties,
|
||||
keepalive=60);
|
||||
|
||||
client.loop_start();
|
||||
elif version == '3':
|
||||
client.connect(config['mqtt-server'], port=config['mqtt-port'], keepalive=60);
|
||||
|
||||
while 1:
|
||||
client.publish(mqtt_topic,'Cedalo Mosquitto is awesome',2,properties=properties);
|
||||
time.sleep(1)
|
||||
client.loop_start();
|
||||
|
||||
# Debug code
|
||||
# while 1:
|
||||
# client.publish(mqtt_topic,'Cedalo Mosquitto is awesome',2,properties=properties);
|
||||
# time.sleep(1)
|
||||
|
||||
return client
|
||||
|
||||
|
||||
def setup():
|
||||
@@ -118,6 +119,20 @@ def setup():
|
||||
logging.info("Using RS485 port : " + configuration.config_file_settings['modbus']['port'])
|
||||
logging.info("Publishing sensor data on MQTT broker: " + configuration.config_file_settings['global']['mqtt-server'])
|
||||
|
||||
# Start MQTT section
|
||||
mqtt_connected = False
|
||||
while not mqtt_connected:
|
||||
try:
|
||||
mqtt_client = start_mqtt(configuration.config_file_settings['global'])
|
||||
mqtt_connected = True
|
||||
except:
|
||||
logging.error("Could not connect to MQTT broker. Retry until success (of until CTRL-C is pressed).")
|
||||
time.sleep(3) # Sleep for 3 seconds
|
||||
|
||||
# End MQTT section
|
||||
|
||||
# Start ModBus section
|
||||
|
||||
# Read the ModBus definition file
|
||||
modbus_registers = definition_file_reader(definition_file)
|
||||
|
||||
@@ -139,7 +154,9 @@ def setup():
|
||||
logging.error("Could not open serial port " + configuration.config_file_settings['modbus']['port'])
|
||||
sys.exit("Exiting")
|
||||
|
||||
return configuration, controller, modbus_addresses, modbus_registers
|
||||
# End ModBus section
|
||||
|
||||
return configuration, controller, modbus_addresses, modbus_registers, mqtt_client
|
||||
|
||||
def data_logger(data, configuration):
|
||||
|
||||
@@ -189,23 +206,46 @@ def send_data_to_aprs(weather_data, configuration):
|
||||
logging.error(f"Command returned: {e}")
|
||||
|
||||
|
||||
def send_data_to_mqtt(data, configuration, modbus_registers):
|
||||
def send_data_to_mqtt(data, configuration, modbus_registers, mqtt_client):
|
||||
|
||||
mqtt_top_topic = []
|
||||
mqtt_full_topic = []
|
||||
#logging.debug(modbus_registers)
|
||||
# Match actual device on ModBus with definition in modbus_registers.yaml
|
||||
for index1, entry1 in enumerate(modbus_registers['devices']):
|
||||
if entry1['device_type'] == data['Type']:
|
||||
|
||||
# Format serial number for unique MQTT ID
|
||||
logging.debug("topic: mees_electronics_" + hex(data['ID'][0])[2:].zfill(4) + hex(data['ID'][1])[2:].zfill(4) + hex(data['ID'][2])[2:].zfill(4) + hex(data['ID'][3])[2:].zfill(4))
|
||||
logging.debug("type: " + str(data['Type']))
|
||||
logging.debug("type_string: " + data['TypeString'])
|
||||
mqtt_type_topic = hex(data['ID'][0])[2:].zfill(4) + hex(data['ID'][1])[2:].zfill(4) + hex(data['ID'][2])[2:].zfill(4) + hex(data['ID'][3])[2:].zfill(4)
|
||||
mqtt_top_topic = configuration.config_file_settings['global']['mqtt-root-topic'] + "/" + mqtt_type_topic + "/"
|
||||
message_topic = "id"
|
||||
mqtt_message = "mees_electronics_" + mqtt_type_topic
|
||||
mqtt_full_topic = mqtt_top_topic + message_topic
|
||||
MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties);
|
||||
logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message)
|
||||
|
||||
message_topic = "type"
|
||||
mqtt_message = str(data['Type'])
|
||||
mqtt_full_topic = mqtt_top_topic + message_topic
|
||||
MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties);
|
||||
logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message)
|
||||
|
||||
message_topic = "type_string"
|
||||
mqtt_message = data['TypeString']
|
||||
mqtt_full_topic = mqtt_top_topic + message_topic
|
||||
MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties);
|
||||
logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message)
|
||||
|
||||
# Go through every input register and match the unit and description with the value
|
||||
for index2, entry2 in enumerate(data['InputRegisters']):
|
||||
print(entry1['input_register_names'][index2][2])
|
||||
# Scale values
|
||||
entry2 = entry2/ (10^entry1['input_register_names'][index2][2])
|
||||
logging.debug(entry1['input_register_names'][index2][0] + ": " + str(round(entry2,1)) + entry1['input_register_names'][index2][1])
|
||||
|
||||
message_topic = entry1['input_register_names'][index2][0]
|
||||
mqtt_message = str(round(entry2,1))
|
||||
mqtt_full_topic = mqtt_top_topic + message_topic
|
||||
MqttClient.publish(mqtt_full_topic, mqtt_message, 2, properties = properties);
|
||||
logging.debug("Published message to MQTT broker: " + mqtt_full_topic + " = " + mqtt_message)
|
||||
|
||||
logging.debug("Send data to MQTT broker.")
|
||||
|
||||
@@ -225,13 +265,14 @@ def ReconnectModBus(configuration):
|
||||
|
||||
logging.info("Reconnected to ModBus dongle.")
|
||||
|
||||
Configuration, Controller, ModbusAddresses, ModbusRegisters = setup()
|
||||
Configuration, Controller, ModbusAddresses, ModbusRegisters, MqttClient = setup()
|
||||
LoopCounter = 0
|
||||
|
||||
while (1):
|
||||
time.sleep(3) # Sleep for 3 seconds
|
||||
|
||||
# Send APRS telemetry every 10 cycles = every 10 minutes
|
||||
'''
|
||||
LoopCounter = LoopCounter + 1
|
||||
if LoopCounter >= 1:
|
||||
|
||||
@@ -242,7 +283,7 @@ while (1):
|
||||
time.sleep(3)
|
||||
|
||||
LoopCounter = 0
|
||||
|
||||
'''
|
||||
ModBusData={}
|
||||
|
||||
# Loop through all configured ModBus devices and try to read the sensor data
|
||||
@@ -299,6 +340,6 @@ while (1):
|
||||
data_logger(ModBusData, Configuration)
|
||||
|
||||
# Send sensor data to MQTT broker
|
||||
send_data_to_mqtt(ModBusData, Configuration, ModbusRegisters.definition_file_data)
|
||||
send_data_to_mqtt(ModBusData, Configuration, ModbusRegisters.definition_file_data, MqttClient)
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user