Source code for metloom.pointdata.cdec

from datetime import datetime, timezone, timedelta
from typing import List

import geopandas as gpd
import numpy as np
import pandas as pd
import requests
import logging

from geopandas import GeoDataFrame

from .base import PointData
from ..variables import CdecStationVariables, SensorDescription
from ..dataframe_utils import append_df, merge_df, resample_whole_df

LOG = logging.getLogger("metloom.pointdata.cdec")


[docs]class CDECPointData(PointData): """ Implement PointData methods for CDEC data source API documentation here https://cdec.water.ca.gov/dynamicapp/ """ ALLOWED_VARIABLES = CdecStationVariables CDEC_URL = "http://cdec.water.ca.gov/dynamicapp/req/JSONDataServlet" META_URL = "https://cdec.water.ca.gov/dynamicapp/staMeta" DATASOURCE = "CDEC" def __init__(self, station_id, name, metadata=None): """ See docstring for PointData.__init__ """ super(CDECPointData, self).__init__(station_id, name, metadata=metadata) self._raw_metadata = None # CDEC has datetimes that aren't found in US/Pacific. Use this instead self._tzinfo = timezone(timedelta(hours=-8.0)) def _parse_sensor_table(self, df): expected_cols = [ "Sensor Description", "Sensor Number", "Duration", "Plot", "Data Collection", "Data Available" ] df_sensors = None if len(df.columns) == len(expected_cols): df_sensors = pd.DataFrame( df.values, columns=[ "Sensor Description", "Sensor Number", "Duration", "Plot", "Data Collection", "Data Available" ] ) df_sensors["Duration"] = df_sensors["Duration"].map( lambda x: x.replace("(", "").replace(")", "").strip() ) duration_values = df_sensors["Duration"].values if not any( [k in duration_values for k in ["monthly", "daily", "hourly", "event"]] ): df_sensors = None # If we didn't find any valid durations, we have the wrong table return df_sensors def _parse_meta_page(self, df): result = {} # restructure the dataframes into a usable format df_loc = df[0] df1 = df_loc.iloc[:, :2].transpose() df2 = df_loc.iloc[:, 2:].transpose() result["location"] = pd.DataFrame( df1.values[1:], columns=df1.iloc[0] ).join( pd.DataFrame(df2.values[1:], columns=df2.iloc[0]) ) # Make sure we read the expected table if "Longitude" not in result["location"].columns.values: LOG.error(result["location"]) raise RuntimeError(f"Failed parsing metadata for {self.id}") # parse and cleanup the sensor info df_sensors = self._parse_sensor_table(df[1]) if df_sensors is None: df_sensors = self._parse_sensor_table(df[2]) if df_sensors is None: LOG.error(f"Failed to find sensor info for {self.id}") raise RuntimeError(f"Failed to find sensor info for {self.id}") result["sensors"] = df_sensors return result def _get_all_metadata(self): """ Get all the raw metadata for a station. This is a list of sensor descriptions for the station Returns: A list of dictionaries describing the sensors at a station """ if self._raw_metadata is None: url = self.META_URL + f"?station_id={self.id}" df = pd.read_html(url) self._raw_metadata = self._parse_meta_page(df) return self._raw_metadata def _check_snowcourse_sensors(self): """ Returns a list of booleans for snow sensors that checks if they are monthly in cadence """ snow_sensors = [18, 3, 82] data = self._get_all_metadata() sensors = data["sensors"] manual_check = [] for d, num in zip(sensors["Duration"], sensors["Sensor Number"]): if int(num) in snow_sensors: manual_check.append(d == "monthly") return manual_check
[docs] def is_only_snow_course(self, variables: List[SensorDescription]): """ Determine if a station only has snow course measurements """ manual_check = self._check_snowcourse_sensors() result = False if len(manual_check) > 0 and all(manual_check): result = True if result and not self.is_only_monthly(): # This would happen if all snow sensors have code "M" # but there are other hourly or daily sensors if any( [sensor not in [ self.ALLOWED_VARIABLES.SWE, self.ALLOWED_VARIABLES.SNOWDEPTH ] for sensor in variables] ): # this is an acceptable scenario where we are looking for # more than just snow variables and the station has them result = False else: # We are only looking for snow variables and this only has # monthly snow variables so it is effectively only # a snow course result = True return result
[docs] def is_partly_snow_course(self): """ Determine if any of the snow sensors at a station are on a monthly interval Assumption: Monthly snow sensor measurements are snow courses """ result = self._check_snowcourse_sensors() return any(result)
[docs] def is_only_monthly(self): """ determine if all sensors for a station are on a monthly interval """ data = self._get_all_metadata() duration = data["sensors"]["Duration"].values manual_check = [d == "monthly" for d in duration] if len(manual_check) > 0 and all(manual_check): return True return False
@staticmethod def _parse_str_deg(value): return float(value.replace("°", "")) @staticmethod def _parse_elevation(value): num, unit = value.split(" ") if unit.lower() not in ["ft", "feet"]: raise RuntimeError("Unexpected unit for elevation") return float(num) def _get_metadata(self): """ See docstring for PointData._get_metadata """ data = self._get_all_metadata() location_data = data["location"].iloc[0] return gpd.points_from_xy( [self._parse_str_deg(location_data["Longitude"])], [self._parse_str_deg(location_data["Latitude"])], z=[self._parse_elevation(location_data["Elevation"])], )[0] def _data_request(self, params): """ Make get request to CDEC and return JSON Args: params: dictionary of request parameters Returns: dictionary of response values """ resp = requests.get(self.CDEC_URL, params=params) resp.raise_for_status() return resp.json() def _sensor_response_to_df(self, response_data, sensor, final_columns, resample_duration=None): """ Convert the response data from the API to a GeoDataFrame Format and map columns in the dataframe Args: response_data: JSON list response from CDEC API sensor: SensorDescription obj final_columns: List of columns used for filtering resample_duration: duration to resample to Returns: GeoDataFrame """ sensor_df = gpd.GeoDataFrame.from_dict( response_data, geometry=[self.metadata] * len(response_data), ) sensor_df.replace(-9999.0, np.nan, inplace=True) # this mapping is important. Sometimes obsDate is null column_map = { "date": "datetime", "value": sensor.name, "units": f"{sensor.name}_units", "stationId": "site", } if "measurementDate" in final_columns: column_map["obsDate"] = "measurementDate" sensor_df.rename( columns=column_map, inplace=True, ) final_columns += [sensor.name, f"{sensor.name}_units"] sensor_df["datetime"] = pd.to_datetime(sensor_df["datetime"]) # resample if necessary if resample_duration: sensor_df = resample_whole_df( sensor_df.set_index("datetime"), sensor, interval=resample_duration ).reset_index() sensor_df = GeoDataFrame(sensor_df, geometry=sensor_df["geometry"]) sensor_df["datetime"] = sensor_df["datetime"].apply(self._handle_df_tz) if "measurementDate" in sensor_df.columns: sensor_df["measurementDate"] = pd.to_datetime(sensor_df["measurementDate"]) sensor_df["measurementDate"] = sensor_df["measurementDate"].apply( self._handle_df_tz ) # set index so joining works sensor_df.set_index("datetime", inplace=True) sensor_df = sensor_df.filter(final_columns) sensor_df = sensor_df.loc[pd.notna(sensor_df[sensor.name])] return sensor_df def _get_data_fallback(self, params, duration_list): """ Allow for fallback on finer resolution API durations with resample if the desired duration does not return data Args: params: request params with or without dur_code duration_list: list of durations to try. First index is desired durations """ if len(duration_list) < 1: raise ValueError("Duration list cannot be empty") response_data = [] df_duration = duration_list[0] for duration in duration_list: params["dur_code"] = duration response_data = self._data_request(params) if response_data: df_duration = duration break return response_data, df_duration def _get_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], duration_list: List[str], include_measurement_date=False ): """ Args: start_date: datetime object for start of data collection period end_date: datetime object for end of data collection period variables: List of metloom.variables.SensorDescription object from self.ALLOWED_VARIABLES duration_list: CDEC duration code and fallbacks ['D', 'H', 'E'] include_measurement_date: boolean for including the 'measurmentDate' column in the resulting dataframe. This column is only relevant for snow courses Returns: GeoDataFrame of data, indexed on datetime, site """ params = { "Stations": self.id, "Start": start_date.isoformat(), "End": end_date.isoformat(), } df = None final_columns = ["geometry", "site"] desired_duration = duration_list[0] if include_measurement_date: final_columns += ["measurementDate"] for sensor in variables: params["SensorNums"] = sensor.code response_data, response_duration = self._get_data_fallback( params, duration_list ) if response_data: # don't resample if we have the desired duration if response_duration == desired_duration: resample_duration = None else: resample_duration = desired_duration sensor_df = self._sensor_response_to_df( response_data, sensor, final_columns, resample_duration=resample_duration ) df = merge_df(df, sensor_df) if df is not None: if len(df.index) > 0: # Set the datasource df["datasource"] = [self.DATASOURCE] * len(df.index) df.reset_index(inplace=True) df.set_index(keys=["datetime", "site"], inplace=True) df.index.set_names(["datetime", "site"], inplace=True) else: df = None self.validate_sensor_df(df) return df
[docs] def get_event_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], ): return self._get_data(start_date, end_date, variables, ["E"])
[docs] def get_daily_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], ): """ See docstring for PointData.get_daily_data Example query: https://cdec.water.ca.gov/dynamicapp/req/JSONDataServlet? Stations=TNY&SensorNums=3&dur_code=D&Start=2021-05-16&End=2021-05-16 """ return self._get_data(start_date, end_date, variables, ["D", "H", "E"])
[docs] def get_hourly_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], ): """ See docstring for PointData.get_hourly_data """ return self._get_data(start_date, end_date, variables, ["H", "E"])
[docs] def get_snow_course_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], ): """ See docstring for PointData.get_snow_course_data """ if not self.is_partly_snow_course(): raise ValueError(f"{self.id} is not a snow course") return self._get_data( start_date, end_date, variables, ["M"], include_measurement_date=True )
@staticmethod def _station_sensor_search( bounds, sensor: SensorDescription, dur=None, collect=None, buffer=0.0 ): """ Station search form https://cdec.water.ca.gov/dynamicapp/staSearch? Search for stations using the CDEC station search utility Args: bounds: dictionary of Longitude and Latitidue bounds with keys minx, maxx, miny, maxy sensor: SensorDescription object dur: optional CDEC duration code ['M', 'H', 'D'] collect: optional CDEC collection type string i.e. 'MANUAL+ENTRY' Returns: Pandas Dataframe of table result or None if no table found """ dur_str = f"&dur_chk=on&dur={dur}" if dur else "&dur=" collect_str = ( f"&collect_chk=on&collect={collect}" if collect else "&collect=NONE+SPECIFIED" ) url = ( f"https://cdec.water.ca.gov/dynamicapp/staSearch?sta=" f"&sensor_chk=on&sensor={sensor.code}" f"{collect_str}" f"{dur_str}" f"&active_chk=on&active=Y" f"&loc_chk=on" f"&lon1={bounds['minx']-buffer}&lon2={bounds['maxx']+buffer}" f"&lat1={bounds['miny']-buffer}&lat2={bounds['maxy']+buffer}" f"&elev1=-5&elev2=99000&nearby=&basin=NONE+SPECIFIED" f"&hydro=NONE+SPECIFIED&county=NONE+SPECIFIED&agency_num=160" f"&display=sta" ) try: return pd.read_html(url)[0] except ValueError: LOG.error(f"No tables for {url}") return None
[docs] @classmethod def points_from_geometry( cls, geometry: gpd.GeoDataFrame, variables: List[SensorDescription], **kwargs ): """ See docstring for PointData.points_from_geometry Args: geometry: GeoDataFrame for shapefile from gpd.read_file variables: List of SensorDescription snow_courses: Boolean for including only snowcourse data or no snowcourse data within_geometry: filter the points to within the shapefile instead of just the extents. Default True buffer: buffer added to search box Returns: PointDataCollection """ # assign defaults kwargs = cls._add_default_kwargs(kwargs) # Assume station search result is in 4326 projected_geom = geometry.to_crs(4326) bounds = projected_geom.bounds.iloc[0] search_df = None station_search_kwargs = {} # Filter to manual, monthly measurements if looking for snow courses if kwargs['snow_courses']: station_search_kwargs["dur"] = "M" station_search_kwargs["collect"] = "MANUAL+ENTRY" for variable in variables: result_df = cls._station_sensor_search( bounds, variable, buffer=kwargs["buffer"], **station_search_kwargs ) if result_df is not None: result_df["index_id"] = result_df["ID"] result_df.set_index("index_id", inplace=True) search_df = append_df( search_df, result_df ).drop_duplicates(subset=['ID']) # return empty collection if we didn't find any points if search_df is None: return cls.ITERATOR_CLASS([]) clms = search_df.columns.values if "ElevationFeet" in clms: elev_key = "ElevationFeet" elif "Elevation Feet" in clms: elev_key = "Elevation Feet" else: raise RuntimeError("No key for elevation") gdf = gpd.GeoDataFrame( search_df, geometry=gpd.points_from_xy( search_df["Longitude"], search_df["Latitude"], z=search_df[elev_key], ), ) # filter to points within shapefile if kwargs['within_geometry']: filtered_gdf = gdf[gdf.within(projected_geom.iloc[0]["geometry"])] else: filtered_gdf = gdf points = [ cls(row[0], row[1], metadata=row[2]) for row in zip( filtered_gdf.index, filtered_gdf["Station Name"], filtered_gdf["geometry"], ) ] # filter to snow courses or not snowcourses depending on desired result if kwargs['snow_courses']: return cls.ITERATOR_CLASS([p for p in points if p.is_partly_snow_course()]) else: return cls.ITERATOR_CLASS( [p for p in points if not p.is_only_snow_course(variables)] )