import os
import time
import webbrowser
from dataclasses import asdict
from pathlib import Path
from typing import Any, Optional, Union
from uuid import UUID
import pandas as pd
import yaml
from avatar_yaml import (
AvatarizationDPParameters,
AvatarizationParameters,
PrivacyMetricsParameters,
)
from avatar_yaml import Config as Config
from avatar_yaml.models.advice import AdviceType
from avatar_yaml.models.parameters import (
AlignmentMethod,
ExcludeVariablesMethod,
ImputeMethod,
ProjectionType,
)
from avatar_yaml.models.schema import ColumnType, LinkMethod
from IPython.display import HTML, display
from avatars import __version__
from avatars.client import ApiClient
from avatars.constants import (
DEFAULT_RETRY_INTERVAL,
DEFAULT_TYPE,
ERROR_STATUSES,
JOB_EXECUTION_ORDER,
MATCHERS,
RESULTS_TO_STORE,
VOLUME_NAME,
PlotKind,
Results,
TypeResults,
)
from avatars.crash_handler import register_runner
from avatars.file_downloader import FileDownloader
from avatars.models import JobCreateRequest, JobCreateResponse, JobKind, JobResponse
from avatars.results_organizer import ResultsOrganizer
[docs]
class Runner:
def __init__(
self,
api_client: ApiClient,
display_name: str,
seed: int | None = None,
max_distribution_plots: int | None = None,
) -> None:
self.client = api_client
self.display_name = display_name
self.set_name: str | None = None
self.config = Config(
set_name=self.display_name, seed=seed, max_distribution_plots=max_distribution_plots
)
self.jobs: dict[JobKind | str, JobCreateResponse] = {}
self.results_urls: dict[JobKind, dict[str, list[str]]] = {}
self.file_downloader = FileDownloader(api_client)
self.results: ResultsOrganizer = ResultsOrganizer()
annotations = {
"client_type": "python",
"client_version": __version__,
}
self.config.create_metadata(annotations)
# Register this Runner instance for crash reporting
register_runner(self)
[docs]
def add_annotations(self, annotations: dict[str, str]) -> None:
if self.config.avatar_metadata is None:
self.config.create_metadata(annotations)
return
previous_annotations = self.config.avatar_metadata.annotations
previous_annotations.update(annotations)
self.config.create_metadata(previous_annotations)
[docs]
def add_table(
self,
table_name: str,
data: str | pd.DataFrame,
primary_key: str | None = None,
foreign_keys: list | None = None,
time_series_time: str | None = None,
types: dict[str, ColumnType] = {},
individual_level: bool | None = None,
avatar_data: str | pd.DataFrame | None = None,
):
"""Add a table to the config and upload the data in the server.
Parameters
----------
table_name
The name of the table.
data
The data to add to the table. Can be a path to a file or a pandas DataFrame.
primary_key
The primary key of the table.
foreign_keys
Foreign keys of the table.
time_series_time
name of the time column in the table (time series case).
types
A dictionary of column types with the column name as the key and the type as the value.
individual_level
A boolean as true if the table is at individual level or not. An individual level table
is a table where each row corresponds to an individual (ex: patient, customer, etc.)
avatar_data
The avatar table if there is one. Can be a path to a file or a pandas DataFrame.
"""
file, avatar_file = self.upload_file(table_name, data, avatar_data)
if isinstance(data, pd.DataFrame):
types = self._get_types(data, types)
self.config.create_table(
table_name=table_name,
original_volume=VOLUME_NAME,
original_file=file,
avatar_volume=VOLUME_NAME if avatar_data is not None else None,
avatar_file=avatar_file,
primary_key=primary_key,
foreign_keys=foreign_keys,
time_series_time=time_series_time,
types=types,
individual_level=individual_level,
)
[docs]
def advise_parameters(self, table_name: str | None = None) -> None:
"""Fill the parameters set with the server recommendation.
Parameters
----------
table_name
The name of the table. If None, all tables will be used.
"""
self._setup_advice_config()
if table_name:
tables = [table_name]
else:
tables = list(self.config.tables.keys())
self._create_advice_jobs(tables)
self._apply_advice_parameters(tables)
def _setup_advice_config(self) -> None:
"""Create advice config and upload resources to server."""
self.config.create_advice(advisor_type=[AdviceType.PARAMETERS])
yaml = self.config.get_yaml()
resource_response = self.client.resources.put_resources(
display_name=self.display_name,
yaml_string=yaml,
)
# Update set_name with the actual UUID returned by the backend
self.set_name = str(resource_response.set_name)
def _create_advice_jobs(self, tables: list[str]) -> None:
"""Download advice results for the specified tables."""
for table_name in tables:
if self.results.advice.get(table_name) is None:
name = self.config.get_parameters_advice_name(
name="advice", advisor_type=[AdviceType.PARAMETERS]
)
created_job = self._create_job(parameters_name=name)
self.jobs[JobKind.advice] = created_job
self._download_specific_result(JobKind.advice, Results.ADVICE)
def _apply_advice_parameters(self, tables: list[str]) -> None:
"""Apply the downloaded advice parameters to each table."""
for table_name in tables:
advise_parameters = self.results.get_results(
table_name, Results.ADVICE, JobKind.advice
)
if not isinstance(advise_parameters, dict):
raise ValueError("Expected advice parameters to be a dictionary")
imputation_data: dict[str, Any] = {}
if advise_parameters.get("imputation") is not None:
imputation_result = advise_parameters.get("imputation")
if isinstance(imputation_result, dict):
imputation_data = imputation_result
imputation_method = None
if imputation_data.get("method") is not None:
imputation_method = ImputeMethod(imputation_data.get("method"))
self.set_parameters(
k=advise_parameters.get("k"),
use_categorical_reduction=advise_parameters.get("use_categorical_reduction"),
ncp=advise_parameters.get("ncp"),
imputation_method=imputation_method,
imputation_k=imputation_data.get("k"),
imputation_training_fraction=imputation_data.get("training_fraction"),
imputation_return_data_imputed=imputation_data.get("return_data_imputed"),
table_name=table_name,
)
[docs]
def upload_file(
self,
table_name: str,
data: str | pd.DataFrame,
avatar_data: str | pd.DataFrame | None = None,
):
"""Upload a file to the server.
Parameters
----------
data
The data to upload. Can be a path to a file or a pandas DataFrame.
file_name
The name of the file.
"""
extension = ".csv" if isinstance(data, pd.DataFrame) else Path(data).suffix
file = table_name + extension
self.client.upload_file(data=data, key=file)
avatar_file = None
if avatar_data is not None:
avatar_file = table_name + "_avatars" + extension
self.client.upload_file(data=avatar_data, key=avatar_file)
return file, avatar_file
def _get_types(
self, data: pd.DataFrame, types: dict[str, ColumnType] = {}
) -> dict[str, ColumnType]:
dtypes = {}
for column_name, _type in data.dtypes.items():
column_name_str = str(column_name)
if column_name_str in types:
dtypes[column_name_str] = types[column_name_str]
else:
dtypes[column_name_str] = self._get_type_from_pandas(str(_type))
return dtypes
def _get_type_from_pandas(self, value: str) -> ColumnType:
"""Return our data type from pandas type."""
for matcher, our_type in MATCHERS.items():
if matcher.search(value):
return our_type
return DEFAULT_TYPE
[docs]
def add_link(
self,
parent_table_name: str,
parent_field: str,
child_table_name: str,
child_field: str,
method: LinkMethod = LinkMethod.LINEAR_SUM_ASSIGNMENT,
):
"""Add a table link to the config.
Parameters
----------
parent_table_name
The name of the parent table.
child_table_name
The name of the child table.
parent_field
The parent link key field (primary key) in the parent table.
child_field
The child link key field (foreign key)in the child table.
method
The method to use for linking the tables. Defaults to "linear_sum_assignment".
"""
self.config.create_link(
parent_table_name, child_table_name, parent_field, child_field, method.value
)
[docs]
def set_parameters(
self,
table_name: str,
k: int | None = None,
ncp: int | None = None,
use_categorical_reduction: bool | None = None,
column_weights: dict[str, float] | None = None,
exclude_variable_names: list[str] | None = None,
exclude_replacement_strategy: ExcludeVariablesMethod | None = None,
imputation_method: ImputeMethod | None = None,
imputation_k: int | None = None,
imputation_training_fraction: float | None = None,
imputation_return_data_imputed: bool | None = None,
dp_epsilon: float | None = None,
dp_preprocess_budget_ratio: float | None = None,
time_series_nf: int | None = None,
time_series_projection_type: ProjectionType | None = None,
time_series_nb_points: int | None = None,
time_series_method: AlignmentMethod | None = None,
known_variables: list[str] | None = None,
target: str | None = None,
quantile_threshold: int | None = None,
categorical_hidden_rate_variables: list[str] | None = None,
):
"""Set the parameters for a given table.
This will overwrite any existing parameters for the table, including parameters set using
`advise_parameter()`.
Parameters
----------
table_name
The name of the table.
k
Number of nearest neighbors to consider for KNN-based methods.
ncp
Number of dimensions to consider for the KNN algorithm.
use_categorical_reduction
Whether to transform categorical variables into a
latent numerical space before projection.
column_weights
Dictionary mapping column names to their respective weights,
indicating the importance of each variable during the projection process.
exclude_variable_names
List of variable names to exclude from the projection.
exclude_replacement_strategy : ExcludeVariablesMethod, optional
Strategy for replacing excluded variables. Options: ROW_ORDER, COORDINATE_SIMILARITY.
imputation_method
Method for imputing missing values. Options: ``ImputeMethod.KNN``,
``ImputeMethod.MODE``, ``ImputeMethod.MEDIAN``, ``ImputeMethod.MEAN``,
``ImputeMethod.FAST_KNN``.
imputation_k
Number of neighbors to use for imputation if the method is KNN or FAST_KNN.
imputation_training_fraction
Fraction of the dataset to use for training the imputation model
when using KNN or FAST_KNN.
imputation_return_data_imputed:
Whether to return the data with imputed values.
dp_epsilon
Epsilon value for differential privacy.
dp_preprocess_budget_ratio
Budget ration to allocate when using differential privacy avatarization.
time_series_nf
In time series context, number of degrees of freedom to
retain in time series projections.
time_series_projection_type
In time series context, type of projection for time series. Options:
``ProjectionType.FCPA`` (default) or ``ProjectionType.FLATTEN``.
time_series_method
In time series context, method for aligning series. Options:
``AlignmentMethod.SPECIFIED``, ``AlignmentMethod.MAX``,
``AlignmentMethod.MIN``, ``AlignmentMethod.MEAN``.
time_series_nb_points
In time series context, number of points to generate for time series.
known_variables
List of known variables to be used for privacy metrics.
These are variables that could be easily known by an attacker.
target
Target variable to predict, used for signal metrics.
"""
imputation = imputation_method.value if imputation_method else None
replacement_strategy = (
exclude_replacement_strategy.value if exclude_replacement_strategy else None
)
if k and dp_epsilon:
raise ValueError(
"Expected either k or dp_epsilon to be set, not both. "
"If you want to use differential privacy, set dp_epsilon and remove k."
)
# reset the parameters if they were already set
if self.config.avatarization and self.config.avatarization.get(table_name):
del self.config.avatarization[table_name]
if self.config.avatarization_dp and self.config.avatarization_dp.get(table_name):
del self.config.avatarization_dp[table_name]
if self.config.privacy_metrics and self.config.privacy_metrics.get(table_name):
del self.config.privacy_metrics[table_name]
if self.config.signal_metrics and self.config.signal_metrics.get(table_name):
del self.config.signal_metrics[table_name]
if self.config.time_series and self.config.time_series.get(table_name):
del self.config.time_series[table_name]
if k:
# Avatarizaztion with avatar method
self.config.create_avatarization_parameters(
table_name=table_name,
k=k,
ncp=ncp,
use_categorical_reduction=use_categorical_reduction,
imputation_method=imputation,
imputation_k=imputation_k,
imputation_training_fraction=imputation_training_fraction,
imputation_return_data_imputed=imputation_return_data_imputed,
column_weights=column_weights,
exclude_variable_names=exclude_variable_names,
exclude_replacement_strategy=replacement_strategy,
)
elif dp_epsilon:
# use dp in avatarization
self.config.create_avatarization_dp_parameters(
table_name=table_name,
epsilon=dp_epsilon,
ncp=ncp,
preprocess_budget_ratio=dp_preprocess_budget_ratio,
use_categorical_reduction=use_categorical_reduction,
imputation_method=imputation,
imputation_k=imputation_k,
imputation_training_fraction=imputation_training_fraction,
column_weights=column_weights,
exclude_variable_names=exclude_variable_names,
exclude_replacement_strategy=replacement_strategy,
)
if (
time_series_nf
or time_series_projection_type
or time_series_nb_points
or time_series_method
):
method = time_series_method.value if time_series_method else None
projection_type = (
time_series_projection_type.value if time_series_projection_type else None
)
self.config.create_time_series_parameters(
table_name=table_name,
nf=time_series_nf,
projection_type=projection_type,
nb_points=time_series_nb_points,
method=method,
)
self.config.create_privacy_metrics_parameters(
table_name=table_name,
ncp=ncp,
use_categorical_reduction=use_categorical_reduction,
imputation_method=imputation,
imputation_k=imputation_k,
imputation_training_fraction=imputation_training_fraction,
known_variables=known_variables,
target=target,
quantile_threshold=quantile_threshold,
categorical_hidden_rate_variables=categorical_hidden_rate_variables,
imputation_return_data_imputed=imputation_return_data_imputed,
)
self.config.create_signal_metrics_parameters(
table_name=table_name,
ncp=ncp,
use_categorical_reduction=use_categorical_reduction,
imputation_method=imputation,
imputation_k=imputation_k,
imputation_training_fraction=imputation_training_fraction,
imputation_return_data_imputed=imputation_return_data_imputed,
)
[docs]
def update_parameters(self, table_name: str, **kwargs) -> None:
"""
Update specific parameters for the table while preserving other existing parameters.
Only updates the parameters that are provided, keeping existing values for others.
Parameters
----------
table_name
The name of the table.
**kwargs
The parameters to update. Only parameters that are provided will be updated.
See set_parameters for the full list of available parameters.
"""
# Get current parameters for this table
current_params = self._extract_current_parameters(table_name)
# Update only the parameters that were provided
for param_name, param_value in kwargs.items():
current_params[param_name] = param_value
# Apply all parameters back using set_parameters
self.set_parameters(table_name=table_name, **current_params)
def _extract_current_parameters(self, table_name: str) -> dict:
"""Extract the current parameters for a given table.
Parameters
----------
table_name
The name of the table.
Returns
-------
dict
A dictionary containing the current parameters for
the table as it is used in set_parameters.
"""
current_params: dict[str, Any] = {}
# Extract avatarization parameters
if (
self.config.avatarization is not None
and table_name in self.config.avatarization.keys()
):
# Standard avatarization parameters
params: Optional[
Union[AvatarizationParameters, AvatarizationDPParameters, PrivacyMetricsParameters]
] = self.config.avatarization[table_name]
if isinstance(params, AvatarizationParameters):
current_params.update(
{
"k": params.k,
"column_weights": params.column_weights,
"use_categorical_reduction": params.use_categorical_reduction,
"ncp": params.ncp,
}
)
current_params.update(self._extract_exclude_parameters(params))
elif (
self.config.avatarization_dp is not None
and table_name in self.config.avatarization_dp.keys()
):
# DP avatarization parameters
params = self.config.avatarization_dp[table_name]
if isinstance(params, AvatarizationDPParameters):
current_params.update(
{
"dp_epsilon": params.epsilon if params.epsilon else None,
"dp_preprocess_budget_ratio": params.preprocess_budget_ratio
if params.preprocess_budget_ratio
else None,
"column_weights": params.column_weights,
"use_categorical_reduction": params.use_categorical_reduction,
"ncp": params.ncp,
}
)
current_params.update(self._extract_exclude_parameters(params))
elif (
self.config.privacy_metrics is not None
and self.config.privacy_metrics.get(table_name) is not None
):
params = self.config.privacy_metrics[table_name]
current_params.update(
{
"use_categorical_reduction": params.use_categorical_reduction,
"ncp": params.ncp,
}
)
else:
params = None # No parameters has been preset
# Extract imputation parameters
if params and params.imputation:
current_params.update(
{
"imputation_method": ImputeMethod(params.imputation["method"])
if params.imputation["method"]
else None,
"imputation_k": params.imputation["k"] if params.imputation["k"] else None,
"imputation_training_fraction": params.imputation["training_fraction"]
if params.imputation["training_fraction"]
else None,
"imputation_return_data_imputed": params.imputation["return_data_imputed"]
if params.imputation["return_data_imputed"]
else None,
}
)
# Extract time series parameters
if self.config.time_series and table_name in self.config.time_series.keys():
ts_params = self.config.time_series[table_name]
# Projection parameters
if ts_params.projection:
current_params.update(
{
"time_series_nf": ts_params.projection["nf"]
if ts_params.projection["nf"]
else None,
"time_series_projection_type": ProjectionType(
ts_params.projection["projection_type"]
)
if ts_params.projection["projection_type"]
else None,
}
)
# Alignment parameters
if ts_params.alignment:
current_params.update(
{
"time_series_nb_points": ts_params.alignment["nb_points"]
if ts_params.alignment["nb_points"]
else None,
"time_series_method": AlignmentMethod(ts_params.alignment["method"])
if ts_params.alignment["method"]
else None,
}
)
# Extract privacy metrics parameters
if (
self.config.privacy_metrics is not None
and table_name in self.config.privacy_metrics.keys()
):
pm_params = self.config.privacy_metrics[table_name]
to_update = {
"known_variables": pm_params.known_variables,
"target": pm_params.target,
"quantile_threshold": pm_params.quantile_threshold,
"categorical_hidden_rate_variables": pm_params.categorical_hidden_rate_variables,
}
current_params.update(to_update)
return current_params
def _extract_exclude_parameters(self, params) -> dict:
"""Extract exclude variables parameters from parameter object.
Parameters
----------
params:
The parameters object that contains exclude_variables information.
Returns
-------
A dictionary containing exclude_variable_names and exclude_replacement_strategy parameters.
"""
result = {}
if params.exclude_variables:
result["exclude_variable_names"] = (
params.exclude_variables["variable_names"]
if params.exclude_variables["variable_names"]
else None
)
result["exclude_replacement_strategy"] = (
ExcludeVariablesMethod(params.exclude_variables["replacement_strategy"])
if params.exclude_variables["replacement_strategy"]
else None
)
return result
[docs]
def delete_parameters(self, table_name: str, parameters_names: list[str] | None = None):
"""Delete parameters from the config.
Parameters
----------
table_name
The name of the table.
parameters_names
The names of the parameters to delete. If None, all parameters will be deleted.
"""
self.config.delete_parameters(table_name, parameters_names)
[docs]
def delete_link(self, parent_table_name: str, child_table_name: str):
"""Delete a link from the config.
Parameters
----------
parent_table_name
The name of the parent table.
child_table_name
The name of the child table.
"""
self.config.delete_link(parent_table_name, child_table_name)
[docs]
def delete_table(self, table_name: str):
"""Delete a table from the config.
Parameters
----------
table_name
The name of the table.
"""
self.config.delete_table(table_name)
[docs]
def get_yaml(self, path: str | None = None):
"""Get the yaml config.
Parameters
----------
path
The path to the yaml file. If None, the default config will be returned.
"""
return self.config.get_yaml(path)
[docs]
def run(self, jobs_to_run: list[JobKind] = JOB_EXECUTION_ORDER):
if JobKind.report in jobs_to_run:
self.config.create_report()
yaml = self.get_yaml()
resource_response = self.client.resources.put_resources(
display_name=self.display_name,
yaml_string=yaml,
)
# Update set_name with the actual UUID returned by the backend
self.set_name = str(resource_response.set_name)
jobs_to_run = sorted(jobs_to_run, key=lambda job: JOB_EXECUTION_ORDER.index(job))
for parameters_name in jobs_to_run:
depends_on = []
if parameters_name == JobKind.standard:
if not self.config.avatarization and not self.config.avatarization_dp:
raise ValueError(
"Expected Avatarization parameters to be set to run standard job, "
"you have to set a k or epsilon parameter using `runner.set_parameter()."
)
if (
parameters_name == JobKind.signal_metrics
or parameters_name == JobKind.privacy_metrics
):
if JobKind.standard in self.jobs.keys():
depends_on = [self.jobs[JobKind.standard].Location]
else:
for avatar_table in self.config.avatar_tables.values():
if (
avatar_table.avatars_data is None
or avatar_table.avatars_data.file is None
):
raise ValueError(
"Expected Avatar tables to be set to run signal/privacy metrics "
"job, you have to set avatar_data using `runner.add_table()`."
)
elif parameters_name == JobKind.report:
if not self.jobs.get(JobKind.privacy_metrics) or not self.jobs.get(
JobKind.signal_metrics
):
raise ValueError(
"Expected Privacy and Signal to be created to run report got {jobs_to_run}"
)
depends_on = [
self.jobs[JobKind.privacy_metrics].Location,
self.jobs[JobKind.signal_metrics].Location,
]
created_job = self._create_job(
parameters_name=parameters_name.value,
depends_on=depends_on,
)
self.jobs[parameters_name] = created_job
return self.jobs
def _create_job(
self,
parameters_name: str,
depends_on: list[str] = [],
):
# FIXME: use the create_job method from the client instead of creating the request manually
# the create_job method doesn't return the right object for now.
print(f"Creating {parameters_name} job") # noqa: T201
request = JobCreateRequest(
set_name=UUID(self.set_name), parameters_name=parameters_name, depends_on=depends_on
)
kwargs: dict[str, Any] = {
"method": "post",
"url": f"/jobs", # noqa: F541
"json_data": request,
}
created_job = JobCreateResponse(**self.client.send_request(**kwargs))
return created_job
def get_job(self, job_name: JobKind) -> JobResponse:
"""
Get a job by name.
Parameters
----------
job_name
The name of the job to get.
Returns
-------
JobCreateResponse
The job object.
"""
if job_name not in self.jobs.keys():
raise ValueError(f"Expected job '{job_name}' to be created. Try running it first.")
return self.client.jobs.get_job_status(self.jobs[job_name].name)
[docs]
def get_status(self, job_name: JobKind):
"""
Get the status of a job by name.
Parameters
----------
job_name
The name of the job to get.
"""
return self.get_job(job_name).status
def _retrieve_job_result_urls(self, job_name: JobKind):
"""
Get the result of a job by name.
Parameters
----------
job_name
The name of the job to get.
"""
job = self.get_job(job_name)
if job.status in ERROR_STATUSES:
if job.exception:
raise ValueError(f"Job {job_name} failed with exception: {job.exception}")
raise ValueError("internal error")
while not job.done:
time.sleep(DEFAULT_RETRY_INTERVAL)
job = self.get_job(job_name)
if job.status in ERROR_STATUSES:
if job.exception:
raise ValueError(f"Job {job_name} failed with exception: {job.exception}")
raise ValueError("internal error")
if job.status == "finished":
break
self.results_urls[job_name] = self.client.results.get_results(self.jobs[job_name].name)
[docs]
def get_specific_result_urls(
self,
job_name: JobKind,
result: Results = Results.SHUFFLED,
) -> list[str]:
if job_name not in self.jobs.keys():
raise ValueError(f"Expected job '{job_name}' to be created. Try running it first.")
if job_name not in self.results_urls:
self._retrieve_job_result_urls(job_name)
return self.results_urls[job_name][result]
def _download_all_files(self):
for job_name in self.jobs.keys():
if not self.results_urls or job_name not in self.results_urls.keys():
self._retrieve_job_result_urls(job_name)
for result in self.results_urls[job_name].keys():
for table_name in self.config.tables.keys():
if result in Results:
if Results(result) in RESULTS_TO_STORE:
self.get_specific_result(table_name, job_name, Results(result))
def _download_specific_result(
self,
job_name: JobKind,
result_name: Results,
) -> None:
urls = self.get_specific_result_urls(job_name=job_name, result=result_name)
# Use batch download to get all file credentials at once
downloaded_results = self.file_downloader.download_files_batch(urls)
for url, result in downloaded_results.items():
metadata = self._get_metadata(url, result_name, job_name)
table_name = self.results.get_table_name(result_name, url, result, metadata)
if table_name is not None:
self.results.set_results(
table_name=table_name,
result=result,
result_name=result_name,
metadata=metadata,
)
def _get_metadata(
self, url: str, result_name: Results, job_name: JobKind
) -> dict[str, Any] | None:
match result_name:
case Results.FIGURES:
return self._get_figure_metadata(url)
case Results.METADATA:
return {"kind": job_name}
case _:
return None
def _get_figure_metadata(self, url: str) -> dict[str, Any] | None:
figures_metadatas = self.file_downloader.download_file(
self.results_urls[JobKind.standard][Results.FIGURES_METADATA][0]
)
if isinstance(figures_metadatas, list):
return self.results.find_figure_metadata(figures_metadatas, url)
else:
raise TypeError(
f"Expected a list, got {type(figures_metadatas)} instead for {url} metadata."
)
[docs]
def get_specific_result(
self,
table_name: str,
job_name: JobKind,
result: Results = Results.SHUFFLED,
) -> TypeResults:
if table_name not in self.config.tables.keys():
raise ValueError(f"Expected table '{table_name}' to be created.")
if job_name not in self.jobs.keys():
raise ValueError(f"Expected job '{job_name}' to be created. Try running it first.")
if self.results.get_results(table_name, result, job_name) is None:
self._download_specific_result(job_name, result)
return self.results.get_results(table_name, result, job_name)
[docs]
def get_all_results(self):
"""
Get all results.
Returns
-------
dict
A dictionary with the results of each job on every table.
Each job is a dictionary with the table name as key and the results as value.
The results are a dictionary with the result name as key and the data as value.
The data can be a pandas DataFrame or a dictionary depending on the result type.
"""
self._download_all_files()
return self.results
[docs]
def download_report(self, path: str | None = None):
"""
Download the report.
Parameters
----------
path
The path to save the report.
"""
if self.results_urls.get(JobKind.report) is None:
self._retrieve_job_result_urls(JobKind.report)
report = self.results_urls[JobKind.report][Results.REPORT][0]
self.file_downloader.download_file(report, path=path)
[docs]
def print_parameters(self, table_name: str | None = None) -> None:
"""Print the parameters for a table.
Parameters
----------
table_name
The name of the table.
If None, all parameters will be printed.
"""
if table_name is None:
for table_name in self.config.tables.keys():
self.print_parameters(table_name)
return
if table_name not in self.config.tables.keys():
raise ValueError(f"Expected table '{table_name}' to be created. Try running it first.")
print(f"--- Avatarization parameters for {table_name}: ---") # noqa: T201
print(asdict(self.config.avatarization[table_name])) # noqa: T201
print("\n") # noqa: T201
print(f"--- Privacy metrics for {table_name}: ---") # noqa: T201
print(asdict(self.config.privacy_metrics[table_name])) # noqa: T201
print("\n") # noqa: T201
print(f"--- Signal metrics for {table_name}: ---") # noqa: T201
print(asdict(self.config.signal_metrics[table_name])) # noqa: T201
[docs]
def kill(self):
"""Method not implemented yet."""
pass
[docs]
def shuffled(self, table_name: str) -> pd.DataFrame:
"""
Get the shuffled data.
Parameters
----------
table_name
The name of the table to get the shuffled data from.
Returns
-------
pd.DataFrame
The shuffled data as a pandas DataFrame.
"""
shuffled = self.get_specific_result(table_name, JobKind.standard, Results.SHUFFLED)
if not isinstance(shuffled, pd.DataFrame):
raise TypeError(f"Expected a pd.DataFrame, got {type(shuffled)} instead.")
return shuffled
[docs]
def sensitive_unshuffled(self, table_name: str) -> pd.DataFrame:
"""
Get the unshuffled data.
This is sensitive data and should be used with caution.
Parameters
----------
table_name
The name of the table to get the unshuffled data from.
Returns
-------
pd.DataFrame
The unshuffled data as a pandas DataFrame.
"""
unshuffled = self.get_specific_result(table_name, JobKind.standard, Results.UNSHUFFLED)
if not isinstance(unshuffled, pd.DataFrame):
raise TypeError(f"Expected a pd.DataFrame, got {type(unshuffled)} instead.")
return unshuffled
[docs]
def privacy_metrics(self, table_name: str) -> list[dict]:
"""
Get the privacy metrics.
Parameters
----------
table_name
The name of the table to get the privacy metrics from.
Returns
-------
dict
The privacy metrics as a list of dictionary.
"""
results = self.get_specific_result(
table_name, JobKind.privacy_metrics, Results.PRIVACY_METRICS
)
if not isinstance(results, list):
raise TypeError(f"Expected a list, got {type(results)} instead.")
return results
[docs]
def signal_metrics(self, table_name: str) -> list[dict]:
"""
Get the signal metrics.
Parameters
----------
table_name
The name of the table to get the signal metrics from.
Returns
-------
dict
The signal metrics as a list of dictionary.
"""
results = self.get_specific_result(
table_name, JobKind.signal_metrics, Results.SIGNAL_METRICS
)
if not isinstance(results, list):
raise TypeError(f"Expected a list, got {type(results)} instead.")
return results
[docs]
def render_plot(self, table_name: str, plot_kind: PlotKind, open_in_browser: bool = False):
"""
Render a plot for a given table.
The different plot kinds are defined in the PlotKind enum.
Parameters
----------
table_name
The name of the table to get the plot from.
plot_kind
The kind of plot to render.
open_in_browser
Whether to save the plot to a file and open it in a browser.
"""
results = self.get_specific_result(table_name, JobKind.standard, Results.FIGURES)
if not isinstance(results, dict):
raise TypeError(f"Expected a dict, got {type(results)} instead.")
if plot_kind not in results:
raise ValueError(f"No {plot_kind} found for table '{table_name}'.")
plots = results[plot_kind]
for idx, plot in enumerate(plots):
filename = None
if open_in_browser:
filename = f"{table_name}_{plot_kind.value}_{idx}.html"
self._save_file(plot, filename=filename)
self._open_plot(plot, filename=filename)
[docs]
def projections(self, table_name: str) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Get the projections.
Parameters
----------
table_name
The name of the table to get the projections from.
Returns
-------
pd.DataFrame
The projections as a pandas DataFrame.
"""
original_coordinates = self.get_specific_result(
table_name, JobKind.standard, Results.PROJECTIONS_ORIGINAL
)
avatars_coordinates = self.get_specific_result(
table_name, JobKind.standard, Results.PROJECTIONS_AVATARS
)
if not (
isinstance(avatars_coordinates, pd.DataFrame)
and isinstance(original_coordinates, pd.DataFrame)
):
raise TypeError(
"Expected a pd.DataFrame, "
f"got {type(original_coordinates)} and {type(avatars_coordinates)} instead."
)
return original_coordinates, avatars_coordinates
[docs]
def table_summary(self, table_name: str) -> pd.DataFrame:
"""
Get the table summary.
Parameters
----------
table_name
The name of the table to get the summary from.
Returns
-------
pd.DataFrame
The table summary as a dataframe.
"""
results = self.get_specific_result(table_name, JobKind.advice, Results.ADVICE)
if not isinstance(results, dict) or "summary" not in results.keys():
raise ValueError(f"Summary not found in results for table '{table_name}'")
summary_data = results["summary"]
if "stats" not in summary_data.keys():
raise ValueError(f"Summary not found in results for table '{table_name}'")
return pd.DataFrame(summary_data["stats"])
[docs]
def from_yaml(self, yaml_path: str) -> None:
"""Create a Runner object from a YAML configuration.
Parameters
----------
yaml
The path to the yaml to transform.
Returns
-------
Runner
A Runner object configured based on the YAML content.
"""
list_config = self._load_yaml_config(yaml_path)
self._process_yaml_config(list_config)
def _load_yaml_config(self, yaml_path: str) -> list[dict]:
"""Load the YAML configuration from a file."""
with open(yaml_path, "r") as file:
config = yaml.safe_load_all(file)
return list(config) # Convert generator to list
def _process_yaml_config(self, list_config: list[dict]) -> None:
"""Process the YAML configuration into parameters and links."""
parameters: dict[str, dict] = {}
links: dict[str, dict] = {}
for item in list_config:
kind = item.get("kind")
metadata = item.get("metadata", {})
spec = item.get("spec", {})
if kind == "AvatarSchema":
self._process_avatar_schema(spec, metadata, links)
elif kind in {
"AvatarParameters",
"AvatarSignalMetricsParameters",
"AvatarPrivacyMetricsParameters",
}:
self._process_parameters(spec, parameters)
self._apply_links(links)
self._apply_parameters(parameters)
def _process_avatar_schema(self, spec: dict, metadata: dict, links: dict):
"""Process AvatarSchema kind from the YAML configuration."""
if metadata["name"].endswith("_avatarized"):
return
for table in spec.get("tables", []):
self._process_table(table, links)
def _process_table(self, table: dict, links: dict):
"""Process a single table from the AvatarSchema."""
try:
base = self.client.results.get_upload_url()
user_specific_path = base + f"/{table['name']}"
access_url = f"{self.client.base_url}/access?url=" + user_specific_path
self.file_downloader.download_file(url=access_url)
original_volume = table["data"]["volume"]
except FileNotFoundError:
print(f"Error downloading file {table['data']['file']}") # noqa: T201
print( # noqa: T201
f"File is not available in the server, upload it with runner.upload_file(table_name='{table['name']}', data='{table['data']['file']}')" # noqa: E501
)
original_volume = VOLUME_NAME
primary_key = None
foreign_keys = []
time_series_time = None
types: dict[str, ColumnType] = {}
if table.get("columns"):
primary_key = next(
(col["field"] for col in table["columns"] if col.get("primary_key")),
None,
)
foreign_keys = [
column["field"]
for column in table["columns"]
if column.get("identifier") and not column.get("primary_key")
]
time_series_time = next(
(col["field"] for col in table["columns"] if col.get("time_series_time")),
None,
)
types = {
col["field"]: ColumnType(col["type"]) for col in table["columns"] if col["type"]
}
self.config.create_table(
table_name=table["name"],
original_volume=original_volume,
original_file=table["data"]["file"],
avatar_volume=table["avatars_data"]["volume"] if "avatars_data" in table else None,
avatar_file=table["avatars_data"]["volume"] if "avatars_data" in table else None,
primary_key=primary_key,
foreign_keys=foreign_keys,
time_series_time=time_series_time,
types=types,
individual_level=table.get("individual_level"),
)
if table.get("links", []):
for link in table["links"]:
links[table["name"]] = link
def _process_parameters(self, spec: dict, parameters: dict):
"""Process parameters from the YAML configuration."""
for param_type in [
"avatarization",
"time_series",
"privacy_metrics",
"signal_metrics",
]:
for table_name, params in spec.get(param_type, {}).items():
parameters.setdefault(table_name, {}).update({param_type: params})
def _apply_links(self, links: dict):
"""Apply links to the configuration."""
for table_name, link in links.items():
self.add_link(
parent_table_name=table_name,
parent_field=link["field"],
child_table_name=link["to"]["table"],
child_field=link["to"]["field"],
)
def _apply_parameters(self, parameters: dict):
"""Apply parameters to the configuration."""
for table_name, params in parameters.items():
avatarization = params.get("avatarization", {})
time_series = params.get("time_series", {})
privacy_metrics = params.get("privacy_metrics", {})
exclude_replacement_strategy, exclude_variable_names = self._process_exclude_variables(
avatarization
)
(
imputation_method,
imputation_k,
imputation_training_fraction,
imputation_return_data_imputed,
) = self._process_imputation(avatarization)
time_series_projection_type, time_series_nf = self._process_time_series_projection(
time_series
)
time_series_method, time_series_nb_points = self._process_time_series_alignment(
time_series
)
self.set_parameters(
table_name=table_name,
k=avatarization.get("k"),
ncp=avatarization.get("ncp"),
use_categorical_reduction=avatarization.get("use_categorical_reduction"),
column_weights=avatarization.get("column_weights"),
exclude_variable_names=exclude_variable_names,
exclude_replacement_strategy=exclude_replacement_strategy,
imputation_method=imputation_method,
imputation_k=imputation_k,
imputation_training_fraction=imputation_training_fraction,
imputation_return_data_imputed=imputation_return_data_imputed,
time_series_nf=time_series_nf,
time_series_projection_type=time_series_projection_type,
time_series_nb_points=time_series_nb_points,
time_series_method=time_series_method,
known_variables=privacy_metrics.get("known_variables"),
target=privacy_metrics.get("target"),
)
def _process_exclude_variables(self, avatarization: dict):
"""Process exclude variables from avatarization parameters."""
exclude_replacement_strategy = None
exclude_variable_names = None
if exclude_vars := avatarization.get("exclude_variables", {}):
exclude_replacement_strategy = self._get_enum_value(
ExcludeVariablesMethod, exclude_vars.get("replacement_strategy")
)
exclude_variable_names = exclude_vars.get("variable_names")
return exclude_replacement_strategy, exclude_variable_names
def _process_imputation(self, avatarization: dict):
"""Process imputation parameters."""
imputation_method = None
imputation_k = None
imputation_training_fraction = None
if imputation := avatarization.get("imputation"):
imputation_method = self._get_enum_value(ImputeMethod, imputation.get("method"))
imputation_k = imputation.get("k")
imputation_training_fraction = imputation.get("training_fraction")
imputation_return_data_imputed = imputation.get("return_data_imputed", False)
return (
imputation_method,
imputation_k,
imputation_training_fraction,
imputation_return_data_imputed,
)
def _process_time_series_projection(self, time_series: dict):
"""Process time series projection parameters."""
time_series_projection_type = None
time_series_nf = None
if projection := time_series.get("projection"):
time_series_projection_type = self._get_enum_value(
ProjectionType, projection.get("type")
)
time_series_nf = projection.get("nf")
return time_series_projection_type, time_series_nf
def _process_time_series_alignment(self, time_series: dict):
"""Process time series alignment parameters."""
time_series_method = None
time_series_nb_points = None
if alignment := time_series.get("alignment"):
time_series_method = self._get_enum_value(AlignmentMethod, alignment.get("method"))
time_series_nb_points = alignment.get("nb_points")
return time_series_method, time_series_nb_points
def _get_enum_value(self, enum_class, value: str | None):
if value is None:
return None
try:
return enum_class(value)
except ValueError:
return None
def _open_plot(self, plot_html: HTML, filename: str | None = None):
"""Render a plot, optionally saving it and opening it in a browser."""
if filename:
file_path = os.path.abspath(filename)
webbrowser.open(f"file://{file_path}")
else:
display(plot_html)
def _save_file(self, file_content: HTML, filename: str | None = None):
"""Save the HTML file content to a specified path."""
if filename is None:
return None
with open(filename, "w", encoding="utf-8") as file:
file.write(file_content.data)