Source code for fluxdataqaqc.qaqc

# -*- coding: utf-8 -*-
""" 
Tools for correcting energy-balance components to improve energy balance
closure and other data management, validation and scientific analysis tools. 
"""

from pathlib import Path

import xarray
import numpy as np
import pandas as pd
from sklearn import linear_model
from sklearn.metrics import mean_squared_error
from refet.calcs import _ra_daily, _rso_simple
import refet 

from .data import Data
from .plot import Plot
from .util import monthly_resample, Convert

[docs] class QaQc(Plot, Convert): """ Numerical routines for correcting daily energy balance closure for eddy covariance data and other data analysis tools. Two routines are provided for improving energy balance closure by adjusting turbulent fluxes, latent energy and sensible heat, the Energy Balance Ratio method (modified from `FLUXNET <https://fluxnet.fluxdata.org/data/fluxnet2015-dataset/data-processing/>`__) and the Bowen Ratio method. The :obj:`QaQc` object also has multiple tools for temporal frequency aggregation and resampling, estimation of climatic and statistical variables (e.g. ET and potential shortwave radiation), downloading gridMET reference ET, managing data and metadata, interactive validation plots, and managing a structure for input and output data files. Input data is expected to be a :obj:`.Data` instance or a :obj:`pandas.DataFrame`. Keyword Arguments: data (:obj:`.Data`): :obj:`.Data` instance to create :obj:`.QaQc` instance. drop_gaps (bool): default :obj:`True`. If :obj:`True` automatically filter variables on days with sub-daily measurement gaps less than ``daily_frac``. daily_frac (float): default 1.00. Fraction of sub-daily data required otherwise the daily value will be filtered out if ``drop_gaps`` is :obj:`True`. E.g. if ``daily_frac = 0.5`` and the input data is hourly, then data on days with less than 12 hours of data will be forced to null within :attr:`QaQc.df`. This is important because systematic diurnal gaps will affect the autmoatic resampling that occurs when creating a :obj:`QaQc` instance and the daily data is used in closure corrections, other calculations, and plots. If sub-daily linear interpolation is applied to energy balance variables the gaps are counted *after* the interpolation. max_interp_hours (None or float): default 2. Length of largest gap to fill with linear interpolation in energy balance variables if input datas temporal frequency is less than daily. This value will be used to fill gaps when :math:`Rn > 0` or :math:`Rn` is missing during each day. max_interp_hours_night (None or float): default 4. Length of largest gap to fill with linear interpolation in energy balance variables if input datas temporal frequency is less than daily when :math:`Rn < 0` within 12:00PM-12:00PM daily intervals. Attributes: agg_dict (dict): Dictionary with internal variable names as keys and method of temporal resampling (e.g. "mean" or "sum") as values. config (:obj:`configparser.ConfigParser`): Config parser instance created from the data within the config.ini file. config_file (:obj:`pathlib.Path`): Absolute path to config.ini file used for initialization of the :obj:`fluxdataqaqc.Data` instance used to create the :obj:`QaQc` instance. corrected (bool): False until an energy balance closure correction has been run by calling :meth:`QaQc.correct_data`. corr_methods (tuple): List of Energy Balance Closure correction routines usable by :meth:`QaQc.correct_data`. corr_meth (str or None): Name of most recently applied energy balance closure correction. elevation (float): Site elevation in meters. gridMET_exists (bool): True if path to matching gridMET time series file exists on disk and has time series for reference ET and precipitation and the dates for these fully overlap with the energy balance variables, i.e. the date index of :attr:`QaQc.df`. gridMET_meta (dict): Dictionary with information for gridMET variables that may be downloaded using :meth:`QaQc.download_gridMET`. inv_map (dict): Dictionary with input climate file names as keys and internal names as values. May only include pairs when they differ. latitude (float): Site latitude in decimal degrees. longitude (float): Site longitude in decimal degrees. out_dir (pathlib.Path): Default directory to save output of :meth:`QaQc.write` or :meth:`QaQc.plot` methods. n_samples_per_day (int): If initial time series temporal frequency is less than 0 then this value will be updated to the number of samples detected per day, useful for post-processing based on the count of sub-daily gaps in energy balance variables, e.g. "LE_subday_gaps". plot_file (pathlib.Path or None): path to plot file once it is created/saved by :meth:`QaQc.plot`. site_id (str): Site ID. temporal_freq (str): Temporal frequency of initial (as found in input climate file) data as determined by :func:`pandas.infer_freq`. units (dict): Dictionary with internal variable names as keys and units as found in config as values. variables (dict): Dictionary with internal variable names as keys and names as found in the input data as values. Note: Upon initialization of a :obj:`QaQc` instance the temporal frequency of the input data checked using :func:`pandas.infer_freq` which does not always correctly parse datetime indices, if it is not able to correctly determine the temporal frequency the time series will be resampled to daily frequency but if it is in fact already at daily frequency the data will be unchanged. In this case the :attr:`QaQc.temporal_freq` will be set to "na". """ # dictionary used for temporally aggregating variables agg_dict = { 'ASCE_ETo': 'sum', 'ASCE_ETr': 'sum', 'energy': 'mean', 'flux': 'mean', 'flux_corr': 'mean', 'br': 'mean', 'ET': 'sum', 'ET_corr': 'sum', 'ET_gap': 'sum', 'ET_fill': 'sum', 'ET_fill_val': 'sum', 'ET_user_corr': 'sum', 'ebr': 'mean', 'ebr_corr': 'mean', 'ebr_user_corr': 'mean', 'ebr_5day_clim': 'mean', 'gridMET_ETr': 'sum', 'gridMET_ETo': 'sum', 'gridMET_prcp': 'sum', 'lw_in': 'mean', 't_avg': 'mean', 't_max': 'mean', 't_min': 'mean', 't_dew': 'mean', 'rso': 'mean', 'sw_pot': 'mean', 'sw_in': 'mean', 'vp': 'mean', 'vpd': 'mean', 'ppt': 'sum', 'ppt_corr': 'sum', 'ws': 'mean', 'Rn': 'mean', 'Rn_subday_gaps': 'sum', 'rh' : 'mean', 'sw_out': 'mean', 'lw_out': 'mean', 'G': 'mean', 'G_subday_gaps': 'sum', 'LE': 'mean', 'LE_corr': 'mean', 'LE_subday_gaps': 'sum', 'LE_user_corr': 'mean', 'H': 'mean', 'H_corr': 'mean', 'H_subday_gaps': 'sum', 'H_user_corr': 'mean', } # EBR correction methods available corr_methods = ( 'ebr', 'br', 'lin_regress' ) # gridMET dict, keys are names which can be passed to download_gridMET gridMET_meta = { 'ETr': { 'nc_suffix': 'agg_met_etr_1979_CurrentYear_CONUS.nc#fillmismatch', 'name': 'daily_mean_reference_evapotranspiration_alfalfa', 'rename': 'gridMET_ETr', 'units': 'mm' }, 'pr': { 'nc_suffix': 'agg_met_pr_1979_CurrentYear_CONUS.nc#fillmismatch', 'name': 'precipitation_amount', 'rename': 'gridMET_prcp', 'units': 'mm' }, 'pet': { 'nc_suffix': 'agg_met_pet_1979_CurrentYear_CONUS.nc#fillmismatch', 'name': 'daily_mean_reference_evapotranspiration_grass', 'rename': 'gridMET_ETo', 'units': 'mm' }, 'sph': { 'nc_suffix': 'agg_met_sph_1979_CurrentYear_CONUS.nc#fillmismatch', 'name': 'daily_mean_specific_humidity', 'rename': 'gridMET_q', 'units': 'kg/kg' }, 'srad': { 'nc_suffix': 'agg_met_srad_1979_CurrentYear_CONUS.nc#fillmismatch', 'name': 'daily_mean_shortwave_radiation_at_surface', 'rename': 'gridMET_srad', 'units': 'w/m2' }, 'vs': { 'nc_suffix': 'agg_met_vs_1979_CurrentYear_CONUS.nc#fillmismatch', 'name': 'daily_mean_wind_speed', 'rename': 'gridMET_u10', 'units': 'm/s' }, 'tmmx': { 'nc_suffix': 'agg_met_tmmx_1979_CurrentYear_CONUS.nc#fillmismatch', 'name': 'daily_maximum_temperature', 'rename': 'gridMET_tmax', 'units': 'K' }, 'tmmn': { 'nc_suffix': 'agg_met_tmmn_1979_CurrentYear_CONUS.nc#fillmismatch', 'name': 'daily_minimum_temperature', 'rename': 'gridMET_tmin', 'units': 'K' }, } # all potentially calculated variables for energy balance corrections _eb_calc_vars = ( 'br', 'br_user_corr', 'energy', 'energy_corr', 'ebr', 'ebr_corr', 'ebr_user_corr', 'ebc_cf', 'ebr_5day_clim', 'flux', 'flux_corr', 'flux_user_corr', 'G_corr', 'H_corr', 'LE_corr', 'Rn_corr' ) # potentially calculated variables for ET _et_calc_vars = ( 'ET', 'ET_corr', 'ET_user_corr' ) # potentially calculated ET gap fill variables _et_gap_fill_vars = ( 'ET_gap', 'ET_fill', 'ET_fill_val', 'ETrF', 'ETrF_filtered', 'EToF', 'EToF_filtered' ) def __init__(self, data=None, drop_gaps=True, daily_frac=1.00, max_interp_hours=2, max_interp_hours_night=4): if isinstance(data, Data): self.config_file = data.config_file self.config = data.config data.df.head();# need to access to potentially calc vp/vpd self._df = data.df self.variables = data.variables self.units = data.units self.elevation = data.elevation self.latitude = data.latitude self.longitude = data.longitude self.out_dir = data.out_dir self.site_id = data.site_id # flip variable naming dict for internal use self.inv_map = { v: k for k, v in self.variables.items() if ( not k in self._df.columns ) } # using 'G' in multiple g plot may overwrite G name internally if not 'G' in self.inv_map.values(): user_G_name = self.variables.get('G') if user_G_name: self.inv_map[user_G_name] = 'G' # data will be loaded if it has not yet via Data.df self.temporal_freq = self._check_daily_freq( drop_gaps, daily_frac, max_interp_hours, max_interp_hours_night ) # check units, convert if possible for energy balance, ppt, Rs, vp, self._check_convert_units() self._check_gridMET() # assume energy balance vars exist, will be validated upon corr self._has_eb_vars = True elif data is not None: print('{} is not a valid input type'.format(type(data))) raise TypeError("Must assign a fluxdataqaqc.data.Data object") else: self._df = None self.corrected = False self.corr_meth = None
[docs] def daily_ASCE_refET(self, reference='short', anemometer_height=None): """ Calculate daily ASCE standardized short (ETo) or tall (ETr) reference ET from input data and wind measurement height. The resulting time series will automatically be merged into the :attr:`.Data.df` dataframe named "ASCE_ETo" or "ASCE_ETr" respectively. Keyword Arguments: reference (str): default "short", calculate tall or short ASCE reference ET. anemometer_height (float or None): wind measurement height in meters , default :obj:`None`. If :obj:`None` then look for the "anemometer_height" entry in the **METADATA** section of the config.ini, if not there then print a warning and use 2 meters. Returns: :obj:`None` Note: If the hourly ASCE variables were prior calculated from a :obj:`.Data` instance they will be overwritten as they are saved with the same names. """ df = self.df.rename(columns=self.inv_map) req_vars = ['vp', 'ws', 'sw_in', 't_min', 't_max'] if not set(req_vars).issubset(df.columns): print('Missing one or more required variables, cannot compute') return if anemometer_height is None: anemometer_height = self.config.get( 'METADATA', 'anemometer_height', fallback=None ) if anemometer_height is None: print( 'WARNING: anemometer height was not given and not found in ' 'the config files metadata, proceeding with height of 2 m' ) anemometer_height = 2 # RefET will convert to MJ-m2-hr input_units = { 'rs': 'w/m2' } length = len(df.t_min) tmin = df.t_min tmax = df.t_max rs = df.sw_in ea = df.vp uz = df.ws zw = np.full(length, anemometer_height) lat = np.full(length, self.latitude) doy = df.index.dayofyear elev = np.full(length, self.elevation) REF = refet.Daily( tmin, tmax, ea, rs, uz, zw, elev, lat, doy, method='asce', input_units=input_units, ) if reference == 'short': ret = REF.eto() name = 'ASCE_ETo' elif reference == 'tall': ret = REF.etr() name = 'ASCE_ETr' # can add directly into QaQc.df df[name] = ret self._df = df.rename(columns=self.variables) self.variables[name] = name self.units[name] = 'mm'
def _check_convert_units(self): """ Verify if units are recognized for variables in QaQc.allowable_units, next verify that they have required units as in QaQc.required_units if not convert them. Conversions are handled by util.Convert.convert class method. """ # force all input units to lower case for k, v in self.units.items(): self.units[k] = v.lower() # can add check/rename unit aliases, e.g. C or c or celcius, etc... df = self._df.rename(columns=self.inv_map) for v, u in self.units.items(): if not v in QaQc.required_units.keys(): # variable is not required to have any particular unit, skip continue elif not u in QaQc.allowable_units[v]: print('ERROR: {} units are not recognizable for var: {}\n' 'allowable input units are: {}\nNot converting.'.format( u, v, ','.join(QaQc.allowable_units[v]) ) ) elif not u == QaQc.required_units[v]: # do conversion, update units # pass variable, initial unit, unit to be converted to, df df = Convert.convert(v, u, QaQc.required_units[v], df) self.units[v] = QaQc.required_units[v] self._df = df def _check_gridMET(self): """ Check if gridMET has been downloaded (file path in config), if so also check if dates fully intersect those of station data. If both conditions are not met then update :attr:`gridMET_exists` to False otherwise assign True. Arguments: None Returns: None """ gridfile = self.config.get('METADATA','gridMET_file_path',fallback=None) if gridfile is None: self.gridMET_exists = False else: try: grid_df = pd.read_csv( gridfile, parse_dates=True, index_col='date' ) gridMET_dates = grid_df.index station_dates = self.df.index # add var names and units to attributes for val in grid_df.columns: meta = [ v for k,v in QaQc.gridMET_meta.items() if \ v['rename'] == val ][0] self.variables[meta['rename']] = meta['rename'] self.units[meta['rename']] = meta['units'] # flag False if ETr was not downloaded for our purposes if not {'gridMET_ETr','gridMET_ETo'}.issubset(grid_df.columns): self.gridMET_exists = False elif station_dates.isin(gridMET_dates).all(): self.gridMET_exists = True # some gridMET exists but needs to be updated for coverage else: self.gridMET_exists = False except: print('WARNING: unable to find/read gridMET file\n {}'.format( gridfile) ) self.gridMET_exists = False
[docs] def download_gridMET(self, variables=None): """ Download reference ET (alfalfa and grass) and precipitation from gridMET for all days in flux station time series by default. Also has ability to download other specific gridMET variables by passing a list of gridMET variable names. Possible variables and their long form can be found in :attr:`QaQc.gridMET_meta`. Upon download gridMET time series for the nearest gridMET cell will be merged into the instances dataframe attibute :attr:`QaQc.df` and all gridMET variable names will have the prefix "gridMET\_" for identification. The gridMET time series file will be saved to a subdirectory called "gridMET_data" within the directory that contains the config file for the current :obj:`QaQc` instance and named with the site ID and gridMET cell centroid lat and long coordinates in decimal degrees. Arguments: variables (None, str, list, or tuple): default None. List of gridMET variable names to download, if None download ETr and precipitation. See the keys of the :attr:`QaQc.gridMET_meta` dictionary for a list of all variables that can be downloaded by this method. Returns: :obj:`None` Note: Any previously downloaded gridMET time series will be overwritten when calling the method, however if using the the gap filling method of the "ebr" correction routine the download will not overwrite currently existing data so long as gridMET reference ET and precipitation is on disk and its path is properly set in the config file. """ # opendap thredds server server_prefix =\ 'http://thredds.northwestknowledge.net:8080/thredds/dodsC/' if variables is None: variables = ['ETr', 'pet', 'pr'] elif not isinstance(variables, (str,list,tuple)): print( 'ERROR: {} is not a valid gridMET variable ' 'or list of variable names, valid options:' '\n{}'.format( variables, ', '.join([v for v in QaQc.gridMET_meta]) ) ) return if isinstance(variables, str): variables = list(variables) station_dates = self.df.index grid_dfs = [] for i,v in enumerate(variables): if not v in QaQc.gridMET_meta: print( 'ERROR: {} is not a valid gridMET variable, ' 'valid options: {}'.format( v, ', '.join([v for v in QaQc.gridMET_meta]) ) ) continue meta = QaQc.gridMET_meta[v] self.variables[meta['rename']] = meta['rename'] self.units[meta['rename']] = meta['units'] print('Downloading gridMET var: {}\n'.format(meta['name'])) netcdf = '{}{}'.format(server_prefix, meta['nc_suffix']) ds = xarray.open_dataset(netcdf).sel( lon=self.longitude, lat=self.latitude, method='nearest' ).drop('crs') df = ds.to_dataframe().loc[station_dates].rename( columns={meta['name']:meta['rename']} ) df.index.name = 'date' # ensure date col name is 'date' # on first variable (if multiple) grab gridcell centroid coords if i == 0: lat_centroid = df.lat[0] lon_centroid = df.lon[0] df.drop(['lat', 'lon'], axis=1, inplace=True) grid_dfs.append(df) # combine data df = pd.concat(grid_dfs, axis=1) # save gridMET time series to CSV in subdirectory where config file is gridMET_file = self.config_file.parent.joinpath( 'gridMET_data' ).joinpath('{}_{:.4f}N_{:.4f}W.csv'.format( self.site_id, lat_centroid, lon_centroid ) ) gridMET_file.parent.mkdir(parents=True, exist_ok=True) self.config.set( 'METADATA', 'gridMET_file_path', value=str(gridMET_file) ) df.to_csv(gridMET_file) # rewrite config with updated gridMET file path with open(str(self.config_file), 'w') as outf: self.config.write(outf) # drop previously calced vars for replacement, no join duplicates self._df = _drop_cols(self._df, variables) self._df = self._df.join(df) self.gridMET_exists = True
def _check_daily_freq(self, drop_gaps, daily_frac, max_interp_hours, max_interp_hours_night): """ Check temporal frequency of input Data, resample to daily if not already Note: If one or more sub-dauly values are missing for a day the entire day will be replaced with a null (:obj:`numpy.nan`). If user QC values for filtering data are present they will also be resampled to daily means, however this should not be an issue as the filtering step occurs in a :obj:`fluxdataqaqc.Data` object. """ # rename columns to internal names df = self._df.rename(columns=self.inv_map) if not isinstance(df, pd.DataFrame): return freq = pd.infer_freq(df.index) second_day = df.index.date[2] third_day = second_day + pd.Timedelta(1, unit='D') # pd.infer_freq does not always work and may return None if freq and freq > 'D': pass elif freq and freq < 'D': print('\nThe input data temporal frequency appears to be less than', 'daily.') # slice is transposed if only one date entry elif len(df.loc[str(third_day)].index) == len(df.columns) and \ (df.loc[str(third_day)].index == df.columns).all(): print('\nInput temporal frequency is already daily.') freq = 'D' # add missing dates (if exist) for full time series records/plots idx = pd.date_range(df.index.min(), df.index.max()) df = df.reindex(idx) df.index.name = 'date' elif freq is None: freq = 'na' if not freq == 'D': # find frequency manually, optionally drop days with subdaily gaps # see if two adj. dates exist, skip first day in case it is not full max_times_in_day = len(df.loc[str(third_day)].index) self.n_samples_per_day = max_times_in_day downsample = False if daily_frac > 1: print('ERROR: daily_frac must be between 0 and 1, using 1') daily_frac = 1 elif daily_frac < 0: print('ERROR: daily_frac must be between 0 and 1, using 0') daily_frac = 0 if not str(third_day) in df.index and drop_gaps: print('WARNING: it looks like the input temporal frequency', 'is greater than daily, downsampling, proceed with' , 'caution!\n') downsample = True energy_bal_vars = ['LE', 'H', 'Rn', 'G'] asce_std_interp_vars = ['t_avg','sw_in','ws','vp'] # for interpolation of energy balance variables if they exist energy_bal_vars = list( set(energy_bal_vars).intersection(df.columns) ) asce_std_interp_vars = list( set(asce_std_interp_vars).intersection(df.columns) ) interp_vars = asce_std_interp_vars + energy_bal_vars # add subday gap col for energy balance comps to sum daily gap count for v in energy_bal_vars: df['{}_subday_gaps'.format(v)] = False df.loc[df[v].isna(), '{}_subday_gaps'.format(v)] = True print('Data is being resampled to daily temporal frequency.') sum_cols = [k for k,v in self.agg_dict.items() if v == 'sum'] sum_cols = list(set(sum_cols).intersection(df.columns)) mean_cols = list(set(df.columns) - set(sum_cols)) means = df.loc[:,mean_cols].apply( pd.to_numeric, errors='coerce').resample('D').mean().copy() # issue with resample sum of nans, need to drop first else 0 sums = df.loc[:,sum_cols].dropna().apply( pd.to_numeric, errors='coerce').resample('D').sum() if max_interp_hours is not None: # linearly interpolate energy balance components only max_gap = int(round(max_interp_hours/24 * max_times_in_day)) max_night_gap = int( round(max_interp_hours_night/24 * max_times_in_day) ) print( 'Linearly interpolating gaps in energy balance components ' 'up to {} hours when Rn < 0 and up to {} hours when ' 'Rn >= 0.'.format(max_interp_hours_night, max_interp_hours) ) tmp = df[interp_vars].apply( pd.to_numeric, errors='coerce' ) if 'Rn' in energy_bal_vars: grped_night = tmp.loc[ (tmp.Rn < 0) & (tmp.Rn.notna()) ].copy() grped_night.drop_duplicates(inplace=True) grped_night = grped_night.groupby( pd.Grouper(freq='24H', offset='12H'), group_keys=True).apply( lambda x: x.interpolate( method='linear', limit=max_night_gap, limit_direction='both', limit_area='inside' ) ) grped_day = tmp.loc[(tmp.Rn >= 0) | (tmp.Rn.isna())].copy() grped_day.drop_duplicates(inplace=True) grped_day = grped_day.groupby( pd.Grouper(freq='24H'), group_keys=True).apply( lambda x: x.interpolate( method='linear', limit=max_gap, limit_direction='both', limit_area='inside' ) ) else: grped_night = tmp.copy() grped_night.drop_duplicates(inplace=True) grped_night = grped_night.groupby( pd.Grouper(freq='24H', offset='12H'), group_keys=True).apply( lambda x: x.interpolate( method='linear', limit=max_night_gap, limit_direction='both', limit_area='inside' ) ) grped_day = tmp.copy() grped_day.drop_duplicates(inplace=True) grped_day = grped_day.groupby( pd.Grouper(freq='24H'), group_keys=True).apply( lambda x: x.interpolate( method='linear', limit=max_gap, limit_direction='both', limit_area='inside' ) ) # get full datetime index from grouper operation if type(grped_night.index) is pd.MultiIndex: grped_night.index = grped_night.index.get_level_values(1) if type(grped_day.index) is pd.MultiIndex: grped_day.index = grped_day.index.get_level_values(1) interped = pd.concat([grped_day, grped_night]) if interped.index.duplicated().any(): interped = interped.loc[ ~interped.index.duplicated(keep='first') ] # overwrite non-interpolated daily means means[interp_vars] =\ interped[interp_vars].resample('D').mean().copy() if 't_avg' in interp_vars: means['t_min'] = interped.t_avg.resample('D').min() means['t_max'] = interped.t_avg.resample('D').max() self.variables['t_min'] = 't_min' self.units['t_min'] = self.units['t_avg'] self.variables['t_max'] = 't_max' self.units['t_max'] = self.units['t_avg'] interp_vars = interp_vars + ['t_min','t_max'] interped[['t_min','t_max']] = means[['t_min','t_max']] if drop_gaps: # make sure round errors do not affect this value n_vals_needed = int(round(max_times_in_day * daily_frac)) # don't overwrite QC flag columns data_cols = [ c for c in df.columns if not c.endswith('_qc_flag') ] # interpolate first before counting gaps if max_interp_hours: df[interp_vars] = interped[interp_vars].copy() days_with_gaps = df[data_cols].groupby( df.index.date).count() < n_vals_needed df = means.join(sums) # drop days based on fraction of missing subday samples if not downsample and drop_gaps: print( 'Filtering days with less then {}% or {}/{} sub-daily ' 'measurements'.format( daily_frac * 100, n_vals_needed, max_times_in_day ) ) df[days_with_gaps] = np.nan else: self.n_samples_per_day = 1 self._df = df.rename(columns=self.variables) return freq @property def df(self): """ See :attr:`fluxdataqaqc.Data.df` as the only difference is that the :attr:`QaQc.df` is first resampled to daily frequency. """ # avoid overwriting pre-assigned data if isinstance(self._df, pd.DataFrame): return self._df.rename(columns=self.variables) @df.setter def df(self, data_frame): if not isinstance(data_frame, pd.DataFrame): raise TypeError("Must assign a Pandas.DataFrame object") self._df = data_frame @property def monthly_df(self): """ Temporally resample time series data to monthly frequency based on monthly means or sums based on :attr:`QaQc.agg_dict`, provides data as :obj:`pandas.DataFrame`. Note that monthly means or sums are forced to null values if less than 20 percent of a months days are missing in the daily data (:attr:`QaQc.df`). Also, for variables that are summed (e.g. ET or precipitation) missing days (if less than 20 percent of the month) will be filled with the month's daily mean value before summation. If a :obj:`QaQc` instance has not yet run an energy balance correction i.e. :attr:`QaQc.corrected` = :obj:`False` before accessing :attr:`monthly_df` then the default routine of data correction (energy balance ratio method) will be conducted. Utilize the :attr:`QaQc.monthly_df` property the same way as the :attr:`fluxdataqaqc.Data.df`, see it's API documentation for examples. Tip: If you have additional variables in :attr:`QaQc.df` or would like to change the aggregation method for the monthly time series, adjust the instance attribute :attr:`QaQc.agg_dict` before accessing the :attr:`QaQc.monthly_df`. """ if not self.corrected and self._has_eb_vars: self.correct_data() # rename columns to internal names df = self._df.rename(columns=self.inv_map).copy() # avoid including string QC flags because of float forcing on resample numeric_cols = [c for c in df.columns if not '_qc_flag' in c] sum_cols = [k for k,v in self.agg_dict.items() if v == 'sum'] # to avoid warning/error of missing columns sum_cols = list(set(sum_cols).intersection(df.columns)) mean_cols = list(set(numeric_cols) - set(sum_cols)) # if data type has changed to 'obj' resample skips... # make sure data exists if len(mean_cols) >= 1: means = monthly_resample(df[mean_cols], mean_cols, 'mean', 0.8) else: means = None if len(sum_cols) >= 1: sums = monthly_resample(df[sum_cols], sum_cols, 'sum', 0.8) else: sums = None if isinstance(means, pd.DataFrame) and isinstance(sums, pd.DataFrame): df = means.join(sums) elif isinstance(means, pd.DataFrame): df = means elif isinstance(sums, pd.DataFrame): df = sums # use monthly sums for ebr columns not means of ratio if set(['LE','H','Rn','G']).issubset(df.columns): df.ebr = (df.H + df.LE) / (df.Rn - df.G) if set(['LE_corr','H_corr','Rn','G']).issubset(df.columns): df.ebr_corr = (df.H_corr + df.LE_corr) / (df.Rn - df.G) if set(['LE_user_corr','H_user_corr','Rn','G']).issubset(df.columns): df['ebr_user_corr']=(df.H_user_corr+df.LE_user_corr) / (df.Rn-df.G) df.replace([np.inf, -np.inf], np.nan, inplace=True) return df.rename(columns=self.variables)
[docs] def write(self, out_dir=None, use_input_names=False): """ Save daily and monthly time series of initial and "corrected" data in CSV format. Note, if the energy balance closure correction (:attr:`QaQc.correct_data`) has not been run, this method will run it with default options before saving time series files to disk. The default location for saving output time series files is within an "output" subdifrectory of the parent directory containing the config.ini file that was used to create the :obj:`fluxdataqaqc.Data` and :obj:`QaQc` objects, the names of the files will start with the site_id and have either the "daily_data" or "monthly_data" suffix. Keyword Arguments: out_dir (str or :obj:`None`): default :obj:`None`. Directory to save CSVs, if :obj:`None` save to :attr:`out_dir` instance variable (typically "output" directory where config.ini file exists). use_input_names (bool): default :obj:`False`. If :obj:`False` use ``flux-data-qaqc`` variable names as in output file header, or if :obj:`True` use the user's input variable names where possible (for variables that were read in and not modified or calculated by ``flux-data-qaqc``). Returns: :obj:`None` Example: Starting from a config.ini file, >>> from fluxdataqaqc import Data, QaQc >>> d = Data('path/to/config.ini') >>> q = QaQc(d) >>> # note no energy balance closure correction has been run >>> q.corrected False >>> q.write() >>> q.corrected True Note: To save data created by multiple correction routines, be sure to run the correction and then save to different output directories otherwise output files will be overwritten with the most recently used correction option. """ if out_dir is None: out_dir = self.out_dir else: out_dir = Path(out_dir) self.out_dir = out_dir.absolute() if not out_dir.is_dir(): print( '{} does not exist, creating directory'.format( out_dir.absolute() ) ) out_dir.mkdir(parents=True, exist_ok=True) if not self.corrected and self._has_eb_vars: print( 'WARNING: energy balance closure corrections have not yet been' 'run. Using default options before writing output time series.' ) self.correct_data() daily_outf = out_dir / '{}_daily_data.csv'.format(self.site_id) monthly_outf = out_dir / '{}_monthly_data.csv'.format(self.site_id) if use_input_names: self.df.to_csv(daily_outf) self.monthly_df.to_csv(monthly_outf) else: self.df.rename(columns=self.inv_map).to_csv(daily_outf) self.monthly_df.rename(columns=self.inv_map).to_csv(monthly_outf)
[docs] @classmethod def from_dataframe(cls, df, site_id, elev_m, lat_dec_deg, var_dict, drop_gaps=True, daily_frac=1.00, max_interp_hours=2, max_interp_hours_night=4): """ Create a :obj:`QaQc` object from a :obj:`pandas.DataFrame` object. Arguments: df (:obj:`pandas.DataFrame`): DataFrame of climate variables with datetime index named 'date' site_id (str): site identifier such as station name elev_m (int or float): elevation of site in meters lat_dec_deg (float): latitude of site in decimal degrees var_dict (dict): dictionary that maps `flux-data-qaqc` variable names to user's columns in `df` e.g. {'Rn': 'netrad', ...} see :attr:`fluxdataqaqc.Data.variable_names_dict` for list of `flux-data-qaqc` variable names Returns: None Note: When using this method, any output files (CSVs, plots) will be saved to a directory named "output" in the current working directory. """ qaqc = cls() # use property setter, make sure it is a dataframe object qaqc.df = df qaqc.site_id = site_id qaqc.latitude = lat_dec_deg qaqc.elevation = elev_m qaqc.out_dir = Path('output').absolute() qaqc.variables = var_dict # TODO handle assigned units qaqc.inv_map = {v: k for k, v in var_dict.items()} qaqc.temporal_freq = qaqc._check_daily_freq( drop_gaps, daily_frac, max_interp_hours, max_interp_hours_night ) qaqc.corrected = False qaqc.corr_meth = None qaqc._has_eb_vars = True return qaqc
[docs] def correct_data(self, meth='ebr', et_gap_fill=True, y='Rn', refET='ETr', x=['G','LE','H'], fit_intercept=False): """ Correct turblent fluxes to improve energy balance closure using an Energy Balance Ratio method modified from `FLUXNET <https://fluxnet.fluxdata.org/data/fluxnet2015-dataset/data-processing/>`__. Currently three correction options are available: 'ebr' (Energy Balance Ratio), 'br' (Bowen Ratio), and 'lin_regress' (least squares linear regression). If you use one method followed by another corrected,the corrected versions of LE, H, ET, ebr, etc. will be overwritten with the most recently used approach. This method also computes potential clear sky radiation (saved as "rso") using an ASCE approach based on station elevation and latitude. ET is calculated from raw and corrected LE using daily air temperature to correct the latent heat of vaporization, if air temp. is not available in the input data then air temp. is assumed at 20 degrees celcius. Corrected or otherwise newly calculated variables are named using the following suffixes to distinguish them: .. code-block:: text uncorrected LE, H, etc. from input data have no suffix _corr uses adjusted LE, H, etc. from the correction method used _user_corr uses corrected LE, H, etc. found in data file (if provided) Arguments: y (str): name of dependent variable for regression, must be in :attr:`QaQc.variables` keys, or a user-added variable. Only used if ``meth='lin_regress'``. x (str or list): name or list of independent variables for regression, names must be in :attr:`QaQc.variables` keys, or a user-added variable. Only used if ``meth='lin_regress'``. Keyword Arguments: meth (str): default 'ebr'. Method to correct energy balance. et_gap_fill (bool): default True. If true fill any remaining gaps in corrected ET with ETr * ETrF, where ETr is alfalfa reference ET from gridMET and ETrF is the filtered, smoothed (7 day moving avg. min 2 days) and linearly interpolated crop coefficient. The number of days in each month that corrected ET are filled will is provided in :attr:`QaQc.monthly_df` as the column "ET_gap". refET (str): default "ETr". Which gridMET reference product to use for ET gap filling, "ETr" or "ETo" are valid options. fit_intercept (bool): default False. Fit intercept for regression or set to zero if False. Only used if ``meth='lin_regress'``. apply_coefs (bool): default False. If :obj:`True` then apply fitted coefficients to their respective variables for linear regression correction method, rename the variables with the suffix "_corr". Returns: :obj:`None` Example: Starting from a correctly formatted config.ini and climate time series file, this example shows how to read in the data and apply the energy balance ratio correction without gap-filling with gridMET ETr x ETrF. >>> from fluxdataqaqc import Data, QaQc >>> d = Data('path/to/config.ini') >>> q = QaQc(d) >>> q.corrected False Now apply the energy balance closure correction >>> q.correct_data(meth='ebr', et_gap_fill=False) >>> q.corrected True Note: If ``et_gap_fill`` is set to :obj:`True` (default) the gap filled days of corrected ET will be used to recalculate LE_corr for those days with the gap filled values, i.e. LE_corr will also be gap-filled. Note: The *ebr_corr* variable or energy balance closure ratio is calculated from the corrected versions of LE and H independent of the method. When using the 'ebr' method the energy balance correction factor (what is applied to the raw H and LE) is left as calculated (inverse of ebr) and saved as *ebc_cf*. See Also: For explanation of the linear regression method see the :meth:`QaQc.lin_regress` method, calling that method with the keyword argument ``apply_coefs=True`` and :math:`Rn` as the y variable and the other energy balance components as the x variables will give the same result as the default inputs to this function when ``meth='lin_regress``. """ # in case starting in Python and no df assigned yet if not isinstance(self._df, pd.DataFrame): print('Please assign a dataframe of acceptable data first!') return if meth not in self.corr_methods: err_msg = ('ERROR: {} is not a valid correction option, please ' 'use one of the following: {}'.format(meth, ', '.join( [el for el in self.corr_methods])) ) raise ValueError(err_msg) # calculate clear sky radiation if not already computed self._calc_rso() # energy balance corrections eb_vars = {'Rn','LE','H','G'} if not eb_vars.issubset(self.variables.keys()) or\ not eb_vars.issubset( self.df.rename(columns=self.inv_map).columns): print( '\nMissing one or more energy balance variables, cannot perform' ' energy balance correction.' ) self._has_eb_vars = False # calculate raw (from input LE) ET self._calc_et() # fill gaps of raw ET with ET from smoothed and gap filled ETrF*ETr if et_gap_fill: self._ET_gap_fill(et_name='ET', refET=refET) return if meth == 'ebr': self._ebr_correction() elif meth == 'br': self._bowen_ratio_correction() elif meth == 'lin_regress': if y not in eb_vars or len(set(x).difference(eb_vars)) > 0: print( 'WARNING: correcting energy balance variables using ' 'depedendent or independent variables that are not ' 'energy balance components will cause undesired results!' '\nIt is recommended to use only Rn, G, LE, and H for ' 'dependent (y) and independent (x) variables.' ) self.lin_regress( y=y, x=x, fit_intercept=fit_intercept, apply_coefs=True ) self.corr_meth = meth # calculate raw, corrected ET self._calc_et() # fill gaps of corr ET with ET from smoothed and gap filled ETrF*ETr if et_gap_fill: self._ET_gap_fill(et_name='ET_corr', refET=refET) # update inv map for naming self.inv_map = { v: k for k, v in self.variables.items() if ( not v.replace('_mean', '') == k or not k in self.df.columns) } # using 'G' in multiple g plot may overwrite G name internally if not 'G' in self.inv_map.values(): user_G_name = self.variables.get('G') self.inv_map[user_G_name] = 'G'
def _calc_vpd_from_vp(self): """ Based on ASCE standardized ref et eqn. 37, air temperature must be in celcius and actual vapor pressure in kPa. Can also calculate VP from VPD and air temperature, es and rel. humidity. """ df = self.df.rename(columns=self.inv_map) # calculate vpd from actual vapor pressure and temp # check if needed variables exist and units are correct has_vpd_vars = set(['vp','t_avg']).issubset(df.columns) units_correct = ( self.units.get('vp') == 'kpa' and self.units.get('t_avg') == 'c' ) if has_vpd_vars and units_correct: # saturation vapor pressure (es) es = 0.6108 * np.exp(17.27 * df.t_avg / (df.t_avg + 237.3)) df['vpd'] = es - df.vp df['es'] = es self.variables['vpd'] = 'vpd' self.units['vpd'] = 'kpa' self.variables['es'] = 'es' self.units['es'] = 'kpa' # same calc actual vapor pressure from vapor pressure deficit and temp has_vp_vars = set(['vpd','t_avg']).issubset(df.columns) units_correct = ( self.units.get('vpd') == 'kpa' and self.units.get('t_avg') == 'c' ) if has_vp_vars and units_correct: # saturation vapor pressure (es) es = 0.6108 * np.exp(17.27 * df.t_avg / (df.t_avg + 237.3)) df['vp'] = es - df.vpd df['es'] = es self.variables['vp'] = 'vp' self.units['vp'] = 'kpa' self.variables['es'] = 'es' self.units['es'] = 'kpa' if not 'rh' in self.variables and {'vp','es'}.issubset(self.variables): if not self.units.get('vp') == 'kpa': pass else: print( 'Calculating relative humidity from actual and saturation ' 'vapor pressure and air temperature' ) df['rh'] = 100 * (df.vp / df.es) self.variables['rh'] = 'rh' self.units['rh'] = '%' self._df = df def _ET_gap_fill(self, et_name='ET_corr', refET='ETr'): """ Use gridMET reference ET to calculate ET from ETrF/EToF, smooth and gap fill calced ET and then use to fill gaps in corrected ET. Keeps tabs on number of days in ET that were filled in each month. Keyword Arguments: et_name (str): default "ET_corr". Name of ET variable to use when calculating the crop coefficient and to fill gaps with calculated ET. Returns: :obj:`None` """ # drop relavant calculated variables if they exist self._df = _drop_cols(self._df, self._et_gap_fill_vars) # get ETr if not on disk if not self.gridMET_exists: self.download_gridMET() else: # gridMET file has been verified and has all needed dates, just load gridfile = self.config.get( 'METADATA','gridMET_file_path',fallback=None ) print( 'gridMET reference ET already downloaded for station at:\n' '{}\nnot redownloading.'.format(gridfile) ) grid_df = pd.read_csv(gridfile, parse_dates=True, index_col='date') # drop previously calced vars for replacement, no join duplicates self._df = _drop_cols(self._df, list(grid_df.columns)) self._df = self._df.join(grid_df) for c in grid_df.columns: self.variables[c] = c df = self.df.rename(columns=self.inv_map) if not et_name in df.columns: print( 'ERROR: {} not found in data, cannot gap-fill'.format(et_name) ) return # calc ETrF 7 day moving average, min vals in window = 2, linear interp print( f'Gap filling {et_name} with filtered {refET}F x {refET} (gridMET)' ) if refET == 'ETr': df['ETrF'] = df[et_name].astype(float)/df.gridMET_ETr.astype(float) df['ETrF_filtered'] = df['ETrF'] # filter out extremes of ETrF Q1 = df['ETrF_filtered'].quantile(0.25) Q3 = df['ETrF_filtered'].quantile(0.75) IQR = Q3 - Q1 to_filter = df.query( 'ETrF_filtered<(@Q1-1.5*@IQR) or ETrF_filtered>(@Q3+1.5*@IQR)' ) df.loc[to_filter.index, 'ETrF_filtered'] = np.nan df['ETrF_filtered'] = df.ETrF_filtered.rolling( 7, min_periods=2, center=True ).mean() df.ETrF_filtered = df.ETrF_filtered.interpolate(method='linear') # calc ET from ETrF_filtered and ETr df['ET_fill'] = df.gridMET_ETr * df.ETrF_filtered # flag days in corrected ET that are missing and sum for each month df['ET_gap'] = False df.loc[(df[et_name].isna() & df.ET_fill.notna()), 'ET_gap'] = True df.loc[df.ET_gap, et_name] = df.loc[df.ET_gap, 'ET_fill'] self.units['ETrF'] = 'unitless' self.units['ETrF_filtered'] = 'unitless' elif refET == 'ETo': df['EToF'] = df[et_name].astype(float)/df.gridMET_ETo.astype(float) df['EToF_filtered'] = df['EToF'] # filter out extremes of EToF Q1 = df['EToF_filtered'].quantile(0.25) Q3 = df['EToF_filtered'].quantile(0.75) IQR = Q3 - Q1 to_filter = df.query( 'EToF_filtered<(@Q1-1.5*@IQR) or EToF_filtered>(@Q3+1.5*@IQR)' ) df.loc[to_filter.index, 'EToF_filtered'] = np.nan df['EToF_filtered'] = df.EToF_filtered.rolling( 7, min_periods=2, center=True ).mean() df.EToF_filtered = df.EToF_filtered.interpolate(method='linear') # calc ET from EToF_filtered and ETo df['ET_fill'] = df.gridMET_ETo * df.EToF_filtered # flag days in corrected ET that are missing and sum for each month df['ET_gap'] = False df.loc[(df[et_name].isna() & df.ET_fill.notna()), 'ET_gap'] = True df.loc[df.ET_gap, et_name] = df.loc[df.ET_gap, 'ET_fill'] self.units['EToF'] = 'unitless' self.units['EToF_filtered'] = 'unitless' # backcalculate LE_corr and flux_corr with gap filled et_corr if et_name == 'ET_corr' and 't_avg' in self.variables: df['LE_corr'] =\ (df.ET_corr * (2501000 - 2361 * df.t_avg.fillna(20))) / 86400 # assume 20 degrees C elif et_name == 'ET_corr' and not 't_avg' in self.variables: df['LE_corr'] =\ (df.ET_corr * (2501000 - 2361 * 20)) / 86400 # et fill values only where they were used to gap fill, for plotting df['ET_fill_val'] = np.nan df.loc[df.ET_gap , 'ET_fill_val'] = df.ET_fill # update variables attribute with new variables (may vary) new_cols = set(df.columns) - set(self.variables) for el in new_cols: self.variables[el] = el self.units['ET_fill'] = 'mm' self.units['ET_fill_val'] = 'mm' self.units['ET_gap'] = 'unitless' self._df = df def _calc_et(self): """ Calculate daily ET (mm) from raw and corrected LE (w/m2)), if air temperature is available use to correct latent heat of vaporization. Currently computes on :attr:`df` dataframe attribute in place. Can be called before or after energy balance closure corrections, if before the only raw ET will be calculated. Arguments: None Returns: :obj:`None` """ # drop relavant calculated variables if they exist self._df = _drop_cols(self._df, self._et_calc_vars) df = self.df.rename(columns=self.inv_map) if not set(['LE','LE_corr','LE_user_corr']).intersection(df.columns): print( 'ERROR: no LE variables found in data, cannot calculate ET' ) return # LH from L.P. Harrison (1963) if 't_avg' in df.columns: df['ET'] = 86400 * (df.LE/(2501000 - (2361 * df.t_avg.fillna(20)))) if 'LE_corr' in df.columns: df['ET_corr']=\ 86400 * (df.LE_corr/(2501000 - (2361*df.t_avg.fillna(20)))) if 'LE_user_corr' in df.columns: df['ET_user_corr'] =\ 86400*(df.LE_user_corr/(2501000-(2361*df.t_avg.fillna(20)))) # otherwise assume air temp = 20 degrees C else: df['ET'] = 86400 * (df.LE/(2501000 - (2361 * 20))) if 'LE_corr' in df.columns: df['ET_corr'] = 86400 * (df.LE_corr/(2501000 - (2361 * 20))) if 'LE_user_corr' in df.columns: df['ET_user_corr']=86400*(df.LE_user_corr/(2501000-(2361*20))) # update variables attribute with new variables (may vary) new_cols = set(df.columns) - set(self.variables) for el in new_cols: self.variables[el] = el self.units[el] = 'mm' # join data back into df attr self._df = df.rename(columns=self.variables) def _calc_rso(self): """ Calculate clear sky potential solar radiation using station latitude, elevation, and day of year using simple method from :mod:`refet`. Arguments: None Returns: :obj:`None` Note: Does not overwrite existing calculation for Rso, i.e. once any correction method has been run using :func:`correct_data` it will not be recalculated after subsequent calls since it is indepent of climate variables. """ # avoid recalculating if 'rso' in self.df.columns: return doy = self.df.index.dayofyear # obtain extraterrestrial radiation from doy and latitude and calculate # clear sky radiation latitude_rads = self.latitude * (np.pi / 180) ra_mj_m2 = _ra_daily(latitude_rads, doy, method='asce') rso_a_mj_m2 = _rso_simple(ra_mj_m2, self.elevation) self._df['rso'] = rso_a_mj_m2 * 11.574 self.variables.update( rso = 'rso' ) self.units['rso'] = 'w/m2'
[docs] def lin_regress(self, y, x, fit_intercept=False, apply_coefs=False): """ Least squares linear regression on single or multiple independent variables. For example, if the dependent variable (``y``) is :math:`Rn` and the independent variables (``x``) are :math:`LE` and :math:`H`, then the linear regression will solve for the best fit coefficients of :math:`Rn = c_0 + c_1 LE + c_2 H`. Any number of variables in the :attr:`QaQc.variables` can be used for ``x`` and one for ``y``. If the variables chosen for regression are part of the energy balance components, i.e. :math:`Rn, G, LE, H` and ``apply_coefs=True``, then the best fit coefficients will be applied to their respecitive variables with consideration of the energy balance equation, i.e. the signs of the coefficients will be corrected according to :math:`Rn - G = LE + H`, for example if ``y=H`` and ``x=['Rn','G','LE]``. i.e. solving :math:`H = c_0 + c_1 Rn + c_2 G + c_3 LE` then the coefficients :math:`c_2` and :math:`c_3` will be multiplied by -1 before applying them to correct :math:`G` and :math:`LE` according to the energy balance equation. This method returns an :obj:`pandas.DataFrame` object containing results of the linear regression including the coefficient values, number of data pairs used in the, the root-mean-square-error, and coefficient of determination. This table can also be retrieved from the :attr:`QaQc.lin_regress_results` instance attribute. Arguments: y (str): name of dependent variable for regression, must be in :attr:`QaQc.variables` keys, or a user-added variable. x (str or list): name or list of independent variables for regression, names must be in :attr:`QaQc.variables` keys, or a user-added variable. Keyword Arguments: fit_intercept (bool): default False. Fit intercept for regression or set to zero if False. apply_coefs (bool): default False. If :obj:`True` then apply fitted coefficients to their respective variables, rename the variables with the suffix "_corr". Returns: :obj:`pandas.DataFrame` Example: Let's say we wanted to compute the linear relationship between net radiation to the other energy balance components which may be useful if we have strong confidence in net radiation measurements for example. The resulting coefficients of regression would give us an idea of whether the other components were "under-measured" or "over-measured". Then, starting with a :obj:`.Data` instance: >>> Q = QaQc(Data_instance) >>> Q.lin_regress(y='Rn', x=['G','H','LE'], fit_intercept=True) >>> Q.lin_regress_result This would produce something like the following, ======= ================== ================ ============== =============== ============== =========== =============== ================ SITE_ID Y (dependent var.) c0 (intercept) c1 (coef on G) c2 (coef on LE) c3 (coef on H) RMSE (w/m2) r2 (coef. det.) n (sample count) ======= ================== ================ ============== =============== ============== =========== =============== ================ a_site Rn 6.99350781229883 1.552 1.054 0.943 18.25 0.91 3386 ======= ================== ================ ============== =============== ============== =========== =============== ================ In this case the intercept is telling us that there may be a systematic or constant error in the independent variables and that :math:`G` is "under-measured" at the sensor by over 50 precent, etc. if we assume daily :math:`Rn` is accurate as measured. Tip: You may also use multiple linear regression to correct energy balance components using the :meth:`QaQc.correct_data` method by passing the ``meth='lin_regress'`` keyword argument. """ # drop relavant calculated variables if they exist self._df = _drop_cols(self.df, self._eb_calc_vars) df = self._df.rename(columns=self.inv_map) if not y in df.columns: print( 'ERROR: the dependent variable ({}) was not ' 'found in the dataframe.\nHere are all available variable ' 'names:\n{}\n'.format(y, ', '.join(df.columns)) ) return if not isinstance(x, list) and not x in df.columns: print( 'ERROR: the dependent variable ({}) was not ' 'found in the dataframe.\nHere are all available variable ' 'names:\n{}\n'.format(x, ', '.join(df.columns)) ) return # get n X vars, names if more than 1 n_x = 1 if isinstance(x, list): n_x = len(x) if not set(x).issubset(df.columns): print( 'ERROR: one or more independent variables ({}) were not ' 'found in the dataframe.\nHere are all available variable ' 'names:\n{}'.format(','.join(x), ', '.join(df.columns)) ) return if n_x > 1: cols = x + [y] tmp = df[cols].copy() if n_x == 1: tmp = df[[x,y]].copy() # drop timestamps with any missing variables tmp = tmp.dropna() X = tmp[x] Y = tmp[y] # create model, fit, and predict Y for RMSE/R2 calcs model = linear_model.LinearRegression(fit_intercept=fit_intercept) model.fit(X, Y) pred = model.predict(X) r2 = model.score(X,Y).round(2) rmse = (np.sqrt(mean_squared_error(Y, pred))).round(2) eb_vars = ['LE','H','Rn','G'] if apply_coefs and set(tmp.columns).intersection(eb_vars): # calc initial energy balance if regression applied to EB vars df['ebr'] = (df.H + df.LE) / (df.Rn - df.G) df['flux'] = df.H + df.LE df['energy'] = df.Rn - df.G # dataframe of results in nice/readable format results = pd.Series() results.loc['Y (dependent var.)'] = y results.loc['c0 (intercept)'] = model.intercept_ for i, c in enumerate(X.columns): results.loc['c{} (coef on {})'.format(i+1, c)] =\ model.coef_[i].round(3) if apply_coefs: new_var = '{}_corr'.format(c) # ensure correct sign of coef. if applied to EB vars if y in ['G','H','LE'] and c in ['G','H','LE']: coef = -1 * model.coef_[i] else: coef = model.coef_[i] print( 'Applying correction factor ({}) to ' 'variable: {} (renamed as {})'.format( coef.round(3), c, new_var ) ) df[new_var] = df[c] * coef self.variables[new_var] = new_var self.units[new_var] = self.units.get(c) # already have units results.loc[ 'RMSE ({})'.format(self.units.get(y,'na')) ] = rmse results.loc['r2 (coef. det.)'] = r2 results.loc['n (sample count)'] = len(Y) results = results.to_frame().T results.index= [self.site_id] results.index.name = 'SITE_ID' # add as instance variable self.lin_regress_results = results # depending on independent vars, calc. corrected flux, ebr, energy if apply_coefs and set(X.columns).intersection(eb_vars): corr = pd.DataFrame() for v in eb_vars: cv = '{}_corr'.format(v) if cv in df: corr[v] = df[cv] else: corr[v] = df[v] # not all vars are necessarily different than initial df['ebr_corr'] = (corr.H + corr.LE) / (corr.Rn - corr.G) df['flux_corr'] = corr.H + corr.LE df['energy_corr'] = corr.Rn + corr.G del corr self.variables.update( energy = 'energy', flux = 'flux', flux_corr = 'flux_corr', energy_corr = 'energy_corr', ebr = 'ebr', ebr_corr = 'ebr_corr' ) self.corrected = True self.corr_meth = 'lin_regress' self._df = df.rename(columns=self.variables) return results
def _ebr_correction(self): """ Energy balance ratio correction method for daily LE, H, EBR, and ET. Correct turblent fluxes to close energy balance using methods described by FLUXNET for daily H and LE. Updates :attr:`QaQc.df` and :attr:`QaQc.variables` attributes with new variables related to the corrections, e.g. LE_corr, ebr_corr, etc. Arguments: None Returns: :obj:`None` """ # moving windows FLUXNET methods 1, 2 and 3 window_1 = 15 window_2 = 11 half_win_1 = window_1 // 2 half_win_2 = window_2 // 2 # drop relavant calculated variables if they exist self._df = _drop_cols(self.df, self._eb_calc_vars) df = self._df.rename(columns=self.inv_map) # check if mean G, or heat storage measurements exist vars_to_use = ['LE','H','Rn','G'] for el in ['SH', 'SLE', 'SG']: if el in df.columns: vars_to_use.append(el) df = df[vars_to_use].astype(float).copy() # make copy of original data for later orig_df = df[['LE','H','Rn','G']].astype(float).copy() orig_df['ebr'] = (orig_df.H + orig_df.LE) / (orig_df.Rn - orig_df.G) # compute IQR to filter out extreme ebrs, df['ebr'] = (df.H + df.LE) / (df.Rn - df.G) Q1 = df['ebr'].quantile(0.25) Q3 = df['ebr'].quantile(0.75) IQR = Q3 - Q1 # filter values between Q1-1.5IQR and Q3+1.5IQR filtered = df.query('(@Q1 - 1.5 * @IQR) <= ebr <= (@Q3 + 1.5 * @IQR)') # apply filter filtered_mask = filtered.index removed_mask = set(df.index) - set(filtered_mask) removed_mask = pd.to_datetime(list(removed_mask)) df.loc[removed_mask] = np.nan # FLUXNET methods 1 and 2 for filtering/smoothing ebr ebr = df.ebr.values df['ebr_corr'] = np.nan for i in range(len(ebr)): win_arr1 = ebr[i-half_win_1:i+half_win_1+1] win_arr2 = ebr[i-half_win_2:i+half_win_2+1] count = np.count_nonzero(~np.isnan(win_arr1)) # get median of daily window1 if half window2 or more days exist if count >= half_win_2: val = np.nanpercentile(win_arr1, 50, axis=None) if abs(1/val) >= 2 or abs(1/val) <= 0.5: val = np.nan # if at least one day exists in window2 take mean elif np.count_nonzero(~np.isnan(win_arr2)) > 0: val = np.nanmedian(win_arr2) if abs(1/val) >= 2 or abs(1/val) <= 0.5: val = np.nan else: # assign nan for now, update with 5 day climatology val = np.nan # assign values if they were found in methods 1 or 2 df.iloc[i, df.columns.get_loc('ebr_corr')] = val # make 5 day climatology of ebr for method 3 # the cenetered window skips first and last 5 DOYs # so prepend and append first and last 5 days and loop... doy_ebr_mean=df['ebr_corr'].groupby(df.index.dayofyear).mean().copy() l5days = pd.Series( index=np.arange(-4,1), data=doy_ebr_mean.iloc[-5:].values) f5days = pd.Series( index=np.arange(367,372), data=doy_ebr_mean.iloc[:5].values) #doy_ebr_mean = doy_ebr_mean.append(f5days) doy_ebr_mean = pd.concat([doy_ebr_mean, f5days]) doy_ebr_mean = pd.concat([l5days, doy_ebr_mean]) ebr_5day_clim = pd.DataFrame( index=np.arange(1,367), columns=['ebr_5day_clim']) doy_ebr_mean = doy_ebr_mean.values for i in range(len(doy_ebr_mean)): # i = 0 which starts at prepended 5 days, shift window up win = doy_ebr_mean[i:i+2*half_win_2+1] count = np.count_nonzero(~np.isnan(win)) # get 11 day moving window mean if i in ebr_5day_clim.index and count > 0: ebr_5day_clim.iloc[ i-1, ebr_5day_clim.columns.get_loc('ebr_5day_clim') ] = np.nanmean(win) ebr_5day_clim['DOY'] = ebr_5day_clim.index ebr_5day_clim.index.name = 'date' # fill gaps of 11 or more days in filtered EBR with 5 day clim df['DOY'] = df.index.dayofyear # datetime indices of all remaining null elements null_dates = df.loc[df.ebr_corr.isnull(), 'ebr_corr'].index merged = pd.merge( df, ebr_5day_clim, left_on='DOY', right_index=True ) # assign 5 day climatology of EBR merged.loc[null_dates,'ebr_corr'] =\ merged.loc[null_dates,'ebr_5day_clim'] # replace raw variables with unfiltered dataframe copy merged.LE = orig_df.LE merged.H = orig_df.H merged.Rn = orig_df.Rn merged.G = orig_df.G merged.ebr = orig_df.ebr # calculated corrected EBR to assign to ebr_corr (not EBC_CF), # save CFs as defined by fluxnet method, i.e. inverse of EBR merged['ebc_cf'] = 1/merged.ebr_corr # filter out CF that are >=2 or <=0.5 (absolute) merged.loc[ (abs(merged.ebc_cf) >= 2) | (abs(merged.ebc_cf <= 0.5)), 'ebc_cf' ] = np.nan # apply corrections to LE and H multiply by 1/EBR merged['LE_corr'] = merged.LE * merged.ebc_cf merged['H_corr'] = merged.H * merged.ebc_cf # filter out any corrected LE that <= -100 or >= 850 w/m2 # also removing corrected H, EBR, and EBC_CF merged.loc[ (merged.LE_corr >= 850) | (merged.LE_corr <= -100), ( 'LE_corr', 'H_corr', 'ebr_corr', 'ebc_cf' ) ] = np.nan # compute EBR total turb flux merged['flux_corr'] = merged['LE_corr'] + merged['H_corr'] df = self._df.rename(columns=self.inv_map) # other variables needed for plots using raw data df['flux'] = merged.LE + merged.H df['energy'] = merged.Rn - merged.G # corrected turbulent flux if given from input data if set(['LE_user_corr','H_user_corr']).issubset(df.columns): df['flux_user_corr'] = df.LE_user_corr + df.H_user_corr df['ebr_user_corr']=(df.H_user_corr+df.LE_user_corr)/(df.Rn - df.G) df.ebr_user_corr=df.ebr_user_corr.replace([np.inf,-np.inf], np.nan) self.variables.update( flux_user_corr = 'flux_user_corr', ebr_user_corr = 'ebr_user_corr' ) # grab select columns to merge into main dataframe cols = list(set(merged.columns).difference(df.columns)) # join calculated data in merged = df.join(merged[cols], how='outer') merged.drop('DOY', axis=1, inplace=True) self.variables.update( energy = 'energy', flux = 'flux', LE_corr = 'LE_corr', H_corr = 'H_corr', flux_corr = 'flux_corr', ebr = 'ebr', ebr_corr = 'ebr_corr', ebc_cf = 'ebc_cf', ebr_5day_clim = 'ebr_5day_clim' ) # revert column names to user's self._df = merged.rename(columns=self.variables) # update flag for other methods self.corrected = True def _bowen_ratio_correction(self): """ Create corrected/adjusted latent energy and sensible heat flux to close surface energy balance. Compute adjusted turbulent fluxes for when Rn > 0 and Bowen ratio < 0.05 instead of forcing closure when bowen ratio is often <- 0.8 or threshold when Rn < 0. The average between i-1 and i+1 is taken. If closure is forced by partitioning the error equally between LE and H, LE is drastically increased during periods of possibly "bad" data, usually while the measured LE is going down. Updates :attr:`QaQc.df` and :attr:`QaQc.variables` attributes with new variables used for closing energy balance. Arguments: None Returns: :obj:`None` """ # drop relavant calculated variables if they exist self._df = _drop_cols(self.df, self._eb_calc_vars) df = self._df.rename(columns=self.inv_map) # apply correction df['br'] = df.H / df.LE df['LE_corr'] = (df.Rn - df.G) / (1 + df.br) df['H_corr'] = df.LE_corr * df.br df['flux_corr'] = df.LE_corr + df.H_corr # add EBR, other vars to dataframe using LE and H from raw, corr # if provided user corrected, add raw energy and flux if set(['LE_user_corr','H_user_corr']).issubset(df.columns): df['flux_user_corr'] = df.LE_user_corr + df.H_user_corr df['br_user_corr'] = df.H_user_corr / df.LE_user_corr df['ebr_user_corr']=(df.H_user_corr+df.LE_user_corr)/(df.Rn - df.G) df.ebr_user_corr=df.ebr_user_corr.replace([np.inf,-np.inf], np.nan) self.variables.update( flux_user_corr = 'flux_user_corr', ebr_user_corr = 'ebr_user_corr', br_user_corr = 'br_user_corr' ) df['ebr'] = (df.H + df.LE) / (df.Rn - df.G) df['ebr_corr'] = (df.H_corr + df.LE_corr) / (df.Rn - df.G) df['energy'] = df.Rn - df.G df['flux'] = df.LE + df.H self.variables.update( br = 'br', energy = 'energy', flux = 'flux', LE_corr = 'LE_corr', H_corr = 'H_corr', flux_corr = 'flux_corr', ebr = 'ebr', ebr_corr = 'ebr_corr' ) # replace undefined/infinity with nans in all EBR columns df.ebr = df.ebr.replace([np.inf, -np.inf], np.nan) df.ebr_corr = df.ebr_corr.replace([np.inf, -np.inf], np.nan) # revert column names to user's self._df = df.rename(columns=self.variables) # update flag for other methods self.corrected = True
[docs] def plot(self, ncols=1, output_type='save', out_file=None, suptitle='', plot_width=1000, plot_height=450, sizing_mode='fixed', merge_tools=False, link_x=True, **kwargs): """ Creates a series of interactive diagnostic line and scatter plots of input and computed daily and monthly aggregated data. The main interactive features of the plots include: pan, selection and scrol zoom, hover tool that shows paired variable values including date, and linked x-axes that pan/zoom togehter for daily and monthly time series plots. It is possible to change the format of the output plots including adjusting the dimensions of subplots, defining the number of columns of subplots, setting a super title that accepts HTML, and other options. If variables are not present for plots they will not be created and a warning message will be printed. There are two options for output: open a temporary file for viewing or saving a copy to :attr:`QaQc.out_dir`. A list of all potential time series plots created: * energy balance components * radiation components * incoming shortwave radiation with ASCE potential clear sky (daily only) * multiple soil heat flux measurements * air temperature * vapor pressure and vapor pressure deficit * wind speed * station precipitation and gridMET precipitation * initial and corrected latent energy * initial, corrected, gap filled, and reference evapotranspiration * crop coefficient and smoothed and interpolated crop coefficient * initial and corrected energy balance ratio * multiple soil moisture measurements A list of all potential scatter plots created: * radiative versus turblent fluxes, initial and corrected * initial versus corrected latent energy * initial versus corrected evapotranspiration Keyword Arguments: ncols (int): default 1. Number of columns of subplots. output_type (str): default "save". How to output plots, "save", "show" in browser, "notebook" for Jupyter Notebooks, "return_figs" to return a list of Bokeh :obj:`bokeh.plotting.figure.Figure`s, or "return_grid" to return the :obj:`bokeh.layouts.gridplot`. out_file (str or None): default :obj:`None`. Path to save output file, if :obj:`None` save output to :attr:`QaQc.out_dir` with the name [site_id]_plots.html where [site_id] is :attr:`QaQc.site_id`. suptitle (str or None): default :obj:`None`. Super title to go above plots, accepts HTML/CSS syntax. plot_width (int): default 1000. Width of subplots in pixels. plot_height (int): default 450. Height of subplots in pixels, note for subplots the height will be forced as the same as ``plot_width``. sizing_mode (str): default "scale_both". Bokeh option to scale dimensions of :obj:`bokeh.layouts.gridplot`. merge_tools (bool): default False. Merges all subplots toolbars into a single location if True. link_x (bool): default True. If True link x axes of daily time series plots and monthly time series plots so that when zooming or panning on one plot they all zoom accordingly, the axes will also be of the same length. Example: Starting from a correctly formatted config.ini and climate time series file, this example shows how to read in the data and produce the default series of plots for viewing with the addition of text at the top of plot that states the site's location and ID. >>> from fluxdataqaqc import Data, QaQc >>> d = Data('path/to/config.ini') >>> q = QaQc(d) >>> q.correct_data() >>> # create plot title from site ID and location in N. America >>> title = "<b>Site:</b> {}; <b>Lat:</b> {}N; <b>Long:</b> {}W".format( >>> q.site_id, q.latitude, q.longitude >>> ) >>> q.plot( >>> ncols=2, output_type='show', plot_width=500, suptitle=title >>> ) Note, we specified the width of plots to be smaller than default because we want both columns of subplots to be viewable on one page. Tip: To reset all subplots at once, refresh the page with your web browser. Note: Additional keyword arguments that are recognized by :obj:`bokeh.layouts.gridplot` are also accepted by :meth:`QaQc.plot`. """ # handle file save for accessing from instance variable if out_file is None and output_type == 'save': out_file = Path(self.out_dir)/'{}_plots.html'.format(self.site_id) out_dir = out_file.parent if not out_dir.is_dir(): out_dir.mkdir(parents=True, exist_ok=True) # if out_file is to a non-existent directory create parents elif out_file is not None and output_type == 'save': out_dir = Path(out_file).parent if not out_dir.is_dir(): out_dir.mkdir(parents=True, exist_ok=True) # create aggregrated plot structure from fluxdataqaqc.Plot._plot() ret = self._plot( self, ncols=ncols, output_type=output_type, out_file=out_file, suptitle=suptitle, plot_width=plot_width, plot_height=plot_height, sizing_mode=sizing_mode, merge_tools=merge_tools, link_x=link_x, **kwargs ) self.plot_file = out_file if ret: return ret
def _drop_cols(df, cols): """Drop columns from dataframe if they exist """ for c in cols: if c in df.columns: df.drop(c, axis=1, inplace=True) return df