First sort of working mqtt routines

This commit is contained in:
marcel
2025-08-11 21:31:31 +02:00
parent f722f8aec1
commit 8afdc890f1
15 changed files with 443 additions and 31 deletions

View File

@@ -15,7 +15,19 @@ modbus:
# Modbus servers settings
modbus_servers:
- address: 14 # ModBus address of weather station
- address: 14 # ModBus address of weather station
description: Dual temperature sensor
- address: 1
description: Dummy
aprs:
- port: ax0 # Linux AX.25 port to which APRS weather report is sent
call: PE1RXF-13 # Call from which transmissions are made (can be a different call from the call assigned to the AX.25 port)
destination: APZMDM # APRS destination
digi_path: 0 # Digipeater path for weather reports (0 = no path)
interval: 30
- port: ax1 # Linux AX.25 port to which APRS weather report is sent
call: PE1RXF-13 # Call from which transmissions are made (can be a different call from the call assigned to the AX.25 port)
destination: APZMDM # APRS destination
digi_path: WIDE2-1 # Digipeater path for weather reports (0 = no path)
interval: 30

View File

@@ -38,6 +38,9 @@ class config_reader:
if self.test_modbus_servers_settings() == 0:
return 0
if self.aprs_settings() == 0:
return 0
return 1
def read_config_file (self):
@@ -50,7 +53,7 @@ class config_reader:
else:
return 1
# Test if all settings are pressebt
# Test if all settings are pressent
def test_global_settings(self):
# Test is all expected settings are present
try:
@@ -63,7 +66,7 @@ class config_reader:
else:
return 1
# Test if all settings are pressebt
# Test if all settings are pressent
def test_modbus_settings(self):
# Test is all expected settings are present
try:
@@ -84,3 +87,17 @@ class config_reader:
return 0
else:
return 1
def aprs_settings(self):
for entry in self.config_file_settings['aprs']:
try:
tmp = tmp = entry['port']
tmp = entry['call']
tmp = entry['destination']
tmp = entry['digi_path']
tmp = entry['interval']
except:
print ("Error in the aprs section of the configuration file.")
return 0
else:
return 1

View File

@@ -39,25 +39,28 @@ class ModBusController(minimalmodbus.Instrument):
self.clear_buffers_before_each_transaction = True
self.close_port_after_each_call = False
#Address range 0x3000
self.watchdog_toggle_bit = 1
# Read the device ID
def get_id(self):
device_id = [None]*4
device_id[0] = self.read_register(0, 0, 4, False)
device_id[1] = self.read_register(1, 0, 4, False)
device_id[2] = self.read_register(2, 0, 4, False)
device_id[3] = self.read_register(3, 0, 4, False)
device_id[0] = self.read_register(1000, 0, 4, False)
device_id[1] = self.read_register(1001, 0, 4, False)
device_id[2] = self.read_register(1002, 0, 4, False)
device_id[3] = self.read_register(1003, 0, 4, False)
return device_id
# Read the device type
def get_type(self):
device_type = self.read_register(4, 0, 4, False)
device_type = self.read_register(1004, 0, 4, False)
return device_type
# Get the 60 character long description string from ModBus device
# Get the 40 character long description string from ModBus device
def get_type_string(self):
register_offset = 6 # ModBus register for text string starts at 6
device_type_register = [None]*30 # ModBus register for text string is 30 integers long
register_offset = 1006 # ModBus register for text string starts at 1006
device_type_register = [None]*20 # ModBus register for text string is 20 integers long
device_type_string = "" # Store the decoded string here
for index, entry in enumerate(device_type_register):
@@ -69,24 +72,36 @@ class ModBusController(minimalmodbus.Instrument):
return device_type_string.strip()
# Ask the device how many input registers it has
def get_number_of_input_registers(self):
device_number_of_input_registers = self.read_register(36, 0, 4, False)
device_number_of_input_registers = self.read_register(1026, 0, 4, False)
return device_number_of_input_registers
# Read all the input registers
def read_all_input_registers(self):
number_of_input_registers = self.get_number_of_input_registers()
offset = 40
input_register = [None]*number_of_input_registers
for index in range(number_of_input_registers):
input_register[index] = self.read_register(index + offset, 0, 4, False)
input_register[index] = self.read_register(index, 0, 4, False)
return input_register
def enable_heater(self):
def set_watchdog(self):
self.write_bit(0, 1, 5)
def disable_heater(self):
def reset_watchdog(self):
self.write_bit(0, 0, 5)
# Toggle the devices watchdog
def toggle_watchdog(self):
if self.watchdog_toggle_bit:
self.set_watchdog()
self.watchdog_toggle_bit = 0
else:
self.reset_watchdog()
self.watchdog_toggle_bit = 1

View File

@@ -11,11 +11,20 @@ devices:
- [Maximum temperature A, °C]
- [Maximum temperature B, °C]
- device_type: 2
input_registers: 6 # The number of available input registers, starting from offset 40
input_register_names: # Description, unit
- [Temperature A, °C]
- [Temperature B , °C]
- [Minimum temperature A, °C]
- [Minimum temperature B, °C]
- [Maximum temperature A, °C]
- [Maximum temperature B, °C]
input_registers: 15 # The number of available input registers, starting from offset 40
input_register_names: # Description, unit, scaling 0 = as is, 1 = decimal one position to the left, 2 = decimal two positions to the left, enz.
- [Weater station ID, '', 0]
- [Wind direction, °, 1]
- [Wind speed, 'km/h', 2]
- [Wind gust, 'km/h', 2]
- [Temperature, °C, 2]
- [Rain last hour, 'l/m2', 2]
- [Rain last 24 hours, 'l/m2', 2]
- [Rain since midnight, 'l/m2', 2]
- [Humidity, '%', 2]
- [Barometric pressure, hPa, 1]
- [Luminosity, 'W/m2', 0]
- [Snow fall, NA, 0]
- [Raw rainfall counter, mm, 0]
- [Temperature pressure sensor, °C, 2]
- [Status bits, '', 0]

View File

@@ -0,0 +1,8 @@
def on_connect(client, userdata, flags, reason_code, properties=None):
client.subscribe(topic="topic/important")
def on_message(client, userdata, message, properties=None):
print(
f"(Received message {message.payload} on topic '{message.topic}' with QoS {message.qos}"
)
def on_subscribe(client, userdata, mid, qos, properties=None):
print(f"Subscribed with QoS {qos}")

View File

@@ -0,0 +1,132 @@
# NOTE: does not work, head exploded!
import json
import logging
import random
import time
from paho.mqtt.client import Client
from paho.mqtt.client import CallbackAPIVersion
BROKER = 'mqtt.meezenest.nl'
PORT = 1883
TOPIC = "python-mqtt/tcp"
# generate client ID with pub prefix randomly
CLIENT_ID = f'python-mqtt-tcp-pub-sub-{random.randint(0, 1000)}'
USERNAME = 'emqx'
PASSWORD = 'public'
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
FLAG_EXIT = False
class MqttController(object):
def __init__(self, config: dict, message_processor=None):
self.config = config
self.client = Client(
client_id=self.config['mqtt_client'],
clean_session=self.config['mqtt_clean_session'],
userdata={"client": self.config['mqtt_client']},
)
self.client.username_pw_set(self.config['mqtt_username'], self.config['mqtt_password'])
if self.config['mqtt_debug']:
self.client.on_log = self._on_log
self.client.on_connect = self._on_connect
#self.client.on_subscribe = self._on_subscribe
self.client.on_message = self._on_message
#self.client.on_publish = self._on_publish
self.client.on_disconnect = self._on_disconnect
self.client.connect(self.config['mqtt_host'], self.config['mqtt_port'], 60)
if message_processor:
self.message_processor = message_processor
def _on_connect(self, client, userdata, flags, rc, properties):
if rc == 0 and self.client.is_connected():
print("Connected to MQTT Broker!")
self.client.subscribe(TOPIC)
else:
print(f'Failed to connect, return code {rc}')
def _on_disconnect(self, client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
self.client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
global FLAG_EXIT
FLAG_EXIT = True
def _on_message(self, client, userdata, msg):
print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
def loop_start(self):
self.client.loop_start()
def publish(self):
msg_count = 0
while not FLAG_EXIT:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
if not self.client.is_connected():
logging.error("publish: MQTT client is not connected!")
time.sleep(1)
continue
result = self.client.publish(TOPIC, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f'Send `{msg}` to topic `{TOPIC}`')
else:
print(f'Failed to send message to topic {TOPIC}')
msg_count += 1
time.sleep(1)
def run():
mqtt_config = {
"mqtt_client": "MeesElecronics",
"mqtt_clean_session": "",
"mqtt_username": "1964",
"mqtt_password": "NoPasswd",
"mqtt_debug": False,
"mqtt_host": "mqtt.meezenest.nl",
"mqtt_port": 1883
}
print(mqtt_config)
logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
level=logging.DEBUG)
client = MqttController(mqtt_config)
client.loop_start()
time.sleep(1)
client.publish()
#if client.is_connected():
# publish()
#else:
# client.loop_stop()
if __name__ == '__main__':
run()

View File

@@ -0,0 +1,89 @@
# python 3.11
import random
import time
from paho.mqtt import client as mqtt_client
from paho.mqtt.client import CallbackAPIVersion
broker = 'mqtt.meezenest.nl'
port = 1883
topic = "python/mqtt"
# Generate a Client ID with the publish prefix.
client_id = f'publish-{random.randint(0, 1000)}'
# username = 'emqx'
# password = 'public'
msg_count = 1
def connect_mqtt():
def on_connect(client, userdata, flags, rc, properties):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
def on_disconnect(client, userdata, rc):
print("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
print("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
print("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
print("Reconnect failed after %s attempts. Exiting...", reconnect_count)
global FLAG_EXIT
FLAG_EXIT = True
client = mqtt_client.Client(CallbackAPIVersion.VERSION2, client_id)
# client.username_pw_set(username, password)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect(broker, port, 120)
return client
def publish(client):
global msg_count
#while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
if msg_count > 5:
#break
msg_count=1
def run():
client = connect_mqtt()
client.loop_start()
while 1:
if client.is_connected():
print(client.is_connected())
publish(client)
#else:
#client.loop_stop()
client.loop_stop()
if __name__ == '__main__':
run()

View File

@@ -21,12 +21,68 @@ import sys
import logging
import minimalmodbus
import datetime
import subprocess
import os
from pathlib import Path
from modbus_control import ModBusController
from config_reader import config_reader
from modbus_definition_file_reader import definition_file_reader
import paho.mqtt.client as mqtt
from paho.mqtt.client import CallbackAPIVersion
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
properties=Properties(PacketTypes.PUBLISH)
properties.MessageExpiryInterval=30 # in seconds
import ssl
version = '3' # or '5'
mqtt_transport = 'tcp' # or 'websockets'
if version == '5':
client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy",
transport=mqtt_transport,
protocol=mqtt.MQTTv5)
if version == '3':
client = mqtt.Client(CallbackAPIVersion.VERSION2, client_id="myPy",
transport=mqtt_transport,
protocol=mqtt.MQTTv311,
clean_session=True)
#client.username_pw_set("user", "password")
import mqtt_callbacks
client.on_message = mqtt_callbacks.on_message;
client.on_connect = mqtt_callbacks.on_connect;
#client.on_publish = mqtt_callbacks.on_publish;
client.on_subscribe = mqtt_callbacks.on_subscribe;
mqtt_broker = 'mqtt.meezenest.nl'
mqtt_port = 1883
mqtt_keepalalive=60
mqtt_topic = 'topic/important'
if version == '5':
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
properties=Properties(PacketTypes.CONNECT)
properties.SessionExpiryInterval=30*60 # in seconds
client.connect(mqtt_broker,
port=mqtt_port,
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
properties=properties,
keepalive=60);
elif version == '3':
client.connect(mqtt_broker, port=mqtt_port, keepalive=mqtt_keepalalive);
client.loop_start();
while 1:
client.publish(mqtt_topic,'Cedalo Mosquitto is awesome',2,properties=properties);
time.sleep(1)
def setup():
config_file = "config.yml"
definition_file = "modbus_registers.yaml"
@@ -104,6 +160,35 @@ def data_logger(data, configuration):
except:
logging.warning("Could not write to file: " + new_filename)
# It is not possible to send multiple beacons in a short time. Call this function not faster than every 3 seconds or so.
def send_data_to_aprs(weather_data, configuration):
# Define the Bash script and its arguments as a list
script = "/usr/sbin/beacon"
if configuration['digi_path'] == 0:
arguments = ["-d", f"{configuration['destination']}", "-s", "ax1", ":PE1RXF-3 :test2"]
else:
arguments = ["-d", f"{configuration['destination']} {configuration['digi_path']}", "-s", "ax1", ":PE1RXF-3 :test3"]
# Combine the script and its arguments into a single list
command = [script] + arguments
# Run the script
logging.debug(command)
try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Check the result
if result.returncode == 0:
logging.info("Send data to APRS radio.")
else:
logging.error("Failed to send data to APRS radio.")
logging.error(f"Reason: {result.stderr}")
except Exception as e:
logging.error("Failed to send data to APRS radio.")
logging.error(f"Command returned: {e}")
def send_data_to_mqtt(data, configuration, modbus_registers):
#logging.debug(modbus_registers)
@@ -117,7 +202,10 @@ def send_data_to_mqtt(data, configuration, modbus_registers):
# Go through every input register and match the unit and description with the value
for index2, entry2 in enumerate(data['InputRegisters']):
logging.debug(entry1['input_register_names'][index2][0] + ": " + str(entry2) + entry1['input_register_names'][index2][1])
print(entry1['input_register_names'][index2][2])
# Scale values
entry2 = entry2/ (10^entry1['input_register_names'][index2][2])
logging.debug(entry1['input_register_names'][index2][0] + ": " + str(round(entry2,1)) + entry1['input_register_names'][index2][1])
logging.debug("Send data to MQTT broker.")
@@ -137,13 +225,24 @@ def ReconnectModBus(configuration):
logging.info("Reconnected to ModBus dongle.")
#print("Enable heater function")
#controller.enable_heater()
Configuration, Controller, ModbusAddresses, ModbusRegisters = setup()
LoopCounter = 0
while (1):
time.sleep(3) # Sleep for 3 seconds
# Send APRS telemetry every 10 cycles = every 10 minutes
LoopCounter = LoopCounter + 1
if LoopCounter >= 1:
# Send data to LoRa radio via external program (/usr/sbin/beacon). Make sure we use all radios defined in the configuration file.
for entry in Configuration.config_file_settings['aprs']:
send_data_to_aprs(1, entry)
# We cannot send multiple APRS messages in a short period of time, so we wait 3 deconds between messages.
time.sleep(3)
LoopCounter = 0
ModBusData={}
# Loop through all configured ModBus devices and try to read the sensor data
@@ -157,6 +256,10 @@ while (1):
ModBusData['Type'] = Controller[index].get_type()
ModBusData['TypeString'] = Controller[index].get_type_string()
ModBusData['InputRegisters'] = Controller[index].read_all_input_registers()
# Keep the watchdog from resetting the ModBusdevice
#Controller[index].toggle_watchdog()
NoError = True
except minimalmodbus.NoResponseError:
# Handle communication timeout
@@ -183,7 +286,6 @@ while (1):
if NoError == True:
try:
DeviceType = ModbusRegisters.definition_file_data['devices'][ModBusData['Type']]
logging.debug("Found Mees Electronics sensor on ModBus address " + str(current_modbus_device))
logging.debug("Serial number: " + hex(ModBusData['ID'][1]) + " " + hex(ModBusData['ID'][2]) + " " + hex(ModBusData['ID'][3]))
logging.debug("Device type: " + str(ModBusData['Type']) + " (" + ModBusData['TypeString'] + ")")
@@ -198,3 +300,5 @@ while (1):
# Send sensor data to MQTT broker
send_data_to_mqtt(ModBusData, Configuration, ModbusRegisters.definition_file_data)