#!/usr/bin/python # -*- coding: utf-8 -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import sys from asyncio import QueueEmpty import traceback sys.path.insert(0, './pySX127x/') from pySX127x.SX127x.LoRa import LoRa from pySX127x.SX127x.constants import * from pySX127x.SX127x.board_config import BOARD import time #import KissHelper class LoraAprsKissTnc(LoRa): LORA_APRS_HEADER = b"<\xff\x01" # APRS data types DATA_TYPES_POSITION = b"!'/@`" DATA_TYPE_MESSAGE = b":" DATA_TYPE_THIRD_PARTY = b"}" queue = None server = None # init has LoRa APRS default config settings - might be initialized different when creating object with parameters def __init__(self, queue, server, frequency=433.775, preamble=8, spreadingFactor=12, bandwidth=BW.BW125, codingrate=CODING_RATE.CR4_5, appendSignalReport = True, paSelect = 1, MaxoutputPower = 15, outputPower = 15, verbose=False): # Init SX127x BOARD.setup() super(LoraAprsKissTnc, self).__init__(verbose) self.queue = queue if appendSignalReport == 'False': appendSignalReport = False self.appendSignalReport = appendSignalReport self.set_mode(MODE.SLEEP) self.set_freq(frequency) self.set_preamble(preamble) self.set_spreading_factor(spreadingFactor) if bandwidth == 'BW7_8': bandwidth = BW.BW7_8 elif bandwidth == 'BW10_4': bandwidth = BW.BW10_4 elif bandwidth == 'BW15_6': bandwidth = BW.BW15_6 elif bandwidth == 'BW20_8': bandwidth = BW.BW20_8 elif bandwidth == 'BW31_25': bandwidth = BW.BW31_25 elif bandwidth == 'BW41_7': bandwidth = BW.BW41_7 elif bandwidth == 'BW62_5': bandwidth = BW.BW62_5 elif bandwidth == 'BW125': bandwidth = BW.BW125 elif bandwidth == 'BW250': bandwidth = BW.BW250 elif bandwidth == 'BW500': bandwidth = BW.BW500 else: bandwidth = BW.BW125 self.set_bw(bandwidth) self.set_low_data_rate_optim(True) if codingrate == 'CR4_5': codingrate = CODING_RATE.CR4_5 elif codingrate == 'CR4_6': codingrate = CODING_RATE.CR4_6 elif codingrate == 'CR4_7': codingrate = CODING_RATE.CR4_7 elif codingrate == 'CR4_8': codingrate = CODING_RATE.CR4_8 else: codingrate = CODING_RATE.CR4_5 self.set_coding_rate(codingrate) self.set_ocp_trim(100) self.set_pa_config(paSelect, MaxoutputPower, outputPower) self.set_max_payload_length(255) self.set_dio_mapping([0] * 6) self.server = server self.reset_ptr_rx() self.set_mode(MODE.RXCONT) def startListening(self): try: while True: # only transmit if no signal is detected to avoid collisions if not self.get_modem_status()["signal_detected"]: # print("RSSI: %idBm" % lora.get_rssi_value()) # FIXME: Add noise floor measurement for telemetry if not self.queue.empty(): try: data = self.queue.get(block=False) if self.aprs_data_type(data) == self.DATA_TYPE_THIRD_PARTY: # remove third party thing data = data[data.find(self.DATA_TYPE_THIRD_PARTY) + 1:] # Add LoRa-APRS header (original, this was indented one position further, only executed when above if-statement was true. Think it should be executed at all times. data = self.LORA_APRS_HEADER + data print("LoRa TX: " + repr(data)) self.transmit(data) except QueueEmpty: pass time.sleep(0.50) except KeyboardInterrupt: BOARD.teardown() def on_rx_done(self): payload = self.read_payload(nocheck=True) if not payload: print("No Payload!") return rssi = self.get_pkt_rssi_value() snr = self.get_pkt_snr_value() data = bytes(payload) print("LoRa RX[%idBm/%idB, %ibytes]: %s" %(rssi, snr, len(data), repr(data))) flags = self.get_irq_flags() if any([flags[s] for s in ['crc_error', 'rx_timeout']]): print("Receive Error, discarding frame.") # print(self.get_irq_flags()) self.clear_irq_flags(RxDone=1, PayloadCrcError=1, RxTimeout=1) # clear rxdone IRQ flag self.reset_ptr_rx() self.set_mode(MODE.RXCONT) return if self.server: # remove LoRa-APRS header if present if data[0:len(self.LORA_APRS_HEADER)] == self.LORA_APRS_HEADER: data = data[len(self.LORA_APRS_HEADER):] if self.appendSignalReport: # Signal report only for certain frames, not messages! if self.aprs_data_type(data) in self.DATA_TYPES_POSITION: data += b" RSSI=%idBm SNR=%idB" % (rssi, snr) self.server.send(data, {"level":rssi, "snr":snr}) self.clear_irq_flags(RxDone=1) # clear rxdone IRQ flag self.reset_ptr_rx() self.set_mode(MODE.RXCONT) # self.set_mode(MODE.CAD) def on_tx_done(self): print("TX DONE") self.clear_irq_flags(TxDone=1) # clear txdone IRQ flag self.set_dio_mapping([0] * 6) self.set_mode(MODE.RXCONT) def transmit(self, data): self.write_payload([c for c in data]) self.set_dio_mapping([1, 0, 0, 0, 0, 0]) self.set_mode(MODE.TX) def aprs_data_type(self, lora_aprs_frame): delimiter_position = lora_aprs_frame.find(b":") try: return lora_aprs_frame[delimiter_position + 1] except IndexError: return ""