A dual band aprs digipeater with enhanced telemetry capabilities.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

280 lines
10 KiB

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# This program provides basic KISS AX.25 APRS frame encoding and decoding.
# Note that only APRS relevant structures are tested. It might not work
# for generic AX.25 frames.
# 11/2019 by Thomas Kottek, OE9TKH
#
# Inspired by:
# * Python script to decode AX.25 from KISS frames over a serial TNC
# https://gist.github.com/mumrah/8fe7597edde50855211e27192cce9f88
#
# * Sending a raw AX.25 frame with Python
# https://thomask.sdf.org/blog/2018/12/15/sending-raw-ax25-python.html
#
# TODO: remove escapes on decoding
#
# Changes by PE1RXF
#
# 2022-01-23: - in encode_address() added correct handling of has_been_repeated flag '*'
# 2022-01-28: - in encode_kiss() and encode_address(): better exeption handling for corrupted or mal-formatted APRS frames
#
import struct
KISS_FEND = 0xC0 # Frame start/end marker
KISS_FESC = 0xDB # Escape character
KISS_TFEND = 0xDC # If after an escape, means there was an 0xC0 in the source message
KISS_TFESC = 0xDD # If after an escape, means there was an 0xDB in the source message
# Addresses must be 6 bytes plus the SSID byte, each character shifted left by 1
# If it's the final address in the header, set the low bit to 1
# Ignoring command/response for simple example
def encode_address(s, final):
if b"-" not in s:
s = s + b"-0" # default to SSID 0
call, ssid = s.split(b'-')
if len(call) < 6:
call = call + b" "*(6 - len(call)) # pad with spaces
encoded_call = [x << 1 for x in call[0:6]]
encoded_ssid = 0b00000000
# If ssid ends with *, the message has been repeated, so we have to set the 'has_been_repeated' flag and remove the * from the ssid
if ssid[-1] == 42:
# print("Message has been repeated")
ssid = ssid[:-1]
encoded_ssid |= 0b10000000
# If SSID was not pressent (and we added the default -0 to it), the has_been_repeated flag could be at the end of the call, so check that as well
# Also, there is a lot of bad software around (including this code) and ignorance of the specifications (are there any specs for LoRa APRS?), so always check for the has_been_repeated flag
if call[-1] == 42:
call = call[:-1]
encoded_ssid |= 0b10000000
# SSID should now be one or two postions long and contain a number (idealy between 0 and 15).
if len(ssid) is 1 and ssid[0] > 47 and ssid[0] < 58:
encoded_ssid |= (int(ssid) << 1) | 0b01100000 | (0b00000001 if final else 0)
elif len(ssid) is 2 and ssid[0] > 47 and ssid[0] < 58 and ssid[1] > 47 and ssid[2] < 58:
encoded_ssid |= (int(ssid) << 1) | 0b01100000 | (0b00000001 if final else 0)
else:
return None
return encoded_call + [encoded_ssid]
def decode_address(data, cursor):
(a1, a2, a3, a4, a5, a6, a7) = struct.unpack("<BBBBBBB", data[cursor:cursor + 7])
hrr = a7 >> 5
ssid = (a7 >> 1) & 0xf
ext = a7 & 0x1
addr = struct.pack("<BBBBBB", a1 >> 1, a2 >> 1, a3 >> 1, a4 >> 1, a5 >> 1, a6 >> 1)
if ssid != 0:
call = addr.strip() + "-{}".format(ssid).encode()
else:
call = addr
return (call, hrr, ext)
########################################################################
# Encode string from LoRa radio to AX.25 over KISS
#
# We must make no assumptions as the incomming frame could be carbage.
# So make sure we think of everthing in order to prevent crashes.
#
# The original code from Thomas Kottek did a good job encoding propper APRS frames.
# But when the frames where not what they should be, the program could crash.
#
########################################################################
def encode_kiss(frame):
# First check: do we have a semi column (seperator path field and data field)
# Note that we could still be wrong: for example when the field seperator is corrupted and we now find a semi column from, lets say, an internet address in the data field...
if not b":" in frame:
return None
# Split the frame in a path field and a data field
path = frame.split(b":")[0]
data_field = frame[frame.find(b":") + 1:]
# The source address is always followed by a greather than sign, so lets see if its there.
# There is always a change that there is another greather than sign because the frame could be corrupted...
if not b">" in path:
return None
# Split the path into a source address and a digi-path array (because digis should be seperated by commas, but again, corruption....)
src_addr = path.split(b">")[0]
digis = path[path.find(b">") + 1:].split(b",")
# destination address
return_value = encode_address(digis.pop(0).upper(), False)
if return_value is None:
return None
packet = return_value
# source address
return_value = encode_address(src_addr.upper(), len(digis) == 0)
if return_value is None:
return None
packet += return_value
# digipeaters
for digi in digis:
final_addr = digis.index(digi) == len(digis) - 1
return_value = encode_address(digi.upper(), final_addr)
if return_value is None:
return None
packet += return_value
# control field
packet += [0x03] # This is an UI frame
# protocol ID
packet += [0xF0] # No protocol
# information field
packet += frame[frame.find(b":") + 1:]
# Escape the packet in case either KISS_FEND or KISS_FESC ended up in our stream
packet_escaped = []
for x in packet:
if x == KISS_FEND:
packet_escaped += [KISS_FESC, KISS_TFEND]
elif x == KISS_FESC:
packet_escaped += [KISS_FESC, KISS_TFESC]
else:
packet_escaped += [x]
# Build the frame that we will send to Dire Wolf and turn it into a string
kiss_cmd = 0x00 # Two nybbles combined - TNC 0, command 0 (send data)
kiss_frame = [KISS_FEND, kiss_cmd] + packet_escaped + [KISS_FEND]
try:
output = bytearray(kiss_frame)
except ValueError:
print("Invalid value in frame.")
return None
return output
def decode_kiss(frame):
result = b""
pos = 0
if frame[pos] != 0xC0 or frame[len(frame) - 1] != 0xC0:
print(frame[pos], frame[len(frame) - 1])
return None
pos += 1
pos += 1
# DST
(dest_addr, dest_hrr, dest_ext) = decode_address(frame, pos)
pos += 7
# print("DST: ", dest_addr)
# SRC
(src_addr, src_hrr, src_ext) = decode_address(frame, pos)
pos += 7
# print("SRC: ", src_addr)
result += src_addr.strip()
# print(type(result), type(dest_addr.strip()))
result += b">" + dest_addr.strip()
# REPEATERS
ext = src_ext
while ext == 0:
rpt_addr, rpt_hrr, ext = decode_address(frame, pos)
# print("RPT: ", rpt_addr)
pos += 7
result += b"," + rpt_addr.strip()
result += b":"
# CTRL
# (ctrl,) = struct.unpack("<B", frame[pos])
ctrl = frame[pos]
pos += 1
if (ctrl & 0x3) == 0x3:
#(pid,) = struct.unpack("<B", frame[pos])
pid = frame[pos]
# print("PID="+str(pid))
pos += 1
result += frame[pos:len(frame) - 1]
elif (ctrl & 0x3) == 0x1:
# decode_sframe(ctrl, frame, pos)
print("SFRAME")
return None
elif (ctrl & 0x1) == 0x0:
# decode_iframe(ctrl, frame, pos)
print("IFRAME")
return None
return result
class SerialParser():
'''Simple parser for KISS frames. It handles multiple frames in one packet
and calls the callback function on each frame'''
STATE_IDLE = 0
STATE_FEND = 1
STATE_DATA = 2
KISS_FEND = KISS_FEND
def __init__(self, frame_cb=None):
self.frame_cb = frame_cb
self.reset()
def reset(self):
self.state = self.STATE_IDLE
self.cur_frame = bytearray()
def parse(self, data):
'''Call parse with a string of one or more characters'''
for c in data:
if self.state == self.STATE_IDLE:
if c == self.KISS_FEND:
self.cur_frame.append(c)
self.state = self.STATE_FEND
elif self.state == self.STATE_FEND:
if c == self.KISS_FEND:
self.reset()
else:
self.cur_frame.append(c)
self.state = self.STATE_DATA
elif self.state == self.STATE_DATA:
self.cur_frame.append(c)
if c == self.KISS_FEND:
# frame complete
if self.frame_cb:
self.frame_cb(self.cur_frame)
self.reset()
if __name__ == "__main__":
# Playground for testing
# frame = "\xc0\x00\x82\xa0\xa4\xb0dr`\x9e\x8ar\xa8\x96\x90u\x03\xf0!4725.73NR00939.61E&Experimental LoRa iGate\xc0"
frame = "\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90q\x03\xf0!4725.51N/00939.86E[322/002/A=001306 Batt=3.99V\xc0"
# print(decode_kiss(frame))
# encoded = encode_kiss("OE9TKH-8>APRS,RELAY,BLA:!4725.51N/00939.86E[322/002/A=001306 Batt=3")
# encoded = encode_kiss("OE9TKH-8>APRS,digi-3,digi-2:!4725.51N/00939.86E[322/002/A=001306 Batt=3")
# print((decode_kiss(encoded)))
# print((decode_kiss("\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90t\xae\x92\x88\x8ab@\x03\x03\xf0}OE9GHV-10>APMI06,TCPIP,OE9TKH-10*:@110104z4726.55N/00950.63E&WX3in1 op. Holger U=14.2V,T=8.8C\xc0")))
def newframe(frame):
print(repr(frame))
two_example_frames = "\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :<Ass/Ref> <Freq> <Mode> [call] [comment]{7ba\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/mylast{7bb\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/last{7bc\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/time(/zone){7bd\xc0"
sp = SerialParser(newframe)
sp.parse(two_example_frames)