"""
Location to keep readers that pull in a flat file like a csv.
"""
import os
import pandas as pd
from metloom.pointdata import PointData
from metloom.variables import SensorDescription
from enum import Enum
from pathlib import Path
from datetime import datetime, timezone, timedelta
import logging
import requests
from typing import List
import geopandas as gpd
import numpy as np
LOG = logging.getLogger(__name__)
[docs]
class StationInfo(Enum):
"""
Since there is not enough info via an API, csv readers rely on a list of stations
and info associated with them to build the calls. This should allow for a common
dataset with multiple stations to be verified and isolated
"""
# Defined: Name, id, lat, long, http sub path to file
@property
def station_name(self):
return self.value[0]
@property
def station_id(self):
return self.value[1]
@property
def latitude(self):
return self.value[2]
@property
def longitude(self):
return self.value[3]
@property
def elevation(self):
return self.value[4]
@property
def path(self):
return Path(self.value[5])
[docs]
@classmethod
def all_station_ids(cls):
return [e.station_id for e in cls]
[docs]
@classmethod
def all_points(cls):
return [e.point for e in cls]
[docs]
@classmethod
def from_station_id(cls, station_id):
result = None
for e in cls:
if station_id == e.station_id:
result = e
break
return result
@property
def point(self):
return gpd.points_from_xy([self.longitude],
[self.latitude],
[self.elevation])[0]
[docs]
class CSVPointData(PointData):
"""
Some met station data is stored off in flat csv files. This class enables the
management of downloading those files while still allowing a similar interface
"""
ALLOWED_STATIONS = StationInfo
UTC_OFFSET_HOURS = 0 # Allows users to specificy the timezone of the datasets
def __init__(self, station_id, name=None, metadata=None, cache='./cache'):
"""
See docstring for PointData.__init__
"""
super(CSVPointData, self).__init__(
station_id, name,
metadata=metadata
)
self._raw_metadata = None
self._tzinfo = timezone(timedelta(hours=self.UTC_OFFSET_HOURS))
self._cache = Path(cache)
self._station_info = None
self.valid = False
def _verify_station(self):
""" Verifies the station is valid using the associated enum"""
self._station_info = self.ALLOWED_STATIONS.from_station_id(self.id)
if self._station_info is not None:
# Auto assign name
if self.name is None:
self.name = self._station_info.station_name
return True
else:
LOG.error(f"Station ID {self.id} is not valid, allowed id's are "
f"{', '.join(self.ALLOWED_STATIONS.all_station_ids())}")
return False
def _verify_sensor(self, resp_df, variable: SensorDescription):
""" Verifies the station is valid using the associated enum"""
if variable.code in resp_df.columns:
return True
else:
LOG.debug(f"{variable.name} not found in {self.id} data")
return False
def _file_urls(self, *args):
"""Returns the url to the file containing the station data"""
raise NotImplementedError('CSVPointData._file_urls() must be implemented to '
'download csv station data.')
def _assign_datetime(self, resp_df):
raise NotImplementedError('CSVPointData._assign_datetime() must be implemented '
'to download csv station data.')
def _download(self, urls):
"""Download the file(s)"""
filenames = []
for url in urls:
filename = self._cache.joinpath(Path(url).name)
filenames.append(filename)
if not filename.exists():
with requests.get(url, stream=True) as r:
LOG.info(f'Downloading {Path(url).name}...')
lines = r.iter_lines()
with open(filename, mode='w+') as fp:
for line in lines:
fp.write(line.decode('utf-8') + '\n')
return filenames
def _get_one_variable(self, resp_df, period, variable: SensorDescription):
"""
Retrieve a single variable and process it accordingly
"""
method = "sum" if variable.accumulated else "average"
if self._verify_sensor(resp_df, variable):
isolated = resp_df[variable.code].loc[:]
# TODO: This may only be true for SNOWEX
isolated.loc[isolated == -9999] = np.nan
if method == 'average':
data = isolated.resample(period).mean()
elif method == 'sum':
data = isolated.resample(period).sum()
else:
raise Exception('Invalid aggregation method')
else:
data = None
LOG.debug(f"No data returned for {variable.name}")
return data
def _get_data(self, start_date, end_date, variables: List[SensorDescription],
period):
"""
Utilizes cached data or downloads the data
"""
self.valid = self._verify_station()
if not self.valid:
return None
urls = self._file_urls(self._station_info.station_id,
start_date, end_date)
# Make the cache dir
if not self._cache.is_dir():
os.mkdir(self._cache)
# Download data if it doesn't exist locally.
files = self._download(urls)
dfs = [pd.read_csv(f, index_col=False, low_memory=False) for f in files]
resp_df = pd.concat(dfs)
resp_df = self._assign_datetime(resp_df)
# use a predefined index to show nans in the event of patch data
df = pd.DataFrame(index=pd.date_range(start_date, end_date, freq=period,
name='datetime'),
columns=[v.name for v in variables])
# Use this instead .loc to avoid index on patchy data
ind = (resp_df.index >= pd.Timestamp(start_date)) & (
resp_df.index < pd.Timestamp(end_date)
)
isolated = resp_df.loc[ind, resp_df.columns]
for i, variable in enumerate(variables):
df_var = self._get_one_variable(isolated, period, variable)
if df_var is not None:
if not np.all(df_var.isnull()):
df.loc[df_var.index, variable.name] = df_var
df.loc[:, f"{variable.name}_units"] = variable.units
else:
df = df.drop(columns=[variable.name])
# All nan data suggests no matching data
if np.all(df.isnull()):
return None
# Make this a geodataframe
df["site"] = [self.id] * len(df)
df["datasource"] = [self.DATASOURCE] * len(df)
# Make this a geodataframe
df = gpd.GeoDataFrame(df, geometry=[self.metadata] * len(df))
df = df.reset_index().set_index(["datetime", "site"])
self.validate_sensor_df(df)
return df
[docs]
def get_daily_data(self, start_date: datetime, end_date: datetime,
variables: List[SensorDescription]):
return self._get_data(
start_date, end_date, variables, "D"
)
[docs]
def get_hourly_data(self, start_date: datetime, end_date: datetime,
variables: List[SensorDescription]):
return self._get_data(
start_date, end_date, variables, "h"
)
def _get_metadata(self):
return self._station_info.point
[docs]
@classmethod
def points_from_geometry(cls, geometry: gpd.GeoDataFrame,
variables: List[SensorDescription], within_geometry=True,
buffer=0.0, **kwargs):
# Avoid multiple polys and use a buffer if requested.
projected_geom = geometry.dissolve().buffer(buffer).to_crs(4326)
gdf = gpd.GeoDataFrame(geometry=cls.ALLOWED_STATIONS.all_points(), data=[],
crs=4326)
# Use the exact geometry to filter, otherwise use the bounds of the poly
if within_geometry:
search_area = projected_geom.iloc[0]
else:
search_area = projected_geom.envelope.iloc[0]
filtered_gdf = gdf[gdf.within(search_area)]
return filtered_gdf