#!/usr/bin/python
# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from asyncio import QueueEmpty
import traceback
sys . path . insert ( 0 , ' ./pySX127x/ ' )
from pySX127x . SX127x . LoRa import LoRa
from pySX127x . SX127x . constants import *
from pySX127x . SX127x . board_config import BOARD
import time
#import KissHelper
class LoraAprsKissTnc ( LoRa ) :
LORA_APRS_HEADER = b " < \xff \x01 "
# APRS data types
DATA_TYPES_POSITION = b " ! ' /@` "
DATA_TYPE_MESSAGE = b " : "
DATA_TYPE_THIRD_PARTY = b " } "
queue = None
server = None
# init has LoRa APRS default config settings - might be initialized different when creating object with parameters
def __init__ ( self , queue , server , frequency = 433.775 , preamble = 8 , spreadingFactor = 12 , bandwidth = BW . BW125 ,
codingrate = CODING_RATE . CR4_5 , appendSignalReport = True , paSelect = 1 , MaxoutputPower = 15 , outputPower = 15 , verbose = False ) :
# Init SX127x
BOARD . setup ( )
super ( LoraAprsKissTnc , self ) . __init__ ( verbose )
self . queue = queue
self . appendSignalReport = appendSignalReport
self . set_mode ( MODE . SLEEP )
self . set_freq ( frequency )
self . set_preamble ( preamble )
self . set_spreading_factor ( spreadingFactor )
if bandwidth == ' BW7_8 ' :
bandwidth = BW . BW7_8
elif bandwidth == ' BW10_4 ' :
bandwidth = BW . BW10_4
elif bandwidth == ' BW15_6 ' :
bandwidth = BW . BW15_6
elif bandwidth == ' BW20_8 ' :
bandwidth = BW . BW20_8
elif bandwidth == ' BW31_25 ' :
bandwidth = BW . BW31_25
elif bandwidth == ' BW41_7 ' :
bandwidth = BW . BW41_7
elif bandwidth == ' BW62_5 ' :
bandwidth = BW . BW62_5
elif bandwidth == ' BW125 ' :
bandwidth = BW . BW125
elif bandwidth == ' BW250 ' :
bandwidth = BW . BW250
elif bandwidth == ' BW500 ' :
bandwidth = BW . BW500
else :
bandwidth = BW . BW125
self . set_bw ( bandwidth )
self . set_low_data_rate_optim ( True )
if codingrate == ' CR4_5 ' :
codingrate = CODING_RATE . CR4_5
elif codingrate == ' CR4_6 ' :
codingrate = CODING_RATE . CR4_6
elif codingrate == ' CR4_7 ' :
codingrate = CODING_RATE . CR4_7
elif codingrate == ' CR4_8 ' :
codingrate = CODING_RATE . CR4_8
else :
codingrate = CODING_RATE . CR4_5
self . set_coding_rate ( codingrate )
self . set_ocp_trim ( 100 )
self . set_pa_config ( paSelect , MaxoutputPower , outputPower )
self . set_max_payload_length ( 255 )
self . set_dio_mapping ( [ 0 ] * 6 )
self . server = server
self . reset_ptr_rx ( )
self . set_mode ( MODE . RXCONT )
def startListening ( self ) :
try :
while True :
# only transmit if no signal is detected to avoid collisions
if not self . get_modem_status ( ) [ " signal_detected " ] :
# print("RSSI: %idBm" % lora.get_rssi_value())
# FIXME: Add noise floor measurement for telemetry
if not self . queue . empty ( ) :
try :
data = self . queue . get ( block = False )
if self . aprs_data_type ( data ) == self . DATA_TYPE_THIRD_PARTY :
# remove third party thing
data = data [ data . find ( self . DATA_TYPE_THIRD_PARTY ) + 1 : ]
# Add LoRa-APRS header (original, this was indented one position further, only executed when above if-statement was true. Think it should be executed at all times.
data = self . LORA_APRS_HEADER + data
print ( " LoRa TX: " + repr ( data ) )
self . transmit ( data )
except QueueEmpty :
pass
time . sleep ( 0.50 )
except KeyboardInterrupt :
BOARD . teardown ( )
def on_rx_done ( self ) :
payload = self . read_payload ( nocheck = True )
if not payload :
print ( " No Payload! " )
return
rssi = self . get_pkt_rssi_value ( )
snr = self . get_pkt_snr_value ( )
data = bytes ( payload )
print ( " LoRa RX[ %i dBm/ %i dB, %i bytes]: %s " % ( rssi , snr , len ( data ) , repr ( data ) ) )
flags = self . get_irq_flags ( )
if any ( [ flags [ s ] for s in [ ' crc_error ' , ' rx_timeout ' ] ] ) :
print ( " Receive Error, discarding frame. " )
# print(self.get_irq_flags())
self . clear_irq_flags ( RxDone = 1 , PayloadCrcError = 1 , RxTimeout = 1 ) # clear rxdone IRQ flag
self . reset_ptr_rx ( )
self . set_mode ( MODE . RXCONT )
return
if self . server :
# remove LoRa-APRS header if present
if data [ 0 : len ( self . LORA_APRS_HEADER ) ] == self . LORA_APRS_HEADER :
data = data [ len ( self . LORA_APRS_HEADER ) : ]
if self . appendSignalReport :
# Signal report only for certain frames, not messages!
if self . aprs_data_type ( data ) in self . DATA_TYPES_POSITION :
data + = b " RSSI= %i dBm SNR= %i dB " % ( rssi , snr )
self . server . send ( data , { " level " : rssi , " snr " : snr } )
self . clear_irq_flags ( RxDone = 1 ) # clear rxdone IRQ flag
self . reset_ptr_rx ( )
self . set_mode ( MODE . RXCONT )
# self.set_mode(MODE.CAD)
def on_tx_done ( self ) :
print ( " TX DONE " )
self . clear_irq_flags ( TxDone = 1 ) # clear txdone IRQ flag
self . set_dio_mapping ( [ 0 ] * 6 )
self . set_mode ( MODE . RXCONT )
def transmit ( self , data ) :
self . write_payload ( [ c for c in data ] )
self . set_dio_mapping ( [ 1 , 0 , 0 , 0 , 0 , 0 ] )
self . set_mode ( MODE . TX )
def aprs_data_type ( self , lora_aprs_frame ) :
delimiter_position = lora_aprs_frame . find ( b " : " )
try :
return lora_aprs_frame [ delimiter_position + 1 ]
except IndexError :
return " "