
    &`i&                     l   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ  d dl!m"Z" d d	l#m$Z$ d d
l%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1 d dl2m3Z3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z:m;Z;m<Z< erd dl=m>Z> d dl?m@Z@ dZAedeg df         f         ZB ejC        eD          ZEdZFde3 ZGde3 ZH e<d           G d deI                      ZJdeKded          d!eKfd"ZLe; G d# d e jM                              ZNdS )$    N)partial)Path)TYPE_CHECKINGAnyCallableDictListOptionalTypeUnion)	usage_lib)deep_update)usage)*ensure_only_allowed_dataclass_keys_updated)AirEntrypoint)	RunConfigScalingConfig)Result)
Checkpoint)get_session)StorageContext_exists_at_fs_pathget_fs_and_path)V2_MIGRATION_GUIDE_MESSAGE_v2_migration_warnings_enabled)!_GET_METADATA_DEPRECATION_MESSAGE)_log_deprecation_warning)
DeprecatedDeveloperAPI	PublicAPI)Dataset)	Trainableztrainer.pklr!   a  The `preprocessor` argument to Trainers is deprecated as of Ray 2.7. Instead, use the Preprocessor `fit` and `transform` APIs directly on the Ray Dataset. For any state that needs to be saved to the trained checkpoint, pass it in using the `metadata` argument of the `Trainer`. For a full example, see https://docs.ray.io/en/master/train/user-guides/data-loading-preprocessing.html#preprocessing-structured-data z]The `restore` and `can_restore` APIs are deprecated and will be removed in a future release. zS`resume_from_checkpoint` is deprecated and will be removed in an upcoming release. beta	stabilityc                       e Zd ZdZdZdZdS )TrainingFailedErrorz-An error indicating that training has failed.a`  The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: `trainer = {trainer_cls_name}.restore("{path}")`.zTo start a new run that will retry on training failures, set `train.RunConfig(failure_config=train.FailureConfig(max_failures))` in the Trainer's `run_config` with `max_failures > 0`, or `max_failures = -1` for unlimited retries.N)__name__
__module____qualname____doc___RESTORE_MSG_FAILURE_CONFIG_MSG     j/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/train/base_trainer.pyr'   r'   G   s*        77	< 	! r/   r'   configtrainer_clsBaseTrainermetadatac                 B   |
J |            |t                      _         |di | }t          j                                        }|r||_        d |j                                        D             |_        |                                 |	                                 dS )a  This is the function that defines the logic of the Ray Train coordinator.
    This is responsible for setting up a remote instance of the `trainer_cls`
    (a different instance than the one calling `trainer.fit` on the driver!)
    and running the training loop.
    Nc                 L    i | ]!\  }}|t          |          r
 |            n|"S r.   )callable).0kds      r0   
<dictcomp>z)_train_coordinator_fn.<locals>.<dictcomp>v   s@       )-A(1++$113331  r/   r.   )
r   r4   raytuneget_checkpointstarting_checkpointdatasetsitemssetuptraining_loop)r1   r2   r4   trainer
checkpoints        r0   _train_coordinator_fnrF   \   s     %KMM k##F##G ((**J 1 '1#
 181A1G1G1I1I  G MMOOOr/   c                   L    e Zd ZU dZdgZee         ed<   dZe	ed<   dZ
e	ed<   g Zddddddd	ee         d
ee         deeeef                  deeeef                  dee         f
dZe ee          	 	 	 d)ded          deeej        f         deej        j                 deeeef                  d	ee         dd fd                        Ze ee e                      	 d*ded          deeej        f         deej        j                 de	fd                        Z d Z! fdZ"d Z#d Z$ed	edefd            Z%d+dZ&d+dZ'e(j)        d+d            Z* e+d           de,fd!            Z-d"ej        j        d#efd$Z.defd%Z/ded&         fd'Z0ded&         fd(Z1 xZ2S ),r3   a  Defines interface for distributed training on Ray.

    Note: The base ``BaseTrainer`` class cannot be instantiated directly. Only
    one of its subclasses can be used.

    Note to developers: If a new trainer is added, please update
    `air/_internal/usage.py`.

    **How does a trainer work?**

    - First, initialize the Trainer. The initialization runs locally,
      so heavyweight setup should not be done in ``__init__``.
    - Then, when you call ``trainer.fit()``, the Trainer is serialized
      and copied to a remote Ray actor. The following methods are then
      called in sequence on the remote actor.
    - ``trainer.setup()``: Any heavyweight Trainer setup should be
      specified here.
    - ``trainer.training_loop()``: Executes the main training logic.
    - Calling ``trainer.fit()`` will return a ``ray.result.Result``
      object where you can access metrics from your training run, as well
      as any checkpoints that may have been saved.

    **How do I create a new Trainer?**

    Subclass ``ray.train.trainer.BaseTrainer``, and override the ``training_loop``
    method, and optionally ``setup``.

    .. testcode::
        :skipif: True

        import torch

        from ray.train.trainer import BaseTrainer
        from ray import train, tune


        class MyPytorchTrainer(BaseTrainer):
            def setup(self):
                self.model = torch.nn.Linear(1, 1)
                self.optimizer = torch.optim.SGD(
                    self.model.parameters(), lr=0.1)

            def training_loop(self):
                # You can access any Trainer attributes directly in this method.
                # self.datasets["train"] has already been
                dataset = self.datasets["train"]

                torch_ds = dataset.iter_torch_batches(dtypes=torch.float)
                loss_fn = torch.nn.MSELoss()

                for epoch_idx in range(10):
                    loss = 0
                    num_batches = 0
                    torch_ds = dataset.iter_torch_batches(
                        dtypes=torch.float, batch_size=2
                    )
                    for batch in torch_ds:
                        X = torch.unsqueeze(batch["x"], 1)
                        y = torch.unsqueeze(batch["y"], 1)
                        # Compute prediction error
                        pred = self.model(X)
                        batch_loss = loss_fn(pred, y)

                        # Backpropagation
                        self.optimizer.zero_grad()
                        batch_loss.backward()
                        self.optimizer.step()

                        loss += batch_loss.item()
                        num_batches += 1
                    loss /= num_batches

                    # Use Tune functions to report intermediate
                    # results.
                    train.report({"loss": loss, "epoch": epoch_idx})


        # Initialize the Trainer, and call Trainer.fit()
        import ray
        train_dataset = ray.data.from_items(
            [{"x": i, "y": i} for i in range(10)])
        my_trainer = MyPytorchTrainer(datasets={"train": train_dataset})
        result = my_trainer.fit()

    Args:
        scaling_config: Configuration for how to scale training.
        run_config: Configuration for the execution of the training run.
        datasets: Any Datasets to use for training. Use the key "train"
            to denote which dataset is the training dataset.
        metadata: Dict that should be made available via
            `train.get_context().get_metadata()` and in `checkpoint.get_metadata()`
            for checkpoints saved from this Trainer. Must be JSON-serializable.
        resume_from_checkpoint: A checkpoint to resume training from.
    trainer_resources_scaling_config_allowed_keysF_handles_checkpoint_freq_handles_checkpoint_at_endN)scaling_config
run_configr@   r4   resume_from_checkpointrL   rM   r@   r4   rN   c                   ||nt                      | _        |t          j        |          nt                      | _        || _        ||ni | _        || _        t                      r,|t          t                     |t          t                     d | _        d | _        |                                  t          j        d           t#          j        |            d S )Ntrain)r   rL   copyr   rM   r4   r@   r?   r   r   r   +_RESUME_FROM_CHECKPOINT_DEPRECATION_WARNING_restore_path_restore_storage_filesystem_validate_attributesr   record_library_usage	air_usagetag_air_trainer)selfrL   rM   r@   r4   rN   s         r0   __init__zBaseTrainer.__init__   s     -8NNmoo 	 &0%;DIj!!! 	 !$,$8b#9 )++ 	V#()JKKK%1()TUUU "+/(!!###&w///!$'''''r/   )messageclspathstorage_filesystemreturnc           	         t                      rt          t                     |                     ||          st	          d| d          t          ||          \  }}t          |t                                                    }|	                    |          5 }	t          j        |	                                          \  }
}ddd           n# 1 swxY w Y   |
| ur(t          j        d|
 d| j         d|
j         d           |                    di           }|r<|s:t	          d	t#          |                                           d
| j         d          |pi }t'          |          t'          |          k    rSt	          dt#          |                                           dt#          |                                                     ||d<   |r||d<   |                                D ]\  }}||||<   	  | di |}n+# t*          $ r}t	          d| j         d          |d}~ww xY w||_        ||_        |S )a  Restores a Train experiment from a previously interrupted/failed run.

        Restore should be used for experiment-level fault tolerance in the event
        that the head node crashes (e.g., OOM or some other runtime error) or the
        entire cluster goes down (e.g., network error affecting all nodes).

        A run that has already completed successfully will not be resumed from this API.
        To continue training from a successful run, launch a new run with the
        ``<Framework>Trainer(resume_from_checkpoint)`` API instead, passing in a
        checkpoint from the previous run to start with.

        .. note::

            Restoring an experiment from a path that's pointing to a *different*
            location than the original experiment path is supported. However, Ray Train
            assumes that the full experiment directory is available
            (including checkpoints) so that it's possible to resume trials from their
            latest state.

            For example, if the original experiment path was run locally, then the
            results are uploaded to cloud storage, Ray Train expects the full contents
            to be available in cloud storage if attempting to resume
            via ``<Framework>Trainer.restore("s3://...")``. The restored run will
            continue writing results to the same cloud storage location.

        The following example can be paired with implementing job retry using
        :ref:`Ray Jobs <jobs-overview>` to produce a Train experiment that will
        attempt to resume on both experiment-level and trial-level failures:

        .. testcode::
            :skipif: True

            import os
            import ray
            from ray import train
            from ray.train.trainer import BaseTrainer

            experiment_name = "unique_experiment_name"
            storage_path = os.path.expanduser("~/ray_results")
            experiment_dir = os.path.join(storage_path, experiment_name)

            # Define some dummy inputs for demonstration purposes
            datasets = {"train": ray.data.from_items([{"a": i} for i in range(10)])}

            class CustomTrainer(BaseTrainer):
                def training_loop(self):
                    pass

            if CustomTrainer.can_restore(experiment_dir):
                trainer = CustomTrainer.restore(
                    experiment_dir, datasets=datasets
                )
            else:
                trainer = CustomTrainer(
                    datasets=datasets,
                    run_config=train.RunConfig(
                        name=experiment_name,
                        storage_path=storage_path,
                        # Tip: You can also enable retries on failure for
                        # worker-level fault tolerance
                        failure_config=train.FailureConfig(max_failures=3),
                    ),
                )

            result = trainer.fit()

        Args:
            path: The path to the experiment directory of the training run to restore.
                This can be a local path or a remote URI if the experiment was
                uploaded to the cloud.
            storage_filesystem: Custom ``pyarrow.fs.FileSystem``
                corresponding to the ``path``. This may be necessary if the original
                experiment passed in a custom filesystem.
            datasets: Re-specified datasets used in the original training run.
                This must include all the datasets that were passed in the
                original trainer constructor.
            scaling_config: Optionally re-specified scaling config. This can be
                modified to be different from the original spec.
            **kwargs: Other optionally re-specified arguments, passed in by subclasses.

        Raises:
            ValueError: If all datasets were not re-supplied on restore.

        Returns:
            BaseTrainer: A restored instance of the class that is calling this method.
        zInvalid restore path: zn. Make sure that this path exists and is the experiment directory that results from a call to `trainer.fit()`.NzFInvalid trainer type. You are attempting to restore a trainer of type z with `z-.restore`, which will most likely fail. Use `z.restore` instead.r@   z=The following datasets need to be provided again on restore: z
Use z^.restore(..., datasets=datasets) with the datasets that were provided to the original trainer.z_The provided datasets don't match the original dataset keys.
  Expected datasets for the keys: z
  Actual datasets provided: rL   zxTrainer restoration failed (see above for the stack trace). Make sure that you use the right trainer class to restore: `z
.restore`
r.   )r   r   $_TRAINER_RESTORE_DEPRECATION_WARNINGcan_restore
ValueErrorr   r   _TRAINER_PKLas_posixopen_input_filepickleloadsreadallwarningswarnr(   poplistkeyssetrA   	ExceptionrS   rT   )r\   r]   r^   r@   rL   kwargsfsfs_pathtrainer_pkl_pathfr2   
param_dictoriginal_datasets
param_namevalrD   es                    r0   restorezBaseTrainer.restore  sL   @ *++ 	K$%IJJJt%788 	# # # #  
 &d,>??G66??AA 011 	@Q&,l199;;&?&?#K	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ c!!MAA A(+A A $,A A A   'NN:r:: 	X 	P)..0011P P|P P P   >r !!S]]22G59:K:P:P:R:R5S5SG G/3HMMOO/D/DG G  
 "*
: 	:+9J'(%||~~ 	- 	-OJ),
:&	c''J''GG 	 	 	.L. . .  		 !%.@+s*   *CCCH
 

H2H--H2)r[   warningc                     t                      rt          t                     t          ||          \  }}t	          |t
                                                    }t          ||          S )a  Checks whether a given directory contains a restorable Train experiment.

        Args:
            path: The path to the experiment directory of the Train experiment.
                This can be either a local directory (e.g., ~/ray_results/exp_name)
                or a remote URI (e.g., s3://bucket/exp_name).

        Returns:
            bool: Whether this path exists and contains the trainer state to resume from
        )r   r   ra   r   r   rd   re   r   )r\   r]   r^   rr   rs   rt   s         r0   rb   zBaseTrainer.can_restore  sc    ( *++ 	K$%IJJJ%d,>??G66??AA!"&6777r/   c                 H   t                      t                      i d d}g }|                                D ]5\  }}t          | |          }||k    r|                    | d|           6|r&d| j        j         dd                    |           dS d| j        j         dS )N)rL   rM   r@   r?   =< >)r   r   rA   getattrappend	__class__r(   join)rY   default_valuesnon_default_arguments	parameterdefault_valuevalues         r0   __repr__zBaseTrainer.__repr__  s     ,oo#++#'	*
 *
 !#(6(<(<(>(> 	G 	G$I}D),,E%%%,,	-E-EE-E-EFFF  	TSt~.SS:O1P1PSSSS-4>*----r/   c                 :   t          t          |                               |           }t          j        | j                  j        }t          |                                          }|dd          }t          t          ||                    }i |||_        |S )N   )superr3   __new__inspect	signaturerZ   
parametersrm   rn   dictzip_param_dict)r\   argsrq   rD   r   arg_dictr   s         r0   r   zBaseTrainer.__new__  s    S))11#66&s|44?
*//++,,
^
J--..44V4r/   c                    t          | j        t                    s-t          dt	          | j                   d| j         d          t          | j        t                    s-t          dt	          | j                   d| j         d          t          | j        t                    s-t          dt	          | j                   d| j         d          | j        	                                D ]I\  }}t          |t          j        j                  s%t          |          st          d| d| d          J| j        pi | _        t          | j        t                    s%t          d	t	          | j                   d
          	 t!          j        t!          j        | j                            | _        n,# t&          $ r}t          d| j         d|           d}~ww xY w| j        Gt          | j        t*                    s-t          dt	          | j                   d| j         d          |                                  dS )z4Called on __init()__ to validate trainer attributes.zC`run_config` should be an instance of `ray.train.RunConfig`, found z with value `z`.zA`scaling_config` should be an instance of `ScalingConfig`, found zW`datasets` should be a dict mapping from a string to `ray.data.Dataset` objects, found zThe Dataset under 'z,' key is not a `ray.data.Dataset`. Received z	 instead.z*The provided metadata must be a dict, was .z1The provided metadata must be JSON-serializable: z: NzP`resume_from_checkpoint` should be an instance of `ray.train.Checkpoint`, found )
isinstancerM   r   rc   typerL   r   r@   r   rA   r<   datar!   r7   r4   	TypeErrorjsonrh   dumpsrp   r?   r   _log_v2_deprecation_warnings)rY   keydatasetrz   s       r0   rU   z BaseTrainer._validate_attributes  s    $/955 	Qdo..Q Q=A_Q Q Q  
 $-}== 	Yd122Y YAEATY Y Y  
 $-.. 	Mdm,,M M;?=M M M   !% 3 3 5 5  W!'38+;<< XgEVEV $7c 7 7$+7 7 7   +$-.. 	ST$-=P=PSSS  	 Jtz$-'@'@AADMM 	 	 	(=( ($%( (  	 #/
$j9
 9
/ <15d6N1O1O< <#7< < <   	))+++++s   0F8 8
G!GG!c                    ddl m}m}  |            r t          d| j        j         d| d          t                      sdS ddlm}m	}m
}m}m}m}m}	m}
 | j        j        t%          |	           | j        j        j        rt%          |           | j        j        t%          |
           | j        j        rt%          |           | j        j        t%          |           | j        j        t%          |           | j        j        t%          |           | j        j        t8          j                                        k    rt%          |           dS dS )a  Logs deprecation warnings for v2 migration.

        Log them here in the Ray Train case rather than in the configuration
        constructors to avoid logging incorrect deprecation warnings when
        `ray.train.RunConfig` is passed to Ray Tune.
        r   )V2_ENABLED_ENV_VARis_v2_enabledz2Detected use of a deprecated Trainer import from `a  `. This Trainer class is not compatible with Ray Train V2.
To fix this:
  - Update to use the new import path. For example, `from ray.train.torch.torch_trainer import TorchTrainer` -> `from ray.train.torch import TorchTrainer`
  - Or, explicitly disable V2 by setting: zS=0
See this issue for more context: https://github.com/ray-project/ray/issues/49454N)CALLBACKS_DEPRECATION_MESSAGEFAIL_FAST_DEPRECATION_MESSAGELOG_TO_FILE_DEPRECATION_MESSAGE%PROGRESS_REPORTER_DEPRECATION_MESSAGESTOP_DEPRECATION_MESSAGESYNC_CONFIG_DEPRECATION_MESSAGE%TRAINER_RESOURCES_DEPRECATION_MESSAGEVERBOSE_DEPRECATION_MESSAGE) ray.train.v2._internal.constantsr   r   DeprecationWarningr   r)   r   &ray.train.v2._internal.migration_utilsr   r   r   r   r   r   r   r   rL   rH   r   rM   failure_config	fail_fast_verboselog_to_filestop	callbacksprogress_reportersync_configr<   rP   
SyncConfig)rY   r   r   r   r   r   r   r   r   r   r   s              r0   r   z(BaseTrainer._log_v2_deprecation_warnings  s    	WVVVVVVV=?? 	$BT^E^ B B >PB B B
 
 
 .// 	F		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 		
 0<$%JKKK ?)3 	D$%BCCC ?#/$%@AAA?& 	F$%DEEE?+$%=>>>?$0$%BCCC?,8$%JKKK?&#)*>*>*@*@@@$%DEEEEE A@r/   c                 2    t          || j                   |S )z?Returns scaling config dataclass after validating updated keys.)	dataclassallowed_keys)r   rI   r\   rL   s     r0   _validate_scaling_configz$BaseTrainer._validate_scaling_configV  s,     	3$9	
 	
 	
 	
 r/   c                     dS )ao  Called during fit() to perform initial setup on the Trainer.

        .. note:: This method is run on a remote process.

        This method will not be called on the driver, so any expensive setup
        operations should be placed here and not in ``__init__``.

        This method is called prior to ``preprocess_datasets`` and
        ``training_loop``.
        Nr.   rY   s    r0   rB   zBaseTrainer.setup_  s	     	r/   c                 0    t          dt                     )zDeprecated.za`preprocess_datasets` is no longer used, since preprocessors are no longer accepted by Trainers.
)r    PREPROCESSOR_DEPRECATION_MESSAGEr   s    r0   preprocess_datasetszBaseTrainer.preprocess_datasetsl  s)     W4TW W
 
 	
r/   c                     t           )a  Loop called by fit() to run training and report results to Tune.

        .. note:: This method runs on a remote process.

        ``self.datasets`` have already been evaluated if they were wrapped in a factory.

        You can use the :ref:`Ray Train utilities <train-loop-api>`
        (:func:`train.report() <ray.train.report>` and
        :func:`train.get_checkpoint() <ray.train.get_checkpoint>`) inside
        this training loop.

        Example:

        .. testcode::

            from ray.train.trainer import BaseTrainer
            from ray import train

            class MyTrainer(BaseTrainer):
                def training_loop(self):
                    for epoch_idx in range(5):
                        ...
                        train.report({"epoch": epoch_idx})

        )NotImplementedErrorr   s    r0   rC   zBaseTrainer.training_loops  s
    6 "!r/   r#   r$   c           
         ddl m}m} ddlm} |                                 }|                                 }| j        j        pt          j
        |          | j        _        t          | j        j        | j        j        | j        j                  }| j        rO|                    | j        || ||j        j        |j        j        |j        j                  | j                  }n |||| j        t$          j                  }|                     |j        |j                   t,          j                            | j        j        t7          |j                            }	 |                                }	n&# |$ r}
|
j        p|
}t-          |          |d	}
~
ww xY wt=          |	          d
k    sJ |	d         }|j        r4t-          d                     |t,          j!        g                    |j        |S )a"  Runs training.

        Returns:
            A Result object containing the training result.

        Raises:
            ray.train.base_trainer.TrainingFailedError: If any failures during the execution
                of ``self.as_trainable()``, or during the Tune execution loop.
        r   )ResumeConfig	TuneError)Tuner)storage_pathexperiment_dir_namer^   )finished
unfinishederrored)r]   	trainableparam_space_resume_configr^   )r   r   rM   _entrypoint)trainer_cls_namer]   Nr   
)"ray.tuner   r   ray.tune.tunerr   as_trainable%_extract_fields_for_tuner_param_spacerM   namer   get_experiment_dir_namer   r^   rS   r{   
ResumeTypeRESUMErT   r   TRAINER_saveexperiment_fs_pathr'   r,   formatr   r(   strfit	__cause__lenerrorr   r-   )rY   r   r   r   r   r   storagetunerrestore_msgresult_gridrz   parent_errorresults                r0   r   zBaseTrainer.fit  sB    	54444444((((((%%''	@@BB O UN$J9$U$U 	 !5 $ 4#A
 
 
  	MM'#'+|)4;+6=(3:     
 $(#C " 
 
EE E#'?)1	  E 	

7-w/IJJJ)6==!^4W/00 > 
 

	E))++KK 	E 	E 	E ;+!L &k22D	E ;1$$$$Q< 	  &		;(;(OPQQ <  s   #E8 8F=FFrr   experiment_pathc                    | j                                         }|                    di           }d |rfd|D             |d<   | j        |f}|                    |           |                    t          |t                                                              5 }|	                    t          j        |                     ddd           dS # 1 swxY w Y   dS )a  Saves the current trainer's class along with the `param_dict` of
        parameters passed to this trainer's constructor.

        This is used to recreate the trainer on restore.
        Unless a parameter is re-specified during restoration (only a subset
        of parameters can be passed in again), that parameter will be loaded
        from the saved copy.

        Datasets should not be saved as part of the state. Instead, we save the
        keys and replace the dataset values with dummy functions that will
        raise an error if invoked. The error only serves as a guardrail for
        misuse (e.g., manually unpickling and constructing the Trainer again)
        and is not typically surfaced, since datasets must be re-specified
        upon restoration.
        r@   c                      t           N)RuntimeErrorr.   r/   r0   raise_fnz#BaseTrainer._save.<locals>.raise_fn  s    r/   c                     i | ]}|S r.   r.   )r8   dataset_namer   s     r0   r;   z%BaseTrainer._save.<locals>.<dictcomp>  s(     & & &+7h& & &r/   N)r   rQ   rl   r   
create_diropen_output_streamr   rd   re   writerg   r   )rY   rr   r   rv   r@   cls_and_param_dictru   r   s          @r0   r   zBaseTrainer._save  s<     %**,,
>>*b11	 	 	  	& & & &;C& & &Jz" #nj9
o&&&""4#F#F#O#O#Q#QRR 	6VWGGFL!344555	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6s   (CCCc                     i }| j         D ]?}|| j                                        v r"t          j        | j        |                   ||<   @|S )as  Extracts fields to be included in `Tuner.param_space`.

        This is needed to leverage the full logging/integration offerings from Tune.
        For example, `param_space` is logged automatically to wandb integration.

        Currently only done for `train_loop_config`.

        Returns:
            A dictionary that should be passed to Tuner.param_space.
        )_fields_for_tuner_param_spacer   rn   rQ   deepcopy)rY   r   r   s      r0   r   z1BaseTrainer._extract_fields_for_tuner_param_space  sX     5 	C 	CCd&++----"mD,<S,ABBsr/   r"   c           	      D  	
 ddl m ddlm} | j        | j        
| j        }t          t          |          }j	        |_	         ||          }t          | j                  		rddlm                                 nd G 	
fdd|          S )	zqGenerates the base Trainable class.

        Returns:
            A Trainable class to use for training.
        r   )PlacementGroupFactory)wrap_function)r2   r4   )DataContextNc                        e Zd ZdZj        Zj        Zedeffd            Zede	ffd            Z
 fdZde	de	ffdZ fdZefd	            Z xZS )
;BaseTrainer._generate_trainable_cls.<locals>.TrainTrainablez(Adds default resources to the Trainable.r_   c                     S )z2Whether a dataset is provided through the Trainer.r.   )r\   has_base_datasets    r0   r  zLBaseTrainer._generate_trainable_cls.<locals>.TrainTrainable.has_base_dataset4  s     ('r/   c                     S )zBReturns the unchanged scaling config provided through the Trainer.r.   r   s    r0   base_scaling_configzOBaseTrainer._generate_trainable_cls.<locals>.TrainTrainable.base_scaling_config9  s     &%r/   c                     t          |          }|                    dd           }t          || j        d          | _        || j        d<   | j                            dt                                }t          |t                     rt          di |}|                     |          | j        d<   | 	                                r
                    	           t          |                               |           d S )NrM   T)new_keys_allowedrL   r.   )r   rl   r   r1   _merged_configgetr   r   ._reconcile_scaling_config_with_trial_resourcesr  _set_currentr   rB   )
rY   r1   rq   base_configrM   merged_scaling_configr   TrainTrainabler   dataset_contexts
         r0   rB   zABaseTrainer._generate_trainable_cls.<locals>.TrainTrainable.setup>  s   "6ll )__\4@@
&1t' ' '# 5?#L1(,(;(?(?$moo) )% 3T:: S,9,R,R<Q,R,R) GG)  #$
 ((** >  ,,_===nd++11&99999r/   rL   c                     | j         }t          |          s|S | j         |                                k    r|S                     |           t	          j        |          S )z
                ResourceChangingScheduler workaround.

                Ensures that the scaling config matches trial resources.

                This should be replaced with RCS returning a ScalingConfig
                in the future.
                )trial_resourcesr   as_placement_group_factoryr   r   from_placement_group_factory)rY   rL   r  r   r2   s      r0   r
  zjBaseTrainer._generate_trainable_cls.<locals>.TrainTrainable._reconcile_scaling_config_with_trial_resources\  sm     #'"6!/3HII *)) '>+T+T+V+VVV))44^DDD$A/RRRr/   c                 T    t                                          | j                   d S r   )r   _trainable_funcr  )rY   r1   r   s     r0   r  zKBaseTrainer._generate_trainable_cls.<locals>.TrainTrainable._trainable_funcv  s&     ''(;<<<<<r/   c                     |                     d          }t          |t                    rt          di |}                    |          }|                                S )NrL   r.   )r	  r   r   r   r   r  )r\   r1   updated_scaling_configvalidated_scaling_configrL   r2   s       r0   default_resource_requestzTBaseTrainer._generate_trainable_cls.<locals>.TrainTrainable.default_resource_request{  sp     *04Dn)U)U&4d;; U-:-T-T=S-T-T*+6+O+O*, ,( 0JJLLLr/   )r(   r)   r*   r+   rJ   rK   classmethodboolr  r   r  rB   r
  r  r  __classcell__)r   r   r   r  r  r  rL   r2   s   @r0   r  r  .  sI       ::'2'K$)4)O&( ( ( ( ( ( [( &M & & & & & [&: : : : : : : :<S&3SS S S S S S S4= = = = =
 M M M M M [M M M M Mr/   r  )#ray.tune.execution.placement_groupsr   ray.tune.trainabler   r   rL   r4   r   rF   r(   r  r@   ray.data.contextr   get_current)rY   r   r4   train_coordinator_fntrainable_clsr   r   r  r  r  rL   r2   s        @@@@@@@r0   _generate_trainable_clsz#BaseTrainer._generate_trainable_cls  s=    	NMMMMM444444n,=&!{X 
  
  
 )4(<%%&:;;.. 	#444444)5577OO"O[	M [	M [	M [	M [	M [	M [	M [	M [	M [	M [	M [	M [	M] [	M [	M [	Mz r/   c                 `    ddl m} | j        }|                                 } |j        |fi |S )z,Converts self to a ``tune.Trainable`` class.r   )r=   )r<   r=   r   r#  with_parameters)rY   r=   r  r"  s       r0   r   zBaseTrainer.as_trainable  sJ    &4466 $t#MAA[AAAr/   )NNNr   )r_   N)3r(   r)   r*   r+   rI   r	   r   __annotations__rJ   r  rK   r   r
   r   r   r   
GenDatasetr   r   rZ   r  r   ra   r   r   osPathLikepyarrowrr   
FileSystemr{   r   rb   r   r   rU   r   r   rB   r   abcabstractmethodrC   r    r   r   r   r   r#  r   r  )r   s   @r0   r3   r3   ~   s        ] ]@ 	/ $s)    &+d***',,,, %'!
 37*.48-17; (  (  ( !/ ( Y'	 (
 4Z01 ( 4S>* ( !) 4 (  (  (  (D Z<=== ?C4826W W- WC$%W %WZ%:;W 4Z01	W
 !/W 
W W W >= [Wr Z4..00   ?C8 8- 8C$%8 %WZ%:;8 
	8 8 8	  [
8,. . .(	 	 	 	 	4, 4, 4,l>F >F >F@ m     [   
 
 
 
 	" " " "8 Y   JV J J J ! JX6
- 6 6 6 6 6Bt    "|k): | | | ||Bd;/ B B B B B B B Br/   )Or,  rQ   r   r   loggingr(  rj   	functoolsr   pathlibr   typingr   r   r   r   r	   r
   r   r   
pyarrow.fsr*  r<   ray.cloudpicklecloudpicklerg   ray._common.usager   ray._private.dictr   ray.air._internalr   rW   ray.air._internal.configr   ray.air._internal.usager   ray.air.configr   r   ray.air.resultr   	ray.trainr   ray.train._internal.sessionr   ray.train._internal.storager   r   r   ray.train.constantsr   r   ray.train.contextr   ray.train.utilsr   ray.util.annotationsr   r   r    ray.datar!   r   r"   rd   r'  	getLoggerr(   loggerr   ra   rR   r   r'   r   rF   ABCr3   r.   r/   r0   <module>rG     sy   



     				              R R R R R R R R R R R R R R R R R R R R     



             ' ' ' ' ' ' ) ) ) ) ) ) 0 0 0 0 0 0 O O O O O O 1 1 1 1 1 1 3 3 3 3 3 3 3 3 ! ! ! ! ! !             3 3 3 3 3 3         
        @ ? ? ? ? ? 4 4 4 4 4 4 D D D D D D D D D D #      """""" 9hr9}556
 
	8	$	$u !I,FI I %-*- - , V    ,   (#M2>B   D VB VB VB VB VB#' VB VB VB VB VBr/   