
    &`iE                        U d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZ d dl	m
Z
 d dlmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZm Z  d dl!m"Z"m#Z# d dl$m%Z%m&Z& d dl'm(Z(m)Z) d dl*m+Z+m,Z, er$d dl-m.Z.m/Z/m0Z0 d dl1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7 d dl8m9Z9  e j:        e;          Z<dZ= ed           G d d                      Z> ed           G d d                      Z? ed           G d d                      Z@e G d d                      ZAdaBeeA         eCd <    ejD                    ZEd!eAfd"ZFd$d#ZGdS )%    N)ThreadPoolExecutor)	dataclassfield)Queue)TYPE_CHECKINGAnyCallableDictListOptional)retry)ActorHandle)DataIteratorDataset)AWS_RETRYABLE_TOKENS)SynchronizationActor)StorageContextdelete_fs_path)_TrainingReport_ValidationSpec)'construct_user_exception_with_tracebackinvoke_context_managers)	RunConfigScalingConfig)CheckpointConsistencyModeCheckpointUploadMode)BackendConfig
Checkpoint
DataConfig)DatasetShardMetadataDatasetShardProvider)TrainContextCallback)ThreadRunner)ReportedCheckpoint   T)frozenc                       e Zd ZU dZ edd           Zeed<   eed<   e	e
eef                  ed<   eed<   d	ed
<   e
eef         ed<   ded<   defdZdS )TrainRunContextz<Holds the metadata and context for the current training run.Fc                  2    t          j                    j        S N)uuiduuid4hex     |/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/train/v2/_internal/execution/context.py<lambda>zTrainRunContext.<lambda>6   s    DJLL<L r/   )initdefault_factoryrun_id
run_configtrain_loop_configscaling_configr   backend_configdatasetsr   dataset_configreturnc                     | j         S )z3Returns the run config of the current training run.)r5   selfs    r0   get_run_configzTrainRunContext.get_run_configJ   s
    r/   N)__name__
__module____qualname____doc__r   r4   str__annotations__r   r   r
   r   r   r   r?   r.   r/   r0   r(   r(   1   s         FF %U4L4LMMMFCMMM   S#X//// "!!! $### 3<     !   	      r/   r(   c                   B    e Zd ZU eed<   eed<   eed<   eed<   eed<   dS )DistributedContext
world_rank
world_size
local_ranklocal_world_size	node_rankN)r@   rA   rB   intrE   r.   r/   r0   rG   rG   O   s@         OOOOOOOOONNNNNr/   rG   c                   H    e Zd ZU dZeed<   eed<   ded<   ed         ed<   dS )	ExecutionContextzHolds the execution context for the current worker process.

    Every worker process has a single execution context accessed via the
    `TrainContext`, which includes the training thread that is actually
    running the user code.
    synchronization_actorresult_queuer#   training_thread_runnerr"   train_context_callbacksN)r@   rA   rB   rC   r   rE   r   r   r.   r/   r0   rO   rO   X   sZ           0///  +*** ""8999999r/   rO   c                   (   e Zd ZU eed<   eed<   eed<   eed<   eed<   ded<   dZ	e
d	         ed
<   dZeed<   dZeed<    ej                    Zej        ed<    ee          Zeed<   defdZdefdZdefdZdefdZdefdZdefdZd Zd Zd Zd Zej         fdede!d         fdZ"dd de#fd!Z$de!d"         fd#Z%	 d5d$e
e         defd%Z& e'd&d'e((          	 	 	 	 d6d$ed*e)ee*f         d
e
d	         d+e+d,e
e,d	egd	f                  d-e
e-         de.fd.            Z/d/e.defd0Z0dde1j2        ddddfd*e)ee*f         d
e
d	         d$e
e         d1e1d+e
e+         d,e
e,d	egd	f                  d2e
e,d	e
e)         ge)f                  d3e
e)         ddfd4Z3dS )7TrainContexttrain_run_contextdistributed_contextexecution_contextstorage_contextcontroller_actorr!   dataset_shard_providerNr   
checkpointr   current_report_indexreport_call_indexreport_order_condition)max_workerscheckpoint_upload_threadpoolr;   c                 $    | j         j        j        S r*   )rV   r5   namer=   s    r0   get_experiment_namez TrainContext.get_experiment_name   s    %055r/   c                     | j         j        S r*   )rW   rI   r=   s    r0   get_world_sizezTrainContext.get_world_size       '22r/   c                     | j         j        S r*   )rW   rH   r=   s    r0   get_world_rankzTrainContext.get_world_rank   rg   r/   c                     | j         j        S r*   )rW   rJ   r=   s    r0   get_local_rankzTrainContext.get_local_rank   rg   r/   c                     | j         j        S r*   )rW   rK   r=   s    r0   get_local_world_sizez!TrainContext.get_local_world_size   s    '88r/   c                     | j         j        S r*   )rW   rL   r=   s    r0   get_node_rankzTrainContext.get_node_rank   s    '11r/   c                     | j         S r*   )rY   r=   s    r0   get_storagezTrainContext.get_storage   s    ##r/   c                     | j         j        S r*   )rX   rQ   r=   s    r0   get_result_queuezTrainContext.get_result_queue   s    %22r/   c                     | j         j        S r*   )rX   rP   r=   s    r0   get_synchronization_actorz&TrainContext.get_synchronization_actor   s    %;;r/   c                 R    | j         5  | j        cd d d            S # 1 swxY w Y   d S r*   )r_   r\   r=   s    r0   get_checkpointzTrainContext.get_checkpoint   st    ( 	# 	#?	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	#s     consistency_moder$   c                 p    t          j        | j        j                            | j        |                    S r*   )raygetrZ   get_all_reported_checkpointsremoter^   )r>   rx   s     r0   r|   z)TrainContext.get_all_reported_checkpoints   s:     w!>EE&  
 
 	
r/   dataset_infor    c                 6    | j                             |          S )a2  Returns the :class:`ray.data.DataIterator` shard for this worker.

        Call :meth:`~ray.data.DataIterator.iter_torch_batches` or
        :meth:`~ray.data.DataIterator.to_tf` on this shard to convert it to the
        appropriate framework-specific data type.

        Args:
            dataset_info: The shard metadata, including the dataset name and worker rank.
        Returns:
            The ``DataIterator`` shard with the given name for this worker.
        Raises:
            KeyError: If the dataset shard with the given name is not found.
        )r[   get_dataset_shard)r>   r~   s     r0   r   zTrainContext.get_dataset_shard   s     *<<\JJJr/   r"   c                     | j         j        S r*   )rX   rS   r=   s    r0   get_context_callbacksz"TrainContext.get_context_callbacks   s    %==r/   checkpoint_dir_namec                     |p| j                                         }|                                 }t          j        |j                            | j        j        | j        j	        |d                    S )zSync the checkpoint dir name across ranks.

        Args:
            checkpoint_dir_name: The checkpoint dir name to sync.

        Returns:
            The synced checkpoint dir name.
        zray.train.report)rH   rI   datacaller_method_name)
rY    make_default_checkpoint_dir_nameru   rz   r{   broadcast_from_rank_zeror}   rW   rH   rI   )r>   r   
sync_actors      r0   &_sync_checkpoint_dir_name_across_ranksz3TrainContext._sync_checkpoint_dir_name_across_ranks   s~       G#DDFF 	 3355
w/663>3>(#5	 7  
 
 	
r/   zupload checkpoint   )descriptionmax_attemptsmatchFmetrics$delete_local_checkpoint_after_uploadcheckpoint_upload_fnvalidation_specc                    |st          d|d          S 	 |r= |||          }|t          |t          j        j                  st          d          n| j                            ||          }n-# t          $ r  t          
                    d| d            w xY w|rI	 t          |j        |j                   n-# t          $ r  t          
                    d|            Y nw xY wt          |||          S )a  Save the checkpoint to remote storage.

        Args:
            checkpoint_dir_name: The checkpoint dir to persist to.
            metrics: The metrics to report.
            checkpoint: The checkpoint to report.
            delete_local_checkpoint_after_upload: Whether to delete the checkpoint after it is uploaded.
            checkpoint_upload_fn: A user defined function that will be called with the
                checkpoint to upload it. If not provided, defaults to using the `pyarrow.fs.copy_files`
                utility for copying to the destination `storage_path`.
            validation_spec: The validation specification.

        Returns:
            The training result object containing the persisted checkpoint.
        Nr\   r   r   z:checkpoint_upload_fn must return a `ray.train.Checkpoint`.z Failed to find local checkpoint z when attempting to upload it. This could be caused by multiple workers on a node attempting to upload the same directory, and then one of the workers deletes the directory before the others finish.zAFailed to delete the local checkpoint after a successful upload: )r   
isinstancerz   trainr   
ValueErrorrY   persist_current_checkpointFileNotFoundErrorlogger	exceptionr   
filesystempath	Exception)r>   r   r   r\   r   r   r   persisted_checkpoints           r0   _upload_checkpointzTrainContext._upload_checkpoint   s   8  	"$   
	# ';'; 3( ($ (/z(#)*>8 8/ %T   0 (,';'V'V 3( ($ ! 	 	 	!: ! ! !   	 0 	z4joFFFF     dXbdd    
 ++
 
 
 	
s   AA1 1*B!B< <'C&%C&training_reportc                      j         5   j                              fd           t                              d d|            |j        r|j         _                                                             |            xj        dz  c_         j                                          ddd           dS # 1 swxY w Y   dS )a  Thread waits for its turn before reporting training result to result queue.

        It does this in order to guarantee the FIFO processing of checkpoints.

        The queue size is set to 1 to avoid accumulating unprocessed results.
        If the queue is full, the put operation blocks until a result is consumed.

        TODO: Add a metric to track the blocking time waiting for the
        training result to be consumed by the controller.
        c                       j          dz
  k    S )Nr%   )r]   )r^   r>   s   r0   r1   z0TrainContext._wait_then_report.<locals>.<lambda>5  s    15F5JJ r/   zReporting training result z: r%   N)	r_   wait_forr   infor\   rs   putr]   
notify_all)r>   r   r^   s   ` `r0   _wait_then_reportzTrainContext._wait_then_report&  s)    ( 	5 	5'00JJJJJ   KKS->SS/SS   ) ="1"<!!##''888%%*%%'22444	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5 	5s   B"B99B= B=checkpoint_upload_modevalidate_fnvalidate_configc	                 Z    dt           j        v r ddlm}	  |	|          rt	          d          t          d  j        j        D                       5  |rt          ||          nd xj	        dz  c_	         j	        }
 
                    |          }|t          j        k    r1                     |||          }                     ||
           n|t          j        k    r)t!          ||	          }                     ||
           n|t          j        k    r^d
t$          dt&          t$          t(          f         dt*          d         dt,          ddf
 fd} j                            |||||
           nt	          d|           ddd           dS # 1 swxY w Y   dS )a  
        Upload checkpoint to remote storage and put a training
        result on the result queue of this worker process.

        TODO: the report function should be implemented in the worker instead
        of in the train context. The train context should only keep the train
        related information and not the worker related actions. This refactor
        would also require the `TrainContextCallback` to be updated as well.
        torchr   )contains_tensora  Passing objects containg Torch tensors as metrics is not supported as it will throw an exception on deserialization. You can either convert the tensors to Python objects (ex: `.numpy()`, `.item()`, etc.) or save tensors as part of the checkpoint files instead.c                     g | ]	}|j         
S r.   )	on_report).0callbacks     r0   
<listcomp>z'TrainContext.report.<locals>.<listcomp>d  s+        "  r/   )r   r   Nr%   r   r   r   r\   r   r^   r;   c                 J   	                      | ||	          }                    ||           d S # t          $ rc}t                              d           j        j                                                            t          |                     Y d }~d S d }~ww xY w)NzCheckpoint upload failed in the background thread. Raising eagerly to avoid training in a corrupted state with more potential progress lost due to checkpointing failures.)
r   r   r   r   r   rX   rR   get_exception_queuer   r   )
r   r   r\   r^   r   er   r   r>   r   s
         r0   _upload_checkpoint_and_reportz:TrainContext.report.<locals>._upload_checkpoint_and_report  s    *.*A*A/#&@0++ + ..@QRRRRR$ 	 	 	((B  
 .EYY[[__CAFF        	s   05 
B"ABB"z Invalid checkpoint upload mode: )sysmodulesray.air._internal.torch_utilsr   r   r   rX   rS   r   r^   r   r   SYNCr   r   	NO_UPLOADr   ASYNCrD   r
   r   r   rM   ra   submit)r>   r   r\   r   r   r   r   r   r   r   r^   r   r   r   s   `    ``      @r0   reportzTrainContext.reportA  s   , ck!!EEEEEEw''  O   %  $ 6 N  
 
 P	 P	  '"1 +$3# # #
 #'""a'"" $ 6 #'"M"M## #
 &)=)BBB"&"9"9'8(## # &&8IJJJJ'+?+III"1)#$3# # #
 &&8IJJJJ'+?+EEE),!#s(^ !) 6 (+	
         6 1881'%    !O7MOO  ]P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	s   D=F  F$'F$r*   )NFNN)4r@   rA   rB   r(   rE   rG   rO   r   r   r\   r   r]   rM   r^   	threading	Conditionr_   r   MAX_CHECKPOINT_UPLOAD_THREADSra   rD   rd   rf   ri   rk   rm   ro   rq   rs   ru   rw   r   	VALIDATEDr   r|   r   r   r   r   r   r   r
   r   boolr	   r   r   r   r   r   r   r   r.   r/   r0   rU   rU   o   sD        &&&&++++''''####!!!!2222 *.J&--- !#!!!s2E)2E2G2GI/GGG7I7I18 8 8 "4   6S 6 6 6 63 3 3 3 33 3 3 3 33 3 3 3 39c 9 9 9 92s 2 2 2 2$ $ $3 3 3< < <# # # 7P6Y	
 	
3	
 
"	#	
 	
 	
 	
K.D K K K K K >t,B'C > > > > 48
 
#+C=
	
 
 
 
< U*BVWWW
 .25: 59E
 E
 E
 c3hE
 \*	E

 /3E
 'lC(,67
E
 "/2E
 
E
 E
 E
 XWE
N5.5CF5 5 5 5< .2-17K7P?C PT*.r rc3hr \*r &c]	r
 !5r /7tnr 'lC(,67
r hhtn'Et'KLMr "$r 
r r r r r rr/   rU   _train_contextr;   c                  ~    t           5  t          t          d          t          cddd           S # 1 swxY w Y   dS )ab  Get the internal train context.

    Note:
        This should not be used directly by user-facing APIs. User-facing APIs should
        call :class:`~ray.train.v2._internal.execution.train_fn_utils.TrainFnUtils`
        or use :class:`~ray.train.v2.api.context.TrainContext` instead.

    Returns:
        The internal TrainContext for this worker.
    Nz&TrainContext has not been initialized.)_context_lockr   RuntimeErrorr.   r/   r0   get_train_contextr     s     
  !GHHH                 s   266c                 J    t           5  | ad d d            d S # 1 swxY w Y   d S r*   )r   r   )contexts    r0   set_train_contextr     sq    	 ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! ! !s   )r;   N)Hloggingr   r   r+   concurrent.futuresr   dataclassesr   r   queuer   typingr   r   r	   r
   r   r   rz   ray._common.retryr   	ray.actorr   ray.datar   r    ray.train.v2._internal.constantsr   6ray.train.v2._internal.execution.checkpoint.sync_actorr   (ray.train.v2._internal.execution.storager   r   0ray.train.v2._internal.execution.training_reportr   r   ray.train.v2._internal.utilr   r   ray.train.v2.api.configr   r   ray.train.v2.api.report_configr   r   	ray.trainr   r   r   2ray.train.v2._internal.data_integration.interfacesr    r!   )ray.train.v2._internal.execution.callbackr"   ;ray.train.v2._internal.execution.worker_group.thread_runnerr#   $ray.train.v2.api.reported_checkpointr$   	getLogger__file__r   r   r(   rG   rO   rU   r   rE   Lockr   r   r   r.   r/   r0   <module>r      s    



      1 1 1 1 1 1 ( ( ( ( ( ( ( (       E E E E E E E E E E E E E E E E 



 # # # # # # ! ! ! ! ! ! * * * * * * * * A A A A A A W W W W W W S S S S S S S S               = < < < < < < <       
  H??????????        ONNNNNXXXXXXGGGGGG 
	8	$	$ !"  $       : $        $: : : : : : : :, C C C C C C C CN
 *.& - - - 	  <    "! ! ! ! ! !r/   