
    &`i                        d dl Z d dlZd dlmZmZ d dlZd dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZmZ d d
lmZ d dlmZmZmZ d dlmZ  ej        e           Z! G d d          Z" G d dee          Z#dS )    N)DictList)ActorHandle)DataIterator)StreamSplitDataIterator)DataContext)GetTimeoutError)DatasetShardMetadataDatasetShardProvider)ControllerCallbackWorkerGroupCallback)TrainRunContext)WorkerWorkerGroupWorkerGroupContext)	ObjectRefc                   >    e Zd ZdZdeeef         fdZdedefdZ	dS )RayDatasetShardProviderzOA shard provider that Train workers use to access a DataIterator for a dataset.ds_iteratorsc                     || _         d S N)_dataset_iterators)selfr   s     }/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/train/v2/_internal/callbacks/datasets.py__init__z RayDatasetShardProvider.__init__"   s    ".    dataset_inforeturnc                 r    |j         | j        vrt          d|j          d          | j        |j                  S )NzDataset shard for 'z^' not found. Please ensure that the dataset is passed through the Trainer `datasets` argument.)dataset_namer   KeyError)r   r   s     r   get_dataset_shardz)RayDatasetShardProvider.get_dataset_shard&   sQ    $D,CCCl&?      &|'@AAr   N)
__name__
__module____qualname____doc__r   strr   r   r
   r"    r   r   r   r      sj        YY/T#|*;%< / / / /B.B B| B B B B B Br   r   c                       e Zd ZdZdefdZdej        j        de	e
ef         fdZdee	e
ef                  dee         fdZd	 Zd
ee         de	e
ee         f         fdZdefdZdeddfdZdeddfdZd ZdS )DatasetsSetupCallbackz:A callback for managing Ray Datasets for the worker group.train_run_contextc                     |j         | _        t          j        |j                  | _        |j        | _        g | _        g | _	        t          j        t          j                              | _        d S r   )datasets	_datasetscopydeepcopydataset_config_data_configscaling_config_scaling_config_coordinator_actors_shutdown_refsr   get_current_data_context)r   r+   s     r   r   zDatasetsSetupCallback.__init__4   s`    *3 M*;*JKK0?68 /1 "];+B+D+DEEr   r3   r   c                     |j         S )zReturn the resources reserved for training, so that Data can exclude
        these resources logically from its available pool.)total_resources)r   r3   s     r   get_train_total_resourcesz/DatasetsSetupCallback.get_train_total_resourcesE   s    
 --r   ds_iterators_per_rankc                 R    |d         }d |                                 D             }|S )z
        Returns a list of each unique SplitCoordinator actor handle given the iterators per rank.
        These handles will later be used to call shutdown on the actors.
        r   c                 F    g | ]}t          |t                    |j        S r(   )
isinstancer   _coord_actor).0iterators     r   
<listcomp>zADatasetsSetupCallback._get_coordinator_actors.<locals>.<listcomp>X   s<     
 
 
($;<<
!
 
 
r   )values)r   r<   rank_0_iteratorscoord_actorss       r   _get_coordinator_actorsz-DatasetsSetupCallback._get_coordinator_actorsL   sA     13
 
,3355
 
 

 r   c                 2    d | j         D             | _        dS )zDEagerly shutdown the data executors of the split coordinator actors.c                 @    g | ]}|j                                         S r(   )shutdown_executorremote)rA   coords     r   rC   zBDatasetsSetupCallback._shutdown_data_executors.<locals>.<listcomp>a   s4     
 
 
16E#**,,
 
 
r   N)r5   r6   r   s    r   _shutdown_data_executorsz.DatasetsSetupCallback._shutdown_data_executors_   s,    
 
:>:R
 
 
r   workersc                    t          |          }d |D             }|                     | j                  }| j                            |                    dd          |                    dd                     d | j                                        D             }| j                            ||d |          t                    |k    sJ | 	                              | _
        fdt          |          D             }d|iS )	Nc                 &    g | ]}|j         j        S r(   )metadatanode_id)rA   workers     r   rC   zCDatasetsSetupCallback.before_init_train_context.<locals>.<listcomp>m   s    IIIv6?2IIIr   CPUr   GPUc                 L    i | ]!\  }}|t          |          r
 |            n|"S r(   )callable)rA   kvs      r   
<dictcomp>zCDatasetsSetupCallback.before_init_train_context.<locals>.<dictcomp>u   s3    TTTTQAhqkk0qqsssqTTTr   )r-   
world_sizeworker_handlesworker_node_idsc                 <    g | ]}t          |                    S ))r   )r   )rA   rankr<   s     r   rC   zCDatasetsSetupCallback.before_init_train_context.<locals>.<listcomp>   s:     $
 $
 $
 $1Ft1LMMM$
 $
 $
r   dataset_shard_provider)lenr;   r4   r2   set_train_total_resourcesgetr.   items	configurerG   r5   range)r   rO   r\   r^   total_train_resourcesr-   shard_providers_per_rankr<   s          @r   before_init_train_contextz/DatasetsSetupCallback.before_init_train_contexti   s>    \\
IIIII !% > >t?S T T33!%%eQ//1F1J1J5RS1T1T	
 	
 	
 UTT^=Q=Q=S=STTT $ 1 ; ;!+	 !< !
 !
 ())Z7777#'#?#?@U#V#V $
 $
 $
 $
j))$
 $
 $
  )*BCCr   worker_groupc                 R    dt           fd}|                    || j                   d S )Nctxc                 .    t          j        |            d S r   )r   _set_current)rm   s    r   _propagate_data_contextzODatasetsSetupCallback.after_worker_group_start.<locals>._propagate_data_context   s    $S)))))r   )r   executer8   )r   rk   rp   s      r   after_worker_group_startz.DatasetsSetupCallback.after_worker_group_start   sJ    	* 	* 	* 	* 	* 	#	
 	
 	
 	
 	
r   worker_group_contextNc                 .    |                                   d S r   rN   r   rs   s     r   after_worker_group_shutdownz1DatasetsSetupCallback.after_worker_group_shutdown        	%%'''''r   c                 .    |                                   d S r   ru   rv   s     r   after_worker_group_abortz.DatasetsSetupCallback.after_worker_group_abort   rx   r   c                     	 t          j        | j        d           d S # t          $ r t                              d           Y d S t          $ r t                              d           Y d S w xY w)N   )timeoutz:Ray Data executor shutdown task timed out after 5 seconds.z2Failed to gracefully terminate Ray Data executors.)rayrd   r6   r	   loggererror	Exception	exceptionrM   s    r   before_controller_shutdownz0DatasetsSetupCallback.before_controller_shutdown   s    	SGD'333333 	W 	W 	WLLUVVVVVV 	S 	S 	SQRRRRRR	Ss    $A-#A-,A-)r#   r$   r%   r&   r   r   r~   trainScalingConfigr   r'   floatr;   r   r   r   rG   rN   r   r   rj   r   rr   r   rw   rz   r   r(   r   r   r*   r*   1   sh       DDF/ F F F F".!i5.	c5j	. . . .%)$sL/@*A%B	k	   &
 
 
DF|D	c4,--	.D D D D:
[ 
 
 
 
($6(	( ( ( (
($6(	( ( ( (S S S S Sr   r*   )$r/   loggingtypingr   r   r~   	ray.train	ray.actorr   ray.datar   1ray.data._internal.iterator.stream_split_iteratorr   ray.data.contextr   ray.exceptionsr	   2ray.train.v2._internal.data_integration.interfacesr
   r   )ray.train.v2._internal.execution.callbackr   r   (ray.train.v2._internal.execution.contextr   :ray.train.v2._internal.execution.worker_group.worker_groupr   r   r   	ray.typesr   	getLoggerr#   r   r   r*   r(   r   r   <module>r      s             



     ! ! ! ! ! ! ! ! ! ! ! ! U U U U U U ( ( ( ( ( ( * * * * * *               E D D D D D         
       		8	$	$B B B B B B B B$sS sS sS sS sS/1C sS sS sS sS sSr   