Forwards PE1RXF APRS telemetry to MQTT broker
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

386 lines
14 KiB

#!/usr/bin/python3
"""
A bridge between PE1RXF APRS telemetry messaging and MQTT.
It uses pythonax25 (https://github.com/josefmtd/python-ax25)
(C)2023 M.T. Konstapel https://meezenest.nl/mees
This file is part of aprs_telemetry_to_mqtt.
aprs_telemetry_to_mqtt is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
aprs_telemetry_to_mqtt is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with aprs_telemetry_to_mqtt. If not, see <https://www.gnu.org/licenses/>.
"""
import sys
import random
import time
import os
from pathlib import Path
import yaml
from yaml.loader import SafeLoader
from paho.mqtt import client as mqtt_client
import csv
import pythonax25
axport = []
axdevice = []
axaddress = []
class aprs_status:
nr_of_ports = 0
busy = 0
wait_for_ack = 0
call_of_wait_for_ack = 0
time_out_timer = 0
retry_counter = 0
selected_port = 0
request_to_send = 0
pass
aprs = aprs_status()
configuration_file = "aprs_telemetry_to_mqtt.yml"
# This is where we keep our settings
class mqtt_settings:
#broker
#topic_root
#port
#client_id
#transmit_rate
#retry
#topics
pass
mqtt = mqtt_settings()
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(mqtt.client_id)
#client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(mqtt.broker, mqtt.port)
return client
def publish(client, topic, message):
result = client.publish(topic, message)
status = result[0]
if status == 0:
print(f"Send `{message}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
def subscribe(client: mqtt_client, topic):
def on_message(client, userdata, message):
received_payload = message.payload.decode()
received_topic = Path(message.topic).name
print(f"Received `{received_payload}` from `{message.topic}` topic")
# Find corresponding topic in configuration-file and send this to the next function
for topics in mqtt.topics:
if received_topic == topics['name']:
#print ('Found topic in list!')
#print (topics)
process_message(topics, received_payload)
break
# print(topic['name'])
# print(topic['command'])
client.subscribe(topic)
client.on_message = on_message
def read_config():
try:
with open(configuration_file) as f:
cfg = yaml.load(f, Loader=SafeLoader)
mqtt.topics = cfg['topics']
#print(mqtt.topics)
#for topic in mqtt.topics:
# print(topic['name'])
# print(topic['command'])
except:
print ("Configuration file ./" + configuration_file + " not found.")
sys.exit(1)
try:
mqtt.broker = cfg['global']['broker']
except:
print ("Error in configuration file: no broker defined.")
sys.exit(1)
try:
mqtt.port = cfg['global']['port']
except:
print ("Error in configuration file: no port defined.")
sys.exit(1)
try:
mqtt.topic_root = cfg['global']['topic_root']
except:
print ("Error in configuration file: no topic defined.")
sys.exit(1)
mqtt.client_id = f'{mqtt.topic_root}-{random.randint(0, 1000)}'
print (mqtt.broker)
print (mqtt.topic_root)
print (mqtt.port)
print (mqtt.client_id)
# Loop through all topics and activate them
def add_subscribtions_from_configfile(client):
for topics in mqtt.topics:
current_topic = mqtt.topic_root + '/' + topics['name']
subscribe(client,current_topic)
print('Topic ' + topics['name'] + ' added')
# Loop through all topics and send telemtry to broker
def send_telemetry_to_broker(client):
for topics in mqtt.topics:
#for topics in topics.description:
#print('Description ' + topics['description'] + ' added')
#print(topics['description'])
# Loop through descriptions and send values from telemtry file along with it
#for descr in topics['description']:
for index, descr in enumerate(topics['description'], start=0):
#print(descr)
# Read telemetry data
with open(topics['telemetry_file'], newline='') as csvfile:
telemetry_reader = csv.reader(csvfile, delimiter=',', quotechar='|')
# there should only be one row in the telemetry file, but try to read all lines anyway
for row in telemetry_reader:
current_topic = mqtt.topic_root + '/' + topics['name'] + '/' + descr
publish(client,current_topic,row[index])
#print(current_topic + '=' + row[index])
# AX.25 stuff
def parsePacket(string):
# Split the address and payload separated by APRS PID
buffer = string.split(b'\x03\xf0')
address = buffer[0]
# Check if the first byte indicates it is a data packet
if address[0] == 0:
# Cut the first byte and feed it to the address parser
listAddress = getAllAddress(address[1:])
if listAddress != 0:
# Get the source, destination, and digipeaters from the address list
source = listAddress[1]
destination = listAddress[0]
digipeaters = listAddress[2:]
payload = buffer[1]
else:
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
source = 'NOCALL'
destination = 'NOCALL'
digipeaters = 'NOCALL'
payload = 'NOT VALID'
else:
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
source = 'NOCALL'
destination = 'NOCALL'
digipeaters = 'NOCALL'
payload = 'NOT VALID'
#raise Exception('Not a data packet')
return (source, destination, digipeaters, payload)
def getAllAddress(packetAddress):
addressSize = 7
# Check if the networked address string is valid
if (len(packetAddress) % 7) == 0:
# Create a list of all address in ASCII form
try:
allAddress = [pythonax25.network_to_ascii(packetAddress[i:i+addressSize])
for i in range(0, len(packetAddress), addressSize)]
except:
allAddress = 0
return allAddress
else:
# Received a non valid address. Fill return value with NULL so we don't crash
allAddress = 0
return allAddress
#raise Exception('Error: Address is not a multiple of 7')
def bind_ax25():
# Check if there's any active AX25 port
current_port = 0;
port_nr = pythonax25.config_load_ports()
aprs.nr_of_ports = port_nr
if port_nr > 0:
# Get the device name of the first port
axport.append(pythonax25.config_get_first_port())
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
while port_nr - current_port > 0:
axport.append(pythonax25.config_get_next_port(axport[current_port-1]))
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
else:
exit(0)
# Initiate a PF_PACKET socket (RX)
rx_socket = pythonax25.packet_socket()
return rx_socket
def receive_ax25(rx_socket):
# Blocking receive packet, 10 ms timeout
receive = pythonax25.packet_rx(rx_socket,10)
return receive
def send_ax25(portCall, srcCall, dest, digi, msg):
# Initiate a datagram socket (TX)
tx_socket = pythonax25.datagram_socket()
res = pythonax25.datagram_bind(tx_socket, srcCall, portCall)
#print(res)
if digi == 0:
res = pythonax25.datagram_tx(tx_socket, dest, msg)
else:
res = pythonax25.datagram_tx_digi(tx_socket, dest, digi, msg)
#print(res)
pythonax25.close_socket(tx_socket)
def process_message(source, ax_port, payload, mqtt_client):
#print(source)
#print(axdevice[ax_port])
#print(payload)
#print(axaddress[ax_port])
values=0
for topics in mqtt.topics:
# Check source call
if source == topics['call']:
# Check ax_port
if topics['ax_port'] == 'all' or topics['ax_port'] == axdevice[ax_port]:
#print('Call found in configuration file')
# split payload at colon. If it is a valid reply, we should get three
# substrings: the first in empty, the second with the call of the ax25
# interface and the thirth with the status of the outputs
split_message=payload.split(":")
if len(split_message) == 3:
#Remove spaces from destination call and test if message is for the server
if split_message[1].replace(" ", "") == axaddress[ax_port]:
print ('Received from: ' + source + ' Telemetry: ' + split_message[2])
# The telemetry is available in split_message[2], but we have to check if it contains any valid data
# Try to split into seperate values (values should be seperated by a comma)
values=split_message[2].split(",")
# Test al values: should be numbers and nothing else
for field in values:
if not is_float(field):
return 0
# One anoying thing of the PE1RXF telemetry standard is that there is also a message containing the status of the output bits.
# These messages are interpreted as valid telemetry data by this program. The message is send after a '06' command. This program
# does not request this message, but another program might. So we have to filter these messages output
if len(values[0]) == 5:
allowed = '0' + '1'
# Removes from the original string all the characters that are allowed, leaving us with a set containing either a) nothing, or b) the #offending characters from the string:'
if not set(values[0]) - set(allowed):
print ("Probably digital status bits. Ignore.")
return 0
# Check if number of teleemtry values and number of descriptions in yml file are the same. If not make then the same by appending to the shorted list.
nr_of_values = len(values)
nr_of_descriptions = len(topics['description'])
if nr_of_values > nr_of_descriptions:
items_to_add = nr_of_values - nr_of_descriptions
for x in range(items_to_add):
topics['description'].append('NotDefined')
print('Added ' + str(items_to_add) + ' to descriptions')
elif nr_of_values < nr_of_descriptions:
items_to_add = nr_of_descriptions - nr_of_values
for x in range(items_to_add):
values.append('0.0')
print('Added ' + str(items_to_add) + ' to values')
else:
print('values and description are of equal length: good!')
# Loop through descriptions and send values from telemtry file along with it
for index, descr in enumerate(topics['description'], start=0):
current_topic = mqtt.topic_root + '/' + topics['name'] + '/' + descr
publish(mqtt_client,current_topic,values[index])
#print('Publish ' + current_topic + '=' + values[index])
return values
# End AX.25 stuff
# Test if string is a number
def is_float(v):
try:
f=float(v)
except:
return False
return True
def run():
read_config()
rx_socket = bind_ax25()
client = connect_mqtt()
add_subscribtions_from_configfile(client)
#topic = mqtt.topic_root + '/set'
#subscribe(client,topic)
client.loop_start()
while True:
receive = receive_ax25(rx_socket)
for port in range(len(axdevice)):
if receive[0][1] == axdevice[port]:
#print(receive)
source, destination, digipeaters, payload = parsePacket(receive[1])
# bug UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf8 in position 47: invalid start byte
try:
payload = payload.decode()
except:
payload = 'NOT VALID'
#print("Packet Received by = %s"%axaddress[0])
#print("Source Address = %s"%source)
#print("Destination Address = %s"%destination)
#print("Digipeaters =")
#print(digipeaters)
#print("Payload = %s"%payload)
#print("")
telemetry=process_message(source, port, payload, client)
if __name__ == '__main__':
#sys.stdout = sys.stderr = open('debug.log', 'w')
run()