Source code for metloom.pointdata.norway

import json
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import List

import geopandas as gpd
import pandas as pd
import requests
from geopandas import GeoDataFrame

from metloom.dataframe_utils import append_df, merge_df, resample_whole_df
from metloom.pointdata.base import PointData
from metloom.variables import SensorDescription, MetNorwayVariables


LOG = logging.getLogger(__name__)


[docs] class MetNorwayPointData(PointData): """ Class for the Norway Frost API https://frost.met.no/index.html To create a user, go here https://frost.met.no/auth/requestCredentials.html Data is provided by MET Norway, see license for details https://www.met.no/en/free-meteorological-data Element (variable) information can be found here https://frost.met.no/elementtable Observations/AvailableTimeSeries/ can be used to find out what types elements are available for a station or time range - we can use this to filter The Sources endpoint returns metadata. It can be used to filter based on geometry and variables For this class we will use default levels and timeoffsets. See more info here https://frost.met.no/concepts2.html#level-offset-filter It is important to note that this class does NOT implement all of the functionality of the frost API. The frost documentation is extensive and worth looking through. Read more about data quality codes here https://frost.met.no/dataclarifications.html Read more about general concepts: https://frost.met.no/concepts2.html """ DATASOURCE = "MET Norway" ALLOWED_VARIABLES = MetNorwayVariables URL = "https://frost.met.no/" POINTS_FROM_GEOM_DEFAULTS = { 'within_geometry': True, 'token_json': "~/.frost_token.json", 'buffer': 0.0 } def __init__( self, station_id, name, token_json="~/.frost_token.json", metadata=None, ): """ Args: station_id: id of station name: name of station token_json: path to file with authentication information metadata: optional metadata for the station """ super(MetNorwayPointData, self).__init__( station_id, name, metadata=metadata ) self._token_path = token_json # track how long the token is valid self._token_expires = None self._auth_header = None # default UTC time self._tzinfo = timezone(timedelta(hours=0)) @classmethod def _get_token(cls, token_json): """ Get token for authorization Args: token_json: path to json with credentials Returns: (auth headers, timestamp of expire) """ # read in credentials token_json = Path(token_json).expanduser().absolute() with open(token_json, "r") as fp: obj = json.load(fp) _client_id = obj["client_id"] _client_secret = obj["client_secret"] url = cls.URL + "auth/accessToken" params = { "client_id": _client_id, "client_secret": _client_secret, "grant_type": "client_credentials" } resp = requests.post(url, data=params) resp.raise_for_status() result = resp.json() # get the token token = result["access_token"] # set the time when it expires _token_expires = datetime.now() + timedelta( seconds=result["expires_in"] ) return { "Authorization": f"Bearer {token}" }, _token_expires def _token_is_valid(self): """ function to check if token is valid """ if self._token_expires is None: return False else: return datetime.now() >= self._token_expires @property def auth_header(self): """ Get the auth header and set the expiration time """ # get a new header if we need to if self._auth_header is None or not self._token_is_valid(): token, expires = self._get_token(self._token_path) # Save when the token expires self._token_expires = expires # set auth header self._auth_header = token return self._auth_header @classmethod def _get_sources( cls, token_json="~/.frost_token.json", ids=None, types="SensorSystem", elements=None, geometry=None, validtime=None, name=None, ): """ Get metadata for the source entitites defined in the Frost API. Use the query parameters to filter the set of sources returned. Leave the query parameters blank to select all sources. Args: token_json: path to json file with credentials ids: The Frost API source ID(s) that you want metadata for. Enter a comma-separated list to select multiple sources. For sources of type SensorSystem or RegionDataset, the source ID must be of the form <prefix><int> where <prefix> is SN for SensorSystem and TR, NR, GR, or GF for RegionDataset. The integer following the prefix may contain wildcards, e.g. SN18*7* matches both SN18700 and SN18007. types: The type of Frost API source that you want metadata for. [SensorSystem, InterpolatedDataset, RegionalDataset] elements: If specified, only sources for which observations are available for all of these elements may be included in the result. Enter a comma-separated list of search filters. geometry: Get Frost API sources defined by a specified geometry. Geometries are specified as either nearest(POINT(...)) or POLYGON(...) using WKT; see the reference section on the Geometry Specification for documentation and examples. If the nearest() function is specified, the output will include the distance in kilometers from the reference point. validtime: If specified, only sources that have been, or still are, valid/applicable during some part of this interval may be included in the result. Specify <date>/<date>, <date>/now, <date>, or now, where <date> is of the form YYYY-MM-DD, e.g. 2017-03-06. The default is 'now', i.e. only currently valid/applicable sources are included. name: If specified, only sources whose 'name' attribute matches this search filter may be included in the result. """ url = cls.URL + "sources/v0.jsonld" if geometry is not None: geo_info = str(geometry.iloc[0].geometry) else: geo_info = None params = dict( ids=ids, types=types, elements=elements, geometry=geo_info, validtime=validtime, name=name ) auth_header, _ = cls._get_token(token_json) resp = requests.get(url, params=params, headers=auth_header) if resp.status_code == 404: if ids: raise RuntimeError(f"Could not find metadata for {ids}") else: # No point were found result = None else: resp.raise_for_status() result = resp.json()["data"] return result def _get_all_metadata(self): """ Get all metadata from the API for one point """ result = self._get_sources( ids=[self.id], token_json=self._token_path ) if len(result) != 1: raise RuntimeError("No metadata returned") return result[0] def _get_metadata(self): """ See docstring for PointData._get_metadata """ data = self._get_all_metadata() location_data = data["geometry"]["coordinates"] return gpd.points_from_xy( [location_data[0]], [location_data[1]], )[0] def _get_observations(self, ids, start_date, end_date, variables_names): """ Args: ids: list of station ids start_date: datetime start date end_date: datetime end date variables_names: list of element names """ url = self.URL + "observations/v0.jsonld" params = { "sources": ids, "referencetime": f"{start_date.isoformat()}/{end_date.isoformat()}", "elements": variables_names, # Defaults https://frost.met.no/concepts2.html#level-offset-filter "timeoffsets": "default", "levels": "default" } resp = requests.get(url, params=params, headers=self.auth_header) # 412 means there was no data found if resp.status_code == 412: # we could use /observations/availableTimeSeries/v0 # to check this first LOG.debug(f"No data found for {ids}, {variables_names}") result = None else: resp.raise_for_status() result = resp.json()["data"] return result @staticmethod def _time_info_to_observation_time( reference_time, time_offset, time_resolution, timeseries_id ): """ Get the observation time from the time info of an observation https://frost.met.no/concepts2.html#relationshipreftime Args: reference_time: string reference time time_offset: string time offset (PT1H) time_resolution: string time resolution (PT12H) Returns: observation_time """ reference_time = pd.to_datetime(reference_time) time_offset = pd.to_timedelta(time_offset) time_resolution = pd.to_timedelta(time_resolution) observation_time = ( reference_time + time_offset + time_resolution * timeseries_id ) return observation_time def _sensor_response_to_df( self, response_data, sensor, final_columns, resample_duration=None ): """ Process the response from the API into a dataframe for 1 sensor Args: response_data: list of entries from the API sensor: single variable object final_columns: expected columns resample_duration: if a resample is desired, a duration that can be parsed by pandas Returns Geodataframe of data """ records = [] for obs in response_data: ref_time = obs["referenceTime"] # filter to the relevant responses relevant_obs = [ o for o in obs["observations"] if o["elementId"] == sensor.code ] if len(relevant_obs) == 0: # skip no data continue if len(relevant_obs) > 1: raise RuntimeError("This case is not implemented") else: # this is our winner o = relevant_obs[0] observation_time = self._time_info_to_observation_time( ref_time, o["timeOffset"], o["timeResolution"], o["timeSeriesId"] ) records.append({ "datetime": observation_time, "site": self.id, sensor.name: o["value"], f"{sensor.name}_units": o["unit"], "quality_code": o["qualityCode"] }) # return None for no data if not records: return None # keep the column names final_columns += [ sensor.name, f"{sensor.name}_units", "quality_code" ] # create df sensor_df = pd.DataFrame.from_records(records) frequency = pd.infer_freq(pd.DatetimeIndex(sensor_df["datetime"])) sensor_df = GeoDataFrame( sensor_df, geometry=[self.metadata] * len(sensor_df) ).set_index("datetime") # resample to the desired duration if frequency != resample_duration and resample_duration is not None: sensor_df = resample_whole_df( sensor_df, sensor, interval=resample_duration ) # Overwrite quality code if we resampled it sensor_df["quality_code"] = ["resampled"] * len(sensor_df) sensor_df = GeoDataFrame(sensor_df, geometry=sensor_df["geometry"]) # double check utc conversion sensor_df = sensor_df.tz_convert(self.desired_tzinfo) # set index so joining works sensor_df = sensor_df.filter(final_columns) sensor_df = sensor_df.loc[pd.notna(sensor_df[sensor.name])] return sensor_df def _get_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], desired_duration=None, ): """ Args: start_date: datetime object for start of data collection period end_date: datetime object for end of data collection period variables: List of metloom.variables.SensorDescription object from self.ALLOWED_VARIABLES Returns: GeoDataFrame of data, indexed on datetime, site """ df = None final_columns = ["geometry", "site"] # Get data from the API response_data = self._get_observations( [self.id], start_date, end_date, [v.code for v in variables] ) if response_data: # Parse data for each variable for sensor in variables: sensor_df = self._sensor_response_to_df( response_data, sensor, final_columns, resample_duration=desired_duration ) df = merge_df(df, sensor_df) if df is not None: if len(df.index) > 0: # Set the datasource df["datasource"] = [self.DATASOURCE] * len(df.index) df.reset_index(inplace=True) df.set_index(keys=["datetime", "site"], inplace=True) df.index.set_names(["datetime", "site"], inplace=True) else: df = None self.validate_sensor_df(df) return df
[docs] def get_daily_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], ): """ See docstring for PointData.get_daily_data """ return self._get_data( start_date, end_date, variables, desired_duration="D" )
[docs] def get_hourly_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], ): """ See docstring for PointData.get_hourly_data """ return self._get_data( start_date, end_date, variables, desired_duration="h" )
[docs] def get_event_data( self, start_date: datetime, end_date: datetime, variables: List[SensorDescription], ): """ Get data in original frequency from API """ return self._get_data( start_date, end_date, variables, desired_duration=None )
[docs] @classmethod def points_from_geometry( cls, geometry: gpd.GeoDataFrame, variables: List[SensorDescription], **kwargs ): """ See docstring for PointData.points_from_geometry Args: geometry: GeoDataFrame for shapefile from gpd.read_file variables: List of SensorDescription within_geometry: filter the points to within the shapefile instead of just the extents. Default True buffer: buffer added to search box token_json: Path to the public token for the mesowest api default = "~/.frost_token.json" Returns: PointDataCollection """ kwargs = cls._add_default_kwargs(kwargs) token_json = kwargs['token_json'] projected_geom = geometry.to_crs("EPSG:4326") # buffer the geometry buffer = kwargs["buffer"] projected_geom = gpd.GeoDataFrame( geometry=projected_geom.dissolve().buffer(buffer) ) # Take the outer bounds if we are not within the geometry if not kwargs["within_geometry"]: projected_geom = gpd.GeoDataFrame(geometry=projected_geom.envelope) # Loop over each variable and create a set of points. # _get_sources returns only points that have ALL variables passed # in, so looping over allows us to not exclude any points points_df = pd.DataFrame() for v in variables: source_info = cls._get_sources( token_json=token_json, geometry=projected_geom, elements=[v.code] ) if source_info is not None: df = pd.DataFrame.from_records(source_info) else: df = None # build our final list points_df = append_df( points_df, df ).drop_duplicates(subset=['id']) if len(points_df) > 0: gdf = gpd.GeoDataFrame( points_df, # We don't get elevation back geometry=gpd.points_from_xy( [p['coordinates'][0] for p in points_df["geometry"].values], [p['coordinates'][1] for p in points_df["geometry"].values], ), ) points = [ cls( row[0], row[1], metadata=row[2] ) for row in zip( gdf["id"], gdf["name"], gdf["geometry"], ) ] else: points = [] # return a points iterator return cls.ITERATOR_CLASS(points)