Source code for fluxdataqaqc.data

# -*- coding: utf-8 -*-
"""
Read and manage time series and metadata within ``flux-data-qaqc`` module
"""

import configparser as cp
import numpy as np
import pandas as pd
import refet
from datetime import datetime
from pathlib import Path
from openpyxl import load_workbook
from .plot import Plot
from .util import Convert

[docs] class Data(Plot, Convert): """ An object for interfacing ``flux-data-qaqc`` with input metadata (config) and time series input, it provides methods and attributes for parsing, temporal analysis, visualization, and filtering data. A :obj:`Data` object is initialized from a config file (see :ref:`Setting up a config file`) with metadata for an eddy covariance tower or other dataset containing time series meterological data. It serves as a starting point in the Python API of the energy balance closure analysis and data validation routines that are provided by ``flux-data-qaqc``. Manual pre-filtering of data based on user-defined quality is aided with the :meth:`Data.apply_qc_flags` method. Weighted or non-weighted means of variables with multiple sensors/recordings is performed upon initialization if these options are declared in the config file. The ``Data`` class also includes the :attr:`Data.df` property which returns the time series data in the form of a :obj:`pandas.DataFrame` object for custom workflows. ``Data`` inherits line and scatter plot methods from :obj:`.Plot` which allows for the creation of interactive visualizations of input time series data. Attributes: climate_file (pathlib.Path): Absolute path to climate input file. config (:obj:`configparser.ConfigParser`): Config parser instance created from the data within the config.ini file. config_file (:obj:`pathlib.Path`): Absolute path to config.ini file used for initialization of :obj:`Data` instance. header (:obj:`numpy.ndarray` or :obj:`pandas.DataFrame.index`): Header as found in input climate file. elevation (float): Site elevation in meters as set in config.ini. inv_map (dict): Dictionary with input climate file names as keys and internal names as values. May only include pairs when they differ. latitude (float): Site latitude in decimal degrees, set in config. longitude (float): Site longitude in decimal degrees, set in config. out_dir (pathlib.Path): Default directory to save output of :meth:`QaQc.write` or :meth:`QaQc.plot` methods. plot_file (pathlib.Path or None): path to plot file once it is created/saved by :meth:`Data.plot`. site_id (str): Site ID as found in site_id config.ini entry. soil_var_weight_pairs (dict): Dictionary with names and weights for weighted averaging of soil heat flux or soil moisture variables. qc_var_pairs (dict): Dictionary with variable names as keys and QC value columns (numeric of characters) as values. units (dict): Dictionary with internal variable names as keys and units as found in config as values. variables (dict): Dictionary with internal names for variables as keys and names as found in the input data as values. variable_names_dict (dict): Dictionary with internal variable names as keys and keys in config.ini file as values. xl_parser (str or None): engine for reading excel files with Pandas. If :obj:`None` use 'openpyxl'. """ # maps internal names to config names for variables # updated with user's column names in instance attribute "variables" variable_names_dict = { 'date' : 'datestring_col', 'Rn' : 'net_radiation_col', 'G' : 'ground_flux_col', 'LE' : 'latent_heat_flux_col', 'LE_user_corr' : 'latent_heat_flux_corrected_col', 'H' : 'sensible_heat_flux_col', 'H_user_corr' : 'sensible_heat_flux_corrected_col', 'sw_in' : 'shortwave_in_col', 'sw_out' : 'shortwave_out_col', 'sw_pot' : 'shortwave_pot_col', 'lw_in' : 'longwave_in_col', 'lw_out' : 'longwave_out_col', 'rh' : 'rel_humidity_col', 'vp' : 'vap_press_col', 'vpd' : 'vap_press_def_col', 't_avg' : 'avg_temp_col', 'ppt' : 'precip_col', 'wd' : 'wind_dir_col', 'ws' : 'wind_spd_col' } def __init__(self, config): self.config_file = Path(config).absolute() self.config = self._load_config(self.config_file) self.variables = self._get_config_vars() self.units = self._get_config_units() self.na_val = self.config.get('METADATA', 'missing_data_value', fallback=None) # try to parse na_val as numeric try: self.na_val = float(self.na_val) except: pass self.elevation = float(self.config.get('METADATA', 'station_elevation')) self.latitude = float(self.config.get('METADATA', 'station_latitude')) self.longitude = float(self.config.get('METADATA', 'station_longitude')) self.climate_file = self._get_climate_file() self.xl_parser = 'openpyxl' self.header = self._get_header(self.climate_file) self.soil_var_weight_pairs = self._get_soil_var_avg_weights() self.qc_var_pairs = self._get_qc_flags() self.site_id = self.config.get('METADATA', 'site_id') # get optionally defined QC thresholds (numeric) or flags (string) # for filtering bad data if 'qc_threshold' in dict(self.config.items('METADATA')): self.qc_threshold=float(self.config.get('METADATA','qc_threshold')) else: self.qc_threshold = None # optional comma separated string of QC flags if 'qc_flag' in dict(self.config.items('METADATA')): self.qc_flag = [ el.strip() for el in self.config.get( 'METADATA','qc_flag' ).split(',') ] else: self.qc_flag = None # output dir will be in directory of config file self.out_dir = self.config_file.parent / 'output' self._df = None self.plot_file = None
[docs] def hourly_ASCE_refET(self, reference='short', anemometer_height=None): """ Calculate hourly ASCE standardized short (ETo) or tall (ETr) reference ET from input data and wind measurement height. If input data's time frequency is < hourly the input data will be resampled to hourly and the output reference ET time series will be returned as a datetime :obj:`pandas.Series` object, if the input data is already hourly then the resulting time series will automatically be merged into the :attr:`.Data.df` dataframe named "ASCE_ETo" or "ASCE_ETr" respectively. Keyword Arguments: reference (str): default "short", calculate tall or short ASCE reference ET. anemometer_height (float or None): wind measurement height in meters , default :obj:`None`. If :obj:`None` then look for the "anemometer_height" entry in the **METADATA** section of the config.ini, if not there then print a warning and use 2 meters. Returns: :obj:`None` or :obj:`pandas.Series` Hint: The input variables needed to run this method are: vapor pressure, wind speed, incoming shortwave radiation, and average air temperature. If vapor pressure deficit and average air temperature exist, the actual vapor pressure will automatically be calculated. """ self.df.head(); # creates vp/vpd df = self.df.rename(columns=self.inv_map) req_vars = ['vp', 'ws', 'sw_in', 't_avg'] if not set(req_vars).issubset(df.columns): print('Missing one or more required variables, cannot compute') return second_day = df.index.date[2] third_day = second_day + pd.Timedelta(1, unit='D') n_samples_per_day = len(df.loc[str(second_day)].index) if n_samples_per_day < 24: print('Temporal frequency greater than hourly, not downsampling.') return if anemometer_height is None: anemometer_height = self.config.get( 'METADATA', 'anemometer_height', fallback=None ) if anemometer_height is None: print( 'WARNING: anemometer height was not given and not found in ' 'the config files metadata, proceeding with height of 2 m' ) anemometer_height = 2 for v, u in self.units.items(): # only converting variables needed for ref ET if not v in req_vars: continue if not v in Convert.required_units.keys(): # variable is not required to have any particular unit, skip continue elif not u in Convert.allowable_units[v]: print('ERROR: {} units are not recognizable for var: {}\n' 'allowable input units are: {}\nNot converting.'.format( u, v, ','.join(Convert.allowable_units[v]) ) ) elif not u == Convert.required_units[v]: # do conversion, update units # pass variable, initial unit, unit to be converted to, df df = Convert.convert(v, u, Convert.required_units[v], df) self.units[v] = Convert.required_units[v] # RefET will convert to MJ-m2-hr input_units = { 'rs': 'w/m2' } if n_samples_per_day == 24: length = len(df.t_avg) tmean = df.t_avg rs = df.sw_in ea = df.vp uz = df.ws zw = anemometer_height lat = np.full(length, self.latitude) lon = np.full(length, self.longitude) doy = df.index.dayofyear elev = np.full(length, self.elevation) time = df.index.hour elif n_samples_per_day > 24: print( 'Resampling ASCE reference ET input variables to hourly means' ) tmean = df.t_avg.resample('H').mean() length = len(tmean) rs = df.sw_in.resample('H').mean() ea = df.vp.resample('H').mean() uz = df.ws.resample('H').mean() zw = anemometer_height lat = np.full(length, self.latitude) lon = np.full(length, self.longitude) doy = tmean.index.dayofyear elev = np.full(length, self.elevation) time = tmean.index.hour REF = refet.Hourly( tmean, ea, rs, uz, zw, elev, lat, lon, doy, time, method='asce', input_units=input_units, ) if reference == 'short': ret = REF.eto() name = 'ASCE_ETo' elif reference == 'tall': ret = REF.etr() name = 'ASCE_ETr' if n_samples_per_day == 24: # can add directly into Data.df df[name] = ret self._df = df.rename(columns=self.variables) self.variables[name] = name self.units[name] = 'mm' else: print( 'WARNING: cannot merge {} into the dataframe because the ' 'input temporal frequency is < hourly, returning it as an ' 'hourly, datetime-index Pandas Series'.format(name) ) return pd.Series(index=tmean.index, data=ret, name=name)
def _calc_rn(self, df): """ If short and longwave radiation inputs exist but Rn is not given calculate it. """ df = df.rename(columns=self.inv_map) # if Rn exists skip if 'Rn' in df.columns and not df.Rn.isna().all(): return rad_vars = ['sw_in', 'sw_out', 'lw_in', 'lw_out'] has_rad_vars = set(rad_vars).issubset(df.columns) for v in rad_vars: u = self.units.get(v) if u: self.units[v] = u = u.lower() if u and not u in Data.allowable_units[v]: print('ERROR: {} units are not recognizable for var: {}\n' 'allowable input units are: {}\nNot converting.'.format( u, v, ','.join(Data.allowable_units[v]) ) ) elif u and not u == Data.required_units[v]: # do conversion, update units df = Convert.convert(v, u, Data.required_units[v], df) self.units[v] = Data.required_units[v] units_correct = ( self.units.get('sw_in') == 'w/m2' and \ self.units.get('sw_out') == 'w/m2' and \ self.units.get('lw_in') == 'w/m2' and \ self.units.get('lw_out') == 'w/m2' ) if has_rad_vars and units_correct: print('Calculating net radiation from components.') df['Rn'] = df.sw_in + df.lw_in - df.sw_out - df.lw_out self.variables['Rn'] = 'Rn' self.units['Rn'] = 'w/m2' self._df = df def _calc_vpd_or_vp(self, df): """ Based on ASCE standardized ref et eqn. 37, air temperature must be in celcius and actual vapor pressure in kPa. Can also calculate VP from VPD and air temperature. """ df = df.rename(columns=self.inv_map) # make sure day intervals are hourly or less if not skip second_day = df.index.date[2] third_day = second_day + pd.Timedelta(1, unit='D') # both days start at 00:00:00, don't duplicate times_in_day = len(df.loc[str(third_day)].index) if times_in_day < 24: print('Temporal frequency of data > hourly cannot calculate VP/VPD') return for v in ['vp', 'vpd', 't_avg']: u = self.units.get(v) if u: self.units[v] = u = u.lower() if u and not u in Data.allowable_units[v]: print('ERROR: {} units are not recognizable for var: {}\n' 'allowable input units are: {}\nNot converting.'.format( u, v, ','.join(Data.allowable_units[v]) ) ) elif u and not u == Data.required_units[v]: # do conversion, update units # pass variable, initial unit, unit to be converted to, df df = Convert.convert(v, u, Data.required_units[v], df) self.units[v] = Data.required_units[v] # calculate vpd from actual vapor pressure and temp # check if needed variables exist and units are correct has_vpd_vars = set(['vp','t_avg']).issubset(df.columns) units_correct = ( self.units.get('vp') == 'kpa' and self.units.get('t_avg') == 'c' ) if has_vpd_vars and units_correct: print( 'Calculating vapor pressure deficit from vapor pressure and ' 'air temperature' ) # saturation vapor pressure (es) es = 0.6108 * np.exp(17.27 * df.t_avg / (df.t_avg + 237.3)) df['vpd'] = es - df.vp df['es'] = es self.variables['vpd'] = 'vpd' self.units['vpd'] = 'kpa' self.variables['es'] = 'es' self.units['es'] = 'kpa' # same calc actual vapor pressure from vapor pressure deficit and temp has_vp_vars = set(['vpd','t_avg']).issubset(df.columns) units_correct = ( self.units.get('vpd') == 'kpa' and self.units.get('t_avg') == 'c' ) if has_vp_vars and units_correct: print( 'Calculating vapor pressure from vapor pressure deficit and ' 'air temperature' ) # saturation vapor pressure (es) es = 0.6108 * np.exp(17.27 * df.t_avg / (df.t_avg + 237.3)) df['vp'] = es - df.vpd df['es'] = es self.variables['vp'] = 'vp' self.units['vp'] = 'kpa' self.variables['es'] = 'es' self.units['es'] = 'kpa' if not 'rh' in self.variables and {'vp','es'}.issubset(self.variables): if not self.units.get('vp') == 'kpa': pass else: print( 'Calculating relative humidity from actual and saturation ' 'vapor pressure and air temperature' ) df['rh'] = 100 * (df.vp / df.es) self.variables['rh'] = 'rh' self.units['rh'] = '%' if 'vp' in self.variables and self.units.get('vp') == 'kpa': print( 'Calculating dew point temperature from vapor pressure' ) df['t_dew'] = (-1 / ((np.log(df.vp/.611) / 5423) - (1/273)))-273.15 self.variables['t_dew'] = 't_dew' self.units['t_dew'] = 'c' self._df = df
[docs] def plot(self, ncols=1, output_type='save', out_file=None, suptitle='', plot_width=1000, plot_height=450, sizing_mode='fixed', merge_tools=False, link_x=True, **kwargs): """ Creates a series of interactive diagnostic line and scatter plots of input data in whichever temporal frequency it is in. The main interactive features of the plots include: pan, selection and scrol zoom, hover tool that shows paired variable values including date, and linked x-axes that pan/zoom togehter for daily and monthly time series plots. It is possible to change the format of the output plots including adjusting the dimensions of subplots, defining the number of columns of subplots, setting a super title that accepts HTML, and other options. If variables are not present for plots they will not be created and a warning message will be printed. There are two options for output: open a temporary file for viewing or saving a copy to :attr:`QaQc.out_dir`. A list of all potential time series plots created: * energy balance components * radiation components * multiple soil heat flux measurements * air temperature * vapor pressure and vapor pressure deficit * wind speed * precipitation * latent energy * multiple soil moisture measurements Keyword Arguments: ncols (int): default 1. Number of columns of subplots. output_type (str): default "save". How to output plots, "save", "show" in browser, "notebook" for Jupyter Notebooks, "return_figs" to return a list of Bokeh :obj:`bokeh.plotting.figure.Figure`s, or "return_grid" to return the :obj:`bokeh.layouts.gridplot`. out_file (str or None): default :obj:`None`. Path to save output file, if :obj:`None` save output to :attr:`Data.out_dir` with the name [site_id]_input_plots.html where [site_id] is :attr:`Data.site_id`. suptitle (str or None): default :obj:`None`. Super title to go above plots, accepts HTML/CSS syntax. plot_width (int): default 1000. Width of subplots in pixels. plot_height (int): default 450. Height of subplots in pixels, note for subplots the height will be forced as the same as ``plot_width``. sizing_mode (str): default "scale_both". Bokeh option to scale dimensions of :obj:`bokeh.layouts.gridplot`. merge_tools (bool): default False. Merges all subplots toolbars into a single location if True. link_x (bool): default True. If True link x axes of daily time series plots and monthly time series plots so that when zooming or panning on one plot they all zoom accordingly, the axes will also be of the same length. Example: Starting from a correctly formatted config.ini and climate time series file, this example shows how to read in the data and produce a series of plots of input data as it is found in the input data file (unlike :meth:`.QaQc.plot` which produces plots at daily and monthly temporal frequency). This example also shows how to display a title at the top of plot with the site's location and site ID. >>> from fluxdataqaqc import Data >>> d = Data('path/to/config.ini') >>> # create plot title from site ID and location in N. America >>> title = "<b>Site:</b> {}; <b>Lat:</b> {}N; <b>Long:</b> {}W".format( >>> q.site_id, q.latitude, q.longitude >>> ) >>> q.plot( >>> ncols=2, output_type='show', plot_width=500, suptitle=title >>> ) Note, we specified the width of plots to be smaller than default because we want both columns of subplots to be viewable on the screen. Tip: To reset all subplots at once, refresh the page with your web browser. Note: Additional keyword arguments that are recognized by :obj:`bokeh.layouts.gridplot` are also accepted by :meth:`Data.plot`. See Also: :meth:`.QaQc.plot` """ # handle file save for accessing from instance variable if out_file is None and output_type == 'save': out_file = Path(self.out_dir)/'{}_input_plots.html'.format( self.site_id ) out_dir = out_file.parent if not out_dir.is_dir(): out_dir.mkdir(parents=True, exist_ok=True) # to allow making any subdir that does not yet exist # if out_file is to a non-existent directory create parents elif out_file is not None and output_type == 'save': out_dir = Path(out_file).parent if not out_dir.is_dir(): out_dir.mkdir(parents=True, exist_ok=True) # create aggregrated plot structure from fluxdataqaqc.Plot._plot() ret = self._plot( self, ncols=ncols, output_type=output_type, out_file=out_file, suptitle=suptitle, plot_width=plot_width, plot_height=plot_height, sizing_mode=sizing_mode, merge_tools=merge_tools, link_x=link_x, **kwargs ) self.plot_file = out_file if ret: return ret
def _load_config(self, config_file): if not config_file.is_file(): raise FileNotFoundError('ERROR: config file not found') config = cp.ConfigParser(interpolation=None) config.read(config_file) return config def _get_climate_file(self): """ Read config file and return absolute path to climate time series file """ file_path = self.config['METADATA']['climate_file_path'] climate_file = self.config_file.parent.joinpath(file_path) if not climate_file.is_file(): err_msg = 'ERROR: climate file:{} does not exist'.format( climate_file) raise FileNotFoundError(err_msg) return climate_file def _get_header(self, climate_file): """ Read only top line of climate time series file return header names. """ if climate_file.suffix in ('.xlsx', '.xls'): try: # using xlrd workbook = pd.ExcelFile(climate_file) rows = workbook.book.sheet_by_index(0).nrows header = pd.read_excel(workbook, skipfooter = (rows - 1)) header = header.columns except: # fallback openpyxl- slower wb = load_workbook(climate_file, enumerate) sheet = wb.worksheets[0] header = sheet._shared_strings self.xl_parser='openpyxl' #rows = sheet.max_row #header = pd.read_excel(workbook, skipfooter = (rows - 1)) #header = header.columns else: # assume CSV skiprows=None if 'skiprows' in dict(self.config.items('METADATA')): val = self.config.get('METADATA','skiprows') if val and val.startswith('[') and val.endswith(']'): skiprows = val.replace(' ','') skiprows = [ int(el) for el in skiprows.strip('][').split(',') ] else: skiprows = int(val) header = pd.read_csv( climate_file, skiprows=skiprows, nrows=0 ).columns return header def _get_soil_var_avg_weights(self): """ Read config and variable attribute to match user names for multiple soil moisture/heat flux variables to user defined weights used for calculating weighted/non-weighted average values in `Data.df` """ soil_var_weight_pairs = {} all_keys = dict(self.config.items('DATA')).keys() for k,v in dict(self.config.items('DATA')).items(): weight_name = '{}_weight'.format(k) var_name = self.variables.get(k) if k in self.variables.keys() and weight_name in all_keys: weight = self.config.get('DATA', weight_name) tmp = {'name': var_name, 'weight' : weight} soil_var_weight_pairs[k] = tmp #print(k,v, var_name, weight) # update dict with weights if not specified in config, normalized # later in df elif (k.startswith('g_') or k.startswith('theta_')) and not \ weight_name in all_keys and not k.endswith('_units') and \ not k.endswith('_weight'): tmp = {'name': var_name, 'weight' : 1} soil_var_weight_pairs[k] = tmp return soil_var_weight_pairs def _get_config_vars(self): """ Read config data section and get names of all variables, pair with internal variable names in a dictionary. Also parses config file for optionally added multiple soil heat flux and soil moisture variables if given following the naming convention explained in ``flux-data-qaqc``. Arguments: None Returns: variables (dict): dictionary with internal climate variable names as keys and user's names as found in config as values. """ variables = {} # get all variables found in Data.variable_names_dict for k, v in Data.variable_names_dict.items(): if self.config.has_option('DATA', v): variables[k] = self.config.get('DATA', v) # get multiple G flux/soil moisture variables all_keys = dict(self.config.items('DATA')).keys() # should be named as 'g_ or 'theta_ what comes after not strict yet added_g_keys = [] added_theta_keys = [] for k in all_keys: if k.startswith('g_') and not k.endswith('_units') \ and not k.endswith('_weight'): added_g_keys.append(k) if k.startswith('theta_') and not k.endswith('_units') \ and not k.endswith('_weight'): added_theta_keys.append(k) if added_g_keys: for el in added_g_keys: variables[el] = self.config.get('DATA', el) if added_theta_keys: for el in added_theta_keys: variables[el] = self.config.get('DATA', el) # update all None values in config with 'na' in variables dict for k, v in variables.items(): if v is None: variables[k] = 'na' return variables def _get_config_units(self): """ Get units from config file, pair with var names and store in dictionary. Keys are `flux-data-qaqc` variable names, i.e. the same names used by :attr:`Data.variables` and values are strings assigned in the config file. Arguments: None Returns: units (dict): dictionary with `flux-data-qaqc` variable names as keys and user's units for each as values. Note: Parsing of correct units and conversion if needed is performed in the :obj:`.QaQc` class. Also, if units are not given in the config file a warning message is printed and the units are not included and thus will either need to be manually added later e.g. in Python by adding to :attr:`Data.units` or by adding them to the config and recreating a :obj:`Data` object otherwise the units will remain unknown and not be able to be later converted. """ no_unit_vars = ('datestring_col', 'year_col', 'month_col', 'day_col') config_dict = dict(self.config.items('DATA')) # dictionary that maps config unit keys to units units_config = { v.replace('_col', '_units'): None for k,v in self.variable_names_dict.items() if ( not v in no_unit_vars and k in self.variables ) } # add user multiple g or soil moisture var units config names for k,v in self.variables.items(): # if multiple g uses same var assigned to ground_flux_col units the # added G var will not be included/duplicated if k.startswith('g_') and not self.variables[k] == 'G': units_config['{}_units'.format(k)] = None if k.startswith('theta_'): units_config['{}_units'.format(k)] = None for k in units_config: if k in config_dict: units_config[k] = config_dict[k] # user corrected versions of LE, H, etc. are in variables_name_dict # but may not be included in the input config/data elif '_corrected' in k: continue else: print( 'WARNING: units for var {} missing from the config file'\ .format(k.replace('_units','')) ) inv_names = {v:k for k,v in self.variable_names_dict.items()} # dictionary that maps fluxdataqaqc var names to units units = dict() for k in units_config: k_col = k.replace('_units', '_col') if k_col in self.variable_names_dict.values(): units[inv_names[k_col]] = units_config.get(k) else: # for multiple g and theta remove _units suffix name = k.replace('_units','') units[name] = units_config.get(k) return units def _get_qc_flags(self): """ Process any existing QC flags for variables in config, also add key,val to variables attribute if QC flags are not specified in config and follow the naming convention [var_name]_QC in the header of the data file. """ qc_var_pairs = {} tmp = {} no_qc_vars = ('datestring_col') # dictionary that maps config QC values to keys for main variables # other variables like multiple g or theta (with unknown names) are # search for in loop below qc_config = { v.replace('_col', '_qc'): k for k,v in self.variable_names_dict.items() if not v in no_qc_vars } # first look if specified in config for k,v in self.config.items('DATA'): if k in qc_config: # internal name for the variable (e.g. LE or Rn) var_name = qc_config[k] user_var_name = self.variables.get(var_name) # keys are internal names for multiple G and theta elif k.startswith(('g_','theta_')) and k.endswith('_qc'): var_name = k # key is not for a QC flag header name... else: continue if not v in self.header: print('WARNING: {} quality control name specified in the config' ' file for variable: {} does not exist in the input file, ' 'it will not be used.'.format(v, self.variables[var_name]) ) continue internal_name = '{}_qc_flag'.format(var_name) tmp[internal_name] = v qc_var_pairs[user_var_name] = v # also look in header, currently always gets these as well, may change # find vairable names that have a matching name with '_QC' suffix # if found here but different name in config use the name in the config for k,v in self.variables.items(): qc_var = '{}_QC'.format(v) if qc_var in self.header: internal_name = '{}_qc_flag'.format(k) if internal_name in tmp: continue tmp[internal_name] = qc_var qc_var_pairs[v] = qc_var self.variables.update(tmp) return qc_var_pairs
[docs] def apply_qc_flags(self, threshold=None, flag=None, threshold_inequality='lt'): """ Apply user-provided QC values or flags for climate variables to filter poor-quality data by converting them to null values, updates :attr:`Data.df`. Specifically where the QC value is < `threshold` change the variables value for that date-time to null. The other option is to use a column of flags, e.g. 'x' for data values to be filtered out. The threshold value or flag may be specified in the config file's **METADATA** section otherwise they should be assigned as keyword arguments here. Specification of which QC (flag or numeric threshold) columns should be applied to which variables is set in the **DATA** section of the config file. For datasets with QC value columns with names identical to the variable they correspond to with the suffix "_QC" the QC column names for each variable do not need to be specified in the config file. Keyword Arguments: threshold (float): default :obj:`None`. Threshold for QC values, if flag is below threshold replace that variables value with null. flag (str, list, or tuple): default :obj:`None`. Character flag signifying data to filter out. Can be list or tuple of multiple flags. threshold_inequality (str): default 'lt'. 'lt' for filtering values that are less than ``threshold`` value, 'gt' for filtering values that are greater. Returns: :obj:`None` Example: If the input time series file has a column with numeric quality values named "LE_QC" which signify the data quality for latent energy measurements, then in the config.ini file's **DATA** section the following must be specified:: [DATA] latent_heat_flux_qc = LE_QC ... Now you must specify the threshold of this column in which to filter out when using :meth:`Data.apply_qc_flags`. For example if you want to remove all data entries of latent energy where the "LE_QC" value is below 5, then the threshold value would be 5. The threshold can either be set in the config file or passed as an argument. If it is set in the config file, i.e.:: [METADATA] qc_threshold = 0.5 Then you would cimply call the method and this threshold would be applied to all *QC* columns specified in the config file, >>> from fluxdataqaqc import Data >>> d = Data('path/to/config.ini') >>> d.apply_qc_flags() Alternatively, if the threshold is not defined in the config file or if you would like to use a different value then pass it in, >>> d.apply_qc_flags(threshold=2.5) Lastly, this method also can filter out based on a single or list of character flags, e.g. "x" or "bad" gievn that the column containing these is specified in the config file for whichever variable they are to be applied to. For example, if a flag column contains multiple flags signifying different data quality control info and two in particular signify poor quality data, say "b" and "a", then apply them either in the config file:: [METADATA] qc_flag = b,a Of within Python >>> d.apply_qc_flags(flag=['b', 'a']) For more explanation and examples see the "Configuration Options" section of the online documentation. """ # if QC threshold or flags not passed use values from config if exist if not threshold: threshold = self.qc_threshold if not flag: flag = self.qc_flag if threshold and not threshold_inequality in ('lt','gt'): err_msg = ('ERROR: threshold_inequality must be "lt" or "gt", ' 'but {} was given.'.format( threshold_inequality )) raise ValueError(err_msg) # load dataframe if not yet accessed df = self.df # infer each columns datatype to avoid applying thresholds to strings infer_type = lambda x: pd.api.types.infer_dtype(x, skipna=True) df_types = pd.DataFrame(df.apply( pd.api.types.infer_dtype, axis=0, skipna=True ) ).rename(columns={'index': 'index', 0: 'type'}) # loop over each variable that has a provided qc value and set nulls # where qc value < threshold, qc_var_pairs maps var names to qc names if threshold: for var, qc in self.qc_var_pairs.items(): if df_types.loc[qc,'type'] != 'string': if threshold_inequality == 'lt': df.loc[ (df[qc] < threshold) & (df[qc].notnull()) , var ] = np.nan else: df.loc[ (df[qc] > threshold) & (df[qc].notnull()) , var ] = np.nan # set values to null where flag is a certain string if flag: if isinstance(flag, str): for var, qc in self.qc_var_pairs.items(): if df_types.loc[qc,'type'] == 'string': df.loc[ (df[qc] == flag) & (df[qc].notnull()) , var ] = np.nan # apply multiple character flags elif isinstance(flag, (list,tuple)): for f in flag: for var, qc in self.qc_var_pairs.items(): if df_types.loc[qc,'type'] == 'string': df.loc[ (df[qc] == f) & (df[qc].notnull()) , var ] = np.nan self._df = df
@property def df(self): """ Pull variables out of the config and climate time series files load them into a datetime-indexed :obj:`pandas.DataFrame`. Metadata about input time series file format: "missing_data_value", "skiprows", and "date_parser" are utilized when first loading the ``df`` into memory. Also, weighted and non-weighted averaging of multiple measurements of the same climatic variable occurs on the first call of :attr:`Data.df`, if these options are declared in the config file. For more details and example uses of these config options please see the "Configuration Options" section of the online documentation. Returns: df (:obj:`pandas.DataFrame`) Examples: You can utilize the df property as with any :obj:`pandas.DataFrame` object. However, if you would like to make changes to the data you must first make a copy, then make the changes and then reassign it to :attr:`Data.df`, e.g. if you wanted to add 5 degrees to air temp. >>> from fluxdataqaqc import Data >>> d = Data('path_to_config.ini') >>> df = d.df.copy() >>> df['air_temp_column'] = df['air_temp_column'] + 5 >>> d.df = df The functionality shown above allows for user-controlled preprocessing and modification of any time series data in the initial dataset. It also allows for adding new columns but if they are variables used by ``flux-data-qaqc`` e.g. Rn or other energy balance variables, be sure to also update :attr:`Data.variables` and :attr:`Data.units` with the appropriate entries. New or modified values will be used in any further analysis/ploting routines within ``flux-data-qaqc``. By default the names of variables as found within input data are retained in :attr:`QaQc.df`, however you can use the naming scheme as ``flux-data-qaqc`` which can be viewed in :attr:`Data.variable_names_dict` by using the the :attr:`Data.inv_map` dictionary which maps names from user-defined to internal names (as opposed to :attr:`Data.variables`) which maps from internal names to user-defined. For example if your input data had the following names for LE, H, Rn, and G set in your config:: [DATA] net_radiation_col = Net radiation, W/m2 ground_flux_col = Soil-heat flux, W/m2 latent_heat_flux_col = Latent-heat flux, W/m2 sensible_heat_flux_col = Sensible-heat flux, W/m2 Then the :attr:`Data.df` will utilize the same names, e.g. >>> # d is a Data instance >>> d.df.head() produces: ============== =================== ====================== ======================== ==================== date Net radiation, W/m2 Latent-heat flux, W/m2 Sensible-heat flux, W/m2 Soil-heat flux, W/m2 ============== =================== ====================== ======================== ==================== 10/1/2009 0:00 -54.02421778 0.70761 0.95511 -40.42365926 10/1/2009 0:30 -51.07744708 0.04837 -1.24935 -33.35383253 10/1/2009 1:00 -50.99438925 0.68862 1.91101 -43.17900525 10/1/2009 1:30 -51.35032377 -1.85829 -15.4944 -40.86201497 10/1/2009 2:00 -51.06604228 -1.80485 -19.1357 -39.80936855 ============== =================== ====================== ======================== ==================== Here is how you could rename your dataframe using ``flux-data-qaqc`` internal names, >>> d.df.rename(columns=q.inv_map).head() ============== =================== ====================== ======================== ==================== date Rn LE H G ============== =================== ====================== ======================== ==================== 10/1/2009 0:00 -54.02421778 0.70761 0.95511 -40.42365926 10/1/2009 0:30 -51.07744708 0.04837 -1.24935 -33.35383253 10/1/2009 1:00 -50.99438925 0.68862 1.91101 -43.17900525 10/1/2009 1:30 -51.35032377 -1.85829 -15.4944 -40.86201497 10/1/2009 2:00 -51.06604228 -1.80485 -19.1357 -39.80936855 ============== =================== ====================== ======================== ==================== A minor note on variable naming, if your input data variables use exactly the same names used by ``flux-data-qaqc``, they will be renamed by adding the prefix "input\_", e.g. "G" becomes "input_G" on the first time reading the data from disk, i.e. the first time accessing :attr:`Data.df`. Note: The temporal frequency of the input data is retained unlike the :attr:`.Qaqc.df` which automatically resamples time series data to daily frequency. """ # avoid overwriting pre-assigned data if isinstance(self._df, pd.DataFrame): return self._df.rename(columns=self.variables) variables = self.variables # remove variable entries that were given as 'na' in config vars_notnull = dict((k, v) for k, v in variables.items() if v != 'na') self.variables = vars_notnull cols = list(vars_notnull.values()) # if multiple columns were assign to a variable parse them now to # make sure they all exist, below calculate mean and rename to _mean # if no var_name_delim in metadata then assume one column per var delim = None if 'var_name_delim' in dict(self.config.items('METADATA')): delim = self.config.get('METADATA','var_name_delim') cols = [x.split(delim) if delim in x else x for x in cols] cols_flat = [] for el in cols: if isinstance(el,list): cols_flat += el else: cols_flat.append(el) cols = cols_flat missing_cols = None if not set(cols).issubset(self.header): missing_cols = set(cols) - set(self.header) err_msg = ('WARNING: the following config variables are missing ' 'in the input climate file:\n{}\nThey will be filled with ' 'NaN values'.format(' '.join(missing_cols))) print(err_msg) cols = set(cols).intersection(self.header) kwargs = {} if 'missing_data_value' in dict(self.config.items('METADATA')): self.na_val = self.config.get('METADATA', 'missing_data_value') # try to parse na_val as numeric try: self.na_val = float(self.na_val) except: pass kwargs['na_values'] = [self.na_val] if 'skiprows' in dict(self.config.items('METADATA')): val = self.config.get('METADATA','skiprows') if val and val.startswith('[') and val.endswith(']'): skiprows = val.replace(' ','') kwargs['skiprows'] = [ int(el) for el in skiprows.strip('][').split(',') ] else: kwargs['skiprows'] = int(val) if 'date_parser' in dict(self.config.items('METADATA')): date_parse_str = self.config.get('METADATA','date_parser') date_parser = lambda x: datetime.strptime(x, date_parse_str) kwargs['date_parser'] = date_parser if 'load_all_vars' in dict(self.config.items('METADATA')): # if this option is listed (with any value) read all columns into df cols = self.header # load data file depending on file format if self.climate_file.suffix in ('.xlsx', '.xls'): # find indices of headers we want, only read those, excel needs ints ix=[i for i, e in enumerate(self.header) if e in set(cols)] df = pd.read_excel( self.climate_file, parse_dates = [variables.get('date')], usecols = ix, engine = self.xl_parser, **kwargs ) else: df = pd.read_csv( self.climate_file, parse_dates = [variables.get('date')], usecols = cols, **kwargs ) if 'missing_data_value' in dict(self.config.items('METADATA')): # force na_val because sometimes with read_excel it doesn't work... df[df == self.na_val] = np.nan # force values to numeric in case of bad characters mixed in non_numeric_cols =\ [self.variables.get('date')]+list(self.qc_var_pairs.values()) numeric_cols = list(set(cols).difference(non_numeric_cols)) df[numeric_cols] =\ df[numeric_cols].apply(pd.to_numeric, errors='coerce') if missing_cols: df = df.reindex(columns=list(cols)+list(missing_cols)) def calc_weight_avg(d, pref, df): """ Helper function to reduce redundant code for calculating weighted average currently only for multiple soil heat flux and moisture variables. d is soil_var_weight_pairs dict pref is variable prefix str, i.e. g_ or theta_ df is the dataframe """ # list of multiple variables with prefix if d: vs = [d.get(el) for el in d if el.startswith(pref)] else: vs = [] # soil heat flux weighted average if len(vs) > 1: # if 1 or less no average weights = [float(el.get('weight')) for el in vs] total_weights = np.sum(weights) # if multiple Gs specified and same num multiple Gs specifed # as the main G var and no weights assigned do not duplicate # mean that is calculated below from comma separated list if delim and pref == 'g_' and self.variables.get('G') is not \ None and delim in self.variables.get('G'): n_Gs = len(self.variables.get('G').split(delim)) if len(weights) == total_weights and len(weights) == n_Gs: return # same for multiple theta, this also avoids issue with multiple # null columns and weighting the remaining columns less... if pref == 'theta_' and len(weights) == total_weights: # update variables var_name = 'theta_mean' theta_names = [el.get('name') for el in vs] print('Calculating mean for var: THETA from ' 'columns: {}'.format(theta_names) ) tmp_df = df[theta_names].copy() df[var_name] = tmp_df.mean(axis=1) self.variables[var_name] = var_name return # if weights are not normalized update them elif not np.isclose(total_weights, 1.0): print( "{} weights not given or don't sum to one, normalizing"\ .format(pref.replace('_','')) ) for k,v in d.items(): if k.startswith(pref): nw = float(v.get('weight')) / total_weights d[k] = {'name': v.get('name'), 'weight': nw} msg = ', '.join([ '{}:{:.2f}'.format( v.get('name'),float(v.get('weight')) ) for k,v in d.items() if k.startswith(pref) ] ) print('Here are the new weights:\n', msg) # calculate average, use updated weights if calculated vs = [d.get(el) for el in d if el.startswith(pref)] tmp_df = df[[el.get('name') for el in vs]].copy() for pair in vs: tmp_df[pair.get('name')] *= float(pair.get('weight')) # update variables key = '{}mean'.format(pref) val = '{}mean'.format(pref) if key == 'g_mean': key = 'G' # calculate mean (sum weighted values) df[val] = tmp_df.sum(axis=1) df.loc[df[tmp_df.columns].isnull().all(1), val] = np.nan self.variables[key] = val elif len(vs) == 1: # only print this message if 1 weighted avg var is assigned... print( 'WARNING: Insufficient data to calculate mean for multiple ' '{} measurements'.format(pref.replace('_','').upper()) ) # remove from var name dict if G to avoid inv_map overwrite # i.e. only one G recording but listed twice in config if pref == 'g_': to_drop = [i for i in d.keys() if i.startswith('g_')][0] self.variables.pop(to_drop, None) self.units.pop(to_drop, None) # check if any of multiple soil vars time series are all null and remove del_keys = [] for k, v in self.soil_var_weight_pairs.items(): var_name = v.get('name') if df[var_name].isna().all(): del_keys.append(k) for k in del_keys: self.soil_var_weight_pairs.pop(k) # calculate weighted average soil vars if they exist d = self.soil_var_weight_pairs calc_weight_avg(d, 'g_', df) calc_weight_avg(d, 'theta_', df) # currently calc non weighted means for vars other than G and theta # later may change so all vars are handled the same way, this is for # multiple listed (delim sep) variables for a single var in the config for k,v in variables.items(): if delim and delim in v: tmp_cols = v.split(delim) print('Calculating mean for var: {}\n'.format(k), 'from columns: {}\n'.format(tmp_cols) ) # calc mean and update variables dict names var_name = k+'_mean' df[var_name] = df[tmp_cols].mean(axis=1) self.variables[k] = var_name # check each variable for multiple criteria, all null, naming, etc. del_keys = [] for k,v in self.variables.items(): if v not in df.columns: del_keys.append(k) # the all zero issue is a weird bug with pandas mean or nans that # may depend on a dependency of pandas called bottleneck elif df[v].isnull().all() or (df[v] == 0).all(): print( 'WARNING: {} variable in column {} is missing all data ' 'it will be removed'.format(k, v) ) df.drop(v, axis=1, inplace=True) del_keys.append(k) # also rename input columns that may cause naming issue elif k in df.columns and k in Data.variable_names_dict.keys() and \ not k == 'date': new_name = 'input_{}'.format(k) print('WARNING: renaming column {} to {}'.format(k, new_name)) df.rename(columns={k:new_name}, inplace=True) if not v == k+'_mean': self.variables[k] = new_name if k in self.qc_var_pairs: self.qc_var_pairs[new_name] = self.qc_var_pairs[k] self.qc_var_pairs.pop(k, None) # also remove entry from var dict to avoid issues later for k in del_keys: self.variables.pop(k, None) self.units.pop(k, None) # update renaming dict with any newly created mean variables/removed self.inv_map = { v: k for k, v in self.variables.items() if not k == v } # the only column that is always renamed is the datestring_col df.rename(columns={variables['date']: 'date'}, inplace=True) # date index df.index = df.date df = df[df.index.notnull()] df.index = pd.to_datetime(df.index) # ensure datetime df.drop('date', axis=1, inplace=True) self._df = df # vpd calc uses attribute # calc vapor pressure or vapor pressure deficit if hourly or less # also converts units if needed for vp, vpd, t_avg self._calc_vpd_or_vp(df) self._calc_rn(df) return df @df.setter def df(self, data_frame): if not isinstance(data_frame, pd.DataFrame): raise TypeError("Must assign a pandas.DataFrame object") self._df = data_frame