#!/usr/bin/python3
"""
A bridge between APRS messaging and MQTT , designed to control my lora_aprs_node_pico
( C ) 2022 - 2023 M . T . Konstapel https : / / meezenest . nl / mees
This file is part of aprs - mqtt - bridge .
aprs - mqtt - bridge is free software : you can redistribute it and / or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation , either version 3 of the License , or
( at your option ) any later version .
aprs - mqtt - bridge is distributed in the hope that it will be useful ,
but WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU General Public License for more details .
You should have received a copy of the GNU General Public License
along with aprs - mqtt - bridge . If not , see < https : / / www . gnu . org / licenses / > .
"""
import sys
import random
import time
import os
from pathlib import Path
import yaml
from yaml . loader import SafeLoader
from paho . mqtt import client as mqtt_client
import pythonax25
configuration_file = " aprs-mqtt-bridge.yml "
# This is where we keep our settings
class mqtt_settings :
#broker
#topic_root
#port
#client_id
#transmit_rate
#retry
#topics
#poll
state = ' ready '
aprs_state = ' idle '
pass
mqtt = mqtt_settings ( )
axport = [ ]
axdevice = [ ]
axaddress = [ ]
class aprs_status :
nr_of_ports = 0
busy = 0
wait_for_ack = 0
call_of_wait_for_ack = 0
time_out_timer = 0
retry_counter = 0
selected_port = 0
request_to_send = 0
pass
aprs = aprs_status ( )
def parsePacket ( string ) :
# Split the address and payload separated by APRS PID
buffer = string . split ( b ' \x03 \xf0 ' )
address = buffer [ 0 ]
# Check if the first byte indicates it is a data packet
if address [ 0 ] == 0 :
# Cut the first byte and feed it to the address parser
listAddress = getAllAddress ( address [ 1 : ] )
if listAddress != 0 :
# Get the source, destination, and digipeaters from the address list
source = listAddress [ 1 ]
destination = listAddress [ 0 ]
digipeaters = listAddress [ 2 : ]
# Occasionally a bad packet is received causng the program to crash with an "IndexError: list index out of range". Fix: check if index IS out of range before copying it to payload
if len ( buffer ) > 1 :
payload = buffer [ 1 ]
else :
payload = ' NOT VALID '
else :
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
source = ' NOCALL '
destination = ' NOCALL '
digipeaters = ' NOCALL '
payload = ' NOT VALID '
else :
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
source = ' NOCALL '
destination = ' NOCALL '
digipeaters = ' NOCALL '
payload = ' NOT VALID '
#raise Exception('Not a data packet')
return ( source , destination , digipeaters , payload )
def getAllAddress ( packetAddress ) :
addressSize = 7
# Check if the networked address string is valid
if ( len ( packetAddress ) % 7 ) == 0 :
# Create a list of all address in ASCII form
try :
allAddress = [ pythonax25 . network_to_ascii ( packetAddress [ i : i + addressSize ] )
for i in range ( 0 , len ( packetAddress ) , addressSize ) ]
except :
allAddress = 0
return allAddress
else :
# Received a non valid address. Fill return value with NULL so we don't crash
allAddress = 0
return allAddress
#raise Exception('Error: Address is not a multiple of 7')
def bind_ax25 ( ) :
# Check if there's any active AX25 port
current_port = 0 ;
port_nr = pythonax25 . config_load_ports ( )
aprs . nr_of_ports = port_nr
if port_nr > 0 :
# Get the device name of the first port
axport . append ( pythonax25 . config_get_first_port ( ) )
axdevice . append ( pythonax25 . config_get_device ( axport [ current_port ] ) )
axaddress . append ( pythonax25 . config_get_address ( axport [ current_port ] ) )
print ( axport [ current_port ] , axdevice [ current_port ] , axaddress [ current_port ] )
current_port = current_port + 1
while port_nr - current_port > 0 :
axport . append ( pythonax25 . config_get_next_port ( axport [ current_port - 1 ] ) )
axdevice . append ( pythonax25 . config_get_device ( axport [ current_port ] ) )
axaddress . append ( pythonax25 . config_get_address ( axport [ current_port ] ) )
print ( axport [ current_port ] , axdevice [ current_port ] , axaddress [ current_port ] )
current_port = current_port + 1
else :
exit ( 0 )
# Initiate a PF_PACKET socket (RX)
rx_socket = pythonax25 . packet_socket ( )
return rx_socket
def receive_ax25 ( rx_socket ) :
# Blocking receive packet, 10 ms timeout
receive = pythonax25 . packet_rx ( rx_socket , 10 )
return receive
def send_ax25 ( portCall , srcCall , dest , digi , msg ) :
# Initiate a datagram socket (TX)
tx_socket = pythonax25 . datagram_socket ( )
res = pythonax25 . datagram_bind ( tx_socket , srcCall , portCall )
#print(res)
if digi == 0 :
res = pythonax25 . datagram_tx ( tx_socket , dest , msg )
else :
res = pythonax25 . datagram_tx_digi ( tx_socket , dest , digi , msg )
#print(res)
pythonax25 . close_socket ( tx_socket )
def connect_mqtt ( ) :
def on_connect ( client , userdata , flags , rc ) :
if rc == 0 :
print ( " Connected to MQTT Broker! " )
else :
print ( " Failed to connect, return code %d \n " , rc )
# Set Connecting Client ID
# After reconnect messages from the MQTT broker where not received.
# clean_session: a boolean that determines the client type.
# If True, the broker will remove all information about this client
# when it disconnects. If False, the client is a durable client and
# subscription information and queued messages will be retained when
# the client disconnects.
# client = mqtt_client.Client(mqtt.client_id)
client = mqtt_client . Client ( mqtt . client_id , clean_session = False )
#client.username_pw_set(username, password)
client . on_connect = on_connect
client . connect ( mqtt . broker , mqtt . port )
return client
def publish ( client , topic , message ) :
result = client . publish ( topic , message )
status = result [ 0 ]
if status == 0 :
print ( f " Send ` { message } ` to topic ` { topic } ` " )
else :
print ( f " Failed to send message to topic { topic } " )
def subscribe ( client : mqtt_client , topic ) :
def on_message ( client , userdata , message ) :
received_payload = message . payload . decode ( )
received_topic = Path ( message . topic ) . name
print ( f " Received ` { received_payload } ` from ` { message . topic } ` topic " )
# Find corresponding topic in configuration-file and send this to the next function
for topics in mqtt . topics :
if received_topic == topics [ ' name ' ] :
#print ('Found topic in list!')
#print (topics)
process_message ( topics , received_payload )
break
# print(topic['name'])
# print(topic['command'])
client . subscribe ( topic )
client . on_message = on_message
def read_config ( ) :
try :
with open ( configuration_file ) as f :
cfg = yaml . load ( f , Loader = SafeLoader )
mqtt . topics = cfg [ ' topics ' ]
mqtt . poll = cfg [ ' poll ' ]
#print(mqtt.topics)
#for topic in mqtt.topics:
# print(topic['name'])
# print(topic['command'])
except :
print ( " Configuration file ./ " + configuration_file + " not found. " )
sys . exit ( 1 )
try :
mqtt . broker = cfg [ ' global ' ] [ ' broker ' ]
except :
print ( " Error in configuration file: no broker defined. " )
sys . exit ( 1 )
try :
mqtt . port = cfg [ ' global ' ] [ ' port ' ]
except :
print ( " Error in configuration file: no port defined. " )
sys . exit ( 1 )
try :
mqtt . topic_root = cfg [ ' global ' ] [ ' topic_root ' ]
except :
print ( " Error in configuration file: no topic defined. " )
sys . exit ( 1 )
try :
mqtt . transmit_rate = cfg [ ' global ' ] [ ' transmit_rate ' ]
except :
print ( " Error in configuration file: no transmit_rate defined. " )
sys . exit ( 1 )
try :
mqtt . retry = cfg [ ' global ' ] [ ' retry ' ]
except :
print ( " Error in configuration file: no retry defined. " )
sys . exit ( 1 )
try :
mqtt . destination = cfg [ ' global ' ] [ ' destination ' ]
except :
print ( " Error in configuration file: no retry defined. " )
sys . exit ( 1 )
try :
mqtt . poll_rate = cfg [ ' global ' ] [ ' poll_rate ' ]
except :
print ( " Error in configuration file: no poll_rate defined. " )
sys . exit ( 1 )
mqtt . client_id = f ' { mqtt . topic_root } - { random . randint ( 0 , 1000 ) } '
print ( mqtt . broker )
print ( mqtt . topic_root )
print ( mqtt . port )
print ( mqtt . client_id )
# Loop through all topics and activate them
def add_subscribtions_from_configfile ( client ) :
for topics in mqtt . topics :
current_topic = mqtt . topic_root + ' / ' + topics [ ' name ' ]
subscribe ( client , current_topic )
print ( ' Topic ' + topics [ ' name ' ] + ' added ' )
# Loop through poll list and activate then
def poll_clients ( count = [ 0 ] ) :
# How many clients do we have to poll?
nr_of_clients = len ( mqtt . poll )
current_call = mqtt . poll [ count [ 0 ] ] [ ' call ' ]
print ( ' Polling ' + current_call )
# Now we have to figure out the other paramters of the client. These are defined in the topics section of the configuration file
for topics in mqtt . topics :
if current_call == topics [ ' call ' ] :
current_port = topics [ ' port ' ]
source_call = topics [ ' server ' ]
#print ('Server: ' + source_call)
#print ('AX.25 port: ' + current_port)
break
# Find call of ax25 port
for position in range ( len ( axdevice ) ) :
if axdevice [ position ] == current_port :
port_call = axaddress [ position ]
#print (port_call)
message = ' : ' + topics [ ' call ' ] . ljust ( 9 ) + ' : ' + ' 06 '
#print (port_call + ' ' + source_call + ' ' + mqtt.destination + ' ' + message)
send_ax25 ( port_call , source_call , mqtt . destination , 0 , message )
# Every time this functtion is called it moves up one call and wraps around
count [ 0 ] + = 1
if count [ 0 ] > = nr_of_clients :
count [ 0 ] = 0
# Decode response of clients to sensible MQTT messages
def process_polling ( call , data , mqtt_client ) :
print ( ' Received status: ' + call + " " + data )
if len ( data ) != 5 :
return
for cnt in range ( len ( data ) ) :
if data [ - 1 * cnt ] != ' 0 ' and data [ - 1 * cnt ] != ' 1 ' :
print ( ' Invalid response ' )
return
# Now we have to figure out the other paramters of the client. These are defined in the topics section of the configuration file
for topics in mqtt . topics :
if call == topics [ ' call ' ] :
current_name = topics [ ' name ' ]
current_poll_bit = topics [ ' poll_bit ' ]
print ( ' MQTT topic: ' + current_name )
#print (current_poll_bit)
inverted_poll_bit = - 1 * current_poll_bit
status_bit = data [ inverted_poll_bit ]
if status_bit == ' 1 ' :
print ( ' ON ' )
state = ' ON '
else :
print ( ' OFF ' )
state = ' OFF '
topic = mqtt . topic_root + ' / ' + current_name + ' /state '
publish ( mqtt_client , topic , state )
def process_message ( data , payload ) :
#print ('Payload: '+ payload)
#print (data['call'])
#print (data['port'])
if aprs . busy == 0 :
# find payload in configuration file
for commands in data [ ' command ' ] :
if payload == commands [ ' payload ' ] :
aprs . time_out_timer = time . time ( ) # Start timeout timer
aprs . busy = 1
aprs . selected_port = data [ ' port ' ]
# Find call of ax25 port
for position in range ( len ( axdevice ) ) :
if axdevice [ position ] == aprs . selected_port :
aprs . port_call = axaddress [ position ]
aprs . source_call = data [ ' server ' ]
aprs . wait_for_ack = commands [ ' response ' ]
aprs . call_of_wait_for_ack = data [ ' call ' ]
aprs . message = ' : ' + data [ ' call ' ] . ljust ( 9 ) + ' : ' + commands [ ' cmd ' ]
arguments = ' -d \" APRX29 \" -s ' + data [ ' port ' ] + ' \" ' + aprs . message + ' \" '
beacon_program = " /usr/sbin/beacon "
aprs . beacon_program_with_arguments = beacon_program + " " + arguments
#os.system(aprs.beacon_program_with_arguments)
print ( ' APRS message ' + aprs . message + ' send to ' + aprs . call_of_wait_for_ack + ' . ' )
mqtt . state = ' busy '
mqtt . aprs_state = ' sending message '
aprs . request_to_send = 1 ;
else :
mqtt . state = ' busy '
def run ( ) :
read_config ( )
rx_socket = bind_ax25 ( )
client = connect_mqtt ( )
add_subscribtions_from_configfile ( client )
#topic = mqtt.topic_root + '/set'
#subscribe(client,topic)
client . loop_start ( )
# Send ready to MQTT broker to indicate we are meaning business
mqtt . aprs_state = ' ready '
#topic = mqtt.topic_root + '/aprs_status'
#publish(client,topic,'ready')
mqtt . state = ' busy '
aprs . time_out_timer = time . time ( )
aprs . poll_timer = time . time ( )
while True :
if aprs . request_to_send == 1 :
send_ax25 ( aprs . port_call , aprs . source_call , mqtt . destination , 0 , aprs . message )
aprs . time_out_timer = time . time ( ) # Restart timeout timer
#print(aprs.selected_port)
#print(aprs.message)
aprs . request_to_send = 0
receive = receive_ax25 ( rx_socket )
for port in range ( len ( axdevice ) ) :
if receive [ 0 ] [ 1 ] == axdevice [ port ] :
#print(receive)
source , destination , digipeaters , payload = parsePacket ( receive [ 1 ] )
# bug UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf8 in position 47: invalid start byte
try :
payload = payload . decode ( )
except :
payload = ' NOT VALID '
#print("Packet Received by = %s"%axaddress[0])
#print("Source Address = %s"%source)
#print("Destination Address = %s"%destination)
#print("Digipeaters =")
#print(digipeaters)
#print("Payload = %s"%payload)
#print("")
# Test if received packet is from a polled station
for poll in mqtt . poll :
if poll [ ' call ' ] == source :
#print ('Received packet from polled station: ' + payload)
# split payload at colon. If it is a valid reply, we should get three
# substrings: the first in empty, the second with the call of the ax25
# interface and the thirth with the status of the outputs
split_message = payload . split ( " : " )
if len ( split_message ) == 3 :
if split_message [ 1 ] . replace ( " " , " " ) == axaddress [ port ] :
if len ( split_message [ 2 ] ) == 5 :
#print ('Received status: ' + source + " " + split_message[2])
process_polling ( source , split_message [ 2 ] , client )
# Waiting for acknowledge...
if aprs . wait_for_ack != 0 :
if source == aprs . call_of_wait_for_ack :
# split payload at colon. If it is a valid acknowledge, we should get three
# substrings: the first in empty, the second with the call of the ax25
# interface and the thirth with the acknowledge
split_message = payload . split ( " : " )
if len ( split_message ) == 3 :
if split_message [ 1 ] . replace ( " " , " " ) == axaddress [ port ] :
if split_message [ 2 ] == aprs . wait_for_ack :
print ( ' Received acknowledge ' + aprs . wait_for_ack + ' from ' + aprs . call_of_wait_for_ack + " . " )
aprs . time_out_timer = time . time ( ) # Restart timeout timer
aprs . wait_for_ack = 0
aprs . busy = 0
aprs . retry_counter = 0
mqtt . aprs_state = ' message sent '
#topic = mqtt.topic_root + '/aprs_status'
#publish(client,topic,mqtt.aprs_state)
mqtt . state = ' busy '
# Time out waiting for acknowledge
if aprs . wait_for_ack != 0 :
if time . time ( ) - aprs . time_out_timer > mqtt . transmit_rate :
aprs . retry_counter = aprs . retry_counter + 1
if aprs . retry_counter < mqtt . retry :
# Try again
aprs . time_out_timer = time . time ( ) # Restart timeout timer
aprs . request_to_send = 1 ;
#os.system(aprs.beacon_program_with_arguments)
print ( ' Retry: APRS ' + aprs . message + ' message send to ' + aprs . call_of_wait_for_ack + ' . ' )
mqtt . aprs_state = ' sending message (retry ' + str ( aprs . retry_counter ) + ' ) '
mqtt . state = ' busy '
else :
# Give up
print ( ' No acknowledge received from ' + aprs . call_of_wait_for_ack + ' . Giving up. ' )
aprs . time_out_timer = time . time ( ) # Restart timeout timer
aprs . wait_for_ack = 0
aprs . busy = 0
aprs . retry_counter = 0
mqtt . aprs_state = ' sending message failed '
#topic = mqtt.topic_root + '/aprs_status'
#publish(client,topic,mqtt.aprs_state)
mqtt . state = ' busy '
# If APRS system is transmitting, retrying and still waiting for acknowledge, keep on waiting and send an MQTT update
if mqtt . state == ' busy ' :
topic = mqtt . topic_root + ' /aprs_status '
publish ( client , topic , mqtt . aprs_state )
mqtt . state = ' ready '
if time . time ( ) - aprs . poll_timer > mqtt . poll_rate :
poll_clients ( )
# Reset poll timer
aprs . poll_timer = time . time ( )
#print('Poll clients')
if __name__ == ' __main__ ' :
sys . stdout = sys . stderr = open ( ' /home/marcel/aprs-mqtt-bridge_debug.log ' , ' w ' )
run ( )