You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
496 lines
18 KiB
496 lines
18 KiB
1 year ago
|
#!/usr/bin/python3
|
||
|
"""
|
||
|
A bridge between APRS messaging and MQTT, designed to control my lora_aprs_node_pico
|
||
|
|
||
|
(C)2022-2023 M.T. Konstapel https://meezenest.nl/mees
|
||
|
|
||
|
This file is part of aprs-mqtt-bridge.
|
||
|
|
||
|
aprs-mqtt-bridge is free software: you can redistribute it and/or modify
|
||
|
it under the terms of the GNU General Public License as published by
|
||
|
the Free Software Foundation, either version 3 of the License, or
|
||
|
(at your option) any later version.
|
||
|
|
||
|
aprs-mqtt-bridge is distributed in the hope that it will be useful,
|
||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
GNU General Public License for more details.
|
||
|
|
||
|
You should have received a copy of the GNU General Public License
|
||
|
along with aprs-mqtt-bridge. If not, see <https://www.gnu.org/licenses/>.
|
||
|
"""
|
||
|
import sys
|
||
|
import random
|
||
|
import time
|
||
|
import os
|
||
|
from pathlib import Path
|
||
|
import yaml
|
||
|
from yaml.loader import SafeLoader
|
||
|
from paho.mqtt import client as mqtt_client
|
||
|
import pythonax25
|
||
|
|
||
|
configuration_file = "aprs-mqtt-bridge.yml"
|
||
|
|
||
|
# This is where we keep our settings
|
||
|
class mqtt_settings:
|
||
|
#broker
|
||
|
#topic_root
|
||
|
#port
|
||
|
#client_id
|
||
|
#transmit_rate
|
||
|
#retry
|
||
|
#topics
|
||
|
#poll
|
||
|
state = 'ready'
|
||
|
aprs_state = 'idle'
|
||
|
pass
|
||
|
mqtt = mqtt_settings()
|
||
|
|
||
|
axport = []
|
||
|
axdevice = []
|
||
|
axaddress = []
|
||
|
class aprs_status:
|
||
|
nr_of_ports = 0
|
||
|
busy = 0
|
||
|
wait_for_ack = 0
|
||
|
call_of_wait_for_ack = 0
|
||
|
time_out_timer = 0
|
||
|
retry_counter = 0
|
||
|
selected_port = 0
|
||
|
request_to_send = 0
|
||
|
pass
|
||
|
aprs = aprs_status()
|
||
|
|
||
|
def parsePacket(string):
|
||
|
# Split the address and payload separated by APRS PID
|
||
|
buffer = string.split(b'\x03\xf0')
|
||
|
address = buffer[0]
|
||
|
|
||
|
# Check if the first byte indicates it is a data packet
|
||
|
if address[0] == 0:
|
||
|
# Cut the first byte and feed it to the address parser
|
||
|
listAddress = getAllAddress(address[1:])
|
||
|
|
||
|
if listAddress != 0:
|
||
|
# Get the source, destination, and digipeaters from the address list
|
||
|
source = listAddress[1]
|
||
|
destination = listAddress[0]
|
||
|
digipeaters = listAddress[2:]
|
||
|
payload = buffer[1]
|
||
|
else:
|
||
|
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
|
||
|
source = 'NOCALL'
|
||
|
destination = 'NOCALL'
|
||
|
digipeaters = 'NOCALL'
|
||
|
payload = 'NOT VALID'
|
||
|
else:
|
||
|
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
|
||
|
source = 'NOCALL'
|
||
|
destination = 'NOCALL'
|
||
|
digipeaters = 'NOCALL'
|
||
|
payload = 'NOT VALID'
|
||
|
#raise Exception('Not a data packet')
|
||
|
|
||
|
return (source, destination, digipeaters, payload)
|
||
|
|
||
|
def getAllAddress(packetAddress):
|
||
|
addressSize = 7
|
||
|
# Check if the networked address string is valid
|
||
|
if (len(packetAddress) % 7) == 0:
|
||
|
# Create a list of all address in ASCII form
|
||
|
try:
|
||
|
allAddress = [pythonax25.network_to_ascii(packetAddress[i:i+addressSize])
|
||
|
for i in range(0, len(packetAddress), addressSize)]
|
||
|
except:
|
||
|
allAddress = 0
|
||
|
|
||
|
return allAddress
|
||
|
else:
|
||
|
# Received a non valid address. Fill return value with NULL so we don't crash
|
||
|
allAddress = 0
|
||
|
return allAddress
|
||
|
#raise Exception('Error: Address is not a multiple of 7')
|
||
|
|
||
|
def bind_ax25():
|
||
|
# Check if there's any active AX25 port
|
||
|
current_port = 0;
|
||
|
port_nr = pythonax25.config_load_ports()
|
||
|
aprs.nr_of_ports = port_nr
|
||
|
if port_nr > 0:
|
||
|
# Get the device name of the first port
|
||
|
axport.append(pythonax25.config_get_first_port())
|
||
|
axdevice.append(pythonax25.config_get_device(axport[current_port]))
|
||
|
axaddress.append(pythonax25.config_get_address(axport[current_port]))
|
||
|
print (axport[current_port], axdevice[current_port], axaddress[current_port])
|
||
|
current_port = current_port + 1
|
||
|
|
||
|
while port_nr - current_port > 0:
|
||
|
axport.append(pythonax25.config_get_next_port(axport[current_port-1]))
|
||
|
axdevice.append(pythonax25.config_get_device(axport[current_port]))
|
||
|
axaddress.append(pythonax25.config_get_address(axport[current_port]))
|
||
|
print (axport[current_port], axdevice[current_port], axaddress[current_port])
|
||
|
current_port = current_port + 1
|
||
|
|
||
|
else:
|
||
|
exit(0)
|
||
|
|
||
|
# Initiate a PF_PACKET socket (RX)
|
||
|
rx_socket = pythonax25.packet_socket()
|
||
|
|
||
|
return rx_socket
|
||
|
|
||
|
def receive_ax25(rx_socket):
|
||
|
# Blocking receive packet, 10 ms timeout
|
||
|
receive = pythonax25.packet_rx(rx_socket,10)
|
||
|
return receive
|
||
|
|
||
|
def send_ax25(portCall, srcCall, dest, digi, msg):
|
||
|
# Initiate a datagram socket (TX)
|
||
|
tx_socket = pythonax25.datagram_socket()
|
||
|
res = pythonax25.datagram_bind(tx_socket, srcCall, portCall)
|
||
|
#print(res)
|
||
|
|
||
|
if digi == 0:
|
||
|
res = pythonax25.datagram_tx(tx_socket, dest, msg)
|
||
|
else:
|
||
|
res = pythonax25.datagram_tx_digi(tx_socket, dest, digi, msg)
|
||
|
#print(res)
|
||
|
pythonax25.close_socket(tx_socket)
|
||
|
|
||
|
def connect_mqtt():
|
||
|
def on_connect(client, userdata, flags, rc):
|
||
|
if rc == 0:
|
||
|
print("Connected to MQTT Broker!")
|
||
|
else:
|
||
|
print("Failed to connect, return code %d\n", rc)
|
||
|
# Set Connecting Client ID
|
||
|
client = mqtt_client.Client(mqtt.client_id)
|
||
|
#client.username_pw_set(username, password)
|
||
|
client.on_connect = on_connect
|
||
|
client.connect(mqtt.broker, mqtt.port)
|
||
|
return client
|
||
|
|
||
|
def publish(client, topic, message):
|
||
|
result = client.publish(topic, message)
|
||
|
status = result[0]
|
||
|
if status == 0:
|
||
|
print(f"Send `{message}` to topic `{topic}`")
|
||
|
else:
|
||
|
print(f"Failed to send message to topic {topic}")
|
||
|
|
||
|
def subscribe(client: mqtt_client, topic):
|
||
|
def on_message(client, userdata, message):
|
||
|
received_payload = message.payload.decode()
|
||
|
received_topic = Path(message.topic).name
|
||
|
print(f"Received `{received_payload}` from `{message.topic}` topic")
|
||
|
# Find corresponding topic in configuration-file and send this to the next function
|
||
|
for topics in mqtt.topics:
|
||
|
if received_topic == topics['name']:
|
||
|
#print ('Found topic in list!')
|
||
|
#print (topics)
|
||
|
process_message(topics, received_payload)
|
||
|
break
|
||
|
# print(topic['name'])
|
||
|
# print(topic['command'])
|
||
|
|
||
|
client.subscribe(topic)
|
||
|
client.on_message = on_message
|
||
|
|
||
|
def read_config():
|
||
|
try:
|
||
|
with open(configuration_file) as f:
|
||
|
cfg = yaml.load(f, Loader=SafeLoader)
|
||
|
|
||
|
mqtt.topics = cfg['topics']
|
||
|
mqtt.poll = cfg['poll']
|
||
|
#print(mqtt.topics)
|
||
|
#for topic in mqtt.topics:
|
||
|
# print(topic['name'])
|
||
|
# print(topic['command'])
|
||
|
except:
|
||
|
print ("Configuration file ./" + configuration_file + " not found.")
|
||
|
sys.exit(1)
|
||
|
|
||
|
try:
|
||
|
mqtt.broker = cfg['global']['broker']
|
||
|
except:
|
||
|
print ("Error in configuration file: no broker defined.")
|
||
|
sys.exit(1)
|
||
|
|
||
|
try:
|
||
|
mqtt.port = cfg['global']['port']
|
||
|
except:
|
||
|
print ("Error in configuration file: no port defined.")
|
||
|
sys.exit(1)
|
||
|
try:
|
||
|
mqtt.topic_root = cfg['global']['topic_root']
|
||
|
except:
|
||
|
print ("Error in configuration file: no topic defined.")
|
||
|
sys.exit(1)
|
||
|
try:
|
||
|
mqtt.transmit_rate = cfg['global']['transmit_rate']
|
||
|
except:
|
||
|
print ("Error in configuration file: no transmit_rate defined.")
|
||
|
sys.exit(1)
|
||
|
try:
|
||
|
mqtt.retry = cfg['global']['retry']
|
||
|
except:
|
||
|
print ("Error in configuration file: no retry defined.")
|
||
|
sys.exit(1)
|
||
|
try:
|
||
|
mqtt.destination = cfg['global']['destination']
|
||
|
except:
|
||
|
print ("Error in configuration file: no retry defined.")
|
||
|
sys.exit(1)
|
||
|
try:
|
||
|
mqtt.poll_rate = cfg['global']['poll_rate']
|
||
|
except:
|
||
|
print ("Error in configuration file: no poll_rate defined.")
|
||
|
sys.exit(1)
|
||
|
|
||
|
mqtt.client_id = f'{mqtt.topic_root}-{random.randint(0, 1000)}'
|
||
|
|
||
|
print (mqtt.broker)
|
||
|
print (mqtt.topic_root)
|
||
|
print (mqtt.port)
|
||
|
print (mqtt.client_id)
|
||
|
|
||
|
# Loop through all topics and activate them
|
||
|
def add_subscribtions_from_configfile(client):
|
||
|
for topics in mqtt.topics:
|
||
|
current_topic = mqtt.topic_root + '/' + topics['name']
|
||
|
subscribe(client,current_topic)
|
||
|
print('Topic ' + topics['name'] + ' added')
|
||
|
|
||
|
# Loop through poll list and activate then
|
||
|
def poll_clients(count=[0]):
|
||
|
# How many clients do we have to poll?
|
||
|
nr_of_clients = len(mqtt.poll)
|
||
|
|
||
|
current_call = mqtt.poll[count[0]]['call']
|
||
|
print ('Polling ' + current_call)
|
||
|
|
||
|
# Now we have to figure out the other paramters of the client. These are defined in the topics section of the configuration file
|
||
|
for topics in mqtt.topics:
|
||
|
if current_call == topics['call']:
|
||
|
current_port = topics['port']
|
||
|
source_call = topics['server']
|
||
|
#print ('Server: ' + source_call)
|
||
|
#print ('AX.25 port: ' + current_port)
|
||
|
break
|
||
|
|
||
|
# Find call of ax25 port
|
||
|
for position in range(len(axdevice)):
|
||
|
if axdevice[position] == current_port:
|
||
|
port_call = axaddress[position]
|
||
|
#print (port_call)
|
||
|
|
||
|
message = ':' + topics['call'].ljust(9) + ':' + '06'
|
||
|
|
||
|
#print (port_call + ' ' + source_call + ' ' + mqtt.destination + ' ' + message)
|
||
|
send_ax25(port_call, source_call, mqtt.destination, 0, message)
|
||
|
|
||
|
# Every time this functtion is called it moves up one call and wraps around
|
||
|
count[0] += 1
|
||
|
|
||
|
if count[0] >= nr_of_clients:
|
||
|
count[0] = 0
|
||
|
|
||
|
# Decode response of clients to sensible MQTT messages
|
||
|
def process_polling(call, data, mqtt_client):
|
||
|
print ('Received status: ' + call + " " + data)
|
||
|
|
||
|
if len(data) != 5:
|
||
|
return
|
||
|
|
||
|
for cnt in range(len(data)):
|
||
|
if data[-1*cnt] != '0' and data[-1*cnt] != '1':
|
||
|
print('Invalid response')
|
||
|
return
|
||
|
|
||
|
# Now we have to figure out the other paramters of the client. These are defined in the topics section of the configuration file
|
||
|
for topics in mqtt.topics:
|
||
|
if call == topics['call']:
|
||
|
current_name = topics['name']
|
||
|
current_poll_bit = topics['poll_bit']
|
||
|
print ('MQTT topic: ' + current_name)
|
||
|
#print (current_poll_bit)
|
||
|
|
||
|
inverted_poll_bit = -1*current_poll_bit
|
||
|
status_bit = data[inverted_poll_bit]
|
||
|
if status_bit == '1':
|
||
|
print ('ON')
|
||
|
state = 'ON'
|
||
|
else:
|
||
|
print('OFF')
|
||
|
state = 'OFF'
|
||
|
|
||
|
topic = mqtt.topic_root + '/' + current_name + '/state'
|
||
|
|
||
|
publish(mqtt_client,topic,state)
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
def process_message(data, payload):
|
||
|
#print ('Payload: '+ payload)
|
||
|
#print (data['call'])
|
||
|
#print (data['port'])
|
||
|
|
||
|
if aprs.busy == 0:
|
||
|
# find payload in configuration file
|
||
|
for commands in data['command']:
|
||
|
if payload == commands['payload']:
|
||
|
aprs.time_out_timer = time.time() # Start timeout timer
|
||
|
aprs.busy = 1
|
||
|
aprs.selected_port = data['port']
|
||
|
|
||
|
# Find call of ax25 port
|
||
|
for position in range(len(axdevice)):
|
||
|
if axdevice[position] == aprs.selected_port:
|
||
|
aprs.port_call = axaddress[position]
|
||
|
|
||
|
aprs.source_call = data['server']
|
||
|
aprs.wait_for_ack = commands['response']
|
||
|
aprs.call_of_wait_for_ack = data['call']
|
||
|
aprs.message = ':' + data['call'].ljust(9) + ':' + commands['cmd']
|
||
|
arguments = '-d \"APRX29\" -s ' + data['port'] + ' \"' + aprs.message + '\"'
|
||
|
beacon_program = "/usr/sbin/beacon"
|
||
|
aprs.beacon_program_with_arguments = beacon_program + " " + arguments
|
||
|
#os.system(aprs.beacon_program_with_arguments)
|
||
|
print ('APRS message ' + aprs.message + ' send to ' + aprs.call_of_wait_for_ack + '.')
|
||
|
mqtt.state = 'busy'
|
||
|
mqtt.aprs_state = 'sending message'
|
||
|
aprs.request_to_send = 1;
|
||
|
else:
|
||
|
mqtt.state = 'busy'
|
||
|
|
||
|
def run():
|
||
|
read_config()
|
||
|
|
||
|
rx_socket = bind_ax25()
|
||
|
|
||
|
client = connect_mqtt()
|
||
|
|
||
|
add_subscribtions_from_configfile(client)
|
||
|
#topic = mqtt.topic_root + '/set'
|
||
|
#subscribe(client,topic)
|
||
|
|
||
|
client.loop_start()
|
||
|
|
||
|
# Send ready to MQTT broker to indicate we are meaning business
|
||
|
mqtt.aprs_state = 'ready'
|
||
|
#topic = mqtt.topic_root + '/aprs_status'
|
||
|
#publish(client,topic,'ready')
|
||
|
mqtt.state = 'busy'
|
||
|
|
||
|
aprs.time_out_timer = time.time()
|
||
|
aprs.poll_timer = time.time()
|
||
|
|
||
|
while True:
|
||
|
|
||
|
if aprs.request_to_send == 1:
|
||
|
send_ax25(aprs.port_call, aprs.source_call, mqtt.destination, 0, aprs.message)
|
||
|
aprs.time_out_timer = time.time() # Restart timeout timer
|
||
|
#print(aprs.selected_port)
|
||
|
#print(aprs.message)
|
||
|
aprs.request_to_send = 0
|
||
|
|
||
|
receive = receive_ax25(rx_socket)
|
||
|
for port in range(len(axdevice)):
|
||
|
if receive[0][1] == axdevice[port]:
|
||
|
#print(receive)
|
||
|
source, destination, digipeaters, payload = parsePacket(receive[1])
|
||
|
# bug UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf8 in position 47: invalid start byte
|
||
|
try:
|
||
|
payload = payload.decode()
|
||
|
except:
|
||
|
payload = 'NOT VALID'
|
||
|
|
||
|
#print("Packet Received by = %s"%axaddress[0])
|
||
|
#print("Source Address = %s"%source)
|
||
|
#print("Destination Address = %s"%destination)
|
||
|
#print("Digipeaters =")
|
||
|
#print(digipeaters)
|
||
|
#print("Payload = %s"%payload)
|
||
|
#print("")
|
||
|
|
||
|
# Test if received packet is from a polled station
|
||
|
for poll in mqtt.poll:
|
||
|
if poll['call'] == source:
|
||
|
#print ('Received packet from polled station: ' + payload)
|
||
|
# split payload at colon. If it is a valid reply, we should get three
|
||
|
# substrings: the first in empty, the second with the call of the ax25
|
||
|
# interface and the thirth with the status of the outputs
|
||
|
split_message=payload.split(":")
|
||
|
if len(split_message) == 3:
|
||
|
if split_message[1].replace(" ", "") == axaddress[port]:
|
||
|
if len(split_message[2]) == 5:
|
||
|
#print ('Received status: ' + source + " " + split_message[2])
|
||
|
process_polling(source, split_message[2], client)
|
||
|
|
||
|
# Waiting for acknowledge...
|
||
|
if aprs.wait_for_ack != 0:
|
||
|
if source == aprs.call_of_wait_for_ack:
|
||
|
# split payload at colon. If it is a valid acknowledge, we should get three
|
||
|
# substrings: the first in empty, the second with the call of the ax25
|
||
|
# interface and the thirth with the acknowledge
|
||
|
|
||
|
split_message=payload.split(":")
|
||
|
if len(split_message) == 3:
|
||
|
if split_message[1].replace(" ", "") == axaddress[port]:
|
||
|
if split_message[2] == aprs.wait_for_ack:
|
||
|
print ('Received acknowledge ' + aprs.wait_for_ack + ' from ' + aprs.call_of_wait_for_ack + ".")
|
||
|
aprs.time_out_timer = time.time() # Restart timeout timer
|
||
|
aprs.wait_for_ack = 0
|
||
|
aprs.busy = 0
|
||
|
aprs.retry_counter = 0
|
||
|
mqtt.aprs_state = 'message sent'
|
||
|
#topic = mqtt.topic_root + '/aprs_status'
|
||
|
#publish(client,topic,mqtt.aprs_state)
|
||
|
mqtt.state = 'busy'
|
||
|
|
||
|
|
||
|
# Time out waiting for acknowledge
|
||
|
if aprs.wait_for_ack != 0:
|
||
|
if time.time() - aprs.time_out_timer > mqtt.transmit_rate:
|
||
|
aprs.retry_counter = aprs.retry_counter + 1
|
||
|
if aprs.retry_counter < mqtt.retry:
|
||
|
# Try again
|
||
|
aprs.time_out_timer = time.time() # Restart timeout timer
|
||
|
aprs.request_to_send = 1;
|
||
|
#os.system(aprs.beacon_program_with_arguments)
|
||
|
print ('Retry: APRS ' + aprs.message + ' message send to ' + aprs.call_of_wait_for_ack + '.')
|
||
|
mqtt.aprs_state = 'sending message (retry ' + str(aprs.retry_counter) + ')'
|
||
|
mqtt.state = 'busy'
|
||
|
else:
|
||
|
# Give up
|
||
|
print ('No acknowledge received from ' + aprs.call_of_wait_for_ack + '. Giving up.')
|
||
|
aprs.time_out_timer = time.time() # Restart timeout timer
|
||
|
aprs.wait_for_ack = 0
|
||
|
aprs.busy = 0
|
||
|
aprs.retry_counter = 0
|
||
|
mqtt.aprs_state = 'sending message failed'
|
||
|
#topic = mqtt.topic_root + '/aprs_status'
|
||
|
#publish(client,topic,mqtt.aprs_state)
|
||
|
mqtt.state = 'busy'
|
||
|
# If APRS system is transmitting, retrying and still waiting for acknowledge, keep on waiting and send an MQTT update
|
||
|
if mqtt.state == 'busy':
|
||
|
topic = mqtt.topic_root + '/aprs_status'
|
||
|
publish(client,topic,mqtt.aprs_state)
|
||
|
mqtt.state = 'ready'
|
||
|
|
||
|
if time.time() - aprs.poll_timer > mqtt.poll_rate:
|
||
|
poll_clients()
|
||
|
# Reset poll timer
|
||
|
aprs.poll_timer = time.time()
|
||
|
#print('Poll clients')
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
#sys.stdout = sys.stderr = open('debug.log', 'w')
|
||
|
run()
|
||
|
|