First commit

This commit is contained in:
marcel
2024-02-18 18:22:51 +01:00
commit 711bf9f7dc
20 changed files with 1601 additions and 0 deletions

40
CHANGELOG.md Normal file
View File

@@ -0,0 +1,40 @@
# Changelog
All notable changes to this project will be documented in this file.
Added : for new features.
Changed : for changes in existing functionality.
Deprecated: for soon-to-be removed features.
Removed : for now removed features.
Fixed : for any bug fixes.
Security : in case of vulnerabilities.
## [0.0.1] - pre 2024-02-16
- Had only weather station functionality
## [0.1.0] - 2024-02-16
### Added
- Can now send beacons
- Beacons and weather reports transmit intervals, digi_paths, etc. can be set via a YAML configuration file
- Periodic transmisions are scheduled with +/- one minute randomnes in transmision time
- APRS frames are logged to file (file name can be set in YAML configuration file)
- sending to APRS-IS implemented
## [0.1.1] - 2024-02-17
### Fixed
- Catch errors when connection to APRS-IS fails, so program doesn't crash.
### Added
- connected to APRS-IS feed
## [0.1.1] - 2024-02-17
### Added
- iGate functionality

21
README.md Normal file
View File

@@ -0,0 +1,21 @@
# PE1RXF APRS
A basic Linux APRS iGate and APRS weather station with additional (optional) PE1RXF telemetry support.
## Requirements
- Python3
- minimalmodbus
- retrying
- gps3
- schedule
- aprslib
## License
Copyright (C) 2024 M.T. Konstapel
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

36
ax25.py Normal file
View File

@@ -0,0 +1,36 @@
'''
# A basic APRS iGate and APRS weather station with additional (optional) PE1RXF telemetry support
#
# This program reads the registers of the PE1RXF weather station via ModBus RTU and sends it as
# an APRS WX report over APRS. Additionally, it sends beacons and forwards received APRS messages
# to the APRS-IS network. All configurable via a YAML file called pe1rxf_aprs.yml.
#
# This program also has a PE1RXF APRS telemetry to MQTT bridge, which is configurable via pe1rxf_telemetry.yml
#
# Copyright (C) 2023, 2024 M.T. Konstapel https://meezenest.nl/mees
#
# This file is part of weather_station
#
# weather_station is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weather_station is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with weather_station. If not, see <https://www.gnu.org/licenses/>.
'''
import yaml
from yaml.loader import SafeLoader
class config_reader:
# initiate class: define name configuration files
def __init__(self, main_config_file, telemetry_config_file):
return 0

160
config_reader.py Normal file
View File

@@ -0,0 +1,160 @@
'''
# A basic APRS iGate and APRS weather station with additional (optional) PE1RXF telemetry support
#
# This program reads the registers of the PE1RXF weather station via ModBus RTU and sends it as
# an APRS WX report over APRS. Additionally, it sends beacons and forwards received APRS messages
# to the APRS-IS network. All configurable via a YAML file called pe1rxf_aprs.yml.
#
# This program also has a PE1RXF APRS telemetry to MQTT bridge, which is configurable via pe1rxf_telemetry.yml
#
# Copyright (C) 2023, 2024 M.T. Konstapel https://meezenest.nl/mees
#
# This file is part of weather_station
#
# weather_station is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weather_station is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with weather_station. If not, see <https://www.gnu.org/licenses/>.
'''
import yaml
from yaml.loader import SafeLoader
class config_reader:
# initiate class: define name configuration files
def __init__(self, main_config_file, telemetry_config_file):
self.config_file = main_config_file
self.telemetry_file = telemetry_config_file
self.modbus_settings = []
self.aprsis_settings = []
self.weather_settings = []
self.beacon_settings = []
self.telemetry_settings = []
def read_settings(self):
if self.read_config_file() == 0:
return 0
if self.read_telemetry_file() == 0:
return 0
if self.test_global_settings() == 0:
return 0
if self.test_modbus_settings() == 0:
return 0
if self.test_aprsis_settings() == 0:
return 0
if self.test_weather_settings() == 0:
return 0
if self.test_beacon_settings() == 0:
return 0
if self.test_telemetry_settings() == 0:
return 0
return 1
def read_config_file (self):
try:
with open(self.config_file) as f:
self.config_file_settings = yaml.load(f, Loader=SafeLoader)
except:
print ("Configuration file ./" + self.config_file + " not found or syntax error in file.")
return 0
else:
return 1
def read_telemetry_file (self):
try:
with open(self.telemetry_file) as f:
self.telemetry_settings = yaml.load(f, Loader=SafeLoader)
except:
print ("Telemetry configuration file ./" + self.telemetry_file + " not found or syntax error in file.")
return 0
else:
return 1
# Test if all settings are pressebt
def test_global_settings(self):
# Test is all expected settings are present
try:
tmp = self.config_file_settings['global']['log-rf']
except:
print ("Error in the global section of the configuration file.")
return 0
else:
return 1
# Test if all settings are pressebt
def test_modbus_settings(self):
# Test is all expected settings are present
try:
tmp = self.config_file_settings['modbus']['port']
tmp = self.config_file_settings['modbus']['address']
except:
print ("Error in the modbus section of the configuration file.")
return 0
else:
return 1
def test_aprsis_settings(self):
try:
tmp = self.config_file_settings['aprsis']['call']
tmp = self.config_file_settings['aprsis']['passcode']
tmp = self.config_file_settings['aprsis']['server']
tmp = self.config_file_settings['aprsis']['port']
tmp = self.config_file_settings['aprsis']['filter']
except:
print ("Error in the aprsis section of the configuration file.")
return 0
else:
return 1
def test_weather_settings(self):
for entry in self.config_file_settings['weather']:
try:
tmp = entry['port']
tmp = entry['call']
tmp = entry['destination']
tmp = entry['digi_path']
tmp = entry['position']
tmp = entry['interval']
except:
print ("Error in the weather section of the configuration file.")
return 0
else:
return 1
def test_beacon_settings(self):
for entry in self.config_file_settings['beacon']:
try:
tmp = entry['port']
tmp = entry['call']
tmp = entry['destination']
tmp = entry['digi_path']
tmp = entry['position']
tmp = entry['interval']
tmp = entry['message']
except:
print ("Error in the beacon section of the configuration file.")
return 0
else:
return 1
def test_telemetry_settings(self):
return 1

490
pe1rxf_aprs.py Normal file
View File

@@ -0,0 +1,490 @@
'''
# A basic APRS iGate and APRS weather station with additional (optional) PE1RXF telemetry support
#
# This program reads the registers of the PE1RXF weather station via ModBus RTU and sends it as
# an APRS WX report over APRS. Additionally, it sends beacons and forwards received APRS messages
# to the APRS-IS network. All configurable via a YAML file called pe1rxf_aprs.yml.
#
# This program also has a PE1RXF APRS telemetry to MQTT bridge, which is configurable via pe1rxf_telemetry.yml
#
# Copyright (C) 2023, 2024 M.T. Konstapel https://meezenest.nl/mees
#
# This file is part of weather_station
#
# weather_station is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weather_station is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with weather_station. If not, see <https://www.gnu.org/licenses/>.
'''
import sys
import time
from time import gmtime, strftime
import pythonax25
import yaml
from yaml.loader import SafeLoader
import schedule
import aprslib
import logging
from threading import Thread
from weather_station_modbus import WeatherStation
from config_reader import config_reader
main_config_file = "pe1rxf_aprs.yml"
telemetry_config_file = "pe1rxf_telemetry.yml"
rflog_file = ""
# Make Weather data global so scheduled task can use it
WxData = []
APRSIS = []
axport = []
axdevice = []
axaddress = []
class aprs_status:
nr_of_ports = 0
pass
aprs = aprs_status()
# AX.25 stuff
def setup_ax25():
# Check if there's any active AX25 port
print("\nAvailable AX.25 ports:")
current_port = 0;
port_nr = pythonax25.config_load_ports()
aprs.nr_of_ports = port_nr
if port_nr > 0:
# Get the device name of the first port
axport.append(pythonax25.config_get_first_port())
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
while port_nr - current_port > 0:
axport.append(pythonax25.config_get_next_port(axport[current_port-1]))
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
else:
print("No AX.25 ports found.")
sys.exit()
# Initiate a PF_PACKET socket (RX)
rx_socket = pythonax25.packet_socket()
return rx_socket
def setup_old__ax25():
# Check if there's any active AX25 port
print("\nAvailable AX.25 ports:")
current_port = 0;
port_nr = pythonax25.config_load_ports()
aprs.nr_of_ports = port_nr
if port_nr > 0:
# Get the device name of the first port
axport.append(pythonax25.config_get_first_port())
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
while port_nr - current_port > 0:
axport.append(pythonax25.config_get_next_port(axport[current_port-1]))
axdevice.append(pythonax25.config_get_device(axport[current_port]))
axaddress.append(pythonax25.config_get_address(axport[current_port]))
print (axport[current_port], axdevice[current_port], axaddress[current_port])
current_port = current_port + 1
else:
print("No AX.25 ports found.")
sys.exit()
# Initiate a PF_PACKET socket (RX)
rx_socket = pythonax25.packet_socket()
return rx_socket
def receive_ax25(rx_socket):
# Blocking receive packet, 10 ms timeout
receive = pythonax25.packet_rx(rx_socket,10)
return receive
def parsePacket(string):
# Split the address and payload separated by APRS PID
buffer = string.split(b'\x03\xf0')
address = buffer[0]
# Check if the first byte indicates it is a data packet
if address[0] == 0:
# Cut the first byte and feed it to the address parser
listAddress = getAllAddress(address[1:])
if listAddress != 0:
# Get the source, destination, and digipeaters from the address list
source = listAddress[1]
destination = listAddress[0]
digipeaters = listAddress[2:]
# Occasionally a bad packet is received causng the program to crash with an "IndexError: list index out of range". Fix: check if index IS out of range before copying it to payload
if len(buffer) > 1:
payload = buffer[1]
else:
payload = 'NOT VALID'
else:
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
source = 'NOCALL'
destination = 'NOCALL'
digipeaters = 'NOCALL'
payload = 'NOT VALID'
else:
# If there was an error decoding the address we return save values which will be ignored by the rest of the program
source = 'NOCALL'
destination = 'NOCALL'
digipeaters = 'NOCALL'
payload = 'NOT VALID'
#raise Exception('Not a data packet')
return (source, destination, digipeaters, payload)
def getAllAddress(packetAddress):
allAddress = []
addressSize = 7
# Check if the networked address string is valid
if (len(packetAddress) % 7) == 0:
for i in range(0, len(packetAddress), addressSize):
address = ""
# First extract CALL
for pos in range(6):
address = address + ( chr(packetAddress[i+pos]>>1) )
# Remove spaces
address= address.rstrip()
# Than extract SSID
ssid = packetAddress[i+6] & 0b00011110
ssid = ssid >> 1
if ssid != 0:
address = address + '-' + str(ssid)
# Has been repeated flag set? Check only for digipeaters, not source and destination
if i != 0 and i != 7:
if packetAddress[i+6] & 0b10000000:
address = address + '*'
#print (address)
allAddress.append(address)
# Create a list of all address in ASCII form
#try:
# allAddress = [pythonax25.network_to_ascii(packetAddress[i:i+addressSize])
# for i in range(0, len(packetAddress), addressSize)]
#except:
# allAddress = 0
#print (allAddress)
return allAddress
else:
# Received a non valid address. Fill return value with NULL so we don't crash
allAddress = 0
return allAddress
#raise Exception('Error: Address is not a multiple of 7')
def process_aprsis(packet):
#print("Received APRSIS")
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", gmtime())
if rflog_file == 0:
return 0
string = timestamp + ' ' + 'APRSIS ' + ' R ' + str(packet, 'utf-8') + '\n'
try:
with open(rflog_file, "a") as logfile:
logfile.write(string)
except:
return 0
else:
return 1
def send_aprsis(srcCall, dest, digi, msg):
try:
APRSIS.connect()
except:
print("Could not connect to APRS-IS network.")
else:
if digi == 0 or digi == "":
message = srcCall + '>' + dest + ':' + msg
else:
# Drop those packets we shouldn't send to APRS-IS
if (',TCP' in digi): # drop packets sourced from internet
print(">>> Internet packet not igated: " + packet)
return 1
if ('RFONLY' in digi):
print(">>> RFONLY, not igated.")
return 1
if ('NOGATE' in digi):
print(">>> NOGATE, not igated")
return 1
message = srcCall + '>' + dest + ',' + digi + ':' + msg
# send a single status message
try:
APRSIS.sendall(message)
#print(message)
except:
print("Failed to send message to APRS-IS network.")
else:
log_ax25("APRSIS ", srcCall, dest, digi, msg, 'T')
def send_ax25(portCall, srcCall, dest, digi, msg):
# Initiate a datagram socket (TX)
tx_socket = pythonax25.datagram_socket()
res = pythonax25.datagram_bind(tx_socket, srcCall, portCall)
#print(res)
if digi == 0 or digi == "":
res = pythonax25.datagram_tx(tx_socket, dest, msg)
#print(timestamp + ' ' + portCall + ' T ' + srcCall + '>' + dest + ':' + msg)
else:
res = pythonax25.datagram_tx_digi(tx_socket, dest, digi, msg)
#print(timestamp + ' ' + portCall + ' T ' + srcCall + '>' + dest + ',' + digi + ':' + msg)
#print(res)
pythonax25.close_socket(tx_socket)
log_ax25(portCall, srcCall, dest, digi, msg, 'T')
# If logging is enabled in configuration file (global:log-rf), append APRS frame to log file
def log_ax25(portCall, srcCall, dest, digi, msg, mode):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", gmtime())
if rflog_file == 0:
return 0
if digi == 0 or digi == "":
string = timestamp + ' ' + portCall.ljust(9) + ' ' + mode + ' ' + srcCall + '>' + dest + ':' + msg + '\n'
else:
string = timestamp + ' ' + portCall.ljust(9) + ' ' + mode + ' ' + srcCall + '>' + dest + ',' + digi + ':' + msg + '\n'
try:
with open(rflog_file, "a") as logfile:
logfile.write(string)
except:
return 0
else:
return 1
def send_aprs_weather_report(weather_settings):
global WxData
# Convert sensible SI values to freedom units for APRS weather report
wind_direction = int(WxData['Wind direction'])
wind_speed = int(2.2369 * WxData['Wind speed'])
wind_gust = int(2.2369 * WxData['Wind gust'])
#rain_lasthour = int(3.93700787 * WxData['Rain last hour'])
rain_lasthour = 0
#rain_24hour = int(3.93700787 * WxData['Rain last 24 hours'])
rain_24hour = 0
temperature = int(WxData['Temperature'] * 1.8 + 32)
humidity = int(WxData['Humidity'])
if (humidity == 100):
humidity = 0;
pressure =int(10 * WxData['Pressure'])
# Get date and time
timestamp = time.strftime("%d%H%M", gmtime())
# Construct APRS weather report
aprs_position = weather_settings['position']
aprs_wx_report = '@' + timestamp + 'z' + weather_settings['position'] + "{:03d}".format(wind_direction) + '/' + "{:03d}".format(wind_speed) + 'g' + "{:03d}".format(wind_gust) + 't' + "{:03d}".format(temperature) + 'r' + "{:03d}".format(rain_lasthour) + 'p' + "{:03d}".format(rain_24hour) + 'h' + "{:02d}".format(humidity) + 'b' + "{:05d}".format(pressure)
if weather_settings['port'] == 'aprsis':
send_aprsis(weather_settings['call'], weather_settings['destination'], weather_settings['digi_path'], aprs_wx_report)
else:
# Send it
# Find call of ax25 port
port_call = 0
for position in range(len(axdevice)):
if axdevice[position] == weather_settings['port']:
port_call = axaddress[position]
if (weather_settings['call'] == 0):
src_call = port_call
else:
src_call = weather_settings['call']
send_ax25(port_call, src_call, weather_settings['destination'], weather_settings['digi_path'], aprs_wx_report)
def send_aprs_beacon(beacon_settings):
beacon_message = beacon_settings['position'] + beacon_settings['message']
if beacon_settings['port'] == 'aprsis':
send_aprsis(beacon_settings['call'], beacon_settings['destination'], beacon_settings['digi_path'], beacon_message)
else:
# Send it
# Find call of ax25 port
port_call = 0
for position in range(len(axdevice)):
if axdevice[position] == beacon_settings['port']:
port_call = axaddress[position]
if (beacon_settings['call'] == 0):
src_call = port_call
else:
src_call = beacon_settings['call']
send_ax25(port_call, src_call, beacon_settings['destination'], beacon_settings['digi_path'], beacon_message)
def send_telemetry():
message = ':' + 'PE1RXF-3 ' + ':' + str(WxData['Wind direction']) + ',' + str(WxData['Wind speed']) + ',' + str(WxData['Wind gust']) + ',' + str(WxData['Rain last hour']) + ',' + str(WxData['Rain last 24 hours']) + ',' + str(WxData['Temperature']) + ',' + str(WxData['Humidity']) + ',' + str(WxData['Pressure']) + ',' + str(WxData['Temp backup']) + ',' + str(WxData['Status bits'])
send_ax25('PE1RXF-3', 'PE1RXF-13', "APZMDM", 0, message)
def check_heater(weather_station):
# Check if heater is off, if so, turn it on
if weather_station.wx_data['Status bits'] & 0x4 == 0:
weather_station.enable_heater()
print("Heater was off, turned it back on gain.")
def run():
global WxData
# Debug logging for aprslib
logging.basicConfig(level=logging.DEBUG) # level=10
loop_counter=0
# Read the main configuration file (pe1rxf_aprs.yml)
Configuration = config_reader(main_config_file, telemetry_config_file)
if Configuration.read_settings() == 0:
sys.exit()
global rflog_file
rflog_file = Configuration.config_file_settings['global']['log-rf']
print ("Write APRS frames to: " + rflog_file)
print ("Read configuration files.")
rx_socket = setup_ax25()
# a valid passcode for the callsign is required in order to send
print("Connecting to APRS-IS server")
global APRSIS
APRSIS = aprslib.IS(Configuration.config_file_settings['aprsis']['call'], passwd=Configuration.config_file_settings['aprsis']['passcode'], port=Configuration.config_file_settings['aprsis']['port'])
APRSIS.connect()
print("Trying to connect to the weather station via the RS-485 dongle...")
try:
weather_station = WeatherStation(Configuration.config_file_settings['modbus']['port'], Configuration.config_file_settings['modbus']['address'])
except:
print("No response from the weather station via the ModBus, even after several retries. Disabling the weather station.")
sys.exit(1) # Make program work without weather station
else:
print("Got response from the weather station. Weather information is available.")
# NOTE: Should be done periodically! So when the weather station is unplugged and plugged back in, the heater will be enabled again.
try:
weather_station.enable_heater()
except:
print("No response from the weather station via the ModBus, even after several retries. Disabling the weather station.")
sys.exit(1) # Make program work without weather station!
else:
print("Enabled the heater function on the weather station.")
###
# Schedule all periodic transmissions (use a little randomness)
###
# Schedule all weather report transmisions
for entry in Configuration.config_file_settings['weather']:
interval = entry['interval']
if interval != 0:
print("Scheduled WX report transmission")
interval = interval * 60 # from minutes to seconds
schedule.every(interval - 59).to(interval + 59).seconds.do(send_aprs_weather_report, entry)
# Schedule all beacon transmisions
for entry in Configuration.config_file_settings['beacon']:
interval = entry['interval']
if interval != 0:
print("Scheduled beacon transmission.")
interval = interval * 60 # from minutes to seconds
schedule.every(interval - 59).to(interval + 59).seconds.do(send_aprs_beacon, entry)
# Schedule telemetry transmision
print("Scheduled telemetry transmission.")
schedule.every(10).minutes.do(send_telemetry)
# ScheduleL check if heater is still on
schedule.every(10).minutes.do(check_heater, weather_station)
# Connect to incoming APRS-IS feed
# by default `raw` is False, then each line is ran through aprslib.parse()
# Set filter on incomming feed
APRSIS.set_filter(Configuration.config_file_settings['aprsis']['filter'])
# This is a blocking call, should run as seperate thread
# create a thread
thread = Thread(target=APRSIS.consumer, args=(process_aprsis, True, True, True))
# run the thread
thread.start()
while (1):
#print ("Reading registers of weather station.")
if weather_station.get_weather_data() == 0:
print ('No response from ModBus, even after 5 retries. Keep trying.')
else:
WxData = weather_station.wx_data
schedule.run_pending()
receive = receive_ax25(rx_socket)
for port in range(len(axdevice)):
if receive[0][1] == axdevice[port]:
#print(receive)
source, destination, digipeaters, payload = parsePacket(receive[1])
#print(receive[1])
# bug UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf8 in position 47: invalid start byte
try:
payload = payload.decode()
except:
payload = 'NOT VALID'
#print("Packet Received by = %s"%axaddress[0])
#print("Source Address = %s"%source)
#print("Destination Address = %s"%destination)
#print("Digipeaters =")
#print(digipeaters)
#print("Payload = %s"%payload)
#print("")
#aprs_frame = axaddress[0] + '>' + destination + ',' + ','.join(digipeaters) + ':' + payload
#print (aprs_frame)
log_ax25(axaddress[port], source, destination, ','.join(digipeaters), payload, 'R')
digipeaters.append('qAR')
digipeaters.append(Configuration.config_file_settings['aprsis']['call'])
send_aprsis(source, destination, ','.join(digipeaters), payload)
#telemetry=process_message(source, port, payload, client)
#time.sleep(1) # Short sleep
if __name__ == '__main__':
sys.stdout = sys.stderr = open('/home/marcel/pe1rxf_aprs_debug.log', 'w')
run()

89
pe1rxf_aprs.yml Normal file
View File

@@ -0,0 +1,89 @@
# This is the main configuration file for the PE1RXF APRS digipeater, igate and weather station software
#
# The settings for the PE1RXF telemetry server are configured in file pe1rxf_telemetry.yml
#
# NOTE: At the start, the program randomizes the starting time of every individual periodic transmission,
# so even if all intervals are equal, the transmissions are not at the same time, but rather spread over time.
#
# This section (ax25) is being depricated
ax25:
call: PE1RXF-13 # Call from which transmissions are made
destination: APZMDM # APRS destination
telemetry_port: ax1 # Linux AX.25 port to which telemetry is sent
telemetry_digi_path: 0 # Digipeater path for telemetry messages (0 = no path)
telemetry_interval: 5 # Time between telemetry transmissions
telemetry_server: PE1RXF-3 # PE1RXF telemetry server call
weather_report_port: ax0 # Linux AX.25 port to which telemetry is sent
weather_report_digi_path: WIDE2-2 # Digipeater path for weather reports (0 = no path)
weather_report_interval: 10 # Time between weather report transmissions
# Global settings
global:
log-rf: /home/marcel/test/pe1rxf_aprs-rf.log # Log RF traffic to file (0=no logging)
modbus:
port: /dev/serial/by-path/platform-3f980000.usb-usb-0:1.1:1.0-port0 # USB port to which RS-485 dongle is connected
address: 14 # ModBus address of weather station
# APRS-IS section
aprsis:
call: PE1RXF-1 # Amateur radio call
passcode: 19123 # APRS-IS passcode for the call
server: euro.aprs2.net # APRS-IS server to connect to (if set to 0 forwarding to the APRS-IS network is disabled) NOT YET FUNCTIONING
port: 14580 # APRS-IS server port
filter: "b/PE1RXF* g/PE1RXF*" # APRS-IS incomming feed filter
# APRS weather station section
weather:
- port: ax0 # Linux AX.25 port to which APRS weather report is sent
call: PE1RXF-13 # Call from which transmissions are made (can be a different call from the call assigned to the AX.25 port)
destination: APZMDM # APRS destination
digi_path: WIDE2-2 # Digipeater path for weather reports (0 = no path)
position: 5302.76N/00707.85E_ # The position string for the weather station
interval: 10 # Time between weather report transmissions (0 = disable)
- port: ax1
call: PE1RXF-13
destination: APZMDM
digi_path: WIDE2-2
position: 5302.76N/00707.85E_
interval: 10
- port: aprsis
call: PE1RXF-13
destination: APZMDM
digi_path: 0
position: 5302.76N/00707.85E_
interval: 10
# APRS beacon section
beacon:
- port: ax0 # The AX.25 port on which to transmit (use aprsis for beaconing to the internet via APRS-IS, set to 0 if you want to use the call assigned to the port in /etc/ax25/axports)
call: PE1RXF-1 # Call from which transmissions are made (can be a different call from the call assigned to the AX.25 port)
destination: APRX29 # APRS destination
digi_path: WIDE2-1 # Specifie the digipeater path (best practise is to use WIDE2-1, WIDE2-2 or set to 0 for no path)
position: "!5302.78NR00707.91E&" # The position string for the beacon (better to put this string between parentheses)
message: APRS RX iGATE 144.800MHz # The beacon text
interval: 30 # Beacon interval in minutes
- port: ax1
call: PE1RXF-3
destination: APRX29
digi_path: WIDE2-1
position: "!5302.78NL00707.91E&"
message: LoRa APRS RX iGATE 433.775MHz
interval: 30
- port: aprsis
call: PE1RXF-1
destination: APRX29
digi_path: 0
position: "!5302.78NR00707.91E&"
message: APRS RX iGATE 144.800MHz
interval: 10
- port: aprsis
call: PE1RXF-3
destination: APRX29
digi_path: 0
position: "!5302.78NL00707.91E&"
message: LoRa APRS RX iGATE 433.775MHz
interval: 10

5
pe1rxf_telemetry.yml Normal file
View File

@@ -0,0 +1,5 @@
# The settings for the PE1RXF telemetry server are configured in this file
#
# NOTE: At the start, the program randomizes the starting time of every individual periodic transmission,
# so even if all intervals are equal, the transmissions are not at the same time, but rather spread over time.
#

115
python-ax25/README.md Normal file
View File

@@ -0,0 +1,115 @@
# python-ax25
Python AX.25 Module for Python3
## Introduction
This is a python module designed for Python3 to access AX.25 features. This module is a C extension that can access the AX.25 interface from the Linux kernel.
This C extension is inspired from pyax25 https://github.com/ha5di/pyax25
## Installing the Module
Clone the Git repository
```
$ git clone https://github.com/josefmtd/python-ax25
```
Install the module by running the install script inside the python-ax25 directory
```
$ cd python-ax25
# ./install.sh
```
## Module Functions
Before using any of the functions, make sure to load all the available ports using `config_load_ports()`
```
pythonax25.config_load_ports()
Returns = number of available ports (int)
```
To get the names of available ports, use the `config_get_first_port` and `config_get_next_port`
```
pythonax25.config_get_first_port()
Returns = name of first port (unicode string)
pythonax25.config_get_next_port(portname)
Returns = name of port after 'portname' (unicode string)
```
To retrieve further information for each available port, use these functions:
1. `config_get_port_name(device)`
2. `config_get_address(portname)`
3. `config_get_device(portname)`
4. `config_get_window(portname)`
5. `config_get_packet_length(portname)`
6. `config_get_baudrate(portname)`
7. `config_get_description(portname)`
To change the callsign from ASCII to network format and vice versa, use the functions `ascii_to_network(callsignascii)` and `network_to_ascii(callsignnetwork)`
```
pythonax25.ascii_to_network(callsignascii)
Returns = callsign in network format (byte literal string)
pythonax25.network_to_ascii(callsignnetwork)
Returns = callsign in ascii format (unicode string)
```
For receiving AX.25 packets, the packet socket is mostly used in C programs. Start a socket by using `packet_socket()` and begin receiving by using `packet_rx(fd, timeout)`
```
pythonax25.packet_socket()
Returns = file descriptor (int)
pythonax25.packet_rx(fd, timeout)
Returns = Protocol and Address (tuple of int and string) and packet (byte-literal string)
```
For sending APRS messages, the datagram socket is used. Socket is started by using `datagram_socket()`, bound to a port by using `datagram_bind(fd, srccall, portcall)` and send packets via `datagram_tx(fd, destcall, message)` or `datagram_tx(fd, destcall, digicall, message)`
```
pythonax25.datagram_socket()
Returns = file descriptor (int)
pythonax25.datagram_bind(fd, srccall, destcall)
Returns = result of bind (int)
pythonax25.datagram_tx(fd, destcall, message)
Returns = result of transmission (int)
pythonax25.datagram_tx_digi(fd, destcall, digicall, message)
Returns = result of transmission (int)
```
Closing socket is done by using `close_socket(fd)`
```
pythonax25.close_socket(fd)
Returns = result of close (int)
```
2020 - Josef Matondang

Binary file not shown.

View File

@@ -0,0 +1,65 @@
#!/usr/bin/python3
import pythonax25
def parsePacket(string):
# Split the address and payload separated by APRS PID
buffer = string.split(b'\x03\xf0')
address = buffer[0]
# Check if the first byte indicates it is a data packet
if address[0] is 0:
# Cut the first byte and feed it to the address parser
listAddress = getAllAddress(address[1:])
# Get the source, destination, and digipeaters from the address list
source = listAddress[1]
destination = listAddress[0]
digipeaters = listAddress[2:]
else:
raise Exception('Not a data packet')
payload = buffer[1]
return (source, destination, digipeaters, payload)
def getAllAddress(packetAddress):
addressSize = 7
# Check if the networked address string is valid
if (len(packetAddress) % 7) is 0:
# Create a list of all address in ASCII form
allAddress = [pythonax25.network_to_ascii(packetAddress[i:i+addressSize])
for i in range(0, len(packetAddress), addressSize)]
return allAddress
else:
raise Exception('Error: Address is not a multiple of 7')
def main():
# Check if there's any active AX25 port
if pythonax25.config_load_ports() > 0:
# Get the device name of the first port
axport = pythonax25.config_get_first_port()
axdevice = pythonax25.config_get_device(axport)
axaddress = pythonax25.config_get_address(axport)
else:
exit(0)
# Initiate a PF_PACKET socket
socket = pythonax25.packet_socket()
while True:
# Blocking receive packet, 10 ms timeout
receive = pythonax25.packet_rx(socket,10)
if receive[0][1] == axdevice:
print(receive)
source, destination, digipeaters, payload = parsePacket(receive[1])
print("Packet Received by = %s"%axaddress)
print("Source Address = %s"%source)
print("Destination Address = %s"%destination)
print("Digipeaters =")
print(digipeaters)
print("Payload = %s"%payload)
print("")
else:
continue
main()

View File

@@ -0,0 +1,49 @@
#!/usr/bin/python3
import pythonax25
import time
def main():
# Check if there's any active AX25 port
if pythonax25.config_load_ports() > 0:
# Get the device name of the first port
axport = pythonax25.config_get_first_port()
axdevice = pythonax25.config_get_device(axport)
axaddress = pythonax25.config_get_address(axport)
else:
exit(0)
# Initiate a datagram socket
socket = pythonax25.datagram_socket()
srcCall = 'YD0ABH-13'
portCall = axaddress
res = pythonax25.datagram_bind(socket, srcCall, portCall)
print(res)
dest = 'APZINA'
digi = 'WIDE2-2'
msg = '!0611.08S/10649.35E$ INARad LoRa APRS#CO2=500'
res = pythonax25.datagram_tx_digi(socket, dest, digi, msg)
print(res)
time.sleep(1)
msg = 'T#001,034,034,034,034,000,11111111'
res = pythonax25.datagram_tx_digi(socket, dest, digi, msg)
print(res)
time.sleep(1)
msg = '_07190749c045s055g055t076r001h45b10101'
res = pythonax25.datagram_tx_digi(socket, dest, digi, msg)
print(res)
pythonax25.close_socket(socket)
return res
if __name__ == '__main__':
main()

17
python-ax25/install.sh Executable file
View File

@@ -0,0 +1,17 @@
#!/bin/bash
DIR=`dirname $0`
# Update the system
/usr/bin/apt update
#/usr/bin/apt -y upgrade
# Install the dependencies
/usr/bin/apt -y install libax25 libax25-dev ax25-apps ax25-tools python3-dev
# Install the Python module
${DIR}/setup.py build
${DIR}/setup.py install
# Remove the build
/bin/rm -rf "${DIR}/build"

View File

@@ -0,0 +1,4 @@
Metadata-Version: 2.1
Name: pythonax25
Version: 1.0
Summary: CPython extension for LINUX ax.25 stack

View File

@@ -0,0 +1,7 @@
README.md
pythonax25module.c
setup.py
pythonax25.egg-info/PKG-INFO
pythonax25.egg-info/SOURCES.txt
pythonax25.egg-info/dependency_links.txt
pythonax25.egg-info/top_level.txt

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@
pythonax25

View File

@@ -0,0 +1,352 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/ioctl.h>
#include <netdb.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <syslog.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include <poll.h>
// #include <curses.h>
#include <sys/socket.h>
#include <net/if.h>
#include <net/ethernet.h>
#include <netax25/ax25.h>
#include <netax25/axconfig.h>
#include <netax25/axlib.h>
static PyObject * config_load_ports(PyObject* self, PyObject* args) {
int activePort;
activePort = ax25_config_load_ports();
return PyLong_FromLong(activePort);
}
static PyObject * config_get_first_port(PyObject* self, PyObject* args) {
char *portName;
portName = ax25_config_get_next(NULL);
return Py_BuildValue("s", portName);
}
static PyObject * config_get_next_port(PyObject* self, PyObject* args) {
char *portName;
char *nextPort;
PyArg_ParseTuple(args, "s", &portName);
nextPort = ax25_config_get_next(portName);
return Py_BuildValue("s", nextPort);
}
static PyObject * config_get_port_name(PyObject* self, PyObject* args) {
char *device;
char *portName;
PyArg_ParseTuple(args, "s", &device);
portName = ax25_config_get_name(device);
return Py_BuildValue("s", portName);
}
static PyObject * config_get_address(PyObject* self, PyObject* args) {
char *portName;
char *address;
PyArg_ParseTuple(args, "s", &portName);
address = ax25_config_get_addr(portName);
return Py_BuildValue("s", address);
}
static PyObject * config_get_device(PyObject* self, PyObject* args) {
char *device;
char *portName;
PyArg_ParseTuple(args, "s", &portName);
device = ax25_config_get_dev(portName);
return Py_BuildValue("s", device);
}
static PyObject * config_get_window(PyObject* self, PyObject* args) {
int window;
char *portName;
PyArg_ParseTuple(args, "s", &portName);
window = ax25_config_get_window(portName);
return PyLong_FromLong(window);
}
static PyObject * config_get_packet_length(PyObject* self, PyObject* args) {
int packetLength;
char *portName;
PyArg_ParseTuple(args, "s", &portName);
packetLength = ax25_config_get_paclen(portName);
return PyLong_FromLong(packetLength);
}
static PyObject * config_get_baudrate(PyObject* self, PyObject* args) {
int baudRate;
char *portName;
PyArg_ParseTuple(args, "s", &portName);
baudRate = ax25_config_get_baud(portName);
return Py_BuildValue("i", baudRate);
}
static PyObject * config_get_description(PyObject* self, PyObject* args) {
char *description;
char *portName;
PyArg_ParseTuple(args, "s", &portName);
description = ax25_config_get_desc(portName);
return Py_BuildValue("s", description);
}
static PyObject * aton_entry(PyObject* self, PyObject* args) {
char *callsignNetwork = null_ax25_address.ax25_call;
char *callsignString;
int result;
PyArg_ParseTuple(args, "s", &callsignString);
result = ax25_aton_entry(callsignString, callsignNetwork);
return Py_BuildValue("iy", result, callsignNetwork);
}
static PyObject * ntoa(PyObject* self, PyObject* args) {
static PyObject * callsignPython;
char *callsignNetwork;
char *callsignString;
ax25_address *callsign = &null_ax25_address;
if (!PyArg_ParseTuple(args, "y", &callsignNetwork))
fprintf(stderr, "ERROR: CANNOT ASSIGN\n");
strncpy(callsign->ax25_call, callsignNetwork, 7);
callsignString = ax25_ntoa(callsign);
callsignPython = Py_BuildValue("s", callsignString);
return callsignPython;
}
static PyObject * datagram_socket(PyObject* self, PyObject* args) {
int fileDescriptor;
fileDescriptor = socket(AF_AX25, SOCK_DGRAM, 0);
return PyLong_FromLong(fileDescriptor);
}
static PyObject * datagram_bind(PyObject* self, PyObject* args) {
struct full_sockaddr_ax25 src;
char *portcall, *srccall;
int len, sock, result;
PyArg_ParseTuple(args, "iss", &sock, &srccall, &portcall);
char * addr = malloc(sizeof(char*) * (strlen(srccall) + strlen(portcall) + 2));
sprintf(addr, "%s %s", srccall, portcall);
len = ax25_aton(addr, &src);
free(addr);
// Binding the socket to source
if (bind(sock, (struct sockaddr *)&src, len) == -1) {
result = 1;
}
else {
result = 0;
}
return PyLong_FromLong(result);
}
static PyObject * datagram_tx_digi(PyObject* self, PyObject* args) {
struct full_sockaddr_ax25 dest;
char *destcall = NULL, *digicall = NULL;
char *message;
int dlen, sock, result;
PyArg_ParseTuple(args, "isss", &sock, &destcall, &digicall, &message);
char * addr = malloc(sizeof(char*) * (strlen(destcall) + strlen(digicall) + 2));
sprintf(addr, "%s %s", destcall, digicall);
dlen = ax25_aton(addr, &dest);
free(addr);
// Send a datagram packet to socket
if (sendto(sock, message, strlen(message), 0, (struct sockaddr *)&dest, dlen) == -1) {
result = 1;
}
result = 0;
return PyLong_FromLong(result);
}
static PyObject * datagram_tx(PyObject* self, PyObject* args) {
struct full_sockaddr_ax25 dest;
char *destcall = NULL;
char *message;
int dlen, sock, result;
PyArg_ParseTuple(args, "iss", &sock, &destcall, &message);
dlen = ax25_aton(destcall, &dest);
// Send a datagram packet to socket
if (sendto(sock, message, strlen(message), 0, (struct sockaddr *)&dest, dlen) == -1) {
result = 1;
}
result = 0;
return PyLong_FromLong(result);
}
// Using PF_PACKET Socket
static PyObject * packet_socket(PyObject* self, PyObject* args) {
int fileDescriptor;
fileDescriptor = socket(PF_PACKET, SOCK_PACKET, htons(ETH_P_AX25));
return PyLong_FromLong(fileDescriptor);
}
// Close a socket
static PyObject * close_socket(PyObject* self, PyObject* args) {
int fileDescriptor;
int result;
PyArg_ParseTuple(args, "i", &fileDescriptor);
result = close(fileDescriptor);
return PyLong_FromLong(result);
}
static PyObject * packet_tx(PyObject* self, PyObject* args) {
int fileDescriptor;
int result;
int length;
char *buffer;
char *destination;
struct sockaddr socketAddress;
int addressSize = sizeof(socketAddress);
unsigned char newBuffer[1000];
int bufferLength;
int i;
int k;
unsigned char charBuffer;
PyArg_ParseTuple(args, "isis", &fileDescriptor, &buffer, &length, &destination);
bufferLength = strlen(buffer);
i = 0;
k = 0;
while ( i < bufferLength ) {
charBuffer = (buffer[i++] & 0x0f) << 4;
charBuffer = charBuffer | (buffer[i++] & 0x0f);
newBuffer[k++] = charBuffer;
}
strcpy(socketAddress.sa_data, destination);
socketAddress.sa_family = AF_AX25;
result = sendto(fileDescriptor, newBuffer, k, 0, &socketAddress, addressSize);
return Py_BuildValue("i", result);
}
static PyObject * packet_rx(PyObject* self, PyObject* args) {
int fileDescriptor;
int result;
int addressSize;
int packetSize;
int timeout;
struct sockaddr socketAddress;
struct pollfd pollFileDescriptor;
unsigned char receiveBuffer[1024];
PyArg_ParseTuple(args, "ii", &fileDescriptor, &timeout);
// Poll the socket for an available data
pollFileDescriptor.fd = fileDescriptor;
pollFileDescriptor.events = POLLRDNORM;
result = poll(&pollFileDescriptor, 1, timeout);
// Read all packet received
packetSize = 0;
socketAddress.sa_family = AF_UNSPEC;
strcpy(socketAddress.sa_data, "");
if (result == 1) {
addressSize = sizeof(socketAddress);
packetSize = recvfrom(fileDescriptor, receiveBuffer, sizeof(receiveBuffer),
0, &socketAddress, (socklen_t*)&addressSize);
}
return Py_BuildValue("(is)y#", socketAddress.sa_family, socketAddress.sa_data,
receiveBuffer, packetSize);
}
static PyObject *PythonAx25Error;
//////////////////////////////////////////
// Define methods
//////////////////////////////////////////
static PyMethodDef python_ax25_functions[] = {
{"config_load_ports", config_load_ports, METH_VARARGS, ""},
{"config_get_first_port", config_get_first_port, METH_VARARGS, ""},
{"config_get_next_port", config_get_next_port, METH_VARARGS, ""},
{"config_get_port_name", config_get_port_name, METH_VARARGS, ""},
{"config_get_address", config_get_address, METH_VARARGS, ""},
{"config_get_device", config_get_device, METH_VARARGS, ""},
{"config_get_window", config_get_window, METH_VARARGS, ""},
{"config_get_packet_length", config_get_packet_length, METH_VARARGS, ""},
{"config_get_baudrate", config_get_baudrate, METH_VARARGS, ""},
{"config_get_description", config_get_description, METH_VARARGS, ""},
{"network_to_ascii", ntoa, METH_VARARGS, ""},
{"ascii_to_network", aton_entry, METH_VARARGS, ""},
{"datagram_socket", datagram_socket, METH_VARARGS, ""},
{"datagram_bind", datagram_bind, METH_VARARGS, ""},
{"datagram_tx_digi", datagram_tx_digi, METH_VARARGS, ""},
{"datagram_tx", datagram_tx, METH_VARARGS, ""},
{"packet_socket", packet_socket, METH_VARARGS, ""},
{"packet_rx", packet_rx, METH_VARARGS, ""},
{"packet_tx", packet_tx, METH_VARARGS, ""},
{"close_socket", close_socket, METH_VARARGS, ""},
{NULL, NULL, 0, NULL}
};
// Initialize module
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"pythonax25",
"This is a python module for ax.25",
-1,
python_ax25_functions,
NULL,
NULL,
NULL,
NULL,
};
PyMODINIT_FUNC
PyInit_pythonax25(void) {
PyObject * m;
m = PyModule_Create(&moduledef);
if (m == NULL)
return NULL;
PythonAx25Error = PyErr_NewException("pythonax25.error", NULL, NULL);
Py_INCREF(PythonAx25Error);
PyModule_AddObject(m, "error", PythonAx25Error);
return m;
}

10
python-ax25/setup.py Executable file
View File

@@ -0,0 +1,10 @@
#!/usr/bin/python3
from distutils.core import setup, Extension
module1 = Extension('pythonax25', libraries = ['ax25', 'ax25io'], sources = ['pythonax25module.c'])
setup (name = 'pythonax25',
version = '1.0',
description = 'CPython extension for LINUX ax.25 stack',
ext_modules = [module1])

5
start_weater_station.sh Executable file
View File

@@ -0,0 +1,5 @@
#!/bin/bash
# Start weather_station software
/usr/bin/python3 /home/marcel/ham/weather_station/weather_station_rs485_client.py -c /home/marcel/ham/weather_station/config.yml &

134
weather_station_modbus.py Normal file
View File

@@ -0,0 +1,134 @@
""""
ModBus control routines
Copyright (C) 2023 M.T. Konstapel
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import minimalmodbus
import serial
from retrying import retry
#from epevermodbus.extract_bits import extract_bits
class WeatherStation(minimalmodbus.Instrument):
"""Instrument class for Epever Charge Controllers.
Args:
* portname (str): port name
* slaveaddress (int): slave address in the range 1 to 247
"""
@retry(wait_fixed=10000, stop_max_attempt_number=5)
def __init__(self, portname, slaveaddress):
minimalmodbus.Instrument.__init__(self, portname, slaveaddress)
self.serial.baudrate = 9600
self.serial.bytesize = 8
self.serial.parity = serial.PARITY_NONE
self.serial.stopbits = 1
self.serial.timeout = 1
self.mode = minimalmodbus.MODE_RTU
self.clear_buffers_before_each_transaction = True
self.wx_data = []
@retry(wait_fixed=200, stop_max_attempt_number=5)
def retriable_read_register(
self, registeraddress, number_of_decimals, functioncode, signed=False
):
return self.read_register(
registeraddress, number_of_decimals, functioncode, signed
)
@retry(wait_fixed=200, stop_max_attempt_number=5)
def retriable_read_bit(self, registeraddress, functioncode):
return self.read_bit(registeraddress, functioncode)
@retry(wait_fixed=200, stop_max_attempt_number=5)
def retriable_write_bit(self, registeraddress, data, functioncode):
return self.write_bit(registeraddress, data, functioncode)
#Address range 0x3000
def get_id(self):
"""PV array rated voltage"""
return self.retriable_read_register(0, 0, 4)
def get_wind_direction(self):
"""PV array rated current"""
return self.retriable_read_register(1, 1, 4)
def get_wind_speedl(self):
"""PV array rated power (low 16 bits)"""
return self.retriable_read_register(2, 2, 4)
def get_wind_gust(self):
"""PV array rated power (high 16 bits)"""
return self.retriable_read_register(3, 2, 4)
def get_temperature(self):
"""Rated Battery's voltage"""
return self.retriable_read_register(4, 2, 4, True)
def get_rain(self):
"""Rated charging current to battery"""
return self.retriable_read_register(5, 2, 4)
def get_rain_last24(self):
"""Rated charging power to battery (low 16 bits)"""
return self.retriable_read_register(6, 2, 4)
def get_rain_since_midnight(self):
"""Charging equipment rated output power (high 16 bits)"""
return self.retriable_read_register(7, 0, 4)
def get_humidity(self):
"""Charging mode: 0x0001 = PWM"""
return self.retriable_read_register(8, 2, 4)
def get_pressure(self):
"""Charging mode: 0x0001 = PWM"""
return self.retriable_read_register(9, 1, 4)
def get_temperature_backup(self):
"""Charging mode: 0x0001 = PWM"""
return self.retriable_read_register(13, 2, 4,True)
def get_status_bits(self):
"""Charging mode: 0x0001 = PWM"""
return self.retriable_read_register(14, 0, 4)
def enable_heater(self):
self.retriable_write_bit(0, 1, 5)
def disable_heater(self):
self.retriable_write_bit(0, 0, 5)
def get_weather_data(self):
try:
self.wx_data={}
self.wx_data['ID'] = self.get_id()
self.wx_data['Wind direction'] = self.get_wind_direction()
self.wx_data['Wind speed'] = self.get_wind_speedl()
self.wx_data['Wind gust'] = self.get_wind_gust()
self.wx_data['Rain last hour'] = self.get_rain()
self.wx_data['Rain last 24 hours'] = self.get_rain_last24()
self.wx_data['Temperature'] = self.get_temperature()
self.wx_data['Humidity'] = self.get_humidity()
self.wx_data['Pressure'] = self.get_pressure()
self.wx_data['Temp backup'] = self.get_temperature_backup()
self.wx_data['Status bits'] = self.get_status_bits()
return 1
except:
return 0