from datetime import datetime, date
from typing import List
import geopandas as gpd
import numpy as np
import pandas as pd
import requests
import logging
from geopandas import GeoDataFrame
from .base import PointData
from ..variables import (
GeoSphereCurrentVariables, SensorDescription, GeoSphereHistVariables
)
from ..dataframe_utils import merge_df, resample_whole_df, \
shp_to_box
LOG = logging.getLogger("metloom.pointdata.geosphere_austria")
M_TO_FT = 3.28084
[docs]class GeoSpherePointDataBase(PointData):
"""
Implement PointData methods for GeoSphere Austria data source
API documentation here
https://dataset.api.hub.geosphere.at/v1/docs/index.html
https://dataset.api.hub.geosphere.at/v1/docs/user-guide/resource.html
Datasets available here https://data.hub.geosphere.at/dataset/
We could either use the verified klima-v1 data or the
raw tawes-v1 data. Kilma has hourly and daily, taws is 10minute
and most current
"""
ALLOWED_VARIABLES = None
URL = "https://dataset.api.hub.geosphere.at"
DATASOURCE = "GEOSPHERE"
META_EXTENSION = None
def __init__(self, station_id, name, metadata=None):
"""
See docstring for PointData.__init__
"""
super(GeoSpherePointDataBase, self).__init__(
station_id, name, metadata=metadata
)
self._raw_metadata = None
self._tzinfo = None
@classmethod
def _retrieve_all_metadata(cls):
"""
Get the metadata we can search through for stations. The assumption
is that we ONLY WANT TAWES stations
The endpoint returns a json object with both `parameters` and
`stations`. Parameters maps to ALL VARIABLES and stations
maps to ALL STATIONS
"""
url = cls.URL + cls.META_EXTENSION
resp = requests.get(url)
resp.raise_for_status()
obj = resp.json()["stations"]
df = pd.DataFrame.from_dict(obj)
return df
def _get_all_metadata(self):
"""
Get all the raw metadata for a station. This is a list of sensor
descriptions for the station
Returns:
A list of dictionaries describing the sensors at a station
"""
if self._raw_metadata is None:
all_meta = self._retrieve_all_metadata()
meta_df = all_meta[all_meta["id"] == self.id]
if len(meta_df) == 0:
raise RuntimeError(f"No matching metadata for {self.id}")
self._raw_metadata = meta_df.to_dict(orient="records")[0]
return self._raw_metadata
def _get_metadata(self):
"""
See docstring for PointData._get_metadata
"""
data = self._get_all_metadata()
# TODO: gridded coords are EPSG:4325, are these also?
return gpd.points_from_xy(
[data["lon"]],
[data["lat"]],
# Convert elevation to feet
z=[data["altitude"] * M_TO_FT],
)[0]
def _data_request(self, params):
"""
Make get request and return JSON
Args:
params: dictionary of request parameters
Returns:
dictionary of response values
"""
raise NotImplementedError("Need to implement")
def _handle_df_tz(self, val):
"""
Covert one entry from a df from cls.TZINFO to UTC
"""
if pd.isna(val):
return val
else:
return val.tz_convert(self.desired_tzinfo)
def _sensor_response_to_df(self, response_data, sensor, final_columns,
resample_duration=None):
"""
Convert the response data from the API to a GeoDataFrame
Format and map columns in the dataframe
Args:
response_data: JSON list response from API
sensor: SensorDescription obj
final_columns: List of columns used for filtering
resample_duration: duration to resample to
Returns:
GeoDataFrame
"""
dt_values = response_data["timestamps"]
params = response_data["features"][0]["properties"]["parameters"]
values = params[sensor.code]["data"]
unit = params[sensor.code]["unit"]
# Build the dataframe
sensor_df = pd.DataFrame.from_dict(
{
"datetime": dt_values,
sensor.name: values,
f"{sensor.name}_units": [unit] * len(values),
"site": [self.id] * len(values)
}
)
if all(pd.isna(sensor_df[sensor.name].values)):
return None
sensor_df.loc[pd.isna(sensor_df[sensor.name])] = np.nan
sensor_df = gpd.GeoDataFrame(
sensor_df,
geometry=[self.metadata] * len(values),
)
final_columns = [sensor.name, f"{sensor.name}_units"] + final_columns
sensor_df["datetime"] = pd.to_datetime(sensor_df["datetime"])
# resample if necessary
if resample_duration:
sensor_df = resample_whole_df(
sensor_df.set_index("datetime"), sensor,
interval=resample_duration
).reset_index()
sensor_df = GeoDataFrame(sensor_df, geometry=sensor_df["geometry"])
sensor_df["datetime"] = sensor_df["datetime"].apply(self._handle_df_tz)
# set index so joining works
sensor_df.set_index("datetime", inplace=True)
sensor_df = sensor_df.filter(final_columns)
sensor_df = sensor_df.loc[pd.notna(sensor_df[sensor.name])]
return sensor_df
def _get_data(
self,
start_date: datetime,
end_date: datetime,
variables: List[SensorDescription],
desired_duration: str,
):
"""
Args:
start_date: datetime object for start of data collection period
end_date: datetime object for end of data collection period
variables: List of metloom.variables.SensorDescription object
from self.ALLOWED_VARIABLES
desired_duration: duration code ['D', 'H', 'E']
Returns:
GeoDataFrame of data, indexed on datetime, site
"""
params = {
"parameters": ",".join([v.code for v in variables]),
"station_ids": self.id,
"start": start_date.isoformat(),
"end": end_date.isoformat(),
}
df = None
final_columns = ["geometry", "site"]
response_data = self._data_request(params)
if response_data:
for sensor in variables:
sensor_df = self._sensor_response_to_df(
response_data, sensor, final_columns,
resample_duration=desired_duration
)
df = merge_df(df, sensor_df)
if df is not None:
if len(df.index) > 0:
# Set the datasource
df["datasource"] = [self.DATASOURCE] * len(df.index)
df.reset_index(inplace=True)
df.set_index(keys=["datetime", "site"], inplace=True)
df.index.set_names(["datetime", "site"], inplace=True)
else:
df = None
self.validate_sensor_df(df)
return df
[docs] @classmethod
def points_from_geometry(
cls,
geometry: gpd.GeoDataFrame,
variables: List[SensorDescription],
**kwargs
):
"""
See docstring for PointData.points_from_geometry
The Austria Geosphere API does not allow filtering by variable.
As a result, we do not filter according to which points have specific
variables. The function arguments allow variables to be passed in
to keep consistency with the same function from other classes.
Args:
geometry: GeoDataFrame for shapefile from gpd.read_file
variables: List of SensorDescription. NOT USED FOR THIS CLASS
within_geometry: filter the points to within the shapefile
instead of just the extents. Default True
buffer: buffer added to search box,
filter_to_active: filter to active stations
Returns:
PointDataCollection
"""
# assign defaults
kwargs = cls._add_default_kwargs(kwargs)
# Assume station search result is in 4326
projected_geom = geometry.to_crs("EPSG:4326")
# add buffer to geometry
search_geom = projected_geom.buffer(kwargs["buffer"])
# get metadata for all stations
all_meta = cls._retrieve_all_metadata()
# return empty collection if we didn't find any points
if all_meta is None:
return cls.ITERATOR_CLASS([])
# convert to a geodataframe
gdf = gpd.GeoDataFrame(
all_meta,
geometry=gpd.points_from_xy(
all_meta["lon"], all_meta["lat"],
all_meta["altitude"] * M_TO_FT
)
)
# TODO: is this correct?
gdf = gdf.set_crs("EPSG:4326")
# filter to points within shapefile
if kwargs['within_geometry']:
filtered_gdf = gdf[gdf.within(projected_geom.iloc[0]["geometry"])]
# filter to the overall bounding box
else:
box_df = shp_to_box(search_geom)
filtered_gdf = gdf[gdf.within(box_df.iloc[0]["geometry"])]
# filter to active stations
if kwargs["filter_to_active"]:
filtered_gdf = filtered_gdf.loc[filtered_gdf["is_active"] == "true"]
points = [
cls(row[0], row[1], metadata=row[2])
for row in zip(
filtered_gdf["id"],
filtered_gdf["name"],
filtered_gdf["geometry"],
)
]
return cls.ITERATOR_CLASS(points)
[docs]class GeoSphereCurrentPointData(GeoSpherePointDataBase):
"""
Implement PointData methods for GeoSphere Austria data source
API documentation here
https://dataset.api.hub.geosphere.at/v1/docs/index.html
https://dataset.api.hub.geosphere.at/v1/docs/user-guide/resource.html
Datasets available here https://data.hub.geosphere.at/dataset/
We use tawes-v1 data which consists of data from the last 3 months
in 10 minute increment
"""
ALLOWED_VARIABLES = GeoSphereCurrentVariables
URL = "https://dataset.api.hub.geosphere.at"
DATASOURCE = "GEOSPHERE"
META_EXTENSION = "/v1/station/current/tawes-v1-10min/metadata"
def _data_request(self, params):
"""
Make get request and return JSON
Args:
params: dictionary of request parameters
Returns:
dictionary of response values
"""
url = self.URL + "/v1/station/historical/tawes-v1-10min"
resp = requests.get(url, params=params)
resp.raise_for_status()
return resp.json()
def _validate_dates(self, end_date):
"""
Validate that the dates will work
Args:
end_date: datetime object for the end of the request
"""
today = date.today()
data_valid_start = (
pd.to_datetime(today.replace(month=today.month - 3))
)
if pd.to_datetime(end_date) < data_valid_start:
raise ValueError(
f"This datasource does not have data older than 3 months. We "
f"cannot fetch data for dates before"
f" {data_valid_start.isoformat()}"
)
[docs] def get_daily_data(
self,
start_date: datetime,
end_date: datetime,
variables: List[SensorDescription],
):
"""
See docstring for PointData.get_daily_data
Example query:
https://dataset.api.hub.geosphere.at/v1/station/current/
tawes-v1-10min?parameters=TL&station_ids=11035
"""
self._validate_dates(end_date)
return self._get_data(start_date, end_date, variables, "D")
[docs] def get_hourly_data(
self,
start_date: datetime,
end_date: datetime,
variables: List[SensorDescription],
):
"""
See docstring for PointData.get_hourly_data
"""
self._validate_dates(end_date)
return self._get_data(start_date, end_date, variables, "H")
[docs]class GeoSphereHistPointData(GeoSpherePointDataBase):
"""
Implement PointData methods for GeoSphere Austria data source
API documentation here
https://dataset.api.hub.geosphere.at/v1/docs/index.html
https://dataset.api.hub.geosphere.at/v1/docs/user-guide/resource.html
Datasets available here https://data.hub.geosphere.at/dataset/
We use klima-v1-1d data which consists of historical daily data.
There is historical hourly data, but the parameter names are different
and as such this has not bee implemented
"""
ALLOWED_VARIABLES = GeoSphereHistVariables
URL = "https://dataset.api.hub.geosphere.at"
DATASOURCE = "GEOSPHERE"
META_EXTENSION = "/v1/station/historical/klima-v1-1d/metadata"
def _data_request(self, params):
"""
Make get request and return JSON
Args:
params: dictionary of request parameters
Returns:
dictionary of response values
"""
url = self.URL + "/v1/station/historical/klima-v1-1d"
resp = requests.get(url, params=params)
resp.raise_for_status()
return resp.json()
[docs] def get_daily_data(
self,
start_date: datetime,
end_date: datetime,
variables: List[SensorDescription],
):
"""
See docstring for PointData.get_daily_data
Example query:
https://dataset.api.hub.geosphere.at/v1/station/historical/klima-v1-1d
?station_ids=11401&start=2023-04-12&end=2023-04-14¶meters=schnee
"""
return self._get_data(start_date, end_date, variables, None)