A dual band aprs digipeater with enhanced telemetry capabilities.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

93 lines
2.9 KiB

#!/usr/bin/python3
import pythonax25
from time import sleep, perf_counter
from threading import Thread
axport = []
axdevice = []
axaddress = []
def parsePacket(string):
# Split the address and payload separated by APRS PID
buffer = string.split(b'\x03\xf0')
address = buffer[0]
# Check if the first byte indicates it is a data packet
if address[0] == 0:
# Cut the first byte and feed it to the address parser
listAddress = getAllAddress(address[1:])
# Get the source, destination, and digipeaters from the address list
source = listAddress[1]
destination = listAddress[0]
digipeaters = listAddress[2:]
else:
raise Exception('Not a data packet')
payload = buffer[1]
return (source, destination, digipeaters, payload)
def getAllAddress(packetAddress):
addressSize = 7
# Check if the networked address string is valid
if (len(packetAddress) % 7) == 0:
# Create a list of all address in ASCII form
allAddress = [pythonax25.network_to_ascii(packetAddress[i:i+addressSize])
for i in range(0, len(packetAddress), addressSize)]
return allAddress
else:
raise Exception('Error: Address is not a multiple of 7')
def bind_ax25():
# Check if there's any active AX25 port
current_port = 0;
port_nr = pythonax25.config_load_ports()
if port_nr > 0:
# Get the device name of the first port
axport.append(pythonax25.config_get_first_port())
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
while port_nr - current_port > 0:
axport.append(pythonax25.config_get_next_port(axport[current_port-1]))
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
else:
exit(0)
# Initiate a PF_PACKET socket
socket = pythonax25.packet_socket()
return socket
def receive_ax25(socket):
# Blocking receive packet, 10 ms timeout
receive = pythonax25.packet_rx(socket,10)
return receive
def main():
socket = bind_ax25()
while True:
receive = receive_ax25(socket)
if receive[0][1] == axdevice[0]:
print(receive)
source, destination, digipeaters, payload = parsePacket(receive[1])
print("Packet Received by = %s"%axaddress[0])
print("Source Address = %s"%source)
print("Destination Address = %s"%destination)
print("Digipeaters =")
print(digipeaters)
print("Payload = %s"%payload)
print("")
else:
continue
main()