parent
c79c7c4855
commit
417c6d8919
10 changed files with 444 additions and 23 deletions
@ -0,0 +1,21 @@ |
|||||||
|
# Changelog |
||||||
|
|
||||||
|
All notable changes to this project will be documented in this file. |
||||||
|
|
||||||
|
Added : for new features. |
||||||
|
Changed : for changes in existing functionality. |
||||||
|
Deprecated: for soon-to-be removed features. |
||||||
|
Removed : for now removed features. |
||||||
|
Fixed : for any bug fixes. |
||||||
|
Security : in case of vulnerabilities. |
||||||
|
|
||||||
|
## [0.0.1] - 2022-01-31 |
||||||
|
|
||||||
|
### Added |
||||||
|
- Lora and TCP settings are now configurable via configuration file RPi-LoRa-KISS-TNC.ini |
||||||
|
|
||||||
|
### Changed |
||||||
|
- Better encoding of AX.25 frames: less/no crashes due to corrupted incomming frames |
||||||
|
|
||||||
|
### Deprecated |
||||||
|
- Configuration via config.py |
@ -0,0 +1,262 @@ |
|||||||
|
# This program is free software: you can redistribute it and/or modify |
||||||
|
# it under the terms of the GNU General Public License as published by |
||||||
|
# the Free Software Foundation, either version 3 of the License, or |
||||||
|
# (at your option) any later version. |
||||||
|
# |
||||||
|
# This program is distributed in the hope that it will be useful, |
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||||||
|
# GNU General Public License for more details. |
||||||
|
# |
||||||
|
# You should have received a copy of the GNU General Public License |
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
||||||
|
|
||||||
|
|
||||||
|
# This program provides basic KISS AX.25 APRS frame encoding and decoding. |
||||||
|
# Note that only APRS relevant structures are tested. It might not work |
||||||
|
# for generic AX.25 frames. |
||||||
|
# 11/2019 by Thomas Kottek, OE9TKH |
||||||
|
# |
||||||
|
# Inspired by: |
||||||
|
# * Python script to decode AX.25 from KISS frames over a serial TNC |
||||||
|
# https://gist.github.com/mumrah/8fe7597edde50855211e27192cce9f88 |
||||||
|
# |
||||||
|
# * Sending a raw AX.25 frame with Python |
||||||
|
# https://thomask.sdf.org/blog/2018/12/15/sending-raw-ax25-python.html |
||||||
|
# |
||||||
|
# TODO: remove escapes on decoding |
||||||
|
# |
||||||
|
# Changes by PE1RXF |
||||||
|
# |
||||||
|
# 2022-01-23: - in encode_address() added correct handling of has_been_repeated flag '*' |
||||||
|
# |
||||||
|
|
||||||
|
import struct |
||||||
|
|
||||||
|
KISS_FEND = 0xC0 # Frame start/end marker |
||||||
|
KISS_FESC = 0xDB # Escape character |
||||||
|
KISS_TFEND = 0xDC # If after an escape, means there was an 0xC0 in the source message |
||||||
|
KISS_TFESC = 0xDD # If after an escape, means there was an 0xDB in the source message |
||||||
|
|
||||||
|
|
||||||
|
# Addresses must be 6 bytes plus the SSID byte, each character shifted left by 1 |
||||||
|
# If it's the final address in the header, set the low bit to 1 |
||||||
|
# Ignoring command/response for simple example |
||||||
|
def encode_address(s, final): |
||||||
|
if b"-" not in s: |
||||||
|
s = s + b"-0" # default to SSID 0 |
||||||
|
call, ssid = s.split(b'-') |
||||||
|
if len(call) < 6: |
||||||
|
call = call + b" "*(6 - len(call)) # pad with spaces |
||||||
|
encoded_call = [x << 1 for x in call[0:6]] |
||||||
|
|
||||||
|
encoded_ssid = 0b00000000 |
||||||
|
# If ssid ends with *, the message has been repeated, so we have to set the 'has_been_repeated' flag and remove the * from the ssid |
||||||
|
if ssid[-1] == 42: |
||||||
|
# print("Message has been repeated") |
||||||
|
ssid = ssid[:-1] |
||||||
|
encoded_ssid |= 0b10000000 |
||||||
|
|
||||||
|
# If SSID was not pressent (and we added the default -0 to it), the has_been_repeated flag could be at the end of the call, so check that as well |
||||||
|
# Also, there is a lot of bad software around (including this code), ignorance of the specifications (are there any specs for LoRa APRS?), so always check for the has_been_repeated flag |
||||||
|
if call[-1] == 42: |
||||||
|
call = call[:-1] |
||||||
|
encoded_ssid |= 0b10000000 |
||||||
|
|
||||||
|
encoded_ssid |= (int(ssid) << 1) | 0b01100000 | (0b00000001 if final else 0) |
||||||
|
|
||||||
|
return encoded_call + [encoded_ssid] |
||||||
|
|
||||||
|
|
||||||
|
def decode_address(data, cursor): |
||||||
|
(a1, a2, a3, a4, a5, a6, a7) = struct.unpack("<BBBBBBB", data[cursor:cursor + 7]) |
||||||
|
hrr = a7 >> 5 |
||||||
|
ssid = (a7 >> 1) & 0xf |
||||||
|
ext = a7 & 0x1 |
||||||
|
addr = struct.pack("<BBBBBB", a1 >> 1, a2 >> 1, a3 >> 1, a4 >> 1, a5 >> 1, a6 >> 1) |
||||||
|
if ssid != 0: |
||||||
|
call = addr.strip() + "-{}".format(ssid).encode() |
||||||
|
else: |
||||||
|
call = addr |
||||||
|
return (call, hrr, ext) |
||||||
|
|
||||||
|
######################################################################## |
||||||
|
# Encode string from LoRa radio to AX.25 over KISS |
||||||
|
# |
||||||
|
# We must make no assumptions as the incomming frame could be carbage. |
||||||
|
# So make sure we think of everthing in order to prevent crashes. |
||||||
|
# |
||||||
|
# The original code from Thomas Kottek did a good job encoding propper APRS frames. |
||||||
|
# But when the frames where not what they should be, the program could crash. |
||||||
|
# |
||||||
|
######################################################################## |
||||||
|
def encode_kiss(frame): |
||||||
|
|
||||||
|
# First check: do we have a semi column (seperator path field and data field) |
||||||
|
# Note that we could still be wrong: for example when the field seperator is corrupted and we now find a semi column from, lets say, an internet address in the data field... |
||||||
|
if not b":" in frame: |
||||||
|
return None |
||||||
|
|
||||||
|
# Split the frame in a path field and a data field |
||||||
|
path = frame.split(b":")[0] |
||||||
|
data_field = frame[frame.find(b":") + 1:] |
||||||
|
|
||||||
|
# The source address is always followed by a greather than sign, so lets see if its there. |
||||||
|
# There is always a change that there is another greather than sign because the frame could be corrupted... |
||||||
|
if not b">" in path: |
||||||
|
return None |
||||||
|
|
||||||
|
# Split the path into a source address and a digi-path array (because digis should be seperated by commas, but again, corruption....) |
||||||
|
src_addr = path.split(b">")[0] |
||||||
|
digis = path[path.find(b">") + 1:].split(b",") |
||||||
|
|
||||||
|
# destination address |
||||||
|
packet = encode_address(digis.pop(0).upper(), False) |
||||||
|
# source address |
||||||
|
packet += encode_address(src_addr.upper(), len(digis) == 0) |
||||||
|
# digipeaters |
||||||
|
for digi in digis: |
||||||
|
final_addr = digis.index(digi) == len(digis) - 1 |
||||||
|
packet += encode_address(digi.upper(), final_addr) |
||||||
|
# control field |
||||||
|
packet += [0x03] # This is an UI frame |
||||||
|
# protocol ID |
||||||
|
packet += [0xF0] # No protocol |
||||||
|
# information field |
||||||
|
packet += frame[frame.find(b":") + 1:] |
||||||
|
|
||||||
|
# Escape the packet in case either KISS_FEND or KISS_FESC ended up in our stream |
||||||
|
packet_escaped = [] |
||||||
|
for x in packet: |
||||||
|
if x == KISS_FEND: |
||||||
|
packet_escaped += [KISS_FESC, KISS_TFEND] |
||||||
|
elif x == KISS_FESC: |
||||||
|
packet_escaped += [KISS_FESC, KISS_TFESC] |
||||||
|
else: |
||||||
|
packet_escaped += [x] |
||||||
|
|
||||||
|
# Build the frame that we will send to Dire Wolf and turn it into a string |
||||||
|
kiss_cmd = 0x00 # Two nybbles combined - TNC 0, command 0 (send data) |
||||||
|
kiss_frame = [KISS_FEND, kiss_cmd] + packet_escaped + [KISS_FEND] |
||||||
|
try: |
||||||
|
output = bytearray(kiss_frame) |
||||||
|
except ValueError: |
||||||
|
print("Invalid value in frame.") |
||||||
|
return None |
||||||
|
return output |
||||||
|
|
||||||
|
|
||||||
|
def decode_kiss(frame): |
||||||
|
result = b"" |
||||||
|
pos = 0 |
||||||
|
if frame[pos] != 0xC0 or frame[len(frame) - 1] != 0xC0: |
||||||
|
print(frame[pos], frame[len(frame) - 1]) |
||||||
|
return None |
||||||
|
pos += 1 |
||||||
|
pos += 1 |
||||||
|
|
||||||
|
# DST |
||||||
|
(dest_addr, dest_hrr, dest_ext) = decode_address(frame, pos) |
||||||
|
pos += 7 |
||||||
|
# print("DST: ", dest_addr) |
||||||
|
|
||||||
|
# SRC |
||||||
|
(src_addr, src_hrr, src_ext) = decode_address(frame, pos) |
||||||
|
pos += 7 |
||||||
|
# print("SRC: ", src_addr) |
||||||
|
|
||||||
|
result += src_addr.strip() |
||||||
|
# print(type(result), type(dest_addr.strip())) |
||||||
|
result += b">" + dest_addr.strip() |
||||||
|
|
||||||
|
# REPEATERS |
||||||
|
ext = src_ext |
||||||
|
while ext == 0: |
||||||
|
rpt_addr, rpt_hrr, ext = decode_address(frame, pos) |
||||||
|
# print("RPT: ", rpt_addr) |
||||||
|
pos += 7 |
||||||
|
result += b"," + rpt_addr.strip() |
||||||
|
|
||||||
|
result += b":" |
||||||
|
|
||||||
|
# CTRL |
||||||
|
# (ctrl,) = struct.unpack("<B", frame[pos]) |
||||||
|
ctrl = frame[pos] |
||||||
|
pos += 1 |
||||||
|
if (ctrl & 0x3) == 0x3: |
||||||
|
#(pid,) = struct.unpack("<B", frame[pos]) |
||||||
|
pid = frame[pos] |
||||||
|
# print("PID="+str(pid)) |
||||||
|
pos += 1 |
||||||
|
result += frame[pos:len(frame) - 1] |
||||||
|
elif (ctrl & 0x3) == 0x1: |
||||||
|
# decode_sframe(ctrl, frame, pos) |
||||||
|
print("SFRAME") |
||||||
|
return None |
||||||
|
elif (ctrl & 0x1) == 0x0: |
||||||
|
# decode_iframe(ctrl, frame, pos) |
||||||
|
print("IFRAME") |
||||||
|
return None |
||||||
|
|
||||||
|
return result |
||||||
|
|
||||||
|
|
||||||
|
class SerialParser(): |
||||||
|
'''Simple parser for KISS frames. It handles multiple frames in one packet |
||||||
|
and calls the callback function on each frame''' |
||||||
|
STATE_IDLE = 0 |
||||||
|
STATE_FEND = 1 |
||||||
|
STATE_DATA = 2 |
||||||
|
KISS_FEND = KISS_FEND |
||||||
|
|
||||||
|
def __init__(self, frame_cb=None): |
||||||
|
self.frame_cb = frame_cb |
||||||
|
self.reset() |
||||||
|
|
||||||
|
def reset(self): |
||||||
|
self.state = self.STATE_IDLE |
||||||
|
self.cur_frame = bytearray() |
||||||
|
|
||||||
|
def parse(self, data): |
||||||
|
'''Call parse with a string of one or more characters''' |
||||||
|
for c in data: |
||||||
|
if self.state == self.STATE_IDLE: |
||||||
|
if c == self.KISS_FEND: |
||||||
|
self.cur_frame.append(c) |
||||||
|
self.state = self.STATE_FEND |
||||||
|
elif self.state == self.STATE_FEND: |
||||||
|
if c == self.KISS_FEND: |
||||||
|
self.reset() |
||||||
|
else: |
||||||
|
self.cur_frame.append(c) |
||||||
|
self.state = self.STATE_DATA |
||||||
|
elif self.state == self.STATE_DATA: |
||||||
|
self.cur_frame.append(c) |
||||||
|
if c == self.KISS_FEND: |
||||||
|
# frame complete |
||||||
|
if self.frame_cb: |
||||||
|
self.frame_cb(self.cur_frame) |
||||||
|
self.reset() |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
# Playground for testing |
||||||
|
|
||||||
|
# frame = "\xc0\x00\x82\xa0\xa4\xb0dr`\x9e\x8ar\xa8\x96\x90u\x03\xf0!4725.73NR00939.61E&Experimental LoRa iGate\xc0" |
||||||
|
frame = "\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90q\x03\xf0!4725.51N/00939.86E[322/002/A=001306 Batt=3.99V\xc0" |
||||||
|
|
||||||
|
|
||||||
|
# print(decode_kiss(frame)) |
||||||
|
# encoded = encode_kiss("OE9TKH-8>APRS,RELAY,BLA:!4725.51N/00939.86E[322/002/A=001306 Batt=3") |
||||||
|
# encoded = encode_kiss("OE9TKH-8>APRS,digi-3,digi-2:!4725.51N/00939.86E[322/002/A=001306 Batt=3") |
||||||
|
# print((decode_kiss(encoded))) |
||||||
|
|
||||||
|
# print((decode_kiss("\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90t\xae\x92\x88\x8ab@\x03\x03\xf0}OE9GHV-10>APMI06,TCPIP,OE9TKH-10*:@110104z4726.55N/00950.63E&WX3in1 op. Holger U=14.2V,T=8.8C\xc0"))) |
||||||
|
|
||||||
|
def newframe(frame): |
||||||
|
print(repr(frame)) |
||||||
|
|
||||||
|
|
||||||
|
two_example_frames = "\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :<Ass/Ref> <Freq> <Mode> [call] [comment]{7ba\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/mylast{7bb\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/last{7bc\xc0\xc0\x00\x82\xa0\xa4\xa6@@`\x9e\x8ar\xa8\x96\x90u\x03\xf0}SOTA>APZS16,TCPIP,OE9TKH-10*::OE9TKH-8 :/time(/zone){7bd\xc0" |
||||||
|
sp = SerialParser(newframe) |
||||||
|
sp.parse(two_example_frames) |
@ -0,0 +1,33 @@ |
|||||||
|
[LoRaSettings] |
||||||
|
# Settings for LoRa module |
||||||
|
frequency=433.775 |
||||||
|
preamble=8 |
||||||
|
spreadingFactor=12 |
||||||
|
# Bandwidth: |
||||||
|
# BW7_8 |
||||||
|
# BW10_4 |
||||||
|
# BW15_6 |
||||||
|
# BW20_8 |
||||||
|
# BW31_25 |
||||||
|
# BW41_7 |
||||||
|
# BW62_5 |
||||||
|
# BW125 |
||||||
|
# BW250 |
||||||
|
# BW500 |
||||||
|
bandwidth=BW125 |
||||||
|
# Coding Rate: |
||||||
|
# CR4_5 |
||||||
|
# CR4_6 |
||||||
|
# CR4_7 |
||||||
|
# CR4_8 |
||||||
|
codingrate=CR4_5 |
||||||
|
appendSignalReport=True |
||||||
|
paSelect = 1 |
||||||
|
MaxoutputPower = 15 |
||||||
|
outputPower = 15 |
||||||
|
|
||||||
|
[KISS] |
||||||
|
# Settings for KISS |
||||||
|
TCP_HOST=0.0.0.0 |
||||||
|
TCP_PORT_AX25=10001 |
||||||
|
TCP_PORT_RAW =10002 |
Binary file not shown.
@ -1,7 +1,7 @@ |
|||||||
#!/bin/bash |
#!/bin/bash |
||||||
|
|
||||||
python3 Start_lora-tnc.py & |
/usr/bin/python3 /home/marcel/ham/RPi-LoRa-KISS-TNC/Start_lora-tnc.py & |
||||||
sleep 1 |
sleep 10 |
||||||
sudo socat PTY,raw,echo=0,link=/tmp/kisstnc TCP4:127.0.0.1:10001 & |
sudo /usr/bin/socat PTY,raw,echo=0,link=/tmp/lorakisstnc TCP4:127.0.0.1:10001 & |
||||||
sleep 1 |
sleep 3 |
||||||
sudo kissattach /tmp/kisstnc ax0 & |
sudo /usr/sbin/kissattach /tmp/lorakisstnc ax2 & |
||||||
|
Loading…
Reference in new issue