Source code for renate.benchmark.experimentation

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple

import pandas as pd
import torch
from pytorch_lightning import seed_everything

import renate
import renate.defaults as defaults
from renate.cli.parsing_functions import (
    get_data_module_fn_kwargs,
    get_metrics_fn_kwargs,
    get_model_fn_kwargs,
    get_scheduler_kwargs,
    get_transforms_kwargs,
)
from renate.evaluation.metrics.classification import (
    average_accuracy,
    backward_transfer,
    forgetting,
    forward_transfer,
    micro_average_accuracy,
)
from renate.training import run_training_job
from renate.training.training import submit_remote_job
from renate.utils.file import (
    copy_to_uri,
    is_s3_uri,
    move_to_uri,
    save_pandas_df_to_csv,
)
from renate.utils.module import (
    evaluate_and_record_results,
    get_and_prepare_data_module,
    get_metrics,
    get_model,
    import_module,
)

logger = logging.getLogger(__name__)


[docs] def experiment_config_file(): return str(Path(renate.__path__[0]) / "benchmark" / "experiment_config.py")
[docs] def create_cumulative_metrics() -> List[Tuple[str, Callable]]: """Gets the cumulative metrics for a given task along with a name of the metric to include in any potential results table. """ return [ ("Average Accuracy", average_accuracy), ("Micro Average Accuracy", micro_average_accuracy), ("Forgetting", forgetting), ("Forward Transfer", forward_transfer), ("Backward Transfer", backward_transfer), ]
[docs] def cumulative_metrics_summary( results: Dict[str, List[List[float]]], cumulative_metrics: List[Tuple[str, Callable]], num_tasks: int, num_instances: List[int], ) -> pd.DataFrame: """Creates a pandas DataFrame summary with respect to the observed tasks, specified by `num_tasks`. Args: results: The results dictionary holding all the results with respect to all recorded metrics. cumulative_metrics: The list of (name, metric) tuples. num_tasks: The total number of tasks. num_instances: Count of test data points for each task. """ data = [] for task_id in range(num_tasks): row = [task_id + 1] for _, metric in cumulative_metrics: row.append(metric(results, task_id, num_instances)) data.append(row) column_names = ["Task ID"] + [name for name, _ in cumulative_metrics] df = pd.DataFrame(data, columns=column_names) return df
[docs] def individual_metrics_summary( results: Dict[str, List[List[float]]], current_task: int, num_tasks: int, ) -> pd.DataFrame: """Creates a pandas DataFrame summary for all individual metrics with respect to all observed tasks. Args: results: The results dictionary holding all the results with respect to all recorded metrics. current_task: The current task ID. num_tasks: The total number of tasks. """ data = [] metric_columns = [k for k in results.keys() if "_init" not in k] for task_id in range(current_task): row = [task_id + 1] for key in metric_columns: value = results[key] for v in value[task_id]: row.append(v) data.append(row) sub_columns = [f"Task {i}" for i in range(1, num_tasks + 1)] mux = pd.MultiIndex.from_product([metric_columns, sub_columns]) mux = mux.insert(0, "Task ID") df = pd.DataFrame(data, columns=mux) return df
[docs] def execute_experiment_job( backend: defaults.SUPPORTED_BACKEND_TYPE, config_file: str, config_space: Dict[str, Any], experiment_outputs_url: str, mode: defaults.SUPPORTED_TUNING_MODE_TYPE, metric: str, num_updates: int, working_directory: Optional[str] = defaults.WORKING_DIRECTORY, requirements_file: Optional[str] = None, dependencies: Optional[List[str]] = None, role: Optional[str] = None, instance_type: str = defaults.INSTANCE_TYPE, instance_count: int = defaults.INSTANCE_COUNT, instance_max_time: float = defaults.INSTANCE_MAX_TIME, max_time: Optional[float] = None, max_num_trials_started: Optional[int] = None, max_num_trials_completed: Optional[int] = None, max_num_trials_finished: Optional[int] = None, n_workers: int = defaults.N_WORKERS, seed: int = defaults.SEED, accelerator: defaults.SUPPORTED_ACCELERATORS_TYPE = defaults.ACCELERATOR, devices: int = defaults.DEVICES, deterministic_trainer: bool = True, gradient_clip_val: Optional[float] = defaults.GRADIENT_CLIP_VAL, gradient_clip_algorithm: Optional[str] = defaults.GRADIENT_CLIP_ALGORITHM, job_name: str = defaults.JOB_NAME, strategy: str = defaults.DISTRIBUTED_STRATEGY, precision: str = defaults.PRECISION, save_state: bool = defaults.SAVE_BENCHMARK_STATE, ) -> None: """Executes the experiment job. Args: backend: Backend of the experiment job. config_file: Path to the Renate config file. config_space: Details for defining your own search space is provided in the `Syne Tune Documentation <https://github.com/awslabs/syne-tune/blob/main/docs/search_space.md>`_. experiment_outputs_url: Path to the experiment outputs. mode: Whether to minimize or maximize the metric. metric: Metric of the experiment job. num_updates: Number of updates of the experiment job. working_directory: Path to the working directory. requirements_file: Path to the requirements file. dependencies: (SageMaker backend only) List of strings containing absolute or relative paths to files and directories that will be uploaded as part of the SageMaker training job. role: Role of the experiment job. instance_type: Instance type of the experiment job. instance_count: Instance count of the experiment job. instance_max_time: Instance max time of the experiment job. max_time: Max time of the experiment job. max_num_trials_started: Max number of trials started of the experiment job. max_num_trials_completed: Max number of trials completed of the experiment job. max_num_trials_finished: Max number of trials finished of the experiment job. n_workers: Number of workers of the experiment job. seed: Seed of the experiment job. accelerator: Type of accelerator to use. devices: Number of devices to use. deterministic_trainer: When true the Trainer adopts a deterministic behaviour also on GPU. In this function this parameter is set to True by default. gradient_clip_val: The value at which to clip gradients. Passing None disables it. `More details <https://lightning.ai/docs/pytorch/stable/common/trainer.html#init>`__ gradient_clip_algorithm: The gradient clipping algorithm to use. Can be norm or value. `More details <https://lightning.ai/docs/pytorch/stable/common/trainer.html#init>`__ job_name: Name of the experiment job. strategy: Name of the distributed training strategy to use. `More details <https://lightning.ai/docs/pytorch/stable/extensions/strategy.html>`__ precision: Type of bit precision to use. `More details <https://lightning.ai/docs/pytorch/stable/common/precision_basic.html>`__ save_state: Flag to retain models and buffer states of each update step. Disable to save storage. """ assert ( mode in defaults.SUPPORTED_TUNING_MODE ), f"Mode {mode} is not in {defaults.SUPPORTED_TUNING_MODE}." assert ( backend in defaults.SUPPORTED_BACKEND ), f"Backend {backend} is not in {defaults.SUPPORTED_BACKEND}." if backend == "local": return _execute_experiment_job_locally( config_file=config_file, experiment_outputs_url=experiment_outputs_url, mode=mode, config_space=config_space, metric=metric, working_directory=working_directory, num_updates=num_updates, max_time=max_time, max_num_trials_started=max_num_trials_started, max_num_trials_completed=max_num_trials_completed, max_num_trials_finished=max_num_trials_finished, n_workers=n_workers, accelerator=accelerator, devices=devices, deterministic_trainer=deterministic_trainer, seed=seed, strategy=strategy, precision=precision, save_state=save_state, gradient_clip_val=gradient_clip_val, gradient_clip_algorithm=gradient_clip_algorithm, ) _execute_experiment_job_remotely( job_name=job_name, config_file=config_file, experiment_outputs_url=experiment_outputs_url, mode=mode, metric=metric, num_updates=num_updates, working_directory=working_directory, dependencies=dependencies or [], config_space=config_space, max_time=max_time, max_num_trials_started=max_num_trials_started, max_num_trials_completed=max_num_trials_completed, max_num_trials_finished=max_num_trials_finished, n_workers=n_workers, accelerator=accelerator, devices=devices, deterministic_trainer=deterministic_trainer, gradient_clip_val=gradient_clip_val, gradient_clip_algorithm=gradient_clip_algorithm, seed=seed, requirements_file=requirements_file, role=role, instance_type=instance_type, instance_count=instance_count, instance_max_time=instance_max_time, strategy=strategy, precision=precision, save_state=save_state, )
def _execute_experiment_job_locally( config_file: str, experiment_outputs_url: str, num_updates: int, mode: defaults.SUPPORTED_TUNING_MODE_TYPE, config_space: Dict[str, Any], metric: str, working_directory: str, seed: int, accelerator: defaults.SUPPORTED_ACCELERATORS_TYPE, devices: int, max_time: float, max_num_trials_started: int, max_num_trials_completed: int, max_num_trials_finished: int, n_workers: int, deterministic_trainer: bool, strategy: str, precision: str, save_state: bool, gradient_clip_val: Optional[float], gradient_clip_algorithm: Optional[str], ) -> None: """Runs an experiment, combining hyperparameter tuning and model for multiple updates. See renate.benchmark.experimentation.execute_experiment_job for more details. """ logger.info("Start experiment.") seed_everything(seed, True) input_state_url = defaults.input_state_folder(working_directory) output_state_url = defaults.output_state_folder(working_directory) data_url = defaults.data_folder(working_directory) model_url = defaults.model_file(input_state_url) logs_url = defaults.logs_folder(working_directory) for url in [input_state_url, output_state_url, logs_url]: if os.path.exists(url): shutil.rmtree(url) Path(url).mkdir(parents=True, exist_ok=True) config_module = import_module("config_module", config_file) scheduler, scheduler_kwargs = get_scheduler_kwargs(config_module) model_fn_kwargs = get_model_fn_kwargs(config_module, config_space) logger.info(f"Loading model {model_fn_kwargs.get('model_name', '')}") model = get_model(config_module, **model_fn_kwargs) data_module_fn_kwargs = get_data_module_fn_kwargs(config_module, config_space) logger.info(f"Prepare dataset {data_module_fn_kwargs.get('dataset_name', '')}") data_module = get_and_prepare_data_module( config_module, data_path=data_url, chunk_id=0, seed=seed, **data_module_fn_kwargs, ) data_module.setup() assert num_updates == len( data_module.test_data() ), f"The dataset has {len(data_module.test_data())} chunks, expected {num_updates}." num_instances = [len(data_chunk) for data_chunk in data_module.test_data()] transforms = get_transforms_kwargs(config_module, config_space) metrics_fn_kwargs = get_metrics_fn_kwargs(config_module, config_space) metrics = get_metrics(config_module, **metrics_fn_kwargs) torch.save( model.state_dict(), model_url, ) # TODO: evaluate's trainer has to use devices=1: # See https://github.com/Lightning-AI/lightning/issues/2537 # The fix is to launch evaluation in a separate process like training. results = evaluate_and_record_results( {}, model=model, data_module=data_module, transform=transforms.get("test_transform"), target_transform=transforms.get("target_test_transform"), logged_metrics=metrics, metric_postfix="_init", accelerator=accelerator, devices=1, strategy=strategy, precision=precision, ) for update_id in range(num_updates): logger.info(f"Starting Update {update_id + 1}/{num_updates}.") update_url = os.path.join(experiment_outputs_url, f"update_{update_id}") run_training_job( mode=mode, config_space=config_space, metric=metric, backend="local", updater=config_space["updater"], max_epochs=config_space["max_epochs"], chunk_id=update_id, input_state_url=input_state_url, output_state_url=output_state_url, working_directory=working_directory, config_file=config_file, max_time=max_time, max_num_trials_started=max_num_trials_started, max_num_trials_completed=max_num_trials_completed, max_num_trials_finished=max_num_trials_finished, n_workers=n_workers, scheduler=scheduler, scheduler_kwargs=scheduler_kwargs, seed=seed, accelerator=accelerator, devices=devices, precision=precision, strategy=strategy, deterministic_trainer=deterministic_trainer, gradient_clip_algorithm=gradient_clip_algorithm, gradient_clip_val=gradient_clip_val, ) move_to_uri(output_state_url, input_state_url) if save_state: copy_to_uri(input_state_url, update_url) model = get_model( config_module, model_state_url=model_url, **get_model_fn_kwargs(config_module, config_space), ) results = evaluate_and_record_results( results, model=model, data_module=data_module, transform=transforms.get("test_transform"), target_transform=transforms.get("target_test_transform"), logged_metrics=metrics, accelerator=accelerator, devices=1, ) df = individual_metrics_summary(results, update_id + 1, num_updates) save_pandas_df_to_csv( df, defaults.metric_summary_file(logs_url, special_str=f"_update_{update_id}") ) logger.info(f"### Results after update {update_id + 1}: ###") logger.info(df) cumulative_metrics = create_cumulative_metrics() df = cumulative_metrics_summary(results, cumulative_metrics, num_updates, num_instances) save_pandas_df_to_csv(df, defaults.metric_summary_file(logs_url)) logger.info("### Cumulative results: ###") logger.info(df) if not save_state: move_to_uri(defaults.hpo_file(input_state_url), str(experiment_outputs_url)) move_to_uri(logs_url, defaults.logs_folder(experiment_outputs_url)) shutil.rmtree(working_directory) logger.info("Experiment completed successfully.") def _execute_experiment_job_remotely(experiment_outputs_url: str, **job_kwargs: Any) -> str: """Executes the experiment job on SageMaker. See renate.benchmark.experimentation.execute_experiment_job for more details. """ assert is_s3_uri( experiment_outputs_url ), f"experiment_outputs_url {experiment_outputs_url} is not on S3." return submit_remote_job( experiment_outputs_url=experiment_outputs_url, optional_dependencies="benchmark", **job_kwargs, )