Source code for avatars.processors.inter_record_range_difference

import pandas as pd


[docs] class InterRecordRangeDifferenceProcessor: """Processor to express the values of two related variables relative to previous records. This processor can be used only on data where there are several records for each individual. By this transformation, variables such as `var_a` and `var_b` whose values are cumulative over successive records `t` in the following way: var_a_t <= var_b_t <= var_a_t+1 <= var_b_t+1 <= var_a_t+2 ... will be expressed as: - a variable containing the first value of `var_a`. - a variable containing the difference from the previous record (i.e. `var_a_t` - `var_b_t-1`) - a variable containing the range between the start and end variables (i.e. `var_b_t` - `var_a_t`) This processor can be used to express a quantity that varies at each event (variation defined by a start and an end) but also that varies across successive events. This processor is not suitable for data where any of the variables passed as args contain missing values. Keyword Arguments ----------------- id_variable: variable indicating which individual each row belongs to target_start_variable: variable representing the start of the range to transform target_end_variable: variable representing the end of the range to transform sort_by_variable: variable used to sort records for each id new_first_variable: name of the variable to be created to contain the first value of the target variable new_range_variable: name of the variable to be created to contain the range value new_difference_variable: name of the variable to be created to contain the difference value keep_record_order: If set to `True`, the postprocess will decode values respecting the record order given by `id_variable` and `sort_by_variable` from the source dataframe. This can only be set to `True` if the indices are the same between the source and dest dataframes passed as arguments to `postprocess`. Examples -------- >>> df = pd.DataFrame( ... { ... "id": [1, 2, 1, 1, 2, 2], ... "start": [7, 14, 6, 12, 10, 23], ... "end": [8, 18, 7, 15, 12, 24], ... } ... ) >>> processor = InterRecordRangeDifferenceProcessor( ... id_variable="id", ... target_start_variable='start', ... target_end_variable='end', ... sort_by_variable="start", ... new_first_variable="first_value", ... new_range_variable='range_value', ... new_difference_variable="value_difference", ... keep_record_order=True ... ) >>> processor.preprocess(df) id range_value first_value value_difference 0 1 1 6 0.0 1 2 4 10 2.0 2 1 1 6 0.0 3 1 3 6 4.0 4 2 2 10 0.0 5 2 1 10 5.0 The postprocess allows you to transform some preprocessed data back into its original format >>> preprocessed_df = pd.DataFrame( ... { ... "id": [1, 2, 1, 1, 2, 2], ... "range_value": [1, 4, 1, 3, 2, 1], ... "first_value": [6, 10, 6, 6, 10, 10], ... "value_difference": [0, 2, 0, 4, 0, 5], ... } ... ) >>> processor.postprocess(df, preprocessed_df) id start end 0 1 7.0 8.0 1 2 14.0 18.0 2 1 6.0 7.0 3 1 12.0 15.0 4 2 10.0 12.0 5 2 23.0 24.0 The postprocess can also be used on data where the number of records per individual is different than the original one. In such cases, the processor should be instantiated with the `keep_record_order` argument set to its default value `False`. In the example below, there is an extra record with the id 2. >>> processor = InterRecordRangeDifferenceProcessor( ... id_variable="id", ... target_start_variable='start', ... target_end_variable='end', ... sort_by_variable="start", ... new_first_variable="first_value", ... new_range_variable='range_value', ... new_difference_variable="value_difference", ... keep_record_order=False ... ) >>> preprocessed_df = pd.DataFrame( ... { ... "id": [1, 2, 1, 1, 2, 2, 2], ... "range_value": [1, 4, 1, 3, 2, 1, 2], ... "first_value": [6, 10, 6, 6, 10, 10, 10], ... "value_difference": [0, 2, 0, 4, 0, 5, 1], ... } ... ) >>> processor.postprocess(df, preprocessed_df) id start end 0 1 6.0 7.0 1 2 12.0 16.0 2 1 7.0 8.0 3 1 12.0 15.0 4 2 16.0 18.0 5 2 23.0 24.0 6 2 25.0 27.0 """ def __init__( self, *, id_variable: str, target_start_variable: str, target_end_variable: str, sort_by_variable: str, new_first_variable: str, new_range_variable: str, new_difference_variable: str, keep_record_order: bool = False, ): self.id_variable = id_variable self.target_start_variable = target_start_variable self.target_end_variable = target_end_variable self.sort_by_variable = sort_by_variable self.new_first_variable = new_first_variable self.new_range_variable = new_range_variable self.new_difference_variable = new_difference_variable self.keep_record_order = keep_record_order
[docs] def preprocess(self, df: pd.DataFrame) -> pd.DataFrame: variables_to_check = [ self.id_variable, self.target_start_variable, self.target_end_variable, self.sort_by_variable, ] if len(set(variables_to_check).difference(df.columns.values)) > 0: raise ValueError( "Expected valid variable names for `id_variable`, `target_start_variable`, " "`target_end_variable` and `sort_by_variable`, got " f"'{self.id_variable}', '{self.target_start_variable}', " f"'{self.target_end_variable}' and '{self.sort_by_variable}' instead" ) if df[variables_to_check].isnull().values.any(): raise ValueError( "Expected no missing values for `id_variable`, `target_start_variable`, " "`target_end_variable` and `sort_by_variable`, got columns with nulls instead" ) df = df.copy() # data needs to be sorted df = df.sort_values([self.id_variable, self.sort_by_variable]) # store range value (difference between end and start) df[self.new_range_variable] = df[self.target_end_variable] - df[self.target_start_variable] # store first value of target start variable df[self.new_first_variable] = df.groupby(self.id_variable)[ self.target_start_variable ].transform("first") # add a temp column with the value of the previous line df["__end_tmp__"] = df.groupby(self.id_variable)[self.target_end_variable].shift() # store the difference between current and previous value df[self.new_difference_variable] = df[self.target_start_variable] - df["__end_tmp__"] df[self.new_difference_variable] = df[self.new_difference_variable].fillna(0) # remove the tmp column and the original target variables df = df.drop( columns=[ self.target_start_variable, self.target_end_variable, "__end_tmp__", ] ) # sort by index to return processed data in the original order df = df.sort_index() return df
[docs] def postprocess(self, source: pd.DataFrame, dest: pd.DataFrame) -> pd.DataFrame: variables_to_check = [ self.id_variable, self.new_first_variable, self.new_range_variable, self.new_difference_variable, ] if len(set(variables_to_check).difference(dest.columns.values)) > 0: raise ValueError( "Expected valid variable names for `id_variable`, `new_first_variable`, " "`new_range_variable` and `new_difference_variable`, " f"got '{self.id_variable}', '{self.new_first_variable}', " f"'{self.new_range_variable}' and '{self.new_difference_variable}' instead" ) if self.sort_by_variable not in source.columns.values: raise ValueError( f"Expected a valid `sort_by_variable`, got '{self.sort_by_variable}' instead" ) if dest[variables_to_check].isnull().values.any(): raise ValueError( "Expected no missing values for `id_variable`, " "`new_first_variable`, `new_range_variable` and `new_difference_variable`, " "got columns with nulls instead" ) if source[self.sort_by_variable].isnull().sum() > 0: raise ValueError( "Expected no missing values for `sort_by_variable` in source, " "got column with nulls instead" ) if self.keep_record_order and len(set(source.index).symmetric_difference(dest.index)) > 0: raise ValueError( "Expected `keep_record_order` to be `True` only if source and dest " "have same indices, got source and dest with different indices" ) df = dest.copy() # sort values in the same way as they were ordered in preprocess if self.keep_record_order: ordered_indices = source.sort_values([self.id_variable, self.sort_by_variable]).index df = df.loc[ordered_indices] # calculate for each row the sum of ranges of all previous records df["__range_tmp__"] = df.groupby(self.id_variable)[self.new_range_variable].shift() df["__range_tmp__"] = df["__range_tmp__"].fillna(0) df["__totalizer_tmp__"] = df.groupby(self.id_variable)["__range_tmp__"].transform( pd.Series.cumsum ) + df.groupby(self.id_variable)[self.new_difference_variable].transform(pd.Series.cumsum) # calculate for each row the start value as the sum of the first value and the previously # calculated sum of ranges of all previous records (`__totalizer_tmp__`) df[self.target_start_variable] = df[self.new_first_variable] + df["__totalizer_tmp__"] # calculate end values from start value and range value df[self.target_end_variable] = df[self.target_start_variable] + df[self.new_range_variable] # remove all temp columns and columns added at preprocessing df = df.drop( columns=[ "__range_tmp__", "__totalizer_tmp__", self.new_difference_variable, self.new_range_variable, self.new_first_variable, ] ) # sort by index to return data in the original order df = df.sort_index() return df