Source code for fluxdataqaqc.util

# -*- coding: utf-8 -*-
"""
Collection of utility objects and functions for the :mod:`fluxdataqaqc`
module.
"""

import numpy as np
import pandas as pd
from pathlib import Path

[docs] class Convert(object): """ Tools for unit conversions for ``flux-data-qaqc`` module. """ # this is a work in progress, add more as needed/conversions are handled # input unit strings are not case sensitive, they will be forced to lower allowable_units = { 'LE': ['w/m2','mj/m2'], 'H': ['w/m2','mj/m2'], 'Rn': ['w/m2','mj/m2'], 'G': ['w/m2','mj/m2'], 'lw_in': ['w/m2','mj/m2'], 'lw_out': ['w/m2','mj/m2'], 'sw_in': ['w/m2'], 'sw_out': ['w/m2','mj/m2'], 'ppt': ['mm', 'in', 'm'], 'vp': ['kpa', 'hpa', 'pa'], 'vpd': ['kpa', 'hpa', 'pa'], 't_avg': ['c', 'f', 'k'], 't_min': ['c', 'f', 'k'], 't_max': ['c', 'f', 'k'], 'ws': ['m/s', 'mph'] } # for printing and plotting purposes pretty_unit_names = { 'pa': 'Pa', 'hpa': 'hPa', 'kpa': 'kPa', 'c': 'C', 'f': 'F', 'k': 'K' } # some variables need to be in specified units for internal calculations # they will be attempted to be converted upon initialization of a QaQc obj # allowable initial units can be found in QaQc.allowable_units required_units = { 'LE': 'w/m2', 'H': 'w/m2', 'Rn': 'w/m2', 'G': 'w/m2', 'lw_in': 'w/m2', 'lw_out': 'w/m2', 'sw_in': 'w/m2', 'sw_out': 'w/m2', 'ppt': 'mm', 'vp': 'kpa', 'vpd': 'kpa', 't_avg': 'c', 't_min': 'c', 't_max': 'c', 'ws': 'm/s' } def __init__(self): self._conversion_map = { 'k_to_c': self._k_to_c, 'hpa_to_kpa': self._hpa_to_kpa, 'pa_to_kpa': self._pa_to_kpa, 'in_to_mm': self._in_to_mm, 'm_to_mm': self._m_to_mm, 'f_to_c': self._f_to_c, 'mj/m2_to_w/m2': self._mj_per_m2_to_watts_per_m2, 'mph_to_m/s': self._mph_to_m_per_s # miles/hr to meters/sec }
[docs] @classmethod def convert(cls, var_name, initial_unit, desired_unit, df): """ Givin a valid initial and desired variable dimension for a variable within a :obj:`pandas.DataFrame`, make the conversion and return the updated :obj:`pandas.DataFrame`. For a list of variables that require certain units within ``flux-data-qaqc`` see :attr:`Convert.allowable_units` (names of allowable options of input variable dimensions) and :attr:`Convert.required_units` (for the mandatory dimensions of certain variables before running QaQc calculations). Arguments: var_name (str): name of variable to convert in ``df``. initial_unit (str): name of initial unit of variable, must be valid from :attr:`Convert.allowable_units`. desired_unit (str): name of units to convert to, also must be valid. df (:obj:`pandas.DataFrame`): :obj:`pandas.DataFrame` containing variable to be converted, i.e. with ``var_name`` in columns. Returns: df (:obj:`pandas.DataFrame`): updated dataframe with specified variable's units converted Note: Many potential dimensions may not be provided for automatic conversion, if so you may need to update your variable dimensions manually, e.g. within a :attr:`.Data.df` before creating a :obj:`.QaQc` instance. Unit conversions are required for variables that can potentially be used in calculations within :obj:`.Data` or :obj:`.QaQc`. """ conv = cls() convert_key = '{}_to_{}'.format(initial_unit, desired_unit) convert_func = conv._conversion_map[convert_key] print( 'Converting {} from {} to {}'.format( var_name, initial_unit, desired_unit ) ) df = convert_func(df, var_name) return df
def _in_to_mm(self, df, var_name): df[var_name] *= 25.4 return df def _m_to_mm(self, df, var_name): df[var_name] *= 1000 return df def _f_to_c(self, df, var_name): df[var_name] = (32 * df[var_name]) * (5/9) return df def _k_to_c(self, df, var_name): df[var_name] -= 273.15 return df def _hpa_to_kpa(self, df, var_name): df[var_name] /= 10 return df def _pa_to_kpa(self, df, var_name): df[var_name] /= 1000 return df def _mph_to_m_per_s(self, df, var_name): df[var_name] *= 0.44704 return df def _mj_per_m2_to_watts_per_m2(self, df, var_name): # assumes average mj per day is correct- only valid daily # because shortwate rad may be used in data (before daily) it is # not covered for automatic conversion because time period is unknown df[var_name] *= 11.574074074074074 return df
[docs] def monthly_resample(df, cols, agg_str, thresh=0.75): """ Resample dataframe to monthly frequency while excluding months missing more than a specified percentage of days of the month. Arguments: df (:obj:`pandas.DataFrame`): datetime indexed DataFrame instance cols (list): list of columns in `df` to resample to monthy frequency agg_str (str): resample function as string, e.g. 'mean' or 'sum' Keyword Arguments: thresh (float): threshold (decimal fraction) of how many days in a month must exist for it to be temporally resampled, otherwise the monthly value for the month will be null. Returns: ret (:obj:`pandas.DataFrame`): datetime indexed DataFrame that has been resampled to monthly time frequency. Note: If taking monthly totals (`agg_str` = 'sum') missing days will be filled with the months daily mean before summation. """ if agg_str == 'sum': mdf = df.loc[:,cols].apply(pd.to_numeric).resample('M').agg( [agg_str, 'count', 'mean'] ) else: mdf = df.loc[:,cols].apply(pd.to_numeric).resample('M').agg( [agg_str, 'count'] ) ret = pd.DataFrame() for c in cols: bad_months = mdf.loc[:,(c,'count')] <= thresh * mdf.index.days_in_month if agg_str == 'sum': mdf.loc[:,(c,'days_missing')] =\ mdf.index.days_in_month - mdf.loc[:,(c,'count')] ret[c] = mdf.loc[:,(c,agg_str)] +\ (mdf.loc[:,(c,'days_missing')] * mdf.loc[:,(c,'mean')]) else: ret[c] = mdf.loc[:,(c, agg_str)] ret.loc[bad_months, c] = np.nan return ret
[docs] def write_configs(meta_df, data_dict, out_dir=None): """ Write multiple config files based on collection of site metadata and a dictionary containing variable information. Useful for creating config files for `flux-data-qaqc` for batches of flux stations that utilize the same naming conventions and formatting. Arguments: meta_df (:obj:`pandas.DataFrame`): dataframe that contains the following columns (or more) that describe metadata for multiple climate stations: 'site_id', 'climate_file_path', 'station_longitude' 'station_elevation', 'station_latitude', and 'missing_data_value'. Elevation should be in meters and latitude is in decimal degrees. Additional metadata columns will be added to the config file for each site, e.g. 'QC_flag', 'anemometer_height', and any others. data_dict (dict): dictionary that maps `flux-data-qaqc` config names to user's column names in input files header e.g. {'net_radiation_col': 'netrad', 'net_radiation_units' : 'w/m2'} Anything that `flux-data-qaqc` config files "DATA" section can be present here including QC flag names, multiple soil moisture names and weights. Keyword Arguments: out_dir (str or None): default None. Directory to save config files, if None then save to currect working directory. Returns: configs (list): list of :obj:`pathlib.Path` objects of full paths to each config file written. Raises: Exception: if one of the mandatory metadata columns does not exist in `meta_df`. """ mandatory_meta_vars = ['site_id', 'climate_file_path', 'station_longitude', 'station_elevation', 'station_latitude', 'missing_data_value'] if not set(mandatory_meta_vars).issubset(meta_df.columns): missing_cols = set(mandatory_meta_vars) - set(meta_df.columns) err_msg = ('ERROR: the following columns is missing from the ' 'metadata dataframe:\n{}'.format( ', '.join([v for v in missing_cols])) ) raise Exception(err_msg) # if out_dir is None save to cwd if out_dir is None: out_dir = Path.cwd() else: out_dir = Path(out_dir) if not out_dir.is_dir(): print( '{} does not exist, creating directory'.format( out_dir.absolute() ) ) out_dir.mkdir(parents=True, exist_ok=True) config_head = '#'*79+'\n'+'# this file was generated by flux-data-qaqc\n'+\ '#'*79+'\n\n'+'[METADATA]\n'+'#'*10 +'\n' # save list of config file Path objects for later running qaqc configs = [] cnt = 0 for index, site in meta_df.iterrows(): # make sure all mandatory data exists (not null) else skip if (site[mandatory_meta_vars]).isna().any(): print('WARNING: site {} is missing mandatory metadata, skipping'\ .format(site.site_id) ) continue else: cnt += 1 out_file = out_dir / '{}_config.ini'.format(site.site_id) configs.append(out_file.absolute()) with open(out_file, 'w') as outf: outf.write(config_head) for meta_var in site.index: if not pd.isnull(site[meta_var]): line = '{} = {}\n'.format(meta_var, site[meta_var]) outf.write(line) mid_lines = '\n[DATA]\n'+'#'*6+'\n' outf.write(mid_lines) for k,v in data_dict.items(): line = '{} = {}\n'.format(k,v) outf.write(line) print( '\nSuccessfully wrote {} config files out of {} sites to folder:\n{}'\ .format(cnt, len(meta_df), out_dir) ) return configs