Source code for avatars.runner

import os
import time
import warnings
import webbrowser
from dataclasses import asdict
from logging import getLogger
from pathlib import Path
from typing import Any, Optional, Union

import pandas as pd
import tenacity
from avatar_yaml import (
    AvatarizationFastDPParameters,
    AvatarizationOpenDPParameters,
    AvatarizationParameters,
    PrivacyMetricsParameters,
)
from avatar_yaml import Config as Config
from avatar_yaml.models.advice import AdviceType
from avatar_yaml.models.avatar_metadata import (
    DataRecipient,
    DataSubject,
    DataType,
    SensitivityLevel,
)
from avatar_yaml.models.parameters import (
    AlignmentMethod,
    AugmentationStrategy,
    AvatarizationProcessorParameters,
    ExcludeVariablesMethod,
    ImputeMethod,
    ProjectionType,
    ReportLanguage,
    ReportType,
)
from avatar_yaml.models.schema import ColumnType, LinkMethod, PseudonymizationColumnConfig
from IPython.display import HTML, display

from avatars import __version__
from avatars.client import ApiClient
from avatars.constants import (
    DEFAULT_DELAY_BETWEEN_CONSECUTIVE_JOBS,
    DEFAULT_POLL_INTERVAL,
    DEFAULT_TYPE,
    ERROR_STATUSES,
    JOB_EXECUTION_ORDER,
    MATCHERS,
    RESULTS_TO_STORE,
    VOLUME_NAME,
    PlotKind,
    Results,
    TypeResults,
)
from avatars.crash_handler import register_runner
from avatars.file_downloader import FileDownloader
from avatars.job_launcher import JobLauncher
from avatars.metrics_summary import build_metrics_summary_df
from avatars.models import BulkDeleteRequest, BulkDeleteResponse, JobKind, JobResponse
from avatars.results_organizer import ResultsOrganizer

logger = getLogger(__name__)


[docs] class Runner: def __init__( self, api_client: ApiClient, display_name: str, seed: int | None = None, max_distribution_plots: int | None = None, pia_data_recipient: DataRecipient = DataRecipient.UNKNOWN, pia_data_type: DataType = DataType.UNKNOWN, pia_data_subject: DataSubject = DataSubject.UNKNOWN, pia_sensitivity_level: SensitivityLevel = SensitivityLevel.UNDEFINED, report_language: ReportLanguage = ReportLanguage.EN, ) -> None: self.client = api_client self.display_name = display_name self.set_name: str | None = None self.config = Config( set_name=self.display_name, seed=seed, max_distribution_plots=max_distribution_plots ) self.file_downloader = FileDownloader(api_client) self.results: ResultsOrganizer = ResultsOrganizer() self.jobs = JobLauncher(api_client, self.config) self.results_urls: dict[str, dict[str, Any]] = {} self._has_results_missing_warning_issued: bool = False self.report_language = report_language annotations = { "client_type": "python", "client_version": __version__, } self.config.create_metadata( annotations, pia_datarecipient=pia_data_recipient, pia_datatype=pia_data_type, pia_datasubject=pia_data_subject, pia_sensitivitylevel=pia_sensitivity_level, ) # Register this Runner instance for crash reporting register_runner(self)
[docs] def add_annotations(self, annotations: dict[str, str]) -> None: """Add metadata annotations to the config. Parameters ---------- annotations A dictionary of annotations to add to the metadata. """ if self.config.avatar_metadata is None: self.config.create_metadata(annotations) else: current_annotations = self.config.avatar_metadata.annotations or {} current_annotations.update(annotations) self.config.create_metadata(current_annotations)
[docs] def add_table( self, table_name: str, data: str | pd.DataFrame, primary_key: str | None = None, foreign_keys: list | None = None, time_series_time: str | None = None, types: dict[str, ColumnType] = {}, individual_level: bool | None = None, avatar_data: str | pd.DataFrame | None = None, ): """Add a table to the config and upload the data in the server. Parameters ---------- table_name The name of the table. data The data to add to the table. Can be a path to a file or a pandas DataFrame. primary_key The primary key of the table. foreign_keys Foreign keys of the table. time_series_time name of the time column in the table (time series case). types A dictionary of column types with the column name as the key and the type as the value. individual_level A boolean as true if the table is at individual level or not. An individual level table is a table where each row corresponds to an individual (ex: patient, customer, etc.). Default behavior is True. avatar_data The avatar table if there is one. Can be a path to a file or a pandas DataFrame. """ file, avatar_file = self.upload_file(table_name, data, avatar_data) if isinstance(data, pd.DataFrame): types = self._get_types(data, types) if foreign_keys == [None]: foreign_keys = None self.config.create_table( table_name=table_name, original_volume=VOLUME_NAME, original_file=file, avatar_volume=VOLUME_NAME if avatar_data is not None else None, avatar_file=avatar_file, primary_key=primary_key, foreign_keys=foreign_keys, time_series_time=time_series_time, types=types, individual_level=individual_level, )
[docs] def advise_parameters(self, table_name: str | None = None) -> None: """Fill the parameters set with the server recommendation. Parameters ---------- table_name The name of the table. If None, all tables will be used. """ self._setup_advice_config() if table_name: tables = [table_name] else: tables = list(self.config.tables.keys()) self._create_advice_jobs(tables) self._apply_advice_parameters(tables)
def _setup_advice_config(self) -> None: """Create advice config and upload resources to server.""" self.config.create_advice(advisor_type=[AdviceType.PARAMETERS]) yaml = self.config.get_yaml() resource_response = self.client.resources.put_resources( display_name=self.display_name, yaml_string=yaml, ) # Update set_name with the actual UUID returned by the backend self.set_name = str(resource_response.set_name) def _create_advice_jobs(self, tables: list[str]) -> None: """Download advice results for the specified tables.""" for table_name in tables: if self.results.advice.get(table_name) is None: # Only launch the advice job once, not per table # but relaunch it if a table didn't get advice yet if self.set_name is None: raise ValueError("Set name is not set. Cannot launch advice job.") self.jobs.launch_job(JobKind.advice, self.set_name) job_name = self.jobs.get_parameters_name(JobKind.advice) self._download_specific_result(job_name, Results.ADVICE) def _apply_advice_parameters(self, tables: list[str]) -> None: """Apply the downloaded advice parameters to each table.""" job_name = self.jobs.get_parameters_name(JobKind.advice) for table_name in tables: advise_parameters = self.results.get_results(table_name, Results.ADVICE, job_name) if not isinstance(advise_parameters, dict): raise ValueError("Expected advice parameters to be a dictionary") imputation_data: dict[str, Any] = {} if advise_parameters.get("imputation") is not None: imputation_result = advise_parameters.get("imputation") if isinstance(imputation_result, dict): imputation_data = imputation_result imputation_method = None if imputation_data.get("method") is not None: imputation_method = ImputeMethod(imputation_data.get("method")) self.set_parameters( k=advise_parameters.get("k"), use_categorical_reduction=advise_parameters.get("use_categorical_reduction"), ncp=advise_parameters.get("ncp"), imputation_method=imputation_method, imputation_k=imputation_data.get("k"), imputation_training_fraction=imputation_data.get("training_fraction"), imputation_return_data_imputed=imputation_data.get("return_data_imputed", False), table_name=table_name, )
[docs] def upload_file( self, table_name: str, data: str | pd.DataFrame, avatar_data: str | pd.DataFrame | None = None, ): """Upload a file to the server. Parameters ---------- data The data to upload. Can be a path to a file or a pandas DataFrame. file_name The name of the file. """ extension = ".csv" if isinstance(data, pd.DataFrame) else Path(data).suffix file = table_name + extension self.client.upload_file(data=data, key=file) avatar_file = None if avatar_data is not None: avatar_file = table_name + "_avatars" + extension self.client.upload_file(data=avatar_data, key=avatar_file) return file, avatar_file
def _get_types( self, data: pd.DataFrame, types: dict[str, ColumnType] = {} ) -> dict[str, ColumnType]: dtypes = {} for column_name, _type in data.dtypes.items(): column_name_str = str(column_name) if column_name_str in types: dtypes[column_name_str] = types[column_name_str] else: dtypes[column_name_str] = self._get_type_from_pandas(str(_type)) return dtypes def _get_type_from_pandas(self, value: str) -> ColumnType: """Return our data type from pandas type.""" for matcher, our_type in MATCHERS.items(): if matcher.search(value): return our_type return DEFAULT_TYPE
[docs] def set_parameters( self, table_name: str, k: int | None = None, ncp: int | None = None, use_categorical_reduction: bool | None = None, column_weights: dict[str, float] | None = None, exclude_variable_names: list[str] | None = None, exclude_replacement_strategy: ExcludeVariablesMethod | None = None, # DEPRECATED exclude_variable_method: ExcludeVariablesMethod | None = None, imputation_method: ImputeMethod | None = None, imputation_k: int | None = None, imputation_training_fraction: float | None = None, imputation_return_data_imputed: bool | None = None, open_dp_epsilon: float | None = None, fast_dp_epsilon: float | None = None, time_series_nf: int | None = None, time_series_projection_type: ProjectionType | None = None, time_series_nb_points: int | None = None, time_series_method: AlignmentMethod | None = None, known_variables: list[str] | None = None, target: str | None = None, quantile_threshold: int | None = None, data_augmentation_strategy: float | AugmentationStrategy | dict[str, float] | None = None, data_augmentation_target_column: str | None = None, data_augmentation_should_anonymize_original_table: bool | None = None, processors: list[AvatarizationProcessorParameters] | None = None, pseudonymized_columns: dict[str, PseudonymizationColumnConfig] | None = None, use_excluded_variables_in_metrics: bool = False, ): """Set the parameters for a given table. This will overwrite any existing parameters for the table, including parameters set using `advise_parameter()`. Parameters ---------- table_name The name of the table. k Number of nearest neighbors to consider for KNN-based methods. ncp Number of dimensions to consider for the KNN algorithm. use_categorical_reduction Whether to transform categorical variables into a latent numerical space before projection. column_weights Dictionary mapping column names to their respective weights, indicating the importance of each variable during the projection process. exclude_variable_names List of variable names to exclude from the projection. exclude_replacement_strategy: DEPRECATED: use exclude_variable_method instead. exclude_variable_method: Strategy for replacing excluded variables. Options: ROW_ORDER, COORDINATE_SIMILARITY. imputation_method Method for imputing missing values. Options: ``ImputeMethod.KNN``, ``ImputeMethod.MODE``, ``ImputeMethod.MEDIAN``, ``ImputeMethod.MEAN``, ``ImputeMethod.FAST_KNN``. imputation_k Number of neighbors to use for imputation if the method is KNN or FAST_KNN. imputation_training_fraction Fraction of the dataset to use for training the imputation model when using KNN or FAST_KNN. imputation_return_data_imputed: Whether to return the data with imputed values. open_dp_epsilon Epsilon value for differential privacy using OpenDP implementation. fast_dp_epsilon Epsilon value for fastDP avatarization. time_series_nf In time series context, number of degrees of freedom to retain in time series projections. time_series_projection_type In time series context, type of projection for time series. Options: ``ProjectionType.FCPA`` (default) or ``ProjectionType.FLATTEN``. time_series_method In time series context, method for aligning series. Options: ``AlignmentMethod.SPECIFIED``, ``AlignmentMethod.MAX``, ``AlignmentMethod.MIN``, ``AlignmentMethod.MEAN``. time_series_nb_points In time series context, number of points to generate for time series. known_variables List of known variables to be used for privacy metrics. These are variables that could be easily known by an attacker. target Target variable to predict, used for signal metrics. quantile_threshold Quantile threshold for privacy metrics calculations. data_augmentation_strategy Strategy for data augmentation. Can be a float representing the augmentation ratio, an AugmentationStrategy enum, or a dictionary mapping modality to their respective augmentation ratios. data_augmentation_target_column Target column for data augmentation when using a dictionary strategy or AugmentationStrategy. data_augmentation_should_anonymize_original_table SENSITIVE: Whether to anonymize the original table during data augmentation. Default is True. processors List of processor parameter objects (subclasses of AvatarizationProcessorParameters). Supports InterRecordRangeDifferenceParameters and RelativeDifferenceParameters. The order of the list determines the order in which processors are applied during preprocessing (and reversed during postprocessing). The transformations are transparent to the user - input and output have the same column structure at the end. pseudonymized_columns A mapping of column name to ``PseudonymizationColumnConfig`` describing the PII type and pseudonymization strategy to apply to each column. Only columns that should be pseudonymized need to be listed. Foreign key columns in child tables automatically inherit the pseudonymization mapping of the referenced parent primary key — no explicit configuration is needed on child FK columns. use_excluded_variables_in_metrics When True, excluded variables are NOT passed to metrics parameters, allowing privacy and signal metrics to include them in calculations. When False (default), excluded variables are also excluded from metrics calculations. """ imputation = imputation_method.value if imputation_method else None if exclude_variable_method: replacement_strategy = exclude_variable_method.value elif exclude_replacement_strategy: warnings.warn( "The 'exclude_replacement_strategy' parameter is deprecated and will be removed " "in a future release. Please use 'exclude_variable_method' instead." ) replacement_strategy = exclude_replacement_strategy.value else: replacement_strategy = None if k and open_dp_epsilon: raise ValueError( "Expected either k or open_dp_epsilon to be set, not both. " "If you want to use OpenDP avatarization, set open_dp_epsilon and remove k." ) if k and fast_dp_epsilon: raise ValueError( "Expected either k or fast_dp_epsilon to be set, not both. " "If you want to use fastDP avatarization, set fast_dp_epsilon and remove k." ) if open_dp_epsilon and fast_dp_epsilon: raise ValueError( "Expected either open_dp_epsilon or fast_dp_epsilon to be set, not both. " "Choose either OpenDP (open_dp_epsilon) or FastDP (fast_dp_epsilon)." ) if k == 2: warnings.warn( "You have set k = 2, which is the minimum allowed value. With such a low k, " "each synthesized record closely mirrors only 2 original records, " "significantly reducing privacy protection." ) # reset the parameters if they were already set if self.config.avatarization and self.config.avatarization.get(table_name): del self.config.avatarization[table_name] avatarization_open_dp = getattr(self.config, "avatarization_open_dp", None) if avatarization_open_dp and avatarization_open_dp.get(table_name): del avatarization_open_dp[table_name] avatarization_fast_dp = getattr(self.config, "avatarization_fast_dp", None) if avatarization_fast_dp and avatarization_fast_dp.get(table_name): del avatarization_fast_dp[table_name] if self.config.privacy_metrics and self.config.privacy_metrics.get(table_name): del self.config.privacy_metrics[table_name] if self.config.signal_metrics and self.config.signal_metrics.get(table_name): del self.config.signal_metrics[table_name] if self.config.time_series and self.config.time_series.get(table_name): del self.config.time_series[table_name] if k: # Avatarizaztion with avatar method self.config.create_avatarization_parameters( table_name=table_name, k=k, ncp=ncp, use_categorical_reduction=use_categorical_reduction, imputation_method=imputation, imputation_k=imputation_k, imputation_training_fraction=imputation_training_fraction, imputation_return_data_imputed=imputation_return_data_imputed, column_weights=column_weights, exclude_variable_names=exclude_variable_names, exclude_variable_method=replacement_strategy, data_augmentation_strategy=data_augmentation_strategy, data_augmentation_target_column=data_augmentation_target_column, data_augmentation_should_anonymize_original_table=data_augmentation_should_anonymize_original_table, avatarization_processors_parameters=processors, pseudonymized_columns=pseudonymized_columns, ) elif open_dp_epsilon: # OpenDP avatarization (MST method) self.config.create_avatarization_open_dp_parameters( table_name=table_name, epsilon=open_dp_epsilon, ncp=ncp, use_categorical_reduction=use_categorical_reduction, imputation_method=imputation, imputation_k=imputation_k, imputation_training_fraction=imputation_training_fraction, imputation_return_data_imputed=imputation_return_data_imputed, column_weights=column_weights, exclude_variable_names=exclude_variable_names, exclude_variable_method=replacement_strategy, data_augmentation_strategy=data_augmentation_strategy, data_augmentation_target_column=data_augmentation_target_column, data_augmentation_should_anonymize_original_table=data_augmentation_should_anonymize_original_table, avatarization_processors_parameters=processors, pseudonymized_columns=pseudonymized_columns, ) elif fast_dp_epsilon: # FastDP avatarization (DP mechanisms in latent space) self.config.create_avatarization_fast_dp_parameters( table_name=table_name, epsilon=fast_dp_epsilon, ncp=ncp, use_categorical_reduction=use_categorical_reduction, imputation_method=imputation, imputation_k=imputation_k, imputation_training_fraction=imputation_training_fraction, imputation_return_data_imputed=imputation_return_data_imputed, column_weights=column_weights, exclude_variable_names=exclude_variable_names, exclude_variable_method=replacement_strategy, data_augmentation_strategy=data_augmentation_strategy, data_augmentation_target_column=data_augmentation_target_column, data_augmentation_should_anonymize_original_table=data_augmentation_should_anonymize_original_table, avatarization_processors_parameters=processors, pseudonymized_columns=pseudonymized_columns, ) if ( time_series_nf or time_series_projection_type or time_series_nb_points or time_series_method ): method = time_series_method.value if time_series_method else None projection_type = ( time_series_projection_type.value if time_series_projection_type else None ) self.config.create_time_series_parameters( table_name=table_name, nf=time_series_nf, projection_type=projection_type, nb_points=time_series_nb_points, method=method, ) metrics_exclude_variable_names = ( None if use_excluded_variables_in_metrics else exclude_variable_names ) metrics_replacement_strategy = ( None if use_excluded_variables_in_metrics else replacement_strategy ) self.config.create_privacy_metrics_parameters( table_name=table_name, ncp=ncp, use_categorical_reduction=use_categorical_reduction, imputation_method=imputation, imputation_k=imputation_k, imputation_training_fraction=imputation_training_fraction, imputation_return_data_imputed=imputation_return_data_imputed, exclude_variable_names=metrics_exclude_variable_names, exclude_variable_method=metrics_replacement_strategy, known_variables=known_variables, target=target, quantile_threshold=quantile_threshold, column_weights=column_weights, ) self.config.create_signal_metrics_parameters( table_name=table_name, ncp=ncp, use_categorical_reduction=use_categorical_reduction, imputation_method=imputation, imputation_k=imputation_k, imputation_training_fraction=imputation_training_fraction, imputation_return_data_imputed=imputation_return_data_imputed, exclude_variable_names=metrics_exclude_variable_names, exclude_variable_method=metrics_replacement_strategy, column_weights=column_weights, )
[docs] def update_parameters(self, table_name: str, **kwargs) -> None: """ Update specific parameters for the table while preserving other existing parameters. Only updates the parameters that are provided, keeping existing values for others. Parameters ---------- table_name The name of the table. **kwargs The parameters to update. Only parameters that are provided will be updated. See set_parameters for the full list of available parameters. """ avatarization = getattr(self.config, "avatarization", None) avatarization_open_dp = getattr(self.config, "avatarization_open_dp", None) avatarization_fast_dp = getattr(self.config, "avatarization_fast_dp", None) if ( (avatarization.get(table_name) if avatarization else None) is None and (avatarization_open_dp.get(table_name) if avatarization_open_dp else None) is None and (avatarization_fast_dp.get(table_name) if avatarization_fast_dp else None) is None ): raise ValueError( f"No existing parameters found for table '{table_name}'. " "Use set_parameters to create new parameters." ) # Get current parameters for this table current_params = self._extract_current_parameters(table_name) # Update only the parameters that were provided for param_name, param_value in kwargs.items(): current_params[param_name] = param_value # Apply all parameters back using set_parameters self.set_parameters(table_name=table_name, **current_params)
def _extract_current_parameters(self, table_name: str) -> dict: """Extract the current parameters for a given table. Parameters ---------- table_name The name of the table. Returns ------- dict A dictionary containing the current parameters for the table as it is used in set_parameters. """ current_params: dict[str, Any] = {} # Extract avatarization parameters if ( self.config.avatarization is not None and table_name in self.config.avatarization.keys() ): # Standard avatarization parameters params: Optional[ Union[ AvatarizationParameters, AvatarizationOpenDPParameters, AvatarizationFastDPParameters, PrivacyMetricsParameters, ] ] = self.config.avatarization[table_name] if isinstance(params, AvatarizationParameters): current_params.update( { "k": params.k, "column_weights": params.column_weights, "use_categorical_reduction": params.use_categorical_reduction, "ncp": params.ncp, } ) current_params.update(self._extract_exclude_parameters(params)) elif ( hasattr(self.config, "avatarization_open_dp") and self.config.avatarization_open_dp is not None and table_name in self.config.avatarization_open_dp.keys() ): # OpenDP avatarization parameters (end-to-end DP) params = self.config.avatarization_open_dp[table_name] if isinstance(params, AvatarizationOpenDPParameters): current_params.update( { "open_dp_epsilon": params.epsilon if params.epsilon else None, "column_weights": params.column_weights, "use_categorical_reduction": params.use_categorical_reduction, "ncp": params.ncp, } ) current_params.update(self._extract_exclude_parameters(params)) elif ( hasattr(self.config, "avatarization_fast_dp") and self.config.avatarization_fast_dp is not None and table_name in self.config.avatarization_fast_dp.keys() ): # FastDP avatarization parameters # Only epsilon is exposed; mechanism-specific params use defaults params = self.config.avatarization_fast_dp[table_name] if isinstance(params, AvatarizationFastDPParameters): current_params.update( { "fast_dp_epsilon": params.epsilon if params.epsilon else None, "column_weights": params.column_weights, "use_categorical_reduction": params.use_categorical_reduction, "ncp": params.ncp, } ) current_params.update(self._extract_exclude_parameters(params)) else: params = None # No parameters has been preset if ( self.config.privacy_metrics is not None and self.config.privacy_metrics.get(table_name) is not None ): params = self.config.privacy_metrics[table_name] current_params.update( { "use_categorical_reduction": params.use_categorical_reduction, "ncp": params.ncp, } ) else: params = None # No parameters has been preset # Extract imputation parameters if params and params.imputation: current_params.update( { "imputation_method": ImputeMethod(params.imputation["method"]) if params.imputation["method"] else None, "imputation_k": params.imputation["k"] if params.imputation["k"] else None, "imputation_training_fraction": params.imputation["training_fraction"] if params.imputation["training_fraction"] else None, "imputation_return_data_imputed": params.imputation["return_data_imputed"], } ) # Extract time series parameters if self.config.time_series and table_name in self.config.time_series.keys(): ts_params = self.config.time_series[table_name] # Projection parameters if ts_params.projection: current_params.update( { "time_series_nf": ts_params.projection["nf"] if ts_params.projection["nf"] else None, "time_series_projection_type": ProjectionType( ts_params.projection["projection_type"] ) if ts_params.projection["projection_type"] else None, } ) # Alignment parameters if ts_params.alignment: current_params.update( { "time_series_nb_points": ts_params.alignment["nb_points"] if ts_params.alignment["nb_points"] else None, "time_series_method": AlignmentMethod(ts_params.alignment["method"]) if ts_params.alignment["method"] else None, } ) # Extract privacy metrics parameters if ( self.config.privacy_metrics is not None and table_name in self.config.privacy_metrics.keys() ): pm_params = self.config.privacy_metrics[table_name] to_update = { "known_variables": pm_params.known_variables, "target": pm_params.target, "quantile_threshold": pm_params.quantile_threshold, } current_params.update(to_update) # Detect use_excluded_variables_in_metrics: True when avatarization has # exclude_variables but privacy metrics does not. avatarization_has_excluded = bool(current_params.get("exclude_variable_names")) metrics_has_excluded = bool( pm_params.exclude_variables and pm_params.exclude_variables.get("variable_names") ) if avatarization_has_excluded and not metrics_has_excluded: current_params["use_excluded_variables_in_metrics"] = True return current_params def _extract_exclude_parameters(self, params) -> dict: """Extract exclude variables parameters from parameter object. Parameters ---------- params: The parameters object that contains exclude_variables information. Returns ------- A dictionary containing exclude_variable_names and exclude_variable_method parameters. """ result = {} if params.exclude_variables: result["exclude_variable_names"] = ( params.exclude_variables["variable_names"] if params.exclude_variables["variable_names"] else None ) result["exclude_variable_method"] = ( ExcludeVariablesMethod(params.exclude_variables["replacement_strategy"]) if params.exclude_variables["replacement_strategy"] else None ) return result
[docs] def delete_parameters(self, table_name: str, parameters_names: list[str] | None = None): """Delete parameters from the config. Parameters ---------- table_name The name of the table. parameters_names The names of the parameters to delete. If None, all parameters will be deleted. """ self.config.delete_parameters(table_name, parameters_names)
[docs] def delete_table(self, table_name: str): """Delete a table from the config. Parameters ---------- table_name The name of the table. """ self.config.delete_table(table_name)
[docs] def get_yaml(self, path: str | None = None): """Get the yaml config. Parameters ---------- path The path to the yaml file. If None, the default config will be returned. """ return self.config.get_yaml(path)
def _handle_existing_results(self, ignore_warnings: bool = False) -> None: """Warn and clean up local state when re-running a runner that already has results.""" advice_job_name = self.jobs.get_parameters_name(JobKind.advice) has_run_before = ( not self.results.is_results_empty() or any(key != advice_job_name for key in self.results_urls) or any(job != advice_job_name for job in self.jobs.get_launched_jobs()) ) if has_run_before: if not ignore_warnings: set_name = self.set_name warnings.warn( f"\nThis runner already has results for set_name '{set_name}'.\n" "Re-running will create a new set_name — previous results will no longer " "be accessible from this runner.\n" f"To access previous results later:\n" f"regenerated_runner = manager.create_runner_from_id('{set_name}')\n" "regenerated_runner.shuffled(...)", UserWarning, ) self.results_urls.clear() self.results = ResultsOrganizer() self.jobs.jobs.clear()
[docs] def run(self, jobs_to_run: list[JobKind] = JOB_EXECUTION_ORDER, ignore_warnings: bool = False): """Run avatarization jobs. This method creates resources and launches the specified jobs in the correct order. If this runner has existing results from a previous run, a ``UserWarning`` is emitted with the old ``set_name`` so you can recover previous results if needed. Local state is then cleared and a new run starts. Parameters ---------- jobs_to_run : list[JobKind] List of job types to run. Defaults to all jobs in execution order. ignore_warnings: bool Whether to ignore warnings about existing results when re-running a runner. Defaults to False. Examples -------- >>> runner = manager.create_runner("my_dataset") >>> runner.add_table("customers", data=df) >>> runner.run() >>> >>> # Running again emits a UserWarning with the old set_name, then proceeds >>> runner.run() """ self._handle_existing_results(ignore_warnings=ignore_warnings) # Create report configurations if report job is requested and report configurations # are not already set (could happen when creating a runner from an existing config) if JobKind.report in jobs_to_run: if self.config.report is None or ReportType.BASIC not in self.config.report: self.config.create_report(language=self.report_language) if self.config.report is None or ReportType.PIA not in self.config.report: self.config.create_report(ReportType.PIA, language=self.report_language) yaml = self.get_yaml() resource_response = self.client.resources.put_resources( display_name=self.display_name, yaml_string=yaml, ) # Update set_name with the actual UUID returned by the backend self.set_name = str(resource_response.set_name) self.jobs.set_name = self.set_name # Execute jobs in order jobs_to_run = sorted(jobs_to_run, key=lambda job: JOB_EXECUTION_ORDER.index(job)) for i, job_kind in enumerate(jobs_to_run): # Add small delay between job creations to avoid bursts in api calls # Skip delay for first job if i > 0: time.sleep(DEFAULT_DELAY_BETWEEN_CONSECUTIVE_JOBS) self.jobs.launch_job(job_kind, self.set_name)
def get_job(self, job_name: JobKind | str) -> JobResponse: """ Get the job by name. Parameters ---------- job_name The name of the job to get. """ return self.jobs.get_job_status(job_name)
[docs] def get_status(self, job_name: JobKind): """ Get the status of a job by name. Parameters ---------- job_name The name of the job to get. """ return self.get_job(job_name).status
[docs] def delete( self, job_names: JobKind | str | list[JobKind | str] | None = None ) -> BulkDeleteResponse: """Delete one or more jobs launched by this runner. When called without arguments, every job launched by this runner is deleted. Parameters ---------- job_names A single job kind/name, a list of job kinds/names, or ``None`` to delete all launched jobs. Returns ------- BulkDeleteResponse Response containing deleted and failed jobs. Examples -------- >>> runner.delete() # all jobs >>> runner.delete(JobKind.standard) # single job >>> runner.delete([JobKind.standard, JobKind.privacy_metrics]) # multiple jobs """ if job_names is None: names: list[JobKind | str] = list(self.jobs.get_launched_jobs()) elif not isinstance(job_names, list): names = [job_names] else: names = job_names job_ids = [self.jobs.get_job_id(name) for name in names] return self.client.jobs.bulk_delete_jobs(BulkDeleteRequest(job_names=job_ids))
def _retrieve_job_result_urls(self, job_name: str) -> None: """ Get the result of a job by name. Parameters ---------- job_name The name of the job to get. """ def check_job_status() -> JobResponse: """Check job status and raise if in error state, otherwise return if not done.""" job = self.get_job(job_name) if job.status in ERROR_STATUSES: if job.exception: raise ValueError(f"Job {job_name} failed with exception: {job.exception}") raise ValueError("internal error") if not job.done: # Raise to trigger retry raise tenacity.TryAgain return job # Use tenacity to poll with exponential backoff capped at 20s # Starts at 5s, 20% longer every time, maxes out at 20s for attempt in tenacity.Retrying( wait=tenacity.wait_exponential( min=DEFAULT_POLL_INTERVAL, max=4 * DEFAULT_POLL_INTERVAL, exp_base=1.2, ), retry=tenacity.retry_if_exception_type(tenacity.TryAgain), reraise=True, ): with attempt: check_job_status() job_id = self.jobs.get_job_id(job_name) self.results_urls[job_name] = self.client.results.get_results(job_id) def _populate_results_from_existing_jobs(self) -> None: """Fetch and populate results URLs from all completed jobs for this set_name. This method is used when reconstructing a Runner from an existing set_name. It fetches all jobs associated with the set_name and populates the results_urls dictionary for completed jobs only. It also registers these jobs with the JobLauncher so they can be accessed via normal result methods. If a job's results are no longer available on the server (404 error), a warning is issued but the method continues to process other jobs. """ if not self.set_name: return all_jobs = self.client.jobs.get_jobs() # Filter jobs by set_name and only include completed jobs for job in all_jobs.jobs: if str(job.set_name) == self.set_name and job.done and not job.exception: job_name = job.name parameters_name = job.parameters_name try: self.results_urls[parameters_name] = self.client.results.get_results(job_name) self.jobs.register_existing_job(parameters_name, job_name, f"/jobs/{job_name}") except Exception as e: # Keep backward-compatible behavior only for missing-results 404s. error_message = str(e).lower() is_missing_results_404 = ( "error status 404" in error_message and "results file for job" in error_message ) if is_missing_results_404: if not self._has_results_missing_warning_issued: tables_list = list(self.config.tables.keys()) warnings.warn( f"Results for job '{job_name}' are no longer " "available on the server. " "Input data has also been cleaned up. " f"You need to reupload the following tables: {tables_list}. " "You can relaunch the job with the same configuration by calling " "runner.run() after re-uploading your data with " "runner.upload_file().", ) # raise only once if multiple jobs are missing results self._has_results_missing_warning_issued = True continue raise
[docs] def get_specific_result_urls( self, job_name: str, result: Results = Results.SHUFFLED, ) -> list[str]: if not self.jobs.has_job(job_name): raise ValueError(f"Expected job '{job_name}' to be created. Try running it first.") if job_name not in self.results_urls: self._retrieve_job_result_urls(job_name) if result not in self.results_urls[job_name]: return [] return self.results_urls[job_name][result]
def _download_all_files(self): for job_name in self.jobs.get_launched_jobs(): if not self.results_urls or job_name not in self.results_urls.keys(): self._retrieve_job_result_urls(job_name) for result in self.results_urls[job_name].keys(): for table_name in self.config.tables.keys(): if result in Results: if Results(result) in RESULTS_TO_STORE: self.get_specific_result(table_name, job_name, Results(result)) def _download_specific_result( self, job_name: str, result_name: Results, ) -> None: urls = self.get_specific_result_urls(job_name=job_name, result=result_name) # Use batch download to get all file credentials at once downloaded_results = self.file_downloader.download_files_batch(urls) for url, result in downloaded_results.items(): metadata = self._get_metadata(url, result_name, job_name) table_name = self.results.get_table_name(result_name, url, result, metadata) if table_name is not None: self.results.set_results( table_name=table_name, result=result, result_name=result_name, metadata=metadata, ) def _get_metadata( self, url: str, result_name: Results, job_name: str ) -> dict[str, Any] | None: match result_name: case Results.FIGURES: return self._get_figure_metadata(url, job_name) case Results.METADATA: return {"kind": job_name} case _: return None def _get_figure_metadata(self, url: str, job_name: str) -> dict[str, Any] | None: figures_metadatas = self.file_downloader.download_file( self.results_urls[job_name][Results.FIGURES_METADATA][0] ) if isinstance(figures_metadatas, list): return self.results.find_figure_metadata(figures_metadatas, url) else: raise TypeError( f"Expected a list, got {type(figures_metadatas)} instead for {url} metadata." )
[docs] def get_specific_result( self, table_name: str, job_name: JobKind, result: Results = Results.SHUFFLED, ) -> TypeResults: job_name_str = self.jobs.get_parameters_name(job_name) if table_name not in self.config.tables.keys(): raise ValueError(f"Expected table '{table_name}' to be created.") if not self.jobs.has_job(job_name_str): raise ValueError(f"Expected job '{job_name}' to be created. Try running it first.") if self.results.get_results(table_name, result, job_name_str) is None: self._download_specific_result(job_name_str, result) return self.results.get_results(table_name, result, job_name_str)
[docs] def get_all_results(self): """ Get all results. Returns ------- dict A dictionary with the results of each job on every table. Each job is a dictionary with the table name as key and the results as value. The results are a dictionary with the result name as key and the data as value. The data can be a pandas DataFrame or a dictionary depending on the result type. """ self._download_all_files() return self.results
[docs] def download_report(self, path: str | None = None, report_type: ReportType = ReportType.BASIC): """ Download the report. Parameters ---------- path The path to save the report. For a single report this is used as-is. When multiple PIA reports are returned (one per table), an index prefix is added to the filename only: ``dir/0_report.pdf``, ``dir/1_report.pdf``, etc. """ is_pia = report_type == ReportType.PIA job_name = self.jobs.get_parameters_name(JobKind.report, pia_report=is_pia) if self.results_urls.get(job_name) is None: self._retrieve_job_result_urls(job_name) urls = self.results_urls[job_name][Results.REPORT] is_multi = len(urls) > 1 for i, url in enumerate(urls): if path is not None: if is_multi: p = Path(path) output_path = str(p.parent / f"{i}_{p.name}") else: output_path = path else: output_path = Path(url).name self.file_downloader.download_file(url, path=output_path)
[docs] def print_parameters(self, table_name: str | None = None) -> None: """Print the parameters for a table. Parameters ---------- table_name The name of the table. If None, all parameters will be printed. """ if table_name is None: for table_name in self.config.tables.keys(): self.print_parameters(table_name) return if table_name not in self.config.tables.keys(): raise ValueError(f"Expected table '{table_name}' to be created. Try running it first.") # Print avatarization parameters avatarization_open_dp = getattr(self.config, "avatarization_open_dp", None) avatarization_fast_dp = getattr(self.config, "avatarization_fast_dp", None) if self.config.avatarization and table_name in self.config.avatarization: print(f"--- Avatarization parameters for {table_name}: ---") # noqa: T201 print(asdict(self.config.avatarization[table_name])) # noqa: T201 print("\n") # noqa: T201 elif avatarization_open_dp and table_name in avatarization_open_dp: print(f"--- Avatarization OpenDP parameters for {table_name}: ---") # noqa: T201 print(asdict(avatarization_open_dp[table_name])) # noqa: T201 print("\n") # noqa: T201 elif avatarization_fast_dp and table_name in avatarization_fast_dp: print(f"--- Avatarization FastDP parameters for {table_name}: ---") # noqa: T201 print(asdict(avatarization_fast_dp[table_name])) # noqa: T201 print("\n") # noqa: T201 else: print(f"--- No avatarization parameters set for {table_name} ---") # noqa: T201 print("\n") # noqa: T201 # Print privacy metrics parameters if self.config.privacy_metrics and table_name in self.config.privacy_metrics: print(f"--- Privacy metrics for {table_name}: ---") # noqa: T201 print(asdict(self.config.privacy_metrics[table_name])) # noqa: T201 print("\n") # noqa: T201 else: print(f"--- No privacy metrics parameters set for {table_name} ---") # noqa: T201 print("\n") # noqa: T201 # Print signal metrics parameters if self.config.signal_metrics and table_name in self.config.signal_metrics: print(f"--- Signal metrics for {table_name}: ---") # noqa: T201 print(asdict(self.config.signal_metrics[table_name])) # noqa: T201 else: print(f"--- No signal metrics parameters set for {table_name} ---") # noqa: T201
[docs] def kill(self): """Method not implemented yet.""" pass
[docs] def shuffled(self, table_name: str) -> pd.DataFrame: """ Get the shuffled data. Parameters ---------- table_name The name of the table to get the shuffled data from. Returns ------- pd.DataFrame The shuffled data as a pandas DataFrame. """ shuffled = self.get_specific_result(table_name, JobKind.standard, Results.SHUFFLED) if not isinstance(shuffled, pd.DataFrame): raise TypeError(f"Expected a pd.DataFrame, got {type(shuffled)} instead.") return shuffled
[docs] def sensitive_unshuffled(self, table_name: str) -> pd.DataFrame: """ Get the unshuffled data. This is sensitive data and should be used with caution. Parameters ---------- table_name The name of the table to get the unshuffled data from. Returns ------- pd.DataFrame The unshuffled data as a pandas DataFrame. """ unshuffled = self.get_specific_result(table_name, JobKind.standard, Results.UNSHUFFLED) if not isinstance(unshuffled, pd.DataFrame): raise TypeError(f"Expected a pd.DataFrame, got {type(unshuffled)} instead.") return unshuffled
[docs] def privacy_metrics(self, table_name: str) -> list[dict]: """ Get the privacy metrics. Parameters ---------- table_name The name of the table to get the privacy metrics from. Returns ------- dict The privacy metrics as a list of dictionary. """ results = self.get_specific_result( table_name, JobKind.privacy_metrics, Results.PRIVACY_METRICS ) if not isinstance(results, list): raise TypeError(f"Expected a list, got {type(results)} instead.") return results
[docs] def signal_metrics(self, table_name: str) -> list[dict]: """ Get the signal metrics. Parameters ---------- table_name The name of the table to get the signal metrics from. Returns ------- dict The signal metrics as a list of dictionary. """ results = self.get_specific_result( table_name, JobKind.signal_metrics, Results.SIGNAL_METRICS ) if not isinstance(results, list): raise TypeError(f"Expected a list, got {type(results)} instead.") return results
[docs] def render_privacy_metrics_summary( self, open_in_browser: bool = False, # DEPRECATED ) -> dict: """Get the aggregated privacy metrics summary across tables. Only available for multi-table jobs. Parameters ---------- open_in_browser Whether to save the summary to a file and open it in a browser. deprecated and will be removed in the future, as the summary is now returned as a dict instead of an HTML file Returns ------- dict A nested dict ``{table_name: {reference: meta_metric}}``. """ if len(self.config.tables) <= 1: raise ValueError( "render_privacy_metrics_summary is only available for multi-table jobs." ) if ( self.results.get_results("summary", Results.PRIVACY_METRICS_SUMMARY, "privacy_metrics") is None ): self._download_specific_result("privacy_metrics", Results.PRIVACY_METRICS_SUMMARY) results = self.results.get_results( "summary", Results.PRIVACY_METRICS_SUMMARY, "privacy_metrics" ) if not isinstance(results, dict): raise TypeError(f"Expected a dict, got {type(results)} instead.") return results
[docs] def render_signal_metrics_summary(self) -> dict: """Get the aggregated signal metrics summary across tables. Only available for multi-table jobs. Returns ------- dict A nested dict ``{table_name: {reference: meta_metric}}``. """ if len(self.config.tables) <= 1: raise ValueError( "render_signal_metrics_summary is only available for multi-table jobs." ) if self.results.get_results("summary", Results.SIGNAL_METRICS_SUMMARY, "") is None: self._download_specific_result("signal_metrics", Results.SIGNAL_METRICS_SUMMARY) results = self.results.get_results("summary", Results.SIGNAL_METRICS_SUMMARY, "") if not isinstance(results, dict): raise TypeError(f"Expected a dict, got {type(results)} instead.") return results
[docs] def metrics_summary(self) -> pd.DataFrame: """Get the combined privacy and signal metrics summary across tables as a DataFrame. Only available for multi-table jobs. Returns ------- pd.DataFrame A DataFrame indexed by ``table_name`` with MultiIndex columns ``(reference, metric)`` where ``reference`` is the top level and ``metric`` is ``privacy`` or ``signal``, combining both meta-metrics for each table/reference pair. """ return build_metrics_summary_df( privacy=self.render_privacy_metrics_summary(), signal=self.render_signal_metrics_summary(), )
[docs] def render_plot(self, table_name: str, plot_kind: PlotKind, open_in_browser: bool = False): """ Render a plot for a given table. The different plot kinds are defined in the PlotKind enum. Parameters ---------- table_name The name of the table to get the plot from. plot_kind The kind of plot to render. open_in_browser Whether to save the plot to a file and open it in a browser. """ results = self.get_specific_result(table_name, JobKind.standard, Results.FIGURES) if not isinstance(results, dict): raise TypeError(f"Expected a dict, got {type(results)} instead.") if plot_kind not in results: raise ValueError(f"No {plot_kind} found for table '{table_name}'.") plots = results[plot_kind] for idx, plot in enumerate(plots): filename = None if open_in_browser: filename = f"{table_name}_{plot_kind.value}_{idx}.html" self._save_file(plot, filename=filename) self._open_plot(plot, filename=filename)
[docs] def projections(self, table_name: str) -> tuple[pd.DataFrame, pd.DataFrame]: """ Get the projections. Parameters ---------- table_name The name of the table to get the projections from. Returns ------- pd.DataFrame The projections as a pandas DataFrame. """ original_coordinates = self.get_specific_result( table_name, JobKind.standard, Results.PROJECTIONS_ORIGINAL ) avatars_coordinates = self.get_specific_result( table_name, JobKind.standard, Results.PROJECTIONS_AVATARS ) if not ( isinstance(avatars_coordinates, pd.DataFrame) and isinstance(original_coordinates, pd.DataFrame) ): raise TypeError( "Expected a pd.DataFrame, " f"got {type(original_coordinates)} and {type(avatars_coordinates)} instead." ) return original_coordinates, avatars_coordinates
[docs] def table_summary(self, table_name: str) -> pd.DataFrame: """ Get the table summary. Parameters ---------- table_name The name of the table to get the summary from. Returns ------- pd.DataFrame The table summary as a dataframe. """ results = self.get_specific_result(table_name, JobKind.advice, Results.ADVICE) if not isinstance(results, dict) or "summary" not in results.keys(): raise ValueError(f"Summary not found in results for table '{table_name}'") summary_data = results["summary"] if "stats" not in summary_data.keys(): raise ValueError(f"Summary not found in results for table '{table_name}'") return pd.DataFrame(summary_data["stats"])
[docs] def from_yaml(self, yaml_path: str) -> None: """Load configuration from a YAML file. This replaces the current runner's configuration with the one from the file. **Note**: Table data files are not automatically loaded. Use ``upload_file()`` to provide data before running jobs. Parameters ---------- yaml_path : str The path to the YAML configuration file. """ with open(yaml_path, "r") as file: yaml_string = file.read() # Parse YAML using ConfigParser self.config = Config.from_yaml(yaml_string)
def _open_plot(self, plot_html: HTML, filename: str | None = None): """Render a plot, optionally saving it and opening it in a browser.""" if filename: file_path = os.path.abspath(filename) webbrowser.open(f"file://{file_path}") else: display(plot_html) def _save_file(self, file_content: HTML, filename: str | None = None): """Save the HTML file content to a specified path.""" if filename is None: return None with open(filename, "w", encoding="utf-8") as file: file.write(file_content.data)