from datetime import datetime, timezone, timedelta
from typing import List
import geopandas as gpd
import numpy as np
import pandas as pd
import requests
import logging
from io import StringIO
from bs4 import BeautifulSoup
from geopandas import GeoDataFrame
from .base import PointData
from ..variables import USGSVariables, SensorDescription
from ..dataframe_utils import merge_df, append_df, resample_whole_df
LOG = logging.getLogger(__name__)
[docs]class USGSPointData(PointData):
"""
Implement PointData methods for USGS data source.
APIs and help
https://waterservices.usgs.gov/rest/DV-Service.html#Service
https://waterservices.usgs.gov/rest/IV-Service.html#Service
https://waterservices.usgs.gov/rest/Site-Test-Tool.html
"""
ALLOWED_VARIABLES = USGSVariables
USGS_URL = "https://waterservices.usgs.gov/nwis/"
META_URL = USGS_URL + "site/"
DATASOURCE = "USGS"
def __init__(self, station_id, name, metadata=None, duration=None):
"""
See docstring for PointData.__init__
"""
super(USGSPointData, self).__init__(
str(station_id), name, metadata=metadata
)
self._tzinfo = None
self._raw_metadata = None
self.duration = duration
def _get_all_metadata(self):
if self._raw_metadata is None:
params = {
"format": "rdb",
"sites": self.id,
"siteOutput": "expanded",
"siteStatus": "all"
}
resp = self._get_url_response(self.META_URL, params=params, parse='text')
if resp:
df = pd.read_csv(
StringIO(resp), delimiter="\t", skip_blank_lines=True,
comment="#"
)
df.drop(df[df['agency_cd'] != "USGS"].index, inplace=True)
self._raw_metadata = df
return self._raw_metadata
def _get_metadata(self):
"""
Get metadata
"""
raw_meta = self._get_all_metadata()
data = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(
raw_meta["dec_long_va"],
raw_meta["dec_lat_va"],
z=raw_meta["alt_va"],
)
)
data = data.set_crs("EPSG:4269").to_crs("EPSG:4326")
return data.iloc[0]["geometry"]
@staticmethod
def _check_dates(start_date: datetime, end_date: datetime):
"""
Ensure that datetimes are date only, without hour and minute, and that
end_date is later than start_date.
Args:
start_date: datetime
end_date: datetime
Returns:
start_date: datetime, date only
end_date: datetime, date only
"""
if hasattr(start_date, 'hour'):
start_date = start_date.date()
if hasattr(end_date, 'hour'):
end_date = end_date.date()
if not end_date >= start_date:
LOG.error(
f" end_date '{end_date}' must be later than start_date '{start_date}'"
)
raise ValueError("end_date must be later than start_date")
return start_date, end_date
@property
def tzinfo(self):
if self._tzinfo is None:
self._tzinfo = self._get_tzinfo()
return self._tzinfo
def _get_tzinfo(self):
raw_meta = self._get_all_metadata()
tz_abbrev = raw_meta.iloc[0]["tz_cd"]
tz_map = {
"PST": timedelta(hours=-8.0),
"PDT": timedelta(hours=-7.0),
"MST": timedelta(hours=-7.0),
"MDT": timedelta(hours=-6.0),
"CST": timedelta(hours=-6.0),
"CDT": timedelta(hours=-5.0),
"EST": timedelta(hours=-5.0),
"EDT": timedelta(hours=-4.0),
}
default = timedelta(hours=0)
return timezone(tz_map.get(tz_abbrev, default))
def _data_request(self, params, duration="dv"):
"""
Make request to USGS and return JSON
Args:
params: dictionary of request parameters
duration: daily ("dv") or instantaneous ("iv") values
Returns:
data: dict of response values
"""
data = []
contains_data = True
url = self.USGS_URL + duration + "/"
resp = self._get_url_response(url, params=params, parse='json')
if "value" not in resp:
LOG.warning(" Empty response from request")
contains_data = False
if len(resp["value"]["timeSeries"]) < 1:
LOG.warning(
" Requested site, sensor(s), and date range resulted in no data "
"returned."
)
contains_data = False
if contains_data:
data = resp["value"]["timeSeries"][0]
if not data["values"][0]["value"]:
LOG.warning(
" Requested site, sensor(s), and date range resulted in no data "
"returned."
)
data = []
return data
def _sensor_response_to_df(
self, response_data, sensor, final_columns, site_id,
resample_duration=None
):
"""
Convert the response data from the API to a GeoDataFrame Format and map columns
in the dataframe
Args:
response_data: JSON list response from API
sensor: SensorDescription obj
final_columns: List of columns used for filtering
site_id: site id
resample_duration: To resample the array. [None, 'D', 'H']
Returns:
GeoDataFrame
"""
if "values" not in response_data:
LOG.warning(" Response does not contain expected data")
raise ValueError("Failed parsing response for data")
no_data_value = response_data["variable"]["noDataValue"]
units = response_data["variable"]["unit"]["unitCode"]
sensor_df = gpd.GeoDataFrame.from_dict(
response_data["values"][0]["value"],
geometry=[self.metadata] * len(response_data["values"][0]["value"]),
)
if sensor_df.empty:
LOG.warning(" Data request resulted in no data returned")
return []
sensor_df.replace(no_data_value, np.nan, inplace=True)
sensor_df["site"] = site_id
sensor_df[f"{sensor.name}_units"] = units
# cast values to float
sensor_df["value"] = sensor_df["value"].astype('float32')
final_columns += [sensor.name, f"{sensor.name}_units"]
column_map = {"dateTime": "datetime", "value": sensor.name}
sensor_df.rename(columns=column_map, inplace=True)
# Daily data doesn't have timezone built in
if pd.to_datetime(sensor_df["datetime"])[0].tzinfo is None:
sensor_df["datetime"] = pd.DatetimeIndex(
pd.to_datetime(sensor_df["datetime"])
)
# We will handle the timezone after resample to avoid
# the case where resampling resets values to 0 hour of the day
# for daily data
handle_tz_after_resample = True
# convert hourly or instantaneous data to utc prior to resample
else:
sensor_df["datetime"] = pd.to_datetime(
sensor_df["datetime"], utc=True
)
handle_tz_after_resample = False
if resample_duration:
sensor_df = resample_whole_df(
sensor_df.set_index("datetime"), sensor,
interval=resample_duration
).reset_index()
sensor_df = GeoDataFrame(sensor_df, geometry=sensor_df["geometry"])
if handle_tz_after_resample:
# handle the timezone if we didn't already
sensor_df["datetime"] = sensor_df["datetime"].apply(
self._handle_df_tz
)
# set index so joining works
sensor_df.set_index("datetime", inplace=True)
sensor_df.index = sensor_df.index.tz_convert(self.desired_tzinfo)
sensor_df = sensor_df.filter(final_columns)
sensor_df = sensor_df.loc[pd.notna(sensor_df[sensor.name])]
return sensor_df
def _get_data_fallback(self, params, duration):
"""
Fallback for different sample times than requested not yet implemented.
Args:
params: request params with or without dur_code
duration: daily ("dv") or instantaneous ("iv")
Returns:
response_data: dict of url response
duration: daily ("dv") or instantaneous ("iv")
"""
response_data = self._data_request(params, duration)
return response_data, duration
def _get_data(
self,
start_date: datetime,
end_date: datetime,
variables: List[SensorDescription],
duration_list: List[str],
resample_duration=None
):
"""
Args:
start_date: datetime object for start of data collection period
end_date: datetime object for end of data collection period
variables: List of metloom.variables.SensorDescription object
from self.ALLOWED_VARIABLES
duration_list: list of USGS duration code, "dv" or "iv"
resample_duration: optional if we need to resample the data
Returns:
GeoDataFrame of data, indexed on datetime, site
"""
if not isinstance(duration_list, list):
raise ValueError("duration list must be a list")
start_date, end_date = self._check_dates(start_date, end_date)
params = {
'startDT': start_date.isoformat(),
'endDT': end_date.isoformat(),
'sites': self.id,
'format': 'json',
'siteStatus': 'all'
}
df = None
final_columns = ["geometry", "site"]
for sensor in variables:
params["parameterCd"] = sensor.code
response_data = None
for duration in duration_list:
response_data, response_duration = self._get_data_fallback(
params, duration
)
# Break if we got data
if response_data:
break
# format the data if we have any
if response_data:
sensor_df = self._sensor_response_to_df(
response_data, sensor, final_columns, self.id,
resample_duration=resample_duration
)
df = merge_df(df, sensor_df)
df[sensor.name] = df[sensor.name].astype(float)
if df is not None:
if len(df.index) > 0:
# Set the datasource
df["datasource"] = [self.DATASOURCE] * len(df.index)
df.reset_index(inplace=True)
df.set_index(keys=["datetime", "site"], inplace=True)
df.index.set_names(["datetime", "site"], inplace=True)
else:
df = None
self.validate_sensor_df(df)
return df
[docs] def get_daily_data(
self,
start_date: datetime,
end_date: datetime,
variables: List[SensorDescription],
):
"""
See docstring for PointData.get_daily_data
"""
return self._get_data(
start_date, end_date, variables, ["dv", "iv"],
resample_duration="24H"
)
[docs] def get_hourly_data(
self,
start_date: datetime,
end_date: datetime,
variables: List[SensorDescription],
):
return self._get_data(
start_date, end_date, variables, ["iv"], resample_duration="H"
)
[docs] def get_instantaneous_data(
self,
start_date: datetime,
end_date: datetime,
variables: List[SensorDescription],
):
"""
USGS 'instantaneous' data, which is generally 15 minutes.
"""
return self._get_data(start_date, end_date, variables, ["iv"])
@staticmethod
def _get_url_response(url, params=None, parse='text'):
"""
Get url response from USGS
Args:
url: url with formed query parameters
params: optional dict of additional url call parameters
parse: parsing format, either 'text' or 'json'
Returns:
response: DataFrame with query results
"""
result = []
if params:
resp = requests.get(url, params)
else:
resp = requests.get(url)
if resp.status_code != 200:
if resp.status_code == 404:
error_msg = "Response 404"
soup = BeautifulSoup(resp.content, 'html.parser')
status = soup.find('h1')
if status is not None:
error_msg = status.get_text()
elif resp.status_code == 400:
error_msg = "Response 400"
else:
resp.raise_for_status()
error_msg = "Error with request url"
LOG.error(f"No data: {error_msg}")
else:
if parse == 'json':
result = resp.json()
elif parse == 'text':
result = resp.text
else:
raise ValueError(f"Format '{parse}' not supported")
return result
@classmethod
def _station_sensor_search(
cls, meta_url, bounds, sensor: SensorDescription, dur="dv,iv", buffer=0.0
):
"""
Search for USGS stations within a bounding box for the given sensor description.
Args:
meta_url: base url for metadata
bounds: dictionary of lat/long bounds with keys minx, miny, maxx, maxy
sensor: SensorDescription object
dur: duration ("dv" or "iv")
buffer: float of lat/lon degrees, buffer for bounding box
"""
result = []
bounds = bounds.round(decimals=5)
minx = f"{bounds['minx'] - buffer}"
miny = f"{bounds['miny'] - buffer}"
maxx = f"{bounds['maxx'] + buffer}"
maxy = f"{bounds['maxy'] + buffer}"
url = (
f'{meta_url}?format=rdb&bBox={minx}%2C{miny}%2C{maxx}%2C'
f'{maxy}&siteStatus=active&hasDataTypeCd={dur}¶meterCd={sensor.code}'
)
response = cls._get_url_response(url, parse='text')
if len(response):
result = pd.read_csv(
StringIO(response), delimiter="\t", skip_blank_lines=True, comment="#"
)
result = result[result['agency_cd'] == "USGS"]
return result
[docs] @classmethod
def points_from_geometry(
cls,
geometry: gpd.GeoDataFrame,
variables: List[SensorDescription],
**kwargs
):
"""
See docstring for PointData.points_from_geometry
Args:
geometry: GeoDataFrame for shapefile from gpd.read_file
variables: List of SensorDescription
Returns:
PointDataCollection
"""
# assign defaults
kwargs = cls._add_default_kwargs(kwargs)
# Assume station search result is in 4326
projected_geom = geometry.to_crs(4326)
bounds = projected_geom.bounds.iloc[0]
search_df = None
station_search_kwargs = {}
for variable in variables:
result_df = cls._station_sensor_search(
cls.META_URL, bounds, variable, buffer=kwargs["buffer"],
**station_search_kwargs
)
if len(result_df):
result_df["index_id"] = result_df["site_no"]
result_df.set_index("index_id", inplace=True)
search_df = append_df(
search_df, result_df
).drop_duplicates(subset=['site_no'])
# return empty collection if we didn't find any points
if search_df is None:
return cls.ITERATOR_CLASS([])
gdf = gpd.GeoDataFrame(
search_df,
geometry=gpd.points_from_xy(
search_df["dec_long_va"],
search_df["dec_lat_va"],
z=search_df["alt_va"],
),
)
if not (search_df["dec_coord_datum_cd"] == "NAD83").all():
LOG.error("Projection assumption for USGS is incorrect."
" Not all points are NAD 83")
# convert results from NAD83 to WGS 84
gdf = gdf.set_crs("EPSG:4269").to_crs("EPSG:4326")
# filter to points within shapefile
if kwargs['within_geometry']:
filtered_gdf = gdf[gdf.within(projected_geom.iloc[0]["geometry"])]
else:
filtered_gdf = gdf
points = [
cls(row[0], row[1], metadata=row[2])
for row in zip(
filtered_gdf.index,
filtered_gdf["station_nm"],
filtered_gdf["geometry"],
)
]
return cls.ITERATOR_CLASS([p for p in points])