A bridge between MQTT and PE1RXF telemetry nodes.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

507 lines
19 KiB

#!/usr/bin/python3
"""
A bridge between APRS messaging and MQTT, designed to control my lora_aprs_node_pico
(C)2022-2023 M.T. Konstapel https://meezenest.nl/mees
This file is part of aprs-mqtt-bridge.
aprs-mqtt-bridge is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
aprs-mqtt-bridge is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with aprs-mqtt-bridge. If not, see <https://www.gnu.org/licenses/>.
"""
import sys
import random
import time
import os
from pathlib import Path
import yaml
from yaml.loader import SafeLoader
from paho.mqtt import client as mqtt_client
import pythonax25
configuration_file = "aprs-mqtt-bridge.yml"
# This is where we keep our settings
class mqtt_settings:
#broker
#topic_root
#port
#client_id
#transmit_rate
#retry
#topics
#poll
state = 'ready'
aprs_state = 'idle'
pass
mqtt = mqtt_settings()
axport = []
axdevice = []
axaddress = []
class aprs_status:
nr_of_ports = 0
busy = 0
wait_for_ack = 0
call_of_wait_for_ack = 0
time_out_timer = 0
retry_counter = 0
selected_port = 0
request_to_send = 0
pass
aprs = aprs_status()
def parsePacket(string):
# Split the address and payload separated by APRS PID
buffer = string.split(b'\x03\xf0')
address = buffer[0]
# Check if the first byte indicates it is a data packet
if address[0] == 0:
# Cut the first byte and feed it to the address parser
listAddress = getAllAddress(address[1:])
if listAddress != 0:
# Get the source, destination, and digipeaters from the address list
source = listAddress[1]
destination = listAddress[0]
digipeaters = listAddress[2:]
# Occasionally a bad packet is received causng the program to crash with an "IndexError: list index out of range". Fix: check if index IS out of range before copying it to payload
if len(buffer) > 1:
payload = buffer[1]
else:
payload = 'NOT VALID'
else:
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
source = 'NOCALL'
destination = 'NOCALL'
digipeaters = 'NOCALL'
payload = 'NOT VALID'
else:
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
source = 'NOCALL'
destination = 'NOCALL'
digipeaters = 'NOCALL'
payload = 'NOT VALID'
#raise Exception('Not a data packet')
return (source, destination, digipeaters, payload)
def getAllAddress(packetAddress):
addressSize = 7
# Check if the networked address string is valid
if (len(packetAddress) % 7) == 0:
# Create a list of all address in ASCII form
try:
allAddress = [pythonax25.network_to_ascii(packetAddress[i:i+addressSize])
for i in range(0, len(packetAddress), addressSize)]
except:
allAddress = 0
return allAddress
else:
# Received a non valid address. Fill return value with NULL so we don't crash
allAddress = 0
return allAddress
#raise Exception('Error: Address is not a multiple of 7')
def bind_ax25():
# Check if there's any active AX25 port
current_port = 0;
port_nr = pythonax25.config_load_ports()
aprs.nr_of_ports = port_nr
if port_nr > 0:
# Get the device name of the first port
axport.append(pythonax25.config_get_first_port())
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
while port_nr - current_port > 0:
axport.append(pythonax25.config_get_next_port(axport[current_port-1]))
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
else:
exit(0)
# Initiate a PF_PACKET socket (RX)
rx_socket = pythonax25.packet_socket()
return rx_socket
def receive_ax25(rx_socket):
# Blocking receive packet, 10 ms timeout
receive = pythonax25.packet_rx(rx_socket,10)
return receive
def send_ax25(portCall, srcCall, dest, digi, msg):
# Initiate a datagram socket (TX)
tx_socket = pythonax25.datagram_socket()
res = pythonax25.datagram_bind(tx_socket, srcCall, portCall)
#print(res)
if digi == 0:
res = pythonax25.datagram_tx(tx_socket, dest, msg)
else:
res = pythonax25.datagram_tx_digi(tx_socket, dest, digi, msg)
#print(res)
pythonax25.close_socket(tx_socket)
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
# After reconnect messages from the MQTT broker where not received.
# clean_session: a boolean that determines the client type.
# If True, the broker will remove all information about this client
# when it disconnects. If False, the client is a durable client and
# subscription information and queued messages will be retained when
# the client disconnects.
# client = mqtt_client.Client(mqtt.client_id)
client = mqtt_client.Client(mqtt.client_id, clean_session=False)
#client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(mqtt.broker, mqtt.port)
return client
def publish(client, topic, message):
result = client.publish(topic, message)
status = result[0]
if status == 0:
print(f"Send `{message}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
def subscribe(client: mqtt_client, topic):
def on_message(client, userdata, message):
received_payload = message.payload.decode()
received_topic = Path(message.topic).name
print(f"Received `{received_payload}` from `{message.topic}` topic")
# Find corresponding topic in configuration-file and send this to the next function
for topics in mqtt.topics:
if received_topic == topics['name']:
#print ('Found topic in list!')
#print (topics)
process_message(topics, received_payload)
break
# print(topic['name'])
# print(topic['command'])
client.subscribe(topic)
client.on_message = on_message
def read_config():
try:
with open(configuration_file) as f:
cfg = yaml.load(f, Loader=SafeLoader)
mqtt.topics = cfg['topics']
mqtt.poll = cfg['poll']
#print(mqtt.topics)
#for topic in mqtt.topics:
# print(topic['name'])
# print(topic['command'])
except:
print ("Configuration file ./" + configuration_file + " not found.")
sys.exit(1)
try:
mqtt.broker = cfg['global']['broker']
except:
print ("Error in configuration file: no broker defined.")
sys.exit(1)
try:
mqtt.port = cfg['global']['port']
except:
print ("Error in configuration file: no port defined.")
sys.exit(1)
try:
mqtt.topic_root = cfg['global']['topic_root']
except:
print ("Error in configuration file: no topic defined.")
sys.exit(1)
try:
mqtt.transmit_rate = cfg['global']['transmit_rate']
except:
print ("Error in configuration file: no transmit_rate defined.")
sys.exit(1)
try:
mqtt.retry = cfg['global']['retry']
except:
print ("Error in configuration file: no retry defined.")
sys.exit(1)
try:
mqtt.destination = cfg['global']['destination']
except:
print ("Error in configuration file: no retry defined.")
sys.exit(1)
try:
mqtt.poll_rate = cfg['global']['poll_rate']
except:
print ("Error in configuration file: no poll_rate defined.")
sys.exit(1)
mqtt.client_id = f'{mqtt.topic_root}-{random.randint(0, 1000)}'
print (mqtt.broker)
print (mqtt.topic_root)
print (mqtt.port)
print (mqtt.client_id)
# Loop through all topics and activate them
def add_subscribtions_from_configfile(client):
for topics in mqtt.topics:
current_topic = mqtt.topic_root + '/' + topics['name']
subscribe(client,current_topic)
print('Topic ' + topics['name'] + ' added')
# Loop through poll list and activate then
def poll_clients(count=[0]):
# How many clients do we have to poll?
nr_of_clients = len(mqtt.poll)
current_call = mqtt.poll[count[0]]['call']
print ('Polling ' + current_call)
# Now we have to figure out the other paramters of the client. These are defined in the topics section of the configuration file
for topics in mqtt.topics:
if current_call == topics['call']:
current_port = topics['port']
source_call = topics['server']
#print ('Server: ' + source_call)
#print ('AX.25 port: ' + current_port)
break
# Find call of ax25 port
for position in range(len(axdevice)):
if axdevice[position] == current_port:
port_call = axaddress[position]
#print (port_call)
message = ':' + topics['call'].ljust(9) + ':' + '06'
#print (port_call + ' ' + source_call + ' ' + mqtt.destination + ' ' + message)
send_ax25(port_call, source_call, mqtt.destination, 0, message)
# Every time this functtion is called it moves up one call and wraps around
count[0] += 1
if count[0] >= nr_of_clients:
count[0] = 0
# Decode response of clients to sensible MQTT messages
def process_polling(call, data, mqtt_client):
print ('Received status: ' + call + " " + data)
if len(data) != 5:
return
for cnt in range(len(data)):
if data[-1*cnt] != '0' and data[-1*cnt] != '1':
print('Invalid response')
return
# Now we have to figure out the other paramters of the client. These are defined in the topics section of the configuration file
for topics in mqtt.topics:
if call == topics['call']:
current_name = topics['name']
current_poll_bit = topics['poll_bit']
print ('MQTT topic: ' + current_name)
#print (current_poll_bit)
inverted_poll_bit = -1*current_poll_bit
status_bit = data[inverted_poll_bit]
if status_bit == '1':
print ('ON')
state = 'ON'
else:
print('OFF')
state = 'OFF'
topic = mqtt.topic_root + '/' + current_name + '/state'
publish(mqtt_client,topic,state)
def process_message(data, payload):
#print ('Payload: '+ payload)
#print (data['call'])
#print (data['port'])
if aprs.busy == 0:
# find payload in configuration file
for commands in data['command']:
if payload == commands['payload']:
aprs.time_out_timer = time.time() # Start timeout timer
aprs.busy = 1
aprs.selected_port = data['port']
# Find call of ax25 port
for position in range(len(axdevice)):
if axdevice[position] == aprs.selected_port:
aprs.port_call = axaddress[position]
aprs.source_call = data['server']
aprs.wait_for_ack = commands['response']
aprs.call_of_wait_for_ack = data['call']
aprs.message = ':' + data['call'].ljust(9) + ':' + commands['cmd']
arguments = '-d \"APRX29\" -s ' + data['port'] + ' \"' + aprs.message + '\"'
beacon_program = "/usr/sbin/beacon"
aprs.beacon_program_with_arguments = beacon_program + " " + arguments
#os.system(aprs.beacon_program_with_arguments)
print ('APRS message ' + aprs.message + ' send to ' + aprs.call_of_wait_for_ack + '.')
mqtt.state = 'busy'
mqtt.aprs_state = 'sending message'
aprs.request_to_send = 1;
else:
mqtt.state = 'busy'
def run():
read_config()
rx_socket = bind_ax25()
client = connect_mqtt()
add_subscribtions_from_configfile(client)
#topic = mqtt.topic_root + '/set'
#subscribe(client,topic)
client.loop_start()
# Send ready to MQTT broker to indicate we are meaning business
mqtt.aprs_state = 'ready'
#topic = mqtt.topic_root + '/aprs_status'
#publish(client,topic,'ready')
mqtt.state = 'busy'
aprs.time_out_timer = time.time()
aprs.poll_timer = time.time()
while True:
if aprs.request_to_send == 1:
send_ax25(aprs.port_call, aprs.source_call, mqtt.destination, 0, aprs.message)
aprs.time_out_timer = time.time() # Restart timeout timer
#print(aprs.selected_port)
#print(aprs.message)
aprs.request_to_send = 0
receive = receive_ax25(rx_socket)
for port in range(len(axdevice)):
if receive[0][1] == axdevice[port]:
#print(receive)
source, destination, digipeaters, payload = parsePacket(receive[1])
# bug UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf8 in position 47: invalid start byte
try:
payload = payload.decode()
except:
payload = 'NOT VALID'
#print("Packet Received by = %s"%axaddress[0])
#print("Source Address = %s"%source)
#print("Destination Address = %s"%destination)
#print("Digipeaters =")
#print(digipeaters)
#print("Payload = %s"%payload)
#print("")
# Test if received packet is from a polled station
for poll in mqtt.poll:
if poll['call'] == source:
#print ('Received packet from polled station: ' + payload)
# split payload at colon. If it is a valid reply, we should get three
# substrings: the first in empty, the second with the call of the ax25
# interface and the thirth with the status of the outputs
split_message=payload.split(":")
if len(split_message) == 3:
if split_message[1].replace(" ", "") == axaddress[port]:
if len(split_message[2]) == 5:
#print ('Received status: ' + source + " " + split_message[2])
process_polling(source, split_message[2], client)
# Waiting for acknowledge...
if aprs.wait_for_ack != 0:
if source == aprs.call_of_wait_for_ack:
# split payload at colon. If it is a valid acknowledge, we should get three
# substrings: the first in empty, the second with the call of the ax25
# interface and the thirth with the acknowledge
split_message=payload.split(":")
if len(split_message) == 3:
if split_message[1].replace(" ", "") == axaddress[port]:
if split_message[2] == aprs.wait_for_ack:
print ('Received acknowledge ' + aprs.wait_for_ack + ' from ' + aprs.call_of_wait_for_ack + ".")
aprs.time_out_timer = time.time() # Restart timeout timer
aprs.wait_for_ack = 0
aprs.busy = 0
aprs.retry_counter = 0
mqtt.aprs_state = 'message sent'
#topic = mqtt.topic_root + '/aprs_status'
#publish(client,topic,mqtt.aprs_state)
mqtt.state = 'busy'
# Time out waiting for acknowledge
if aprs.wait_for_ack != 0:
if time.time() - aprs.time_out_timer > mqtt.transmit_rate:
aprs.retry_counter = aprs.retry_counter + 1
if aprs.retry_counter < mqtt.retry:
# Try again
aprs.time_out_timer = time.time() # Restart timeout timer
aprs.request_to_send = 1;
#os.system(aprs.beacon_program_with_arguments)
print ('Retry: APRS ' + aprs.message + ' message send to ' + aprs.call_of_wait_for_ack + '.')
mqtt.aprs_state = 'sending message (retry ' + str(aprs.retry_counter) + ')'
mqtt.state = 'busy'
else:
# Give up
print ('No acknowledge received from ' + aprs.call_of_wait_for_ack + '. Giving up.')
aprs.time_out_timer = time.time() # Restart timeout timer
aprs.wait_for_ack = 0
aprs.busy = 0
aprs.retry_counter = 0
mqtt.aprs_state = 'sending message failed'
#topic = mqtt.topic_root + '/aprs_status'
#publish(client,topic,mqtt.aprs_state)
mqtt.state = 'busy'
# If APRS system is transmitting, retrying and still waiting for acknowledge, keep on waiting and send an MQTT update
if mqtt.state == 'busy':
topic = mqtt.topic_root + '/aprs_status'
publish(client,topic,mqtt.aprs_state)
mqtt.state = 'ready'
if time.time() - aprs.poll_timer > mqtt.poll_rate:
poll_clients()
# Reset poll timer
aprs.poll_timer = time.time()
#print('Poll clients')
if __name__ == '__main__':
sys.stdout = sys.stderr = open('/home/marcel/aprs-mqtt-bridge_debug.log', 'w')
run()