diff --git a/KissHelper_old.py b/KissHelper_old.py deleted file mode 100644 index dde022e..0000000 --- a/KissHelper_old.py +++ /dev/null @@ -1,262 +0,0 @@ -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -# This program provides basic KISS AX.25 APRS frame encoding and decoding. -# Note that only APRS relevant structures are tested. It might not work -# for generic AX.25 frames. -# 11/2019 by Thomas Kottek, OE9TKH -# -# Inspired by: -# * Python script to decode AX.25 from KISS frames over a serial TNC -# https://gist.github.com/mumrah/8fe7597edde50855211e27192cce9f88 -# -# * Sending a raw AX.25 frame with Python -# https://thomask.sdf.org/blog/2018/12/15/sending-raw-ax25-python.html -# -# TODO: remove escapes on decoding -# -# Changes by PE1RXF -# -# 2022-01-23: - in encode_address() added correct handling of has_been_repeated flag '*' -# - -import struct - -KISS_FEND = 0xC0 # Frame start/end marker -KISS_FESC = 0xDB # Escape character -KISS_TFEND = 0xDC # If after an escape, means there was an 0xC0 in the source message -KISS_TFESC = 0xDD # If after an escape, means there was an 0xDB in the source message - - -# Addresses must be 6 bytes plus the SSID byte, each character shifted left by 1 -# If it's the final address in the header, set the low bit to 1 -# Ignoring command/response for simple example -def encode_address(s, final): - if b"-" not in s: - s = s + b"-0" # default to SSID 0 - call, ssid = s.split(b'-') - if len(call) < 6: - call = call + b" "*(6 - len(call)) # pad with spaces - encoded_call = [x << 1 for x in call[0:6]] - - encoded_ssid = 0b00000000 - # If ssid ends with *, the message has been repeated, so we have to set the 'has_been_repeated' flag and remove the * from the ssid - if ssid[-1] == 42: - # print("Message has been repeated") - ssid = ssid[:-1] - encoded_ssid |= 0b10000000 - - # If SSID was not pressent (and we added the default -0 to it), the has_been_repeated flag could be at the end of the call, so check that as well - # Also, there is a lot of bad software around (including this code), ignorance of the specifications (are there any specs for LoRa APRS?), so always check for the has_been_repeated flag - if call[-1] == 42: - call = call[:-1] - encoded_ssid |= 0b10000000 - - encoded_ssid |= (int(ssid) << 1) | 0b01100000 | (0b00000001 if final else 0) - - return encoded_call + [encoded_ssid] - - -def decode_address(data, cursor): - (a1, a2, a3, a4, a5, a6, a7) = struct.unpack("> 5 - ssid = (a7 >> 1) & 0xf - ext = a7 & 0x1 - addr = struct.pack("> 1, a2 >> 1, a3 >> 1, a4 >> 1, a5 >> 1, a6 >> 1) - if ssid != 0: - call = addr.strip() + "-{}".format(ssid).encode() - else: - call = addr - return (call, hrr, ext) - -######################################################################## -# Encode string from LoRa radio to AX.25 over KISS -# -# We must make no assumptions as the incomming frame could be carbage. -# So make sure we think of everthing in order to prevent crashes. -# -# The original code from Thomas Kottek did a good job encoding propper APRS frames. -# But when the frames where not what they should be, the program could crash. -# -######################################################################## -def encode_kiss(frame): - - # First check: do we have a semi column (seperator path field and data field) - # Note that we could still be wrong: for example when the field seperator is corrupted and we now find a semi column from, lets say, an internet address in the data field... - if not b":" in frame: - return None - - # Split the frame in a path field and a data field - path = frame.split(b":")[0] - data_field = frame[frame.find(b":") + 1:] - - # The source address is always followed by a greather than sign, so lets see if its there. - # There is always a change that there is another greather than sign because the frame could be corrupted... - if not b">" in path: - return None - - # Split the path into a source address and a digi-path array (because digis should be seperated by commas, but again, corruption....) - src_addr = path.split(b">")[0] - digis = path[path.find(b">") + 1:].split(b",") - - # destination address - packet = encode_address(digis.pop(0).upper(), False) - # source address - packet += encode_address(src_addr.upper(), len(digis) == 0) - # digipeaters - for digi in digis: - final_addr = digis.index(digi) == len(digis) - 1 - packet += encode_address(digi.upper(), final_addr) - # control field - packet += [0x03] # This is an UI frame - # protocol ID - packet += [0xF0] # No protocol - # information field - packet += frame[frame.find(b":") + 1:] - - # Escape the packet in case either KISS_FEND or KISS_FESC ended up in our stream - packet_escaped = [] - for x in packet: - if x == KISS_FEND: - packet_escaped += [KISS_FESC, KISS_TFEND] - elif x == KISS_FESC: - packet_escaped += [KISS_FESC, KISS_TFESC] - else: - packet_escaped += [x] - - # Build the frame that we will send to Dire Wolf and turn it into a string - kiss_cmd = 0x00 # Two nybbles combined - TNC 0, command 0 (send data) - kiss_frame = [KISS_FEND, kiss_cmd] + packet_escaped + [KISS_FEND] - try: - output = bytearray(kiss_frame) - except ValueError: - print("Invalid value in frame.") - return None - return output - - -def decode_kiss(frame): - result = b"" - pos = 0 - if frame[pos] != 0xC0 or frame[len(frame) - 1] != 0xC0: - print(frame[pos], frame[len(frame) - 1]) - return None - pos += 1 - pos += 1 - - # DST - (dest_addr, dest_hrr, dest_ext) = decode_address(frame, pos) - pos += 7 - # print("DST: ", dest_addr) - - # SRC - (src_addr, src_hrr, src_ext) = decode_address(frame, pos) - pos += 7 - # print("SRC: ", src_addr) - - result += src_addr.strip() - # print(type(result), type(dest_addr.strip())) - result += b">" + dest_addr.strip() - - # REPEATERS - ext = src_ext - while ext == 0: - rpt_addr, rpt_hrr, ext = decode_address(frame, pos) - # print("RPT: ", rpt_addr) - pos += 7 - result += b"," + rpt_addr.strip() - - result += b":" - - # CTRL - # (ctrl,) = struct.unpack("APRS,RELAY,BLA:!4725.51N/00939.86E[322/002/A=001306 Batt=3") - # encoded = encode_kiss("OE9TKH-8>APRS,digi-3,digi-2:!4725.51N/00939.86E[322/002/A=001306 Batt=3") - # print((decode_kiss(encoded))) - - # print((decode_kiss("\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90t\xae\x92\x88\x8ab@\x03\x03\xf0}OE9GHV-10>APMI06,TCPIP,OE9TKH-10*:@110104z4726.55N/00950.63E&WX3in1 op. Holger U=14.2V,T=8.8C\xc0"))) - - def newframe(frame): - print(repr(frame)) - - - two_example_frames = "\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 : [call] [comment]{7ba\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/mylast{7bb\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/last{7bc\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/time(/zone){7bd\xc0" - sp = SerialParser(newframe) - sp.parse(two_example_frames)