#!/usr/bin/python3 '''' # A bridge between PE1RXF APRS telemetry messaging and MQTT. # It uses pythonax25 (https://github.com/josefmtd/python-ax25) # # This program reads the registers of the PE1RXF weather station via ModBus RTU and sends it as # an APRS WX report over APRS. Additionally, it sends beacons and forwards received APRS messages # to the APRS-IS network. All configurable via a YAML file called pe1rxf_aprs.yml. # # This program also has a PE1RXF APRS telemetry to MQTT bridge, which is configurable via pe1rxf_telemetry.yml # # Copyright (C) 2023, 2024 M.T. Konstapel https://meezenest.nl/mees # # This file is part of weather_station # # weather_station is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # weather_station is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with weather_station. If not, see . ''' import sys import random import time from time import gmtime, strftime #import os #from pathlib import Path import yaml from yaml.loader import SafeLoader #from paho.mqtt import client as mqtt_client import paho.mqtt.publish as publish import logging import json #import csv logger = logging.getLogger("aprs_telemetry_to_mqtt") class aprs_telemetry_to_mqtt: # initiate class: define name configuration files def __init__(self, telemetry_config_file): self.config_file = telemetry_config_file logger.info("Initializing telemetry to mqtt bridge.") def read_settings(self): if self.read_config_file() == 0: return 0 if self.test_telemetry_settings() == 0: return 0 return 1 def read_config_file (self): try: with open(self.config_file) as f: self.config_file_settings = yaml.load(f, Loader=SafeLoader) except: print ("Telemetry configuration file ./" + self.config_file + " not found or syntax error in file.") return 0 else: return 1 def test_telemetry_settings(self): try: tmp = self.config_file_settings['global']['broker'] tmp = self.config_file_settings['global']['port'] tmp = self.config_file_settings['global']['topic_root'] tmp = self.config_file_settings['global']['publish_messages'] tmp = self.config_file_settings['global']['call'] tmp = self.config_file_settings['global']['weather_report_interval'] tmp = self.config_file_settings['global']['blacklist'] tmp = self.config_file_settings['topics'] except: print ("Error in the telemetry configuration file.") return 0 else: #print (self.config_file_settings['global']['topic_root']) tmp = self.config_file_settings['global']['topic_root'] #mqtt_client_id = f'{self.config_file_settings['global']['topic_root']}-{random.randint(0, 1000)}' self.mqtt_client_id = f'{tmp}-{random.randint(0, 1000)}' return 1 # Publish a single message to the MQTT broker def publish(self, topic, message): try: #publish.single(topic, message, hostname=self.config_file_settings['global']['broker']) publish.single(topic, message, hostname=self.config_file_settings['global']['broker'], port=self.config_file_settings['global']['port'], client_id=self.mqtt_client_id) except: logger.debug("Failed to connect to MQTT broker.") else: logger.debug('Published: ' + topic + '=' + message) # Checks if payload (APRS message) contains valid PE1RXF telemetry data. # If so, it sends the formatted data to the MQTT broker. # Configuration is done via the file pe1rxf_telemetry.yml def publish_telemetry_message(self, source, ax_device, ax_address, payload): values=0 for topics in self.config_file_settings['topics']: # Check source call if source == topics['call']: # Check ax_port if topics['ax_port'] == 'all' or topics['ax_port'] == ax_device: #print('Call found in configuration file') # split payload at colon. If it is a valid reply, we should get three # substrings: the first in empty, the second with the call of the ax25 # interface and the thirth with the status of the outputs split_message=payload.split(":") if len(split_message) == 3: #Remove spaces from destination call and test if message is for the server if split_message[1].replace(" ", "") == ax_address: print ('Received from: ' + source + ' Telemetry: ' + split_message[2]) # The telemetry is available in split_message[2], but we have to check if it contains any valid data # Try to split into seperate values (values should be seperated by a comma) values=split_message[2].split(",") # Test al values: should be numbers and nothing else for field in values: if not self.is_float(field): return 0 # One anoying thing of the PE1RXF telemetry standard is that there is also a message containing the status of the output bits. # These messages are interpreted as valid telemetry data by this program. The message is send after a '06' command. This program # does not request this message, but another program might. So we have to filter these messages output if len(values[0]) == 5: allowed = '0' + '1' # Removes from the original string all the characters that are allowed, leaving us with a set containing either a) nothing, or b) the #offending characters from the string:' if not set(values[0]) - set(allowed): print ("Probably digital status bits. Ignore.") return 0 # Check if number of telemtry values and number of descriptions in yml file are the same. If not make then the same by appending to the shorted list. nr_of_values = len(values) nr_of_descriptions = len(topics['description']) if nr_of_values > nr_of_descriptions: items_to_add = nr_of_values - nr_of_descriptions for x in range(items_to_add): topics['description'].append('NotDefined') print('Added ' + str(items_to_add) + ' to descriptions') elif nr_of_values < nr_of_descriptions: items_to_add = nr_of_descriptions - nr_of_values for x in range(items_to_add): values.append('0.0') print('Added ' + str(items_to_add) + ' to values') else: print('values and description are of equal length: good!') # Loop through descriptions and send values from telemtry file along with it '''' for index, descr in enumerate(topics['description'], start=0): current_topic = self.config_file_settings['global']['topic_root'] + '/' + topics['name'] + '/' + descr #self.publish(client,current_topic,values[index]) try: publish.single(current_topic, values[index], hostname=self.config_file_settings['global']['broker']) except: print("Failed to connect to MQTT broker.") else: print('Published: ' + current_topic + '=' + values[index]) ''' # Loop through descriptions and send values from telemtry file along with it publish_list = [] for index, descr in enumerate(topics['description'], start=0): current_topic = self.config_file_settings['global']['topic_root'] + '/' + topics['name'] + '/' + descr publish_list.append({"topic": current_topic, "payload": values[index]}) try: #publish.multiple(publish_list, hostname=self.config_file_settings['global']['broker']) publish.multiple(publish_list, hostname=self.config_file_settings['global']['broker'], port=self.config_file_settings['global']['port'], client_id=self.mqtt_client_id) except: logger.debug("Failed to connect to MQTT broker.") else: logger.debug('Published telemetry to mqtt broker.') logger.debug(publish_list) return values # Checks if payload (APRS message) contains valid message to us. # If so, it sends the formatted data to the MQTT broker. # Configuration is done via the file pe1rxf_telemetry.yml def publish_aprs_messages(self, source, ax_device, payload): #logger.debug('Check if message is for us.') #logger.debug(self.config_file_settings['global']['publish_messages']) if self.config_file_settings['global']['publish_messages'] == True: #logger.debug('Configured to forward messages.') mqtt_message = {} # Loop through blacklist and check if source is not in it for call in self.config_file_settings['global']['blacklist']: if call == source: print ("Call " + call + " blacklisted. Message not send to mqtt broker.") return 0 # If we come to here the sender is not in the blacklist # But is could well be an ordinary aprs packet, so let's check if it is a message.' # Split payload at colon. If it is a valid message, we should get three # substrings: the first in empty, the second with the call of the ax25 # interface and the thirth with the message itself split_message=payload.split(":") #logger.debug(split_message) if len(split_message) == 3: #Remove spaces from destination call split_message[1] = split_message[1].replace(" ", "") # check if call defined in the configuration file is part of the destination call. This way we can also catch all the prefixes if self.config_file_settings['global']['call'] in split_message[1]: print ('Received from: ' + source + ' payload: ' + split_message[2]) mqtt_message['from'] = source mqtt_message['to'] = split_message[1] mqtt_message['port'] = ax_device mqtt_message['time'] = time.strftime("%Y-%m-%d %H:%M:%S", gmtime()) mqtt_message['message'] = split_message[2] message = json.dumps(mqtt_message) topic = self.config_file_settings['global']['topic_root'] + '/aprs_message' self.publish(topic, message) # Test if string is a number def is_float(self, v): try: f=float(v) except: return False return True