#!/usr/bin/python3 ''' # A basic APRS iGate and APRS weather station with additional (optional) PE1RXF telemetry support # # This program reads the registers of the PE1RXF weather station via ModBus RTU and sends it as # an APRS WX report over APRS. Additionally, it sends beacons and forwards received APRS messages # to the APRS-IS network. All configurable via a YAML file called pe1rxf_aprs.yml. # # This program also has a PE1RXF APRS telemetry to MQTT bridge, which is configurable via pe1rxf_telemetry.yml # # Copyright (C) 2023-2025 M.T. Konstapel https://meezenest.nl/mees # # This file is part of weather_station # # weather_station is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # weather_station is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with weather_station. If not, see . ''' import sys import time from time import gmtime, strftime import pythonax25 import yaml from yaml.loader import SafeLoader import schedule import aprslib import logging from threading import Thread from weather_station_modbus import WeatherStation from config_reader import config_reader from aprs_telemetry_to_mqtt import aprs_telemetry_to_mqtt main_config_file = "pe1rxf_aprs.yml" telemetry_config_file = "pe1rxf_telemetry.yml" rflog_file = "" # Make Weather data global so scheduled task can use it WxData = {} APRSIS = [] aprsis_data = ['', '',False] # Global variable for storing received APRSIS frames. Shared between threads. [0]=from_call, [1]=payload, [2]=token (set by thread, reset by main loop) axport = [] axdevice = [] axaddress = [] class aprs_status: nr_of_ports = 0 pass aprs = aprs_status() # AX.25 stuff def setup_ax25(): # Check if there's any active AX25 port print("\nAvailable AX.25 ports:") current_port = 0; port_nr = pythonax25.config_load_ports() aprs.nr_of_ports = port_nr if port_nr > 0: # Get the device name of the first port axport.append(pythonax25.config_get_first_port()) axdevice.append(pythonax25.config_get_device(axport[current_port])) axaddress.append(pythonax25.config_get_address(axport[current_port])) print (axport[current_port], axdevice[current_port], axaddress[current_port]) current_port = current_port + 1 while port_nr - current_port > 0: axport.append(pythonax25.config_get_next_port(axport[current_port-1])) axdevice.append(pythonax25.config_get_device(axport[current_port])) axaddress.append(pythonax25.config_get_address(axport[current_port])) print (axport[current_port], axdevice[current_port], axaddress[current_port]) current_port = current_port + 1 else: print("No AX.25 ports found.") sys.exit() # Initiate a PF_PACKET socket (RX) rx_socket = pythonax25.packet_socket() return rx_socket def setup_old__ax25(): # Check if there's any active AX25 port print("\nAvailable AX.25 ports:") current_port = 0; port_nr = pythonax25.config_load_ports() aprs.nr_of_ports = port_nr if port_nr > 0: # Get the device name of the first port axport.append(pythonax25.config_get_first_port()) axdevice.append(pythonax25.config_get_device(axport[current_port])) axaddress.append(pythonax25.config_get_address(axport[current_port])) print (axport[current_port], axdevice[current_port], axaddress[current_port]) current_port = current_port + 1 while port_nr - current_port > 0: axport.append(pythonax25.config_get_next_port(axport[current_port-1])) axdevice.append(pythonax25.config_get_device(axport[current_port])) axaddress.append(pythonax25.config_get_address(axport[current_port])) print (axport[current_port], axdevice[current_port], axaddress[current_port]) current_port = current_port + 1 else: print("No AX.25 ports found.") sys.exit() # Initiate a PF_PACKET socket (RX) rx_socket = pythonax25.packet_socket() return rx_socket def receive_ax25(rx_socket): # Blocking receive packet, 10 ms timeout receive = pythonax25.packet_rx(rx_socket,10) return receive def parsePacket(string): # Split the address and payload separated by APRS PID buffer = string.split(b'\x03\xf0') address = buffer[0] #Define empty list in case packet does not has digipeaters in path digipeaters = [] # Check if the first byte indicates it is a data packet if address[0] == 0: # Cut the first byte and feed it to the address parser listAddress = getAllAddress(address[1:]) if listAddress != 0: # Get the source, destination, and digipeaters from the address list source = listAddress[1] destination = listAddress[0] digipeaters = listAddress[2:] # Occasionally a bad packet is received causng the program to crash with an "IndexError: list index out of range". Fix: check if index IS out of range before copying it to payload if len(buffer) > 1: payload = buffer[1] else: payload = 'NOT VALID' else: # If there was an error decoding the address we return save values which will be ignored by the rest of the program source = 'NOCALL' destination = 'NOCALL' digipeaters = 'NOCALL' payload = 'NOT VALID' else: # If there was an error decoding the address we return save values which will be ignored by the rest of the program source = 'NOCALL' destination = 'NOCALL' digipeaters = 'NOCALL' payload = 'NOT VALID' #raise Exception('Not a data packet') return (source, destination, digipeaters, payload) def getAllAddress(packetAddress): allAddress = [] addressSize = 7 # Check if the networked address string is valid if (len(packetAddress) % 7) == 0: for i in range(0, len(packetAddress), addressSize): address = "" # First extract CALL for pos in range(6): address = address + ( chr(packetAddress[i+pos]>>1) ) # Remove spaces address= address.rstrip() # Than extract SSID ssid = packetAddress[i+6] & 0b00011110 ssid = ssid >> 1 if ssid != 0: address = address + '-' + str(ssid) # Has been repeated flag set? Check only for digipeaters, not source and destination if i != 0 and i != 7: if packetAddress[i+6] & 0b10000000: address = address + '*' #print (address) allAddress.append(address) # Create a list of all address in ASCII form #try: # allAddress = [pythonax25.network_to_ascii(packetAddress[i:i+addressSize]) # for i in range(0, len(packetAddress), addressSize)] #except: # allAddress = 0 #print (allAddress) return allAddress else: # Received a non valid address. Fill return value with NULL so we don't crash allAddress = 0 return allAddress #raise Exception('Error: Address is not a multiple of 7') def process_aprsis(packet): global aprsis_data # Setup MQTT connection mqtt_connection2 = aprs_telemetry_to_mqtt(telemetry_config_file) mqtt_connection2.read_settings() #print("Received APRSIS") timestamp = time.strftime("%Y-%m-%d %H:%M:%S", gmtime()) if rflog_file == 0: return 0 string = timestamp + ' ' + 'APRSIS ' + ' R ' + str(packet, 'latin-1') + '\n' # Check if frame is a message to us. If so send it to mqtt # Split packet at first occurance of ':', which seperates the header from the payload aprsis_payload = str(packet, 'latin-1').split(':', 1) # We should have two substrings if len(aprsis_payload) == 2: # extract from_call aprsis_data[0] = aprsis_payload[0].split('>',1) aprsis_data[1] = aprsis_payload[1] aprsis_data[2] = True #print ("In thread:") #print (aprsis_data[0]) #print (aprsis_data[1]) # write APRSIS string to log file try: with open(rflog_file, "a") as logfile: logfile.write(string) except: return 0 else: return 1 def send_aprsis(srcCall, dest, digi, msg): # Bug, once the program crashed when accessing msg[0] because of "error IndexError: string index out of range". This should never happen, because the function parsePacket checks for this. # But id did happen, don't know why. But to prevent is from happening again, we check it here as well: try: tmp = msg[0] except: print("Empty APRS message, nothing to do.") return 1 try: APRSIS.connect() except: print("Could not connect to APRS-IS network.") else: if digi == 0 or digi == "": message = srcCall + '>' + dest + ':' + msg else: # Drop those packets we shouldn't send to APRS-IS if (',TCP' in digi): # drop packets sourced from internet print(">>> Internet packet not igated: " + packet) return 1 if ('RFONLY' in digi): print(">>> RFONLY, not igated.") return 1 if ('NOGATE' in digi): print(">>> NOGATE, not igated") return 1 if msg[0] == '}': if ('TCPIP' in msg) or ('TCPXX' in msg): print(">>> Third party packet, not igated") return 1 # TODO: strip third party header and send to igate else: print(">>> Third party packet, not igated") return 1 if msg[0] == '?': print(">>> Query, not igated") return 1 message = srcCall + '>' + dest + ',' + digi + ':' + msg # send a single status message try: APRSIS.sendall(message) #print(message) except: print("Failed to send message to APRS-IS network.") else: log_ax25("APRSIS ", srcCall, dest, digi, msg, 'T') def send_ax25(portCall, srcCall, dest, digi, msg): # Initiate a datagram socket (TX) tx_socket = pythonax25.datagram_socket() res = pythonax25.datagram_bind(tx_socket, srcCall, portCall) #print(res) if digi == 0 or digi == "": res = pythonax25.datagram_tx(tx_socket, dest, msg) #print(timestamp + ' ' + portCall + ' T ' + srcCall + '>' + dest + ':' + msg) else: res = pythonax25.datagram_tx_digi(tx_socket, dest, digi, msg) #print(timestamp + ' ' + portCall + ' T ' + srcCall + '>' + dest + ',' + digi + ':' + msg) #print(res) pythonax25.close_socket(tx_socket) log_ax25(portCall, srcCall, dest, digi, msg, 'T') # If logging is enabled in configuration file (global:log-rf), append APRS frame to log file def log_ax25(portCall, srcCall, dest, digi, msg, mode): timestamp = time.strftime("%Y-%m-%d %H:%M:%S", gmtime()) if rflog_file == 0: return 0 if digi == 0 or digi == "": string = timestamp + ' ' + portCall.ljust(9) + ' ' + mode + ' ' + srcCall + '>' + dest + ':' + msg + '\n' else: string = timestamp + ' ' + portCall.ljust(9) + ' ' + mode + ' ' + srcCall + '>' + dest + ',' + digi + ':' + msg + '\n' try: with open(rflog_file, "a") as logfile: logfile.write(string) except: return 0 else: return 1 def send_aprs_weather_report(weather_settings): global WxData # Convert sensible SI values to freedom units for APRS weather report wind_direction = int(WxData['Wind direction']) wind_speed = int(2.2369 * WxData['Wind speed']) wind_gust = int(2.2369 * WxData['Wind gust']) rain_lasthour = int(3.93700787 * WxData['Rain last hour']) #rain_lasthour = 0 rain_24hour = int(3.93700787 * WxData['Rain last 24 hours']) #rain_24hour = 0 temperature = int(WxData['Temperature'] * 1.8 + 32) humidity = int(WxData['Humidity']) if (humidity == 100): humidity = 0; pressure =int(10 * WxData['Pressure']) luminosity = int(0.0079 * WxData['Luminosity']) # W/m2: various sources give 0.0079 or 0.0083 as an approxmation for sunlight if (luminosity <= 999): APRS_Lum = 'L' else: APRS_Lum = 'l' luminosity = luminosity-1000 # Get date and time timestamp = time.strftime("%d%H%M", gmtime()) # Construct APRS weather report aprs_position = weather_settings['position'] aprs_wx_report = '@' + timestamp + 'z' + weather_settings['position'] + "{:03d}".format(wind_direction) + '/' + "{:03d}".format(wind_speed) + 'g' + "{:03d}".format(wind_gust) + 't' + "{:03d}".format(temperature) + 'r' + "{:03d}".format(rain_lasthour) + 'p' + "{:03d}".format(rain_24hour) + 'h' + "{:02d}".format(humidity) + 'b' + "{:05d}".format(pressure) + APRS_Lum + "{:03d}".format(luminosity) if weather_settings['port'] == 'aprsis': send_aprsis(weather_settings['call'], weather_settings['destination'], weather_settings['digi_path'], aprs_wx_report) else: # Send it # Find call of ax25 port port_call = 0 for position in range(len(axdevice)): if axdevice[position] == weather_settings['port']: port_call = axaddress[position] if (weather_settings['call'] == 0): src_call = port_call else: src_call = weather_settings['call'] send_ax25(port_call, src_call, weather_settings['destination'], weather_settings['digi_path'], aprs_wx_report) def send_aprs_beacon(beacon_settings): beacon_message = beacon_settings['position'] + beacon_settings['message'] if beacon_settings['port'] == 'aprsis': send_aprsis(beacon_settings['call'], beacon_settings['destination'], beacon_settings['digi_path'], beacon_message) else: # Send it # Find call of ax25 port port_call = 0 for position in range(len(axdevice)): if axdevice[position] == beacon_settings['port']: port_call = axaddress[position] if (beacon_settings['call'] == 0): src_call = port_call else: src_call = beacon_settings['call'] send_ax25(port_call, src_call, beacon_settings['destination'], beacon_settings['digi_path'], beacon_message) def send_telemetry(): message = ':' + 'PE1RXF-3 ' + ':' + str(WxData['Wind direction']) + ',' + str(WxData['Wind speed']) + ',' + str(WxData['Wind gust']) + ',' + str(WxData['Rain last hour']) + ',' + str(WxData['Rain last 24 hours']) + ',' + str(WxData['Temperature']) + ',' + str(WxData['Humidity']) + ',' + str(WxData['Pressure']) + ',' + str(WxData['Temp backup']) + ',' + str(WxData['Status bits']) send_ax25('PE1RXF-3', 'PE1RXF-13', "APZMDM", 0, message) def publish_weather_data(mqtt_client): payload = ':' + 'PE1RXF-3 ' + ':' + str(WxData['Wind direction']) + ',' + str(WxData['Wind speed']) + ',' + str(WxData['Wind gust']) + ',' + str(WxData['Rain last hour']) + ',' + str(WxData['Rain last 24 hours']) + ',' + str(WxData['Temperature']) + ',' + str(WxData['Humidity']) + ',' + str(WxData['Pressure']) + ',' + str(WxData['Temp backup']) + ',' + str(WxData['Status bits']) + ',' + str(WxData['Luminosity']) telemetry=mqtt_client.publish_telemetry_message("PE1RXF-13", "ax1", "PE1RXF-3", payload) return 1 def read_weather_station(weather_station): global WxData #print ("Reading registers of weather station.") if weather_station.get_weather_data() == 0: print ('No response from ModBus, even after 5 retries. Keep trying.') else: WxData = weather_station.wx_data print (WxData) def check_heater(weather_station): # Check if heater is off, if so, turn it on if weather_station.wx_data['Status bits'] & 0x4 == 0: weather_station.enable_heater() print("Heater was off, turned it back on gain.") def check_thread_alive(thr): thr.join(timeout=0.0) # returns True if the thread is still running and False, otherwise return thr.is_alive() def run(): global WxData # Fill WxData with some sensible data to start: WxData['ID'] = 0.0 WxData['Wind direction'] = 0.0 WxData['Wind speed'] = 0.0 WxData['Wind gust'] = 0.0 WxData['Rain last hour'] = 0.0 WxData['Rain last 24 hours'] = 0.0 WxData['Temperature'] = 0.0 WxData['Humidity'] = 0.0 WxData['Pressure'] = 0.0 WxData['Luminosity'] = 0.0 WxData['Temp backup'] = 0.0 WxData['Status bits'] = 0.0 # Debug logging for aprslib logging.basicConfig(level=logging.DEBUG) # level=10 loop_counter=0 # Read the main configuration file (pe1rxf_aprs.yml) Configuration = config_reader(main_config_file, telemetry_config_file) if Configuration.read_settings() == 0: sys.exit() global rflog_file rflog_file = Configuration.config_file_settings['global']['log-rf'] print ("Write APRS frames to: " + rflog_file) print ("Read configuration files.") # Setup MQTT connection mqtt_connection = aprs_telemetry_to_mqtt(telemetry_config_file) mqtt_connection.read_settings() rx_socket = setup_ax25() # a valid passcode for the callsign is required in order to send print("Connecting to APRS-IS server") global APRSIS APRSIS = aprslib.IS(Configuration.config_file_settings['aprsis']['call'], passwd=Configuration.config_file_settings['aprsis']['passcode'], port=Configuration.config_file_settings['aprsis']['port']) APRSIS.connect() print("Trying to connect to the weather station via the RS-485 dongle...") try: weather_station = WeatherStation(Configuration.config_file_settings['modbus']['port'], Configuration.config_file_settings['modbus']['address']) except: print("No response from the weather station via the ModBus, even after several retries. Disabling the weather station.") sys.exit(1) # Make program work without weather station else: print("Got response from the weather station. Weather information is available.") # NOTE: Is now done periodically. So when the weather station is unplugged and plugged back in, the heater will be enabled again. #try: # weather_station.enable_heater() #except: # print("No response from the weather station via the ModBus, even after several retries. Disabling the weather station.") # sys.exit(1) # Make program work without weather station! #else: # print("Enabled the heater function on the weather station.") ### # Schedule all periodic transmissions (use a little randomness) ### # Schedule all weather report transmisions for entry in Configuration.config_file_settings['weather']: interval = entry['interval'] if interval != 0: print("Scheduled WX report transmission") interval = interval * 60 # from minutes to seconds schedule.every(interval - 59).to(interval + 59).seconds.do(send_aprs_weather_report, entry) # Schedule all beacon transmisions for entry in Configuration.config_file_settings['beacon']: interval = entry['interval'] if interval != 0: print("Scheduled beacon transmission.") interval = interval * 60 # from minutes to seconds schedule.every(interval - 59).to(interval + 59).seconds.do(send_aprs_beacon, entry) # Schedule check if heater is still on # So when the weather station is unplugged and plugged back in, the heater will be enabled again. schedule.every(10).minutes.do(check_heater, weather_station) # Schedule readout of weather station print("Scheduled readout of weather station.") schedule.every(1).minutes.do(read_weather_station, weather_station) # Schedule telemetry transmision #print("Scheduled telemetry transmission.") #schedule.every(10).minutes.do(send_telemetry) print("Schedule mqtt weather publisher.") interval = mqtt_connection.config_file_settings['global']['weather_report_interval'] if interval != 0: schedule.every(interval).minutes.do(publish_weather_data, mqtt_connection) # Connect to incoming APRS-IS feed # by default `raw` is False, then each line is ran through aprslib.parse() # Set filter on incomming feed APRSIS.set_filter(Configuration.config_file_settings['aprsis']['filter']) # This is a blocking call, should run as seperate thread. Data from aprsis is stored in aprsid_data. [0]=from_call, [1]=payload, [2]=token (set by thread, reset by main loop) # create a thread global aprsis_data thread = Thread(target=APRSIS.consumer, args=(process_aprsis, True, True, True)) # run the thread thread.start() while (1): #print ("Reading registers of weather station.") #if weather_station.get_weather_data() == 0: # print ('No response from ModBus, even after 5 retries. Keep trying.') #else: # WxData = weather_station.wx_data # Scheduler schedule.run_pending() # Check if APRS-IS thread is still running, if not restart it if check_thread_alive(thread) == False: # Set filter on incomming feed APRSIS.set_filter(Configuration.config_file_settings['aprsis']['filter']) # This is a blocking call, should run as seperate thread # create a thread thread = Thread(target=APRSIS.consumer, args=(process_aprsis, True, True, True)) # run the thread thread.start() # Listen on all ax25 ports and send to APRS-IS receive = receive_ax25(rx_socket) for port in range(len(axdevice)): if receive[0][1] == axdevice[port]: #print(receive) source, destination, digipeaters, payload = parsePacket(receive[1]) #print(receive[1]) # Convert byte string to normal string try: payload = payload.decode('latin-1') except: payload = 'NOT VALID' #print("Packet Received by = %s"%axaddress[0]) #print("Source Address = %s"%source) #print("Destination Address = %s"%destination) #print("Digipeaters =") #print(digipeaters) #print("Payload = %s"%payload) #print("") #aprs_frame = axaddress[0] + '>' + destination + ',' + ','.join(digipeaters) + ':' + payload #print (aprs_frame) if payload == 'NOT VALID' or payload == 0: print (">>> Packet not valid, ignored.") else: log_ax25(axaddress[port], source, destination, ','.join(digipeaters), payload, 'R') digipeaters.append('qAR') digipeaters.append(Configuration.config_file_settings['aprsis']['call']) send_aprsis(source, destination, ','.join(digipeaters), payload) # Check if APRS frame is PE1RXF telemetry telemetry=mqtt_connection.publish_telemetry_message(source, axdevice[port], axaddress[port], payload) # Check if APRS frame is a message to us mqtt_connection.publish_aprs_messages(source, axdevice[port], payload) # If APRSIS read thread receives message for us, send it to MQTT if aprsis_data[2] == True: aprsis_data[2] = False print ("In loop:") print (aprsis_data[0]) print (aprsis_data[1]) mqtt_connection.publish_aprs_messages(aprsis_data[0][0], 'APRSIS', aprsis_data[1]) #time.sleep(1) # Short sleep if __name__ == '__main__': sys.stdout = sys.stderr = open('/home/marcel/pe1rxf_aprs_debug.log', 'w') run()