#!/usr/bin/python3 """ A bridge between APRS messaging and MQTT, designed to control my lora_aprs_node_pico (C)2022-2023 M.T. Konstapel https://meezenest.nl/mees This file is part of aprs-mqtt-bridge. aprs-mqtt-bridge is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. aprs-mqtt-bridge is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with aprs-mqtt-bridge. If not, see . """ import sys import random import time import os from pathlib import Path import yaml from yaml.loader import SafeLoader from paho.mqtt import client as mqtt_client import pythonax25 configuration_file = "aprs-mqtt-bridge.yml" # This is where we keep our settings class mqtt_settings: #broker #topic_root #port #client_id #transmit_rate #retry #topics #poll state = 'ready' aprs_state = 'idle' pass mqtt = mqtt_settings() axport = [] axdevice = [] axaddress = [] class aprs_status: nr_of_ports = 0 busy = 0 wait_for_ack = 0 call_of_wait_for_ack = 0 time_out_timer = 0 retry_counter = 0 selected_port = 0 request_to_send = 0 pass aprs = aprs_status() def parsePacket(string): # Split the address and payload separated by APRS PID buffer = string.split(b'\x03\xf0') address = buffer[0] # Check if the first byte indicates it is a data packet if address[0] == 0: # Cut the first byte and feed it to the address parser listAddress = getAllAddress(address[1:]) if listAddress != 0: # Get the source, destination, and digipeaters from the address list source = listAddress[1] destination = listAddress[0] digipeaters = listAddress[2:] # Occasionally a bad packet is received causng the program to crash with an "IndexError: list index out of range". Fix: check if index IS out of range before copying it to payload if len(buffer) > 1: payload = buffer[1] else: payload = 'NOT VALID' else: # If there was an error decoding the address we return save values which will be ignored by the rest of the program source = 'NOCALL' destination = 'NOCALL' digipeaters = 'NOCALL' payload = 'NOT VALID' else: # If there was an error decoding the address we return save values which will be ignored by the rest of the program source = 'NOCALL' destination = 'NOCALL' digipeaters = 'NOCALL' payload = 'NOT VALID' #raise Exception('Not a data packet') return (source, destination, digipeaters, payload) def getAllAddress(packetAddress): addressSize = 7 # Check if the networked address string is valid if (len(packetAddress) % 7) == 0: # Create a list of all address in ASCII form try: allAddress = [pythonax25.network_to_ascii(packetAddress[i:i+addressSize]) for i in range(0, len(packetAddress), addressSize)] except: allAddress = 0 return allAddress else: # Received a non valid address. Fill return value with NULL so we don't crash allAddress = 0 return allAddress #raise Exception('Error: Address is not a multiple of 7') def bind_ax25(): # Check if there's any active AX25 port current_port = 0; port_nr = pythonax25.config_load_ports() aprs.nr_of_ports = port_nr if port_nr > 0: # Get the device name of the first port axport.append(pythonax25.config_get_first_port()) axdevice.append(pythonax25.config_get_device(axport[current_port])) axaddress.append(pythonax25.config_get_address(axport[current_port])) print (axport[current_port], axdevice[current_port], axaddress[current_port]) current_port = current_port + 1 while port_nr - current_port > 0: axport.append(pythonax25.config_get_next_port(axport[current_port-1])) axdevice.append(pythonax25.config_get_device(axport[current_port])) axaddress.append(pythonax25.config_get_address(axport[current_port])) print (axport[current_port], axdevice[current_port], axaddress[current_port]) current_port = current_port + 1 else: exit(0) # Initiate a PF_PACKET socket (RX) rx_socket = pythonax25.packet_socket() return rx_socket def receive_ax25(rx_socket): # Blocking receive packet, 10 ms timeout receive = pythonax25.packet_rx(rx_socket,10) return receive def send_ax25(portCall, srcCall, dest, digi, msg): # Initiate a datagram socket (TX) tx_socket = pythonax25.datagram_socket() res = pythonax25.datagram_bind(tx_socket, srcCall, portCall) #print(res) if digi == 0: res = pythonax25.datagram_tx(tx_socket, dest, msg) else: res = pythonax25.datagram_tx_digi(tx_socket, dest, digi, msg) #print(res) pythonax25.close_socket(tx_socket) def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # Set Connecting Client ID # After reconnect messages from the MQTT broker where not received. # clean_session: a boolean that determines the client type. # If True, the broker will remove all information about this client # when it disconnects. If False, the client is a durable client and # subscription information and queued messages will be retained when # the client disconnects. # client = mqtt_client.Client(mqtt.client_id) client = mqtt_client.Client(mqtt.client_id, clean_session=False) #client.username_pw_set(username, password) client.on_connect = on_connect client.connect(mqtt.broker, mqtt.port) return client def publish(client, topic, message): result = client.publish(topic, message) status = result[0] if status == 0: print(f"Send `{message}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") def subscribe(client: mqtt_client, topic): def on_message(client, userdata, message): received_payload = message.payload.decode() received_topic = Path(message.topic).name print(f"Received `{received_payload}` from `{message.topic}` topic") # Find corresponding topic in configuration-file and send this to the next function for topics in mqtt.topics: if received_topic == topics['name']: #print ('Found topic in list!') #print (topics) process_message(topics, received_payload) break # print(topic['name']) # print(topic['command']) client.subscribe(topic) client.on_message = on_message def read_config(): try: with open(configuration_file) as f: cfg = yaml.load(f, Loader=SafeLoader) mqtt.topics = cfg['topics'] mqtt.poll = cfg['poll'] #print(mqtt.topics) #for topic in mqtt.topics: # print(topic['name']) # print(topic['command']) except: print ("Configuration file ./" + configuration_file + " not found.") sys.exit(1) try: mqtt.broker = cfg['global']['broker'] except: print ("Error in configuration file: no broker defined.") sys.exit(1) try: mqtt.port = cfg['global']['port'] except: print ("Error in configuration file: no port defined.") sys.exit(1) try: mqtt.topic_root = cfg['global']['topic_root'] except: print ("Error in configuration file: no topic defined.") sys.exit(1) try: mqtt.transmit_rate = cfg['global']['transmit_rate'] except: print ("Error in configuration file: no transmit_rate defined.") sys.exit(1) try: mqtt.retry = cfg['global']['retry'] except: print ("Error in configuration file: no retry defined.") sys.exit(1) try: mqtt.destination = cfg['global']['destination'] except: print ("Error in configuration file: no retry defined.") sys.exit(1) try: mqtt.poll_rate = cfg['global']['poll_rate'] except: print ("Error in configuration file: no poll_rate defined.") sys.exit(1) mqtt.client_id = f'{mqtt.topic_root}-{random.randint(0, 1000)}' print (mqtt.broker) print (mqtt.topic_root) print (mqtt.port) print (mqtt.client_id) # Loop through all topics and activate them def add_subscribtions_from_configfile(client): for topics in mqtt.topics: current_topic = mqtt.topic_root + '/' + topics['name'] subscribe(client,current_topic) print('Topic ' + topics['name'] + ' added') # Loop through poll list and activate then def poll_clients(count=[0]): # How many clients do we have to poll? nr_of_clients = len(mqtt.poll) current_call = mqtt.poll[count[0]]['call'] print ('Polling ' + current_call) # Now we have to figure out the other paramters of the client. These are defined in the topics section of the configuration file for topics in mqtt.topics: if current_call == topics['call']: current_port = topics['port'] source_call = topics['server'] #print ('Server: ' + source_call) #print ('AX.25 port: ' + current_port) break # Find call of ax25 port for position in range(len(axdevice)): if axdevice[position] == current_port: port_call = axaddress[position] #print (port_call) message = ':' + topics['call'].ljust(9) + ':' + '06' #print (port_call + ' ' + source_call + ' ' + mqtt.destination + ' ' + message) send_ax25(port_call, source_call, mqtt.destination, 0, message) # Every time this functtion is called it moves up one call and wraps around count[0] += 1 if count[0] >= nr_of_clients: count[0] = 0 # Decode response of clients to sensible MQTT messages def process_polling(call, data, mqtt_client): print ('Received status: ' + call + " " + data) if len(data) != 5: return for cnt in range(len(data)): if data[-1*cnt] != '0' and data[-1*cnt] != '1': print('Invalid response') return # Now we have to figure out the other paramters of the client. These are defined in the topics section of the configuration file for topics in mqtt.topics: if call == topics['call']: current_name = topics['name'] current_poll_bit = topics['poll_bit'] print ('MQTT topic: ' + current_name) #print (current_poll_bit) inverted_poll_bit = -1*current_poll_bit status_bit = data[inverted_poll_bit] if status_bit == '1': print ('ON') state = 'ON' else: print('OFF') state = 'OFF' topic = mqtt.topic_root + '/' + current_name + '/state' publish(mqtt_client,topic,state) def process_message(data, payload): #print ('Payload: '+ payload) #print (data['call']) #print (data['port']) if aprs.busy == 0: # find payload in configuration file for commands in data['command']: if payload == commands['payload']: aprs.time_out_timer = time.time() # Start timeout timer aprs.busy = 1 aprs.selected_port = data['port'] # Find call of ax25 port for position in range(len(axdevice)): if axdevice[position] == aprs.selected_port: aprs.port_call = axaddress[position] aprs.source_call = data['server'] aprs.wait_for_ack = commands['response'] aprs.call_of_wait_for_ack = data['call'] aprs.message = ':' + data['call'].ljust(9) + ':' + commands['cmd'] arguments = '-d \"APRX29\" -s ' + data['port'] + ' \"' + aprs.message + '\"' beacon_program = "/usr/sbin/beacon" aprs.beacon_program_with_arguments = beacon_program + " " + arguments #os.system(aprs.beacon_program_with_arguments) print ('APRS message ' + aprs.message + ' send to ' + aprs.call_of_wait_for_ack + '.') mqtt.state = 'busy' mqtt.aprs_state = 'sending message' aprs.request_to_send = 1; else: mqtt.state = 'busy' def run(): read_config() rx_socket = bind_ax25() client = connect_mqtt() add_subscribtions_from_configfile(client) #topic = mqtt.topic_root + '/set' #subscribe(client,topic) client.loop_start() # Send ready to MQTT broker to indicate we are meaning business mqtt.aprs_state = 'ready' #topic = mqtt.topic_root + '/aprs_status' #publish(client,topic,'ready') mqtt.state = 'busy' aprs.time_out_timer = time.time() aprs.poll_timer = time.time() while True: if aprs.request_to_send == 1: send_ax25(aprs.port_call, aprs.source_call, mqtt.destination, 0, aprs.message) aprs.time_out_timer = time.time() # Restart timeout timer #print(aprs.selected_port) #print(aprs.message) aprs.request_to_send = 0 receive = receive_ax25(rx_socket) for port in range(len(axdevice)): if receive[0][1] == axdevice[port]: #print(receive) source, destination, digipeaters, payload = parsePacket(receive[1]) # bug UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf8 in position 47: invalid start byte try: payload = payload.decode() except: payload = 'NOT VALID' #print("Packet Received by = %s"%axaddress[0]) #print("Source Address = %s"%source) #print("Destination Address = %s"%destination) #print("Digipeaters =") #print(digipeaters) #print("Payload = %s"%payload) #print("") # Test if received packet is from a polled station for poll in mqtt.poll: if poll['call'] == source: #print ('Received packet from polled station: ' + payload) # split payload at colon. If it is a valid reply, we should get three # substrings: the first in empty, the second with the call of the ax25 # interface and the thirth with the status of the outputs split_message=payload.split(":") if len(split_message) == 3: if split_message[1].replace(" ", "") == axaddress[port]: if len(split_message[2]) == 5: #print ('Received status: ' + source + " " + split_message[2]) process_polling(source, split_message[2], client) # Waiting for acknowledge... if aprs.wait_for_ack != 0: if source == aprs.call_of_wait_for_ack: # split payload at colon. If it is a valid acknowledge, we should get three # substrings: the first in empty, the second with the call of the ax25 # interface and the thirth with the acknowledge split_message=payload.split(":") if len(split_message) == 3: if split_message[1].replace(" ", "") == axaddress[port]: if split_message[2] == aprs.wait_for_ack: print ('Received acknowledge ' + aprs.wait_for_ack + ' from ' + aprs.call_of_wait_for_ack + ".") aprs.time_out_timer = time.time() # Restart timeout timer aprs.wait_for_ack = 0 aprs.busy = 0 aprs.retry_counter = 0 mqtt.aprs_state = 'message sent' #topic = mqtt.topic_root + '/aprs_status' #publish(client,topic,mqtt.aprs_state) mqtt.state = 'busy' # Time out waiting for acknowledge if aprs.wait_for_ack != 0: if time.time() - aprs.time_out_timer > mqtt.transmit_rate: aprs.retry_counter = aprs.retry_counter + 1 if aprs.retry_counter < mqtt.retry: # Try again aprs.time_out_timer = time.time() # Restart timeout timer aprs.request_to_send = 1; #os.system(aprs.beacon_program_with_arguments) print ('Retry: APRS ' + aprs.message + ' message send to ' + aprs.call_of_wait_for_ack + '.') mqtt.aprs_state = 'sending message (retry ' + str(aprs.retry_counter) + ')' mqtt.state = 'busy' else: # Give up print ('No acknowledge received from ' + aprs.call_of_wait_for_ack + '. Giving up.') aprs.time_out_timer = time.time() # Restart timeout timer aprs.wait_for_ack = 0 aprs.busy = 0 aprs.retry_counter = 0 mqtt.aprs_state = 'sending message failed' #topic = mqtt.topic_root + '/aprs_status' #publish(client,topic,mqtt.aprs_state) mqtt.state = 'busy' # If APRS system is transmitting, retrying and still waiting for acknowledge, keep on waiting and send an MQTT update if mqtt.state == 'busy': topic = mqtt.topic_root + '/aprs_status' publish(client,topic,mqtt.aprs_state) mqtt.state = 'ready' if time.time() - aprs.poll_timer > mqtt.poll_rate: poll_clients() # Reset poll timer aprs.poll_timer = time.time() #print('Poll clients') if __name__ == '__main__': sys.stdout = sys.stderr = open('/home/marcel/aprs-mqtt-bridge_debug.log', 'w') run()