Source code for avatars.processors.inter_record_bounded_range_difference

from typing import Optional

import numpy as np
import pandas as pd


[docs] class InterRecordBoundedRangeDifferenceProcessor: """Processor to express two related bounded variables relative to previous records. This processor can be used only on data where there are several records for each individual. By this transformation, variables such as `var_a` and `var_b` whose values are cumulative over successive records `t` in the following way: var_a_t <= var_b_t <= var_a_t+1 <= var_b_t+1 <= var_a_t+2 ... will be expressed as: - a variable containing the first value of `var_a`. - a variable containing the difference from the previous record - a variable containing the range between the start and end variables Difference and range variables are expressed as proportion of possible change between the value and the bound (upper or lower). For example, for a variable whose value only spreads from 10 (lower bound) to 100 (upper bound), if the previous records value is 60 and the new value is 30, the proportion will be calculated as (30 - 60) / (60 - 10) = -0.6 This processor is not suitable for data where any of the variables passed as args contain missing values. Keyword Arguments ----------------- id_variable: variable indicating which individual each row belongs to target_start_variable: variable representing the start of the range to transform target_end_variable: variable representing the end of the range to transform sort_by_variable: variable used to sort records for each id new_first_variable: name of the variable to be created to contain the first value of the target variable new_range_variable: name of the variable to be created to contain the range value new_difference_variable: name of the variable to be created to contain the difference value should_round_output: set to True to force post-processed values to be integer. Examples -------- >>> df = pd.DataFrame( ... { ... 'quantity_start': [30, 100, 80, 70, 40, 70], ... 'quantity_end': [10, 80, 70, 60, 30, 5], ... 'b': [4, 3, 0, 0, 2, 4], ... 'id': [1,1,1,2,2,2] ... } ... ) >>> processor = InterRecordBoundedRangeDifferenceProcessor( ... id_variable='id', ... target_start_variable='quantity_start', ... target_end_variable='quantity_end', ... new_first_variable_name='quantity_s_first_val', ... new_difference_variable_name="quantity_diff_to_bound", ... new_range_variable="quantity_range", ... should_round_output=True ... ) >>> preprocessed_df = processor.preprocess(df) >>> print(preprocessed_df) b id quantity_range quantity_s_first_val quantity_diff_to_bound 0 4 1 -0.800000 30 0.000000 1 3 1 -0.210526 30 1.000000 2 0 1 -0.133333 30 0.000000 3 0 2 -0.153846 70 0.000000 4 2 2 -0.285714 70 -0.363636 5 4 2 -1.000000 70 0.571429 The postprocess allows you to transform some preprocessed data back into its original format. >>> processor.postprocess(df, preprocessed_df) quantity_start quantity_end b id 0 30 10 4 1 1 100 80 3 1 2 80 70 0 1 3 70 60 0 2 4 40 30 2 2 5 70 5 4 2 """ def __init__( self, *, id_variable: str, target_start_variable: str, target_end_variable: str, new_first_variable_name: str, new_range_variable: str, new_difference_variable_name: str, sort_by_variable: Optional[str] = None, should_round_output: bool = True, ): self.id_variable = id_variable self.target_start_variable = target_start_variable self.target_end_variable = target_end_variable self.new_first_variable_name = new_first_variable_name self.new_range_variable = new_range_variable self.new_difference_variable_name = new_difference_variable_name self.sort_by_variable = sort_by_variable self.should_round_output = should_round_output
[docs] def preprocess(self, df: pd.DataFrame) -> pd.DataFrame: variables_to_check = [ self.id_variable, self.target_start_variable, self.target_end_variable, ] if self.sort_by_variable: variables_to_check.append(self.sort_by_variable) if len(set(variables_to_check).difference(df.columns.values)) > 0: msg = "Expected valid variable names for `id_variable`, `target_start_variable`, " msg += f"`target_end_variable` and `sort_by_variable`, got '{self.id_variable}'," msg += f" '{self.target_start_variable}', '{self.target_end_variable}' and " msg += f"'{self.sort_by_variable}' instead" raise ValueError(msg) if df[variables_to_check].isnull().values.any(): msg = "Expected no missing values for `id_variable`, `target_start_variable`, " msg += "`target_end_variable` and `sort_by_variable`, " msg += "got columns with nulls instead" raise ValueError(msg) working = df.copy() # sort records for each individual if self.sort_by_variable: working = working.sort_values([self.id_variable, self.sort_by_variable]) else: working = working.sort_values([self.id_variable]) # determine lb and ub working["lb"] = min( min(working[self.target_start_variable]), min(working[self.target_end_variable]), ) working["ub"] = max( max(working[self.target_start_variable]), max(working[self.target_end_variable]), ) # compute relative range to ub or lb working["range_increase"] = ( working[self.target_end_variable] >= working[self.target_start_variable] ) working["range_decrease"] = ( working[self.target_end_variable] < working[self.target_start_variable] ) working["diff_to_lb"] = abs(working[self.target_start_variable] - working["lb"]) working["diff_to_ub"] = abs(working[self.target_start_variable] - working["ub"]) working["relative_range_to_lb"] = ( -abs(working[self.target_end_variable] - working[self.target_start_variable]) / working["diff_to_lb"] ) working["relative_range_to_lb"] = working["relative_range_to_lb"].replace( [np.inf, -np.inf, np.nan], 0 ) working["relative_range_to_ub"] = ( abs(working[self.target_end_variable] - working[self.target_start_variable]) / working["diff_to_ub"] ) working["relative_range_to_ub"] = working["relative_range_to_ub"].replace( [np.inf, -np.inf, np.nan], 0 ) working[self.new_range_variable] = ( working["range_decrease"] * working["relative_range_to_lb"] + working["range_increase"] * working["relative_range_to_ub"] ) # compute first value working[self.new_first_variable_name] = working.groupby(self.id_variable)[ self.target_start_variable ].transform("first") # compute difference from previous val as proportion to lb or ub working["previous_val"] = working.groupby(self.id_variable)[ self.target_end_variable ].shift() working = working.reset_index(drop=False) working.loc[working["previous_val"].isnull(), "previous_val"] = working[ self.target_start_variable ] # for first record, set previous value as same value to avoid NaN working["increase"] = working[self.target_start_variable] >= working["previous_val"] working["decrease"] = working[self.target_start_variable] < working["previous_val"] working["diff_to_lb"] = abs(working["previous_val"] - working["lb"]) working["diff_to_ub"] = abs(working["previous_val"] - working["ub"]) working["relative_diff_to_lb"] = ( -abs(working[self.target_start_variable] - working["previous_val"]) / working["diff_to_lb"] ) working["relative_diff_to_lb"] = working["relative_diff_to_lb"].replace( [np.inf, -np.inf, np.nan], 0 ) working["relative_diff_to_ub"] = ( abs(working[self.target_start_variable] - working["previous_val"]) / working["diff_to_ub"] ) working["relative_diff_to_ub"] = working["relative_diff_to_ub"].replace( [np.inf, -np.inf, np.nan], 0 ) working[self.new_difference_variable_name] = ( working["decrease"] * working["relative_diff_to_lb"] + working["increase"] * working["relative_diff_to_ub"] ) # Remove tmp variables working = working.drop( columns=[ "relative_diff_to_lb", "relative_diff_to_ub", "previous_val", "diff_to_lb", "diff_to_ub", "decrease", "increase", "lb", "ub", self.target_start_variable, self.target_end_variable, ] ) working = working.drop( columns=[ "range_increase", "range_decrease", "relative_range_to_lb", "relative_range_to_ub", ] ) # Re-order data as it was in the input - required because of earlier sorting step working = working.set_index("index").sort_index() working.index.name = None return working
[docs] def postprocess(self, source: pd.DataFrame, dest: pd.DataFrame) -> pd.DataFrame: working = dest.copy() vals_s = [] vals_e = [] working["original_order"] = range(len(working)) working = working.sort_values(by=[self.id_variable]) # determine lb and ub lb = min( min(source[self.target_start_variable]), min(source[self.target_end_variable]), ) ub = max( max(source[self.target_start_variable]), max(source[self.target_end_variable]), ) # identify whether the difference and range values have increased or decreased working["increase"] = working[self.new_difference_variable_name] >= 0 working["decrease"] = ~working["increase"] working[self.new_difference_variable_name] = abs( working[self.new_difference_variable_name] ) working["range_increase"] = working[self.new_range_variable] >= 0 working["range_decrease"] = ~working["range_increase"] # Iteratively compute the values based on the values of previous records # NB: It is not clear how this could be vectorized for theid in working[self.id_variable].unique(): previous_val = working[working[self.id_variable] == theid][ self.new_first_variable_name ].values[0] for ( increase, decrease, relative_diff_to_bound, range_diff, range_increase, range_decrease, ) in working.loc[ working[self.id_variable] == theid, [ "increase", "decrease", self.new_difference_variable_name, self.new_range_variable, "range_increase", "range_decrease", ], ].values: val_s = ( previous_val - decrease * (relative_diff_to_bound * abs(previous_val - lb)) + increase * (relative_diff_to_bound * abs(previous_val - ub)) ) val_e = ( val_s + range_decrease * (range_diff * abs(val_s - lb)) + range_increase * (range_diff * abs(val_s - ub)) ) previous_val = val_e vals_s.append(val_s) vals_e.append(val_e) working[self.target_start_variable] = vals_s working[self.target_end_variable] = vals_e # optional rounding step if self.should_round_output: working[self.target_start_variable] = working[self.target_start_variable].astype(int) working[self.target_end_variable] = working[self.target_end_variable].astype(int) working = working.sort_values(by=["original_order"]) # remove tmp variables columns_to_remove = [ "increase", "decrease", self.new_difference_variable_name, self.new_first_variable_name, self.new_range_variable, "range_increase", "range_decrease", "original_order", ] working = working.drop(columns=columns_to_remove) # order columns common_cols = [c for c in source.columns if c in working.columns] other_cols = [c for c in working.columns if c not in source.columns] cols_order = common_cols + other_cols working = working[cols_order] return working