-
-
Save kura/21a8512ef73d17dab4cdfbe8a31a05a4 to your computer and use it in GitHub Desktop.
import datetime | |
import json | |
import os | |
import sys | |
import requests | |
apiKey = "" | |
stationID = "" | |
weatherUnits = "" | |
rainfallLog = "rainfallLog.csv" | |
lastTotalLog = "lastTotal" | |
now = datetime.datetime.now().time() | |
def calc(precipTotal): | |
if now.hour == 0: | |
lastTotal = 0.0 | |
else: | |
with open(lastTotalLog, "r") as f: | |
lastTotal = float(f.readline()) | |
return round(precipTotal - lastTotal, 2) | |
def create_empty_files(): | |
if not os.path.isfile(lastTotalLog): | |
with open(lastTotalLog, "w") as f: | |
f.write("0") | |
if not os.path.isfile(rainfallLog): | |
with open(rainfallLog, "w") as f: | |
f.write("Time,Hourly Rainfall (mm)\n") | |
def get_weather_data_from_api(): | |
r = requests.get( | |
f"https://api.weather.com/v2/pws/observations/current?stationId={stationID}8&format=json&units={weatherUnits}&apiKey={apiKey}" | |
) | |
r.raise_for_status() | |
return json.loads(r.text) | |
if __name__ == "__main__": | |
create_empty_files() | |
try: | |
weather = get_weather_data_from_api() | |
except requests.RequestException as e: | |
print(e) | |
sys.exit(1) | |
dataTime = weather["observations"][0]["obsTimeLocal"] | |
precipTotal = weather["observations"][0]["uk_hybrid"]["precipTotal"] | |
if weatherUnits == "h": | |
precipTotal *= 25.4 # API bug shows inch so convert to mm | |
hourlyRain = calc(precipTotal) | |
with open(rainfallLog, "a") as f: | |
f.write(f"{dataTime},{hourlyRain}\n") | |
with open(lastTotalLog, "w") as f: | |
f.write(precipTotal) |
import datetime | |
import json | |
import os | |
from http.client import HTTPSConnection | |
API_KEY = "" | |
STATION_ID = "" | |
WEATHER_UNITS = "h" | |
CSV = "rainfall.csv" | |
def calc(current_value, last_value): | |
if current_value == 0 or last_value == 0: | |
return current_value | |
return round(last_value - current_value, 2) | |
def create_empty_files(): | |
if os.path.exists(CSV): | |
return | |
one_hour_before = ( | |
datetime.datetime.utcnow() - datetime.timedelta(hours=1) | |
).isoformat() | |
with open(CSV, "w") as f: | |
f.write("Time,Hourly Rainfall (mm)\n") | |
f.write(f"{one_hour_before},0.0\n") | |
def write_data(data): | |
dt = datetime.datetime.utcnow().isoformat() | |
with open(CSV, "a") as f: | |
f.write(f"{dt},{data}\n") | |
def get_last_entry(): | |
with open(CSV) as f: | |
l = f.readlines()[-1].strip() | |
return float(l.partition(",")[2]) | |
# API Call | |
def get_weather_data_from_api(): | |
conn = HTTPSConnection("api.weather.com") | |
conn.request( | |
"GET", | |
f"/v2/pws/observations/current?stationId={STATION_ID}&format=json&units={WEATHER_UNITS}&apiKey={API_KEY}" | |
) | |
r = conn.getresponse() | |
return json.loads(r.read())["observations"][0] | |
if __name__ == "__main__": # The main routine | |
create_empty_files() | |
last_entry = get_last_entry() | |
try: | |
data = get_weather_data_from_api() | |
precipitation = calc( | |
float(data["uk_hybrid"]["precipTotal"]), last_entry | |
) | |
write_data(precipitation) | |
except (OSError, json.JSONDecodeError) as e: | |
print(e) |
JSON is pretty easy. The Python json
module will basically turn anything in to a JSON object but is probably annoying and overkill for this. You'd need to try something like I'd guess:
{
"last_value": 0,
"data_points": [
{"datetime": "2023-10-31 00:00:00", "rainfall": 0.0},
{"datetime": "2023-10-31 01:00:00", "rainfaill": 5.0},
# etc
]
}
And then whenever you want to add to it, you'd need to load the whole thing in Python, add your record, and write the whole thing again:
# Load previous data
with open("data.json") as f:
data = json.loads(f.read())
# Add new entry to previous data
data["last_value"] = 11.0
data["datapoints"].append({"datetime": "2023-11-01 12:00:00", "rainfall": 11.0})
# Overwrite old data + new entry in file
with open("data.json", "w") as f:
f.write(json.dumps(data))
The zeroing isn't working. It's not using '0' as lastTotal at midnight.
Latest reading with the 0ing removed shows the same negative value, so if hour == 0 isn't being used...
2023-11-01 22:55:21,0.25
Hourly Rain: 0.25
Precip Total: 2.2859999999999996
Last Total: 2.032
2023-11-01 23:55:21,0.76
Hourly Rain: 0.76
Precip Total: 3.0479999999999996
Last Total: 2.2859999999999996
2023-11-02 00:55:21,-1.78
Hourly Rain: -1.78
Precip Total: 1.27
Last Total: 3.0479999999999996
2023-11-02 01:55:21,2.03
Hourly Rain: 2.03
Precip Total: 3.302
Last Total: 1.27
No, Precip Total is the data straight from the API.
2023-11-02 00:55:21,-1.78
Hourly Rain: -1.78
Precip Total: 1.27
Last Total: 3.0479999999999996
It should pull 1.27 as that's the total rainfall for the first hour of the day. Maybe skip the math if it's hour 0 and instead log precip total?
This was the temp code I used to get those readings:
# import datetime
import json
import os
import sys
import requests
# Settings
apiKey = ""
stationID = ""
weatherUnits = "h"
# File Locations
rainfallLog = "rainfallLog.csv"
lastTotalLog = "lastTotal"
# Get current time as variable 'now'
#now = datetime.datetime.now().time()
# Calculate the precipTotal by doing math with lastTotal
# def calc(precipTotal):
# #if now.hour == 0:
# # lastTotal = 0.0
#else:
with open(lastTotalLog, "r") as f:
lastTotal = float(f.readline())
return round(precipTotal - lastTotal, 2)
# Create files
def create_empty_files():
if not os.path.isfile(lastTotalLog):
with open(lastTotalLog, "w") as f:
f.write("0")
if not os.path.isfile(rainfallLog):
open(rainfallLog, "w").close()
# API Call
def get_weather_data_from_api():
r = requests.get(
f"https://api.weather.com/v2/pws/observations/current?stationId={stationID}&format=json&units={weatherUnits}&apiKey={apiKey}"
)
r.raise_for_status()
return json.loads(r.text)
if __name__ == "__main__": # The main routine
create_empty_files() # Call function
try:
weather = get_weather_data_from_api() # Call function and apply it to variable 'weather'
except requests.RequestException as e:
print(e)
sys.exit(1)
# Collect only the needed data
dataTime = weather["observations"][0]["obsTimeLocal"]
precipTotal = weather["observations"][0]["uk_hybrid"]["precipTotal"]
if weatherUnits == "h":
precipTotal *= 25.4 # API bug shows inch so convert to mm
# Get variable by calling function
hourlyRain = calc(precipTotal)
# # Testing
# with open(lastTotalLog, "r") as f:
# lastLog = float(f.readline())
if os.path.getsize(rainfallLog) == 0: # Check if the file is empty
with open(rainfallLog, "w") as f:
f.write("Time,Hourly Rainfall (mm)\n")
else:
with open(rainfallLog, "a") as f:
f.write(f"{dataTime},{hourlyRain}\n")
# f.write(f"Hourly Rain: {hourlyRain}\n")
# f.write(f"Precip Total: {precipTotal}\n")
# f.write(f"Last Total: {lastLog}\n")
# Store precipTotal to call next time for math
with open(lastTotalLog, "w") as f:
f.write(str(precipTotal))
You put your API keys in that comment. Might wanna delete that. :P
I did realise my api key is public on the site I use it on... but that's for another day lol
def calc(precipTotal):
# bail early if midnight
if now.hour == 0:
return round(recipTotal, 2)
with open(lastTotalLog, "r") as f:
lastTotal = float(f.readline())
return round(precipTotal - lastTotal, 2)
With an else in that gap correct?
OK! I think we have something. A quick test worked... time for midnight again.
Btw, thank you so much for all this help. It would have driven me crazy doing it all alone.
No need for an else, that func call bails early so the with is never hit.
No worries. :)
You are well versed in python. It's these weird little rules that are fun to learn.
Most programming language allows you to return early from a function if a condition isn't met tbh. Sometimes it just looks odd or people don't think about doing it that way and wrap it in an else for no real gain.
Fantastic thanks! Added it. Not sure if we even need
if not os.path.isfile(rainfallLog):
open(rainfallLog, "w").close()
as it's addded later anyway.
I'm happy with my basic csv for this task. I looked into creating json files and decided the scope of the project wasn't worth it.