Source code for avatars.client

# This file has been generated - DO NOT MODIFY
# API Version : 2.33.0

import warnings
from dataclasses import dataclass
from typing import Optional
from uuid import UUID

import httpx
from structlog import get_logger

from avatars import __version__
from avatars.base_client import BaseClient
from avatars.client_config import ClientConfig
from avatars.constants import DEFAULT_TIMEOUT
from avatars.models import (
    ForgottenPasswordRequest,
    Login,
    LoginResponse,
    ResetPasswordRequest,
)

MAX_FILE_LENGTH = 1024 * 1024 * 1024  # 1 GB

logger = get_logger(__name__)


[docs] @dataclass class AuthTokens: access: str refresh: Optional[str] = None
[docs] def update(self, resp: LoginResponse) -> None: self.access = resp.access_token if resp.refresh_token: self.refresh = resp.refresh_token
[docs] class ApiClient(BaseClient): def __init__( self, base_url: str | None = None, *, timeout: int | None = None, should_verify_ssl: bool | None = None, verify_auth: bool = True, http_client: Optional[httpx.Client] = None, api_key: Optional[str] = None, config: ClientConfig | None = None, ) -> None: """Client to communicate with the Avatar API. Parameters ---------- base_url: url of the API (required if config not provided) timeout: timeout in seconds, by default None (uses DEFAULT_TIMEOUT) should_verify_ssl : optional whether to verify SSL certificates on the server. By default None (uses True) http_client : optional allow passing in custom httpx.Client instance, by default None verify_auth : optional Bypass client-side authentication verification, by default True api_key : optional API key for authentication using api-key-v1 scheme. If provided, authenticate() should not be called. By default None config : optional ClientConfig object containing all configuration. Mutually exclusive individual parameters (except http_client). By default None """ if config is not None: conflicting_params = [] if base_url is not None: conflicting_params.append("base_url") if timeout is not None: conflicting_params.append("timeout") if should_verify_ssl is not None: conflicting_params.append("should_verify_ssl") if api_key is not None: conflicting_params.append("api_key") if conflicting_params: params_str = ", ".join(conflicting_params) raise ValueError( f"Cannot provide both 'config' and other parameters ({params_str}). " "Either pass a ClientConfig object or individual parameters, not both. " "Note: 'http_client' is allowed alongside 'config' for testing." ) else: # Create ClientConfig from individual parameters if not base_url: raise ValueError("base_url must be provided when creating an ApiClient") if '"' in base_url: raise ValueError( f"Expected base_url not to contain quotes. Got {base_url} instead" ) # Try to derive from base_url (replace /api with /storage) if not base_url.endswith("/api"): raise ValueError("base_url must end with '/api' to derive storage_endpoint_url") final_storage_url = base_url.replace("/api", "/storage") config = ClientConfig( base_api_url=base_url, timeout=timeout, should_verify_ssl=(should_verify_ssl if should_verify_ssl is not None else True), storage_endpoint_url=final_storage_url, api_key=api_key, ) if config.base_api_url is None: raise ValueError("base_api_url must be set in ClientConfig") final_timeout = config.timeout if config.timeout is not None else DEFAULT_TIMEOUT super().__init__( base_url=str(config.base_api_url), timeout=final_timeout, should_verify_ssl=config.should_verify_ssl, verify_auth=verify_auth, on_auth_refresh=self._refresh_auth if not config.api_key else None, http_client=http_client, headers={"User-Agent": f"avatar-python/{__version__}"}, api_key=config.api_key, ) # Importing here to prevent circular import from avatars.api import ( ApiKeys, Auth, Compatibility, Health, Jobs, Openapi, Resources, Results, Users, ) from avatars.data_upload import DataUploader self.api_keys = ApiKeys(self) self.auth = Auth(self) self.compatibility = Compatibility(self) self.health = Health(self) self.jobs = Jobs(self) self.openapi = Openapi(self) self.resources = Resources(self) self.results = Results(self) self.users = Users(self) data_uploader = DataUploader( self, should_verify_ssl=config.should_verify_ssl, storage_endpoint_url=str(config.storage_endpoint_url), ) self.data_uploader = data_uploader self.upload_file = data_uploader.upload_file self.download_file = data_uploader.download_file self.auth_tokens: Optional[AuthTokens] = None logger.debug("ApiClient initialized", base_api_url=str(config.base_api_url))
[docs] def authenticate(self, username: str, password: str, timeout: Optional[int] = None) -> None: if self._api_key: raise ValueError( "Cannot call authenticate() when api_key is set. " "API key authentication is already active. " "To use username/password authentication, create a new ApiClient without api_key." ) resp = self.auth.login( Login(username=username, password=password), timeout=timeout or self.timeout, ) self._update_auth_tokens(resp)
[docs] def forgotten_password(self, email: str, timeout: Optional[int] = None) -> None: self.auth.forgotten_password( ForgottenPasswordRequest(email=email), timeout=timeout or self.timeout )
[docs] def reset_password( self, email: str, new_password: str, new_password_repeated: str, token: UUID, timeout: Optional[int] = None, ) -> None: self.auth.reset_password( ResetPasswordRequest( email=email, new_password=new_password, new_password_repeated=new_password_repeated, token=token, ), timeout=timeout or self.timeout, )
def __str__(self) -> str: return ", ".join( f"ApiClient(base_url={self.base_url}" f"timeout={self.timeout}" f"should_verify_ssl={self.should_verify_ssl}" f"verify_auth={self.verify_auth})" ) def _enable_refresh_auth(self, enable: bool = True) -> None: self.on_auth_refresh(self._refresh_auth if enable else None) def _refresh_auth(self) -> dict[str, str]: new_headers: dict[str, str] = {} # API keys don't expire, no refresh needed if self._api_key: return new_headers if self.auth_tokens: if self.auth_tokens.refresh: resp = self.auth.refresh(self.auth_tokens.refresh) self._update_auth_tokens(resp, headers=new_headers) else: warnings.warn("Cannot refresh auth with refresh token") else: warnings.warn("Client is not authenticated, cannot refresh auth") return new_headers def _set_auth_bearer(self, token: str, *, headers: Optional[dict[str, str]] = None) -> None: self.set_header("Authorization", f"Bearer {token}") if headers is not None: headers["Authorization"] = f"Bearer {token}" def _update_auth_tokens( self, resp: LoginResponse, *, headers: Optional[dict[str, str]] = None ) -> None: if not self.auth_tokens: self.auth_tokens = AuthTokens(access=resp.access_token, refresh=resp.refresh_token) else: self.auth_tokens.update(resp) self._set_auth_bearer(self.auth_tokens.access, headers=headers)