
    &`iX                     b   d dl Z d dlZd dlmZmZmZmZmZmZm	Z	 d dl
Z
d dlmZ d dlmZ d dlmZmZ d dlmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZmZ d dlm Z m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z'm(Z(m)Z) d dl*m+Z+m,Z, d dl-m.Z. d dl/m0Z0  e j1        e2          Z3e, G d de'                      Z4dS )    N)AnyCallableDictListOptionalTypeUnion)env_integer)tabulate)	RunConfigScalingConfig)BackendConfig
Checkpoint)session)BackendExecutor	TrialInfo)
DataConfig)_TrainingResultget_session)construct_train_funccount_required_parameters)$_TRAINER_RESTORE_DEPRECATION_WARNING)RAY_TRAIN_ENABLE_STATE_TRACKING)BaseTrainer
GenDatasetTrainingIterator)
DeprecatedDeveloperAPI)Template)repr_with_fallbackc                       e Zd ZU dZeZee         ed<   eZ	ee         ed<   e
j        g dz   ZdZe
j        dgz   Zddddddddddeeg df         eegdf         f         dee         d	ee         d
ee         dee         dee         deeeef                  deeeef                  dee         f fdZe ee          	 	 d%dedeeeg df         eegdf         f                  dee         f fd                        Z fdZdededdfdZ ed
edef fd            Z!deddfdZ"de#e$         fdZ%d&dZ&defdZ' e(dd g          d!             Z)defd"Z*defd#Z+defd$Z, xZ-S )'DataParallelTrainera  A Trainer for data parallel training.

    You should subclass this Trainer if your Trainer follows SPMD (single program,
    multiple data) programming paradigm - you want multiple processes to run the same
    function, but on different data.

    This Trainer runs the function ``train_loop_per_worker`` on multiple Ray
    Actors.

    The ``train_loop_per_worker`` function is expected to take in either 0 or 1
    arguments:

    .. testcode::

        def train_loop_per_worker():
            ...

    .. testcode::

        def train_loop_per_worker(config: Dict):
            ...

    If ``train_loop_per_worker`` accepts an argument, then
    ``train_loop_config`` will be passed in as the argument. This is useful if you
    want to tune the values in ``train_loop_config`` as hyperparameters.

    If the ``datasets`` dict contains a training dataset (denoted by
    the "train" key), then it will be split into multiple dataset
    shards that can then be accessed by ``train.get_dataset_shard("train")`` inside
    ``train_loop_per_worker``. All the other datasets will not be split and
    ``train.get_dataset_shard(...)`` will return the entire Dataset.

    Inside the ``train_loop_per_worker`` function, you can use any of the
    :ref:`Ray Train loop methods <train-loop-api>`.

    .. testcode::

        from ray import train

        def train_loop_per_worker():
            # Report intermediate results for callbacks or logging and
            # checkpoint data.
            train.report(...)

            # Returns dict of last saved checkpoint.
            train.get_checkpoint()

            # Returns the Dataset shard for the given key.
            train.get_dataset_shard("my_dataset")

            # Returns the total number of workers executing training.
            train.get_context().get_world_size()

            # Returns the rank of this worker.
            train.get_context().get_world_rank()

            # Returns the rank of the worker on the current node.
            train.get_context().get_local_rank()

    Any returns from the ``train_loop_per_worker`` will be discarded and not
    used or persisted anywhere.

    **How do I use DataParallelTrainer or any of its subclasses?**

    Example:

    .. testcode::
        :skipif: True

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.data_parallel_trainer import DataParallelTrainer

        def train_loop_for_worker():
            dataset_shard_for_this_worker = train.get_dataset_shard("train")

            # 3 items for 3 workers, each worker gets 1 item
            batches = list(dataset_shard_for_this_worker.iter_batches(batch_size=1))
            assert len(batches) == 1

        train_dataset = ray.data.from_items([1, 2, 3])
        assert train_dataset.count() == 3
        trainer = DataParallelTrainer(
            train_loop_for_worker,
            scaling_config=ScalingConfig(num_workers=3),
            datasets={"train": train_dataset},
        )
        result = trainer.fit()

    **How do I develop on top of DataParallelTrainer?**

    In many cases, using DataParallelTrainer directly is sufficient to execute
    functions on multiple actors.

    However, you may want to subclass ``DataParallelTrainer`` and create a custom
    Trainer for the following 2 use cases:

      - **Use Case 1:** You want to do data parallel training, but want to have
        a predefined ``training_loop_per_worker``.

      - **Use Case 2:** You want to implement a custom
        :py:class:`~ray.train.backend.Backend` that automatically handles
        additional setup or teardown logic on each actor, so that the users of this
        new trainer do not have to implement this logic. For example, a
        ``TensorflowTrainer`` can be built on top of ``DataParallelTrainer``
        that automatically handles setting the proper environment variables for
        distributed Tensorflow on each actor.

    For 1, you can set a predefined training loop in __init__

    .. testcode::

        from ray.train.data_parallel_trainer import DataParallelTrainer

        class MyDataParallelTrainer(DataParallelTrainer):
            def __init__(self, *args, **kwargs):
                predefined_train_loop_per_worker = lambda: 1
                super().__init__(predefined_train_loop_per_worker, *args, **kwargs)


    For 2, you can implement the ``ray.train.Backend`` and ``ray.train.BackendConfig``
    interfaces.

    .. testcode::

        from dataclasses import dataclass
        from ray.train.backend import Backend, BackendConfig

        class MyBackend(Backend):
            def on_start(self, worker_group, backend_config):
                def set_env_var(env_var_value):
                    import os
                    os.environ["MY_ENV_VAR"] = env_var_value

                worker_group.execute(set_env_var, backend_config.env_var)

        @dataclass
        class MyBackendConfig(BackendConfig):
            env_var: str = "default_value"

            def backend_cls(self):
                return MyBackend

        class MyTrainer(DataParallelTrainer):
            def __init__(self, train_loop_per_worker, my_backend_config:
                MyBackendConfig, **kwargs):

                super().__init__(
                    train_loop_per_worker,
                    backend_config=my_backend_config, **kwargs)

    Args:
        train_loop_per_worker: The training function to execute.
            This can either take in no arguments or a ``config`` dict.
        train_loop_config: Configurations to pass into
            ``train_loop_per_worker`` if it accepts an argument.
        backend_config: Configuration for setting up a Backend (e.g. Torch,
            Tensorflow, Horovod) on each worker to enable distributed
            communication. If no Backend should be set up, then set this to None.
        scaling_config: Configuration for how to scale data parallel training.
        dataset_config: Configuration for dataset ingest. This is merged with the
            default dataset config for the given trainer (`cls._dataset_config`).
        run_config: Configuration for the execution of the training run.
        datasets: Ray Datasets to use for training and evaluation.
            This is a dict where the key is the name of the dataset, which
            can be accessed from within the ``train_loop_per_worker`` by calling
            ``train.get_dataset_shard(dataset_key)``.
            By default, all datasets are sharded equally across workers.
            This can be configured via ``dataset_config``.
        metadata: Dict that should be made available via
            `train.get_context().get_metadata()` and in `checkpoint.get_metadata()`
            for checkpoints saved from this Trainer. Must be JSON-serializable.
        resume_from_checkpoint: A checkpoint to resume training from.
    _backend_executor_cls_training_iterator_cls)num_workersresources_per_workeruse_gpuplacement_strategyaccelerator_typeNtrain_loop_config)r*   backend_configscaling_configdataset_config
run_configdatasetsmetadataresume_from_checkpointtrain_loop_per_workerr+   r,   r-   r.   r/   r0   r1   c                   || _         || _        |t                      }t          |t                    st	          d|           || _        ||nt                      }|| _        t          t          |           
                    |||||	           | j        j        }
| j                            |
                    dd          |
                    dd                     t          t           d          rddlm}  |             d S d S )NzC`dataset_config` must be an instance of ray.train.DataConfig, was: )r,   r.   r/   r0   r1   CPUr   GPU)get_or_create_state_actor)_train_loop_per_worker_train_loop_configr   
isinstance
ValueError_data_configr   _backend_configsuperr"   __init__r,   total_resourcesset_train_total_resourcesgetr
   r   %ray.train._internal.state.state_actorr6   )selfr2   r*   r+   r,   r-   r.   r/   r0   r1   train_total_resourcesr6   	__class__s               s/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/train/data_parallel_trainer.pyr>   zDataParallelTrainer.__init__   sQ    '<#"3!'\\N.*55 	)&) )   + -8NNmoo 	  .!4((11)!#9 	2 	
 	
 	
 !% 3 C33!%%eQ//!%%eQ//	
 	
 	

 6:: 	(WWWWWW%%'''''	( 	(    )messagepathc                 L     t          t          |           j        d|||d|S )a  Restores a DataParallelTrainer from a previously interrupted/failed run.

        Args:
            train_loop_per_worker: Optionally re-specified train loop function.
                This should be used to re-specify a function that is not
                restorable in a new Ray cluster (e.g., it holds onto outdated
                object references). This should be the same training loop
                that was passed to the original trainer constructor.
            train_loop_config: Optionally re-specified train config.
                This should similarly be used if the original `train_loop_config`
                contained outdated object references, and it should not be modified
                from what was originally passed in.

        See :meth:`BaseTrainer.restore() <ray.train.trainer.BaseTrainer.restore>`
        for descriptions of the other arguments.

        Returns a restored instance of the `DataParallelTrainer`.
        )rI   r2   r*    )r=   r"   restore)clsrI   r2   r*   kwargsrE   s        rF   rL   zDataParallelTrainer.restore  sD    : 7u(#..6 
"7/
 
 	
 
 	
rG   c                 ~    t                                                       |                     | j        d           d S )Nr2   )r=   _validate_attributes_validate_train_loop_per_workerr7   )rC   rE   s    rF   rP   z(DataParallelTrainer._validate_attributes5  sD    $$&&&,,')@	
 	
 	
 	
 	
rG   fn_namereturnc                 Z    t          |          }|dk    rt          | d| d          d S )N   z1 should take in 0 or 1 arguments, but it accepts z arguments instead.)r   r:   )rC   r2   rR   num_required_paramss       rF   rQ   z3DataParallelTrainer._validate_train_loop_per_worker<  s\     88MNN"" K K"5K K K   #"rG   c                 F   t          t          |                               |          }|j        s/dt	          j                    v rt                              d           |j        t          d| j
         d          |j        dk    rt          d|j                   |S )Nr5   zGPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.zQYou must specify the 'num_workers' in `scaling_config` as either an argument of `zR` or through the `param_space` of a `Tuner` (if performing hyperparameter tuning).r   zG'num_workers' in `scaling_config` must be a positive integer. Received )r=   r"   _validate_scaling_configr'   rayavailable_resourcesloggerinfor%   r:   __name__)rM   r,   rE   s     rF   rX   z,DataParallelTrainer._validate_scaling_configF  s    2C88QQ
 
 % 	%33J3L3L*L*LKK*   %-A #A A A   %**B%3%?B B  
 rG   training_iteratorc                 :    |D ]}|                      |           dS )a  This method loops over the `TrainingIterator`:
        The actual iteration (for ... in ...) waits for the training function
        on each worker to report a result and supplies it as a list of results.
        Afterwards (in the body of the loop), it will report the result
        to the Tune session.
        The iterator ends after the training function on each worker has finished.
        N)_propagate_results)rC   r^   training_resultss      rF   _run_trainingz!DataParallelTrainer._run_traininge  s6     !2 	6 	6##$45555	6 	6rG   ra   c                     |d         }t          d |D                       sJ t                      d |D             }t          |          dk    }|rj                            |j                   t          fd|D                       sJ |r%t          j        j        j        j                  nd }t          ||j                  }t                              d|j         d|j                                        |           d S )	Nr   c              3   @   K   | ]}t          |t                    V  d S N)r9   r   .0results     rF   	<genexpr>z9DataParallelTrainer._propagate_results.<locals>.<genexpr>s  s,      VV6:fo66VVVVVVrG   c                 *    g | ]}|j         	|j         S re   )
checkpointrf   s     rF   
<listcomp>z:DataParallelTrainer._propagate_results.<locals>.<listcomp>~  s.     
 
 
 , ,,,rG   c              3   B   K   | ]}|j         j        j        k    V  d S re   )rI   storagecheckpoint_fs_path)rg   rk   tune_sessions     rF   ri   z9DataParallelTrainer._propagate_results.<locals>.<genexpr>  sD       
 
 O|3FF
 
 
 
 
 
rG   )
filesystemrI   )rk   metricsz<Report (metrics, checkpoint) to the Tune session:
  metrics=z
  checkpoint=)allr   lenrn   _update_checkpoint_indexrr   r   storage_filesystemro   r   r[   debugrk   _report_training_result)rC   ra   first_worker_resultworker_checkpoints at_least_one_reported_checkpointrk   tracked_training_resultrp   s          @rF   r`   z&DataParallelTrainer._propagate_resultsq  s   .q1VVEUVVVVVVVV"}}
 
*
 
 

 ,//A+B+BQ+F(+ 	W  99:M:UVVV  
 
 
 
0
 
 
 
 
 	
 	
 
 0	J'/B!)<   
  	 #2!'/#
 #
 #

 	A08A A3>A A	
 	
 	
 	,,-DEEEEErG   c                 B   |                      | j                  }t          | j        | j        | j        j        dd          }t          t          j	                    t          j
                    t          j                    t          j                    t          j                                        t          j                                                    t          j                    t'          j                    j                  }|                     | j        ||j        |j        d          }|                                 |                     || j        || j        | j        | j        | j                  }|                     |           |                                  d S )Nr2   T)train_func_contextfn_arg_namediscard_returns)nameid	resourceslogdir	driver_ipdriver_node_idexperiment_namerun_idr   )r+   
trial_infor%   r&   max_retries)backend_executorr+   
train_funcr/   r0   data_configrk   )!rX   r,   r   r7   r8   r<   r~   r   r   get_trial_nameget_trial_idget_trial_resourcesget_trial_dirrY   utilget_node_ip_addressget_runtime_contextget_node_idget_experiment_nameuuiduuid4hexr#   r%   _resources_per_worker_not_nonestartr$   r/   r0   r;   starting_checkpointrb   shutdown)rC   r,   r2   r   r   r^   s         rF   training_loopz!DataParallelTrainer.training_loop  s   66t7JKK 4'##3F/ !
 !
 !
 '))#%%133(**h2244244@@BB#799:<<#	
 	
 	

  55/!&2!/!N 6 
 
 	    77-/,]])/ 8 
 
 	,--- 	!!#####rG   c                     | j         S )zReturns a copy of this Trainer's final dataset configs.

        Returns:
            The merged default + user-supplied dataset config.
        )r;   )rC   s    rF   get_dataset_configz&DataParallelTrainer.get_dataset_config  s       rG   
ipywidgets8c                    ddl m}m}m}m}  |d| j        j         d          }g }g }| j        r|                    | 	                                           |                    d           |                     || 
                                                     |                    d           | j        rE|                     ||                                                      |                    d           | j        rJ|                     || j                                                             |                    d           | j        rJ|                     || j                                                             |                    d	           | j        rJ|                     || j                                                             |                    d
            |||          }	 |||	g |d                    }
 |
j        di |}|                    dt'          |           i           |S )a,  Returns a mimebundle with an ipywidget repr and a simple text repr.

        Depending on the frontend where the data is being displayed,
        different mimetypes will be used from this bundle.
        See https://ipython.readthedocs.io/en/stable/config/integrating.html
        for information about this method, and
        https://ipywidgets.readthedocs.io/en/latest/embedding.html
        for more information about the jupyter widget mimetype.

        Returns:
            A mimebundle containing an ipywidget repr and a simple text repr.
        r   )HTMLLayoutTabVBoxz<h2>z</h2>DatasetszData ConfigTrain Loop ConfigzScaling Configz
Run ConfigzBackend Config)titles100%widthlayoutz
text/plainrK   )r   r   r   r   r   rE   r]   r/   append_datasets_repr__data_config_repr_html_r8   _train_loop_config_repr_html_r,   _repr_html_r.   r<   _repr_mimebundle_updaterepr)rC   rN   r   r   r   r   titlechildrenr   tabwidgetbundles               rF   r   z%DataParallelTrainer._repr_mimebundle_  sb    	766666666666:DN3:::;;= 	)OOD0022333MM*%%%OODD!=!=!?!?@@AAAMM-(((" 	/OODD!C!C!E!EFFGGGMM-... 	,OODD!4!@!@!B!BCCDDDMM*+++? 	(OODD!<!<!>!>??@@@MM,''' 	,OODD!5!A!A!C!CDDEEEMM*+++c(6***ucl66+?+?+?@@@))33F33d4jj	
 	
 	

 rG   c                    | j         ri }| j                                         D ]{\  }}t          |t                    s!t          |                                          r|||<   At          |d          r|                                ||<   it          |          ||<   |t          d                              dt          d                              t          |                                ddgdd	          d
                    S dS )Nr   title_data.html.j2r   zscrollableTable.html.j2SettingValueF
unsafehtml)headers	showindextablefmtnone)table
max_heightr   data )
r8   itemsr9   str	isnumerichasattrr   r   renderr   )rC   
table_datakvs       rF   r   z1DataParallelTrainer._train_loop_config_repr_html_  s&   " 	J/5577 + +1a%% +Q)9)9);); +$%JqMMQ.. +$%MMOOJqMM$'FFJqMM01188)788??""((**!*G 4"'!-	    & @   9    2rG   c                 r    t          | j                  g}t          d                              |          S )Nzrendered_html_common.html.j2)content)r   r;   r   r   )rC   r   s     rF   r   z+DataParallelTrainer._data_config_repr_html_0  s5    t())*677>>w>OOOrG   c           
         ddl m}m}m} g }| j        r| j                                        D ]\  }}|                                }|rk|                     |t          d          	                    d| dd                                |                    |                                            || |d          	          S )
Nr   )r   r   r   r   zDataset - <code>z</code>r   r   r   r   )
r   r   r   r   r/   r   
_tab_repr_r   r   r   )rC   r   r   r   r   r   configr   s           rF   r   z#DataParallelTrainer._datasets_repr_5  s   1111111111= 	8 $ 3 3 5 5 
8 
8f'')) 8NN$%9::AA&F&F&F&FT B      NN6#4#4#6#6777tGFF$8$8$89999rG   )NN)rS   N).r]   
__module____qualname____doc__r   r#   r   __annotations__r   r$   r   _scaling_config_allowed_keys_dataset_config_fields_for_tuner_param_spacer	   r   r   r   r   r   r   r   r   r   r   r   r>   classmethodr   r   rL   rP   rQ   rX   rb   r   r   r`   r   r   r    r   r   r   r   __classcell__)rE   s   @rF   r"   r"      s        n nd 4C40BBB5ED!12EEE#.#K O O O $  O$/$MQ %! -12626/3*.48-17;0( 0( 0($Xb$h%74&$,9O%OP0( $D>	0(
 !/0( !/0( !,0( Y'0( 4Z010( 4S>*0( !) 40( 0( 0( 0( 0( 0(d Z<=== ,0 
  
 
  ((2t8$hvt|&<<= 
 
 $D> 
  
  
  
  
 >= [ 
D
 
 
 
 
%-8;	    m       [<
6/? 
6D 
6 
6 
6 
64F43H 4F 4F 4F 4Fl.$ .$ .$ .$`!J ! ! ! ! s+,,3 3 -,3js    4P P P P P
: : : : : : : : :rG   r"   )5loggingr   typingr   r   r   r   r   r   r	   rY   ray._private.ray_constantsr
   )ray._private.thirdparty.tabulate.tabulater   ray.air.configr   r   	ray.trainr   r   ray.train._internalr   $ray.train._internal.backend_executorr   r   ray.train._internal.data_configr   ray.train._internal.sessionr   r   ray.train._internal.utilsr   r   ray.train.base_trainerr   ray.train.constantsr   ray.train.trainerr   r   r   ray.util.annotationsr   r   ray.widgetsr   ray.widgets.utilr    	getLoggerr]   r[   r"   rK   rG   rF   <module>r      s     C C C C C C C C C C C C C C C C C C 



 2 2 2 2 2 2 > > > > > > 3 3 3 3 3 3 3 3 / / / / / / / / ' ' ' ' ' ' K K K K K K K K 6 6 6 6 6 6 D D D D D D D D U U U U U U U U G G G G G G ? ? ? ? ? ? G G G G G G G G G G 9 9 9 9 9 9 9 9             / / / / / /		8	$	$ l: l: l: l: l:+ l: l: l: l: l:rG   