
    &`i                     ~   U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZ d dlZd dlmZmZ d dlmZmZmZmZmZ d dlmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3mZ4 d dl5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z=m>Z> erd dlm?Z? d dl@mAZA  ejB        eC          ZDe
 G d d                      ZE G d d          ZF G d d          ZGe6 G d d                      ZH eI            ZJeeK         eLd<   daMeeH         eLd<   deeNeOf         d ee>         fd!ZPdKd#ZQd"eeH         fd$ZRd% ZSd& ZTd'ee#         d"e#fd(ZUd)e#d"dfd*ZVdLd+efd,ZW e7d-.           eW            ddd/d0ed1ee!         d2eeN         d"dfd3                        ZX e7d-.           eW            d"ee!         fd4                        ZY e7d5.           eW            d"eeNef         fd6                        ZZ e7d5.           eW            d"eNfd7                        Z[ e7d5.           eW            d"eNfd8                        Z\ e7d5.           eW            d"eNfd9                        Z] e7d:.           eW            d"eNfd;                        Z^ e7d5.           eW            dMd=                        Z_ e7d5.           eW            d"eNfd>                        Z` e7d5.           eWd?@          d"eafdA                        Zb e7d5.           eWd @          d"eafdB                        Zc e7d5.           eWd @          d"eafdC                        Zd e7d5.           eWd @          d"eafdD                        Ze e7d5.           eWd @          d"eafdE                        Zf e7d-.           eW            	 dLdFeeN         d"edG         fdH                        Zge6 eW            d"e%fdI                        Zhd"eifdJZjdS )N    N)	dataclass)datetime)TYPE_CHECKINGAnyCallableDictOptionalSetType)RunnerThreadStartTraceback)_ERROR_FETCH_TIMEOUT_RESULT_FETCH_TIMEOUTSESSION_MISUSE_LOG_ONCE_KEYTIME_THIS_ITER_S	TIMESTAMP)Dataset)
Checkpoint)Accelerator)StorageContext)CHECKPOINT_DIR_NAMEDETAILED_AUTOFILLED_KEYSRAY_CHDIR_TO_TRIAL_DIRTIME_TOTAL_SWORKER_HOSTNAMEWORKER_NODE_IP
WORKER_PID_v2_migration_warnings_enabledSessionMisuseError)_log_deprecation_warning)queue)DeveloperAPI	PublicAPI)log_once)_valid_resource_shape) PlacementGroupSchedulingStrategySchedulingStrategyT)DataIterator)PlacementGroupFactoryc                       e Zd ZU dZeed<   eed<   eeef         ed<   eed<   eed<   eed<   dZe	e         ed	<   dZ
e	e         ed
<   dS )	TrialInfoz3The trial information to propagate to TrainSession.nameid	resourceslogdir	driver_ipdriver_node_idNexperiment_namerun_id)__name__
__module____qualname____doc__str__annotations__r   floatr3   r	   r4        o/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/train/_internal/session.pyr,   r,   8   s         ==
IIIGGGCJKKKNNN%)OXc]))) FHSM     r=   r,   c                   F    e Zd ZdZdej        fdZd
deded         fdZ	d	S )_FutureTrainingResultzA future that will be resolved to a `_TrainingResult`.

    This is needed for specific schedulers such as PBT that schedule saves.

    This wrapper should be removed after refactoring PBT to not schedule saves anymore.
    futurec                     || _         d S N)rA   )selfrA   s     r>   __init__z_FutureTrainingResult.__init__N   s    r=   Tblockreturn_TrainingResultc                     |rd}nd}	 t          j        | j        |          S # t          $ r Y dS t          $ r(}t
                              d|            Y d}~dS d}~ww xY w)zResolve into ``_TrainingResult``.

        This will return None for function trainables if no checkpoint has been
        saved before.
        Ng&.>timeoutzError resolving result: )raygetrA   TimeoutError	Exceptionloggererror)rD   rF   rK   excs       r>   resolvez_FutureTrainingResult.resolveQ   s      	GGG	;74;8888 	 	 	DD 	; 	; 	;LL9C99:::::::::	;s   $ 
A"	A"AA"N)T)
r5   r6   r7   r8   rL   	ObjectRefrE   boolr	   rS   r<   r=   r>   r@   r@   F   sh         s}    ; ;T ;X6G-H ; ; ; ; ; ;r=   r@   c                   J    e Zd ZdZdee         deeef         fdZ	defdZ
dS )rH   z4A (checkpoint, metrics) result reported by the user.
checkpointmetricsc                 "    || _         || _        d S rC   rW   rX   )rD   rW   rX   s      r>   rE   z_TrainingResult.__init__g   s    $r=   rG   c                 (    d| j          d| j         dS )NzTrainingResult(checkpoint=z
, metrics=)rZ   rD   s    r>   __repr__z_TrainingResult.__repr__k   s    VDOVVt|VVVVr=   N)r5   r6   r7   r8   r	   r   r   r9   r   rE   r^   r<   r=   r>   rH   rH   d   sk        >>8J#7 $sCx.    W# W W W W W Wr=   rH   c                      e Zd ZdZ	 	 	 	 	 	 	 d3dedee         dee         dee         dee         d	ee         d
ee         deee	e
f                  dee	ef         dee         dedee         defdZde	defdZde	defdZd Zd Z	 d4ded
edefdZd Zd4dee         dee         fdZdee         fdZd Zdedee         fd Zd!edefd"Zd!edefd#Zd5d$Zd%eddfd&Z d4d'edee         ddfd(Z!e"de	fd)            Z#e"de	fd*            Z$e"de	fd+            Z%e"de	fd,            Z&e"d6d.            Z'e"de	fd/            Z(	 d4d0ee	         ded1         fd2Z)dS )7_TrainSessionz.Holds information for training on each worker.NFtraining_func
world_rank
local_rank	node_ranklocal_world_size
world_size
trial_infodataset_shardmetadatarW   detailed_autofilled_metricsstoragesynchronous_result_reportingc                    || _         || _        |	| _        || _        || _        || _        || _        || _        |sJ t          	                    d| d|            | 
                    ||||
           || _        t          j                    | _        d| _        d| _        |                                 | _        d | _        i | _        d S )Nz StorageContext on SESSION (rank=z):
)ra   rg   rk   loaded_checkpointr   g        )rl   rh   ri   rb   rc   rd   re   rf   rP   debugresetrj   timelast_report_time	iteration
time_totalget_current_iplocal_ipaccelerator_state)rD   ra   rb   rc   rd   re   rf   rg   rh   ri   rW   rj   rk   rl   s                 r>   rE   z_TrainSession.__init__t   s    6 -I) + $$" 0$wQ
QQQQRRR 	

'!(	 	 	
 	
 	
 ,G( $	++--r=   keyrG   c                 6    | j                             |          S rC   )rx   rM   )rD   ry   s     r>   	get_statez_TrainSession.get_state   s    {s###r=   valuec                     || j         |<   d S rC   )rx   )rD   ry   r|   s      r>   	set_statez_TrainSession.set_state   s     Cr=   c                 V    t           j                                        | _        | j        S rC   )rL   utilget_node_ip_addressrv   r]   s    r>   ru   z_TrainSession.get_current_ip   s    4466}r=   c                 F    d| _         | j                                         dS )zStarts the training thread.TN)training_startedtraining_threadstartr]   s    r>   r   z_TrainSession.start   s%     $""$$$$$r=   c                    t          j        d          | _        t          j                    | _        t          j        d          | _        d | _        t          j        d          | _	        t          |d| j	                  | _        || _        || _        || _        i | _        d| _        d| _        d| _        t'          j        |j        d           t-          t/          t&          j                            t4          d                              r=t6                              d|j                    t'          j        |j                   d S d S )	Nr      T)targetdaemonerror_queueF)exist_ok1z#Changing the working directory to: )	threading	Semaphorecontinue_lockEvent
stop_eventr"   Queueresult_queue_inter_actor_queuer   r   r   rg   rk   rn   rx   ignore_reportr   _first_reportosmakedirstrial_working_directoryrU   intenvironrM   r   rP   ro   chdir)rD   ra   rg   rk   rn   s        r>   rp   z_TrainSession.reset   s@    '033 $/++ "KNN DH
 !;q>>  , 4;K 
  
  

 %!2 " %! 	G3dCCCCBJNN#93??@@AA 	6LLWg6UWW   HW455555		6 	6r=   c                     d| _         dS )z-Ignore all future ``session.report()`` calls.TN)r   r]   s    r>   pause_reportingz_TrainSession.pause_reporting   s    !r=   rK   c                     | j                                          | j                                         | j                            d           d}| j        r| j                            |          }|S )zSFinishes the training thread.

        Raises any Exception from training.
        TforceNrJ   )	r   setr   releaserk   persist_artifactsr   r   join)rD   rK   outputs      r>   finishz_TrainSession.finish   s{     	 	""$$$ 	&&T&222
   	@)..w.??Fr=   c                 2   | j         st          d          | j        r'| j        s| j                                         d| _        d}|J| j                                        r1|                     d          }|| j                                        1||                     d          }|| 	                    d           n3| j
                                        st                              d           | j        s| j                                         |S )zGets the next ``_TrainingResult`` from the result queue.

        If the result queue is empty, then this function returns ``None``.
        z*Please call start before calling get_next.FNTrF   zVRunner error waiting to be raised in main thread. Logging all available results first.)r   RuntimeErrorrl   r   r   r   r   is_alive_get_result_from_queues_report_thread_runner_errorr   emptyrP   ro   )rD   results     r>   get_nextz_TrainSession.get_next  s@   
 $ 	MKLLL, 	' % - "**,,,!&Dn!5!>!>!@!@n111==F n!5!>!>!@!@n > 111>>F >,,4,8888#))++ ?   0 	) &&((( r=   c                 X    | j         t          j        dddi          | _         | j         S )z$Get or create the inter-actor queue.Nr   num_cpusr   )actor_options)r   	ray_queuer   r]   s    r>    _get_or_create_inter_actor_queuez._TrainSession._get_or_create_inter_actor_queueB  s0    "*&/oa
TU&W&W&WD#&&r=   rF   c                 X   d}| j         h	 | j                             |t                    }|r.| j                                         |                     |           n# t          j        $ r Y nw xY w	 | j                            |t                    }n# t          j        $ r Y nw xY w|S )zUGet result from result queue. Pass result from training actor result queue if needed.NrF   rK   )
r   rM   r   r   r   reportr   Emptyr   r"   )rD   rF   r   inter_actor_items       r>   r   z%_TrainSession._get_result_from_queuesH  s    ".	#'#:#>#>)> $? $ $  $ 2&..000KK 0111?   	&**@U*VVFF{ 	 	 	D	s$   AA A/.A/3!B B'&B'r   c           
         t          j                     }t          j                    }t          |v r|t                   }n
|| j        z
  }| xj        dz  c_        | xj        |z  c_        || _        t          t          t          j	        |
                                                    t          | j        t          t          j                    t          t!          j                    t$          | j        i}| j        sd |                                D             }|                                }|                    |           |S )-Add autofilled metrics and update attributes.r   c                 ,    i | ]\  }}|t           v||S r<   )r   .0kvs      r>   
<dictcomp>z4_TrainSession._auto_fill_metrics.<locals>.<dictcomp>q  s4     # # #Aq444 1444r=   )rq   r   nowr   rr   rs   rt   r   r   mktime	timetupler   r   r   getpidr   platformnoder   rv   rj   itemscopyupdate)rD   r   current_timecurrent_datetimetime_this_iterauto_filled_metricss         r>   _auto_fill_metricsz _TrainSession._auto_fill_metrics\  s   y{{#<>>v%%#$45NN)D,AAN!>) , s4;'7'A'A'C'CDDEE$/	X]__DM
 / 	# #/5577# # # )***r=   c                     t          j                    }t          t          t	          j        |                                                    i}|                                }|                    |           |S )r   )	r   r   r   r   rq   r   r   r   r   )rD   r   r   r   s       r>   _auto_fill_checkpoint_metricsz+_TrainSession._auto_fill_checkpoint_metrics{  sc    #<>> s4;'7'A'A'C'CDDEE
 )***r=   c                     	 | j                             |t                    }t          |# t          j        $ r Y d S w xY w)Nr   )r   rM   r   r   r"   r   )rD   rF   es      r>   r   z)_TrainSession._report_thread_runner_error  sO    	 $$5:N$OOA a'{ 	 	 	DD	s   )+ >>training_resultc                 &   |j         r|j         | _        | j                            |d           | j                                         | j                                        r/| j                                         t          j
        d           dS dS )a  Place a training result on the result queue for the main thread to process,
        then block until the main thread signals that training should continue.

        NOTE: This is used internally to report results from Train to Tune
        without persisting checkpoints to storage 2 times.
        `report` is the public API that directly persists to storage, which
        should only be called by user code.
        Tr   r   N)rW   rn   r   putr   acquirer   is_setclearsysexit)rD   r   s     r>   _report_training_resultz%_TrainSession._report_training_result  s     % 	@%4%?D" 	oT::: 	""$$$ ?!!## 	O!!###HQKKKKK	 	r=   rX   c                    dt           j        v r ddlm}  ||          rt	          d          | j        rd S |                     |          }d }|rI| j                            |           | j        	                    |          }| j        j
        |t          <   n
d |t          <   |o| j        j        j        }| j                            |           |rX| j        rQ|                                }| j                                        D ]\  }}||vr|||<   |                    |           t'          ||          }	|                     |	           d S )Ntorchr   )contains_tensorzPassing objects containg Torch tensors as metrics is not supported as it will throw an exception on deserialization. You can either convert the tensors to Python objects or report a `train.Checkpoint` with `ray.train.report` to store your Torch objects.r   rZ   )r   modulesray.air._internal.torch_utilsr   
ValueErrorr   r   rk   _update_checkpoint_indexpersist_current_checkpointcheckpoint_dir_namer   sync_configsync_artifacts_on_checkpointr   ri   get_metadatar   set_metadatarH   r   )
rD   rX   rW   r   persisted_checkpointforce_artifact_syncuser_metadatar   r   r   s
             r>   r   z_TrainSession.report  s   ck!!EEEEEEw''  K    	F))'22# 	0L11'::: $(<#J#J:#V#V +/<+KG'((+/G'( ! F(E 	 	&&-@&AAA   	=DM 	=0==??M++-- ) )1 M))'(M!$ --m<<< ,@'RRR$$V,,,,,r=   c                     | j         j        S rC   )rg   r3   r]   s    r>   r3   z_TrainSession.experiment_name  s    ..r=   c                     | j         j        S rC   )rg   r-   r]   s    r>   
trial_namez_TrainSession.trial_name  s    ##r=   c                     | j         j        S rC   )rg   r.   r]   s    r>   trial_idz_TrainSession.trial_id  s    !!r=   c                     | j         j        S rC   )rg   r4   r]   s    r>   r4   z_TrainSession.run_id      %%r=   r*   c                     | j         j        S rC   )rg   r/   r]   s    r>   trial_resourcesz_TrainSession.trial_resources  s    ((r=   c                     | j         j        S rC   )rg   r0   r]   s    r>   	trial_dirz_TrainSession.trial_dir  r   r=   dataset_namer)   c                     | j         }|t          j        d           n;t          |t                    r&|st          d          |                    |          S |S )NziNo dataset passed in. Returning None. Make sure to pass in a Dataset to Trainer.run to use this function.zMultiple datasets were passed into ``Trainer``, but no ``dataset_name`` is passed into ``get_dataset_shard``. Please specify which dataset shard to retrieve.)rh   warningswarn
isinstancedictr   rM   )rD   r   shards      r>   get_dataset_shardz_TrainSession.get_dataset_shard  s{     "=M   
 t$$ 	+ "1   99\***r=   )NNNNFNFrC   )FrG   r*   )*r5   r6   r7   r8   r   r	   r   r,   r   r9   r   r   r   rU   r   rE   r{   r~   ru   r   rp   r   r;   r   rH   r   r   r   r   r   r   r   r   r   propertyr3   r   r   r4   r   r   r   r<   r=   r>   r`   r`   p   s       88 +/6:#'+/,1,0-2< << SM< SM	<
 C=< #3-< SM< Y'<  S'\ 23< sCx.< Z(< &*< .)< '+< < < <|$S $S $ $ $ $!S ! ! ! ! !  % % % /6 /6/6 /6  	/6 /6 /6 /6b" " " huo #    .3(?3 3 3 3 3j' ' 'T h6O    ( $    >	D 	T 	 	 	 	    4    81- 1-d 1-0D 1-PT 1- 1- 1- 1-f / / / / X/ $C $ $ $ X$ "# " " " X" & & & & X& ) ) ) X) &3 & & & X&
 '+ sm 
.	!     r=   r`   _checked_resources_sessionr/   strategyc                    t          d |                                D                       }|r	|t          v rdS t          |t                    r|j        dS t          j                                        }|r|j        j	        |j	        k    rdS t          
                    |           t                      }|j        r|j        dd         }n|j        dd         }t          ||          rdS | j        r d}| j        dz   | j        z   dz   | j        z   }nd}| j        dz   | j        z   }|j        d         }	d |                                D             }t%          d	| d
| d|	 d| d| d          )zLaunch hook to catch nested tasks that can't fit in the placement group.

    This gives users a nice warning in case they launch a nested task in a Tune trial
    without reserving resources in the trial placement group to fit it.
    c                 (    h | ]\  }}|d k    ||fS r   r<   r   s      r>   	<setcomp>z3_tune_task_and_actor_launch_hook.<locals>.<setcomp>  s%    CCC1QUUaVUUUr=   Nr   r   actor.taskc                 @    i | ]\  }}|d k    |t          |          S r  )r;   r   s      r>   r   z4_tune_task_and_actor_launch_hook.<locals>.<dictcomp>D  s)    DDDAa!eeE!HHeeer=   z3No trial resources are available for launching the z `zg`. To resolve this, specify the Tune option:

>  resources_per_trial=tune.PlacementGroupFactory(
>    [z] + [zB] * N
>  )

Where `N` is the number of slots to reserve for trial zs. If you are using a Ray training library, there might be a utility function to set this automatically for you. For more information, refer to https://docs.ray.io/en/latest/tune/tutorials/tune-resources.html)	frozensetr   r  r   r'   placement_grouprL   r   get_current_placement_groupr.   addget_trial_resourceshead_bundle_is_emptybundle_specsr&   
class_namemodule_namefunction_namer   )
fnr/   r  ry   cur_pgpgfavailable_bundles	submittedr-   main_resourcess
             r>    _tune_task_and_actor_launch_hookr    s    CC	(9(9CCC
D
DC #+++ x!ABB#+ X1133F X-0FI==3 

C
 4"/3"/3 Y(9:: 	} 7	~#bm3c9B<LL	~#b&66 (+NDD):):DDDI
	Ki 	K 	K4 	K 	K  	K 	K '0	K 	K
 BK	K 	K 	K
 
 
r=   rG   c                      t           rt          d          ddlm}m} dt
          j        vrt          |_        t          |_	        t          | i |a d S )NzIA Train session is already in use. Do not call `init_session()` manually.r   )r
  remote_functionTUNE_DISABLE_RESOURCE_CHECKS)r  r   rL   r
  r   r   r   r  _actor_launch_hook_task_launch_hookr`   )argskwargsr
  r   s       r>   init_sessionr&  S  sn     
)
 
 	
 +*******%RZ77#C ,L)d-f--HHHr=   c                      t           S rC   r  r<   r=   r>   get_sessionr)  e  s    Or=   c                  
    da dS )z#Shuts down the initialized session.Nr(  r<   r=   r>   shutdown_sessionr+  i  s     HHHr=   c                       t          d          )zKRaises a SessionMisuseError because a utility function was used improperly.zjprepare/accelerate utility functions should be called inside a training function executed by `Trainer.run`r   r<   r=   r>   !_raise_accelerator_session_misuser-  o  s    
	-  r=   default_accelerator_clsc                 x    t                      }|t                       |j         |             |_        |j        S )zThe accelerator for this training session.

    If an accelerator has not been set, then this method will construct an
    accelerator using the provided accelerator class.

    Raises:
        SessionMisuseError: if the session is uninitialized.
    )r)  r-  rw   )r.  sessions     r>   get_acceleratorr1  w  sA     mmG)+++"5577r=   rw   c                 |    t                      }|t                       |j        t          d          | |_        dS )a   Sets the accelerator for this training session.

    Args:
        accelerator: The accelerator to use for training.

    Raises:
        SessionMisuseError: if the session is unitialized.
        RuntimeError: if the accelerator has already been set.
    Nz#Cannot change accelerator once set.)r)  r-  rw   r   )rw   r0  s     r>   set_acceleratorr3    sD     mmG)+++&@AAA%Gr=   default_valuec                 "     dt           f fd}|S )zKWarns if fn is being used outside of session and returns ``default_value``.r  c                 \      j         t          j                    fd            }|S )Nc                      t                      }|s6t          t           d           rt          j        d d d           S  | i |S )N-`zb` is meant to only be called inside a function that is executed by a Tuner or Trainer. Returning `z`.)r)  r%   r   r   r   )r$  r%  r0  r4  r  fn_names      r>   wrapperz4_warn_session_misuse.<locals>.inner.<locals>.wrapper  s    !mmG %:FFWFFGG MEG E E3@E E E  
 %$2t&v&&&r=   )r5   	functoolswraps)r  r;  r:  r4  s   ` @r>   innerz#_warn_session_misuse.<locals>.inner  sK    +			
	' 
	' 
	' 
	' 
	' 
	' 
	
	' r=   )r   )r4  r>  s   ` r>   _warn_session_misuser?    s/    (      $ Lr=   stable)	stability)rW   r   rX   rW   r   c                   |t                               d           ddlm}  |            r=ddl}t                      rt          d           |j                            | |          S t                                          | |           dS )a  Report metrics and optionally save a checkpoint.

    If a checkpoint is provided, it will be
    :ref:`persisted to storage <persistent-storage-guide>`.

    If this is called in multiple distributed training workers:

    - Only the metrics reported by the rank 0 worker will be tracked by Ray Train.
      See :ref:`the metrics logging guide <train-monitoring-and-logging>`.
    - A checkpoint will be registered as long as one or more workers reports
      checkpoint that is not None.
      See the :ref:`checkpointing guide <train-dl-saving-checkpoints>`.
    - Checkpoints from multiple workers will be merged into one directory
      in persistent storage.
      See :ref:`the distributed checkpointing guide <train-distributed-checkpointing>`.

    .. note::

        Each invocation of this method will automatically increment the underlying
        ``training_iteration`` number. The physical meaning of this "iteration" is
        defined by user depending on how often they call ``report``.
        It does not necessarily map to one epoch.

    .. warning::

        All workers must call `ray.train.report` the same number of times
        so that Ray Train can properly synchronize the training state across
        workers. Otherwise, your training will hang.

    .. warning::

        This method does NOT act as a barrier for distributed training workers.
        Workers will upload their checkpoint, then continue training immediately.
        If you need to synchronize workers, you can use a framework-native barrier
        such as `torch.distributed.barrier()`.

    Example:

        .. testcode::

            import tempfile

            from ray import train
            from ray.train import Checkpoint
            from ray.train.torch import TorchTrainer


            def train_func(config):
                start_epoch = 0
                checkpoint = train.get_checkpoint()
                if checkpoint:
                    with checkpoint.as_directory() as checkpoint_dir:
                        # Load back training state
                        ...

                for epoch in range(start_epoch, config.get("num_epochs", 10)):
                    # Do training...

                    metrics = {"loss": ...}

                    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                       # Save the checkpoint...
                       # torch.save(...)

                        checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

                        # Example: Only the rank 0 worker uploads the checkpoint.
                        if ray.train.get_context().get_world_rank() == 0:
                            train.report(metrics, checkpoint=checkpoint)
                        else:
                            train.report(metrics, checkpoint=None)

            trainer = TorchTrainer(
                train_func, scaling_config=train.ScalingConfig(num_workers=2)
            )

    Args:
        metrics: The metrics you want to report.
        checkpoint: The optional checkpoint you want to report.
    Nz`checkpoint_dir_name` is only supported in the new Ray Train implementation, which can be enabled with `RAY_TRAIN_V2_ENABLED=1`. This argument will be ignored.r   _in_tune_sessionz`ray.train.report` should be switched to `ray.tune.report` when running in a function passed to Ray Tune. This will be an error in the future. See this issue for more context: https://github.com/ray-project/ray/issues/49454)rW   )
rP   warning%ray.tune.trainable.trainable_fn_utilsrD  ray.tuner   r!   tuner   r)  )rX   rW   r   rD  rL   s        r>   r   r     s    p &-	
 	
 	
 GFFFFF ?)++ 	$B   xw:>>>MMZ88888r=   c                      ddl m}   |             r:ddl}t                      rt	          d           |j                                        S t                      j        S )a  Access the latest reported checkpoint to resume from if one exists.

    Example:

        .. testcode::

            import tempfile

            from ray import train
            from ray.train import Checkpoint
            from ray.train.torch import TorchTrainer


            def train_func(config):
                start_epoch = 0
                checkpoint = train.get_checkpoint()
                if checkpoint:
                    with checkpoint.as_directory() as checkpoint_dir:
                        # Load back training state
                        ...

                for epoch in range(start_epoch, config.get("num_epochs", 10)):
                    # Do training...

                    metrics = {"loss": ...}

                    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                       # Save the checkpoint...

                        checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
                        train.report(metrics, checkpoint=checkpoint)

            trainer = TorchTrainer(
                train_func, scaling_config=train.ScalingConfig(num_workers=2)
            )

    Returns:
        Checkpoint object if the session is currently being resumed.
            Otherwise, return None.
    r   rC  Nz`ray.train.get_checkpoint` should be switched to `ray.tune.get_checkpoint` when running in a function passed to Ray Tune. This will be an error in the future. See this issue for more context: https://github.com/ray-project/ray/issues/49454)	rF  rD  rG  r   r!   rH  get_checkpointr)  rn   )rD  rL   s     r>   rJ  rJ  $  s{    X GFFFFF ))++ 	$B   x&&(((==**r=   betac                  (    t                      j        S )z5User metadata dict passed to the Trainer constructor.)r)  ri   r<   r=   r>   r   r   b       ==!!r=   c                  (    t                      j        S )z,Experiment name for the corresponding trial.)r)  r3   r<   r=   r>   get_experiment_namerO  i       ==((r=   c                  (    t                      j        S )z'Trial name for the corresponding trial.)r)  r   r<   r=   r>   get_trial_namerR  p  s     ==##r=   c                  (    t                      j        S )z%Trial id for the corresponding trial.)r)  r   r<   r=   r>   get_trial_idrT  w  rM  r=   alphac                  (    t                      j        S )z0Unique Train Run id for the corresponding trial.)r)  r4   r<   r=   r>   
get_run_idrW  ~  s     ==r=   r*   c                  (    t                      j        S )z,Trial resources for the corresponding trial.)r)  r   r<   r=   r>   r  r    rP  r=   c                  (    t                      j        S )a  Log directory corresponding to the trial directory for a Tune session.
    If calling from a Train session, this will give the trial directory of its parent
    Tune session.

    .. testcode::

        import ray.tune

        def train_func(config):
            print(ray.tune.get_context().get_trial_dir())

        tuner = ray.tune.Tuner(train_func)
        tuner.fit()

    .. testoutput::
        :options: +MOCK

        /Users/root/ray_results/train_func_2023-07-19_15-01-37/train_func_d620c_00000_0_2023-07-19_15-01-40
    )r)  r   r<   r=   r>   get_trial_dirrZ    s    , ==""r=   r   )r4  c                  j    t                      } t          | d          st          d          | j        S )a  Get the current world size (i.e. total number of workers) for this run.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.tensorflow import TensorflowTrainer

        NUM_WORKERS = 2

        def train_loop_per_worker(config):
            assert train.get_context().get_world_size() == NUM_WORKERS

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TensorflowTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=NUM_WORKERS),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rf   z`get_world_size` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r)  hasattrr   rf   r0  s    r>   get_world_sizer^    sA    < mmG7L)) 
9
 
 	

 r=   c                  j    t                      } t          | d          st          d          | j        S )a  Get the world rank of this worker.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.tensorflow import TensorflowTrainer

        def train_loop_per_worker(config):
            if train.get_context().get_world_rank() == 0:
                print("Worker 0")

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TensorflowTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rb   z`get_world_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r)  r\  r   rb   r]  s    r>   get_world_rankr`    sA    : mmG7L)) 
9
 
 	

 r=   c                  j    t                      } t          | d          st          d          | j        S )a  Get the local rank of this worker (rank of the worker on its node).

    .. testcode::

        import torch

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.torch import TorchTrainer

        def train_loop_per_worker(config):
            if torch.cuda.is_available():
                torch.cuda.set_device(train.get_context().get_local_rank())
            ...

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TorchTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rc   z`get_local_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r)  r\  r   rc   r]  s    r>   get_local_rankrb    sB    @ mmG7L)) 
9
 
 	

 r=   c                  j    t                      } t          | d          st          d          | j        S )a  Get the local world size of this node (i.e. number of workers on this node).

    Example:

        .. testcode::

            import ray
            from ray import train
            from ray.train import ScalingConfig
            from ray.train.torch import TorchTrainer

            def train_loop_per_worker():
                print(train.get_context().get_local_world_size())

            train_dataset = ray.data.from_items(
                [{"x": x, "y": x + 1} for x in range(32)])
            trainer = TorchTrainer(train_loop_per_worker,
                scaling_config=ScalingConfig(num_workers=1),
                datasets={"train": train_dataset})
            trainer.fit()

        .. testoutput::
            :hide:

            ...
    re   z`get_local_world_size` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r)  r\  r   re   r]  s    r>   get_local_world_sizerd    sB    : mmG7.// 
9
 
 	

 ##r=   c                  j    t                      } t          | d          st          d          | j        S )a  Get the rank of this node.

    Example:

        .. testcode::

            import ray
            from ray import train
            from ray.train import ScalingConfig
            from ray.train.torch import TorchTrainer

            def train_loop_per_worker():
                print(train.get_context().get_node_rank())

            train_dataset = ray.data.from_items(
                [{"x": x, "y": x + 1} for x in range(32)])
            trainer = TorchTrainer(train_loop_per_worker,
                scaling_config=ScalingConfig(num_workers=1),
                datasets={"train": train_dataset})
            trainer.fit()

        .. testoutput::
            :hide:

            ...
    rd   z`get_node_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r)  r\  r   rd   r]  s    r>   get_node_rankrf  E  sA    : mmG7K(( 
9
 
 	

 r=   r   r)   c                     t                      }t          |d          st          d          |                    |           S )a?  Returns the :class:`ray.data.DataIterator` shard for this worker.

    Call :meth:`~ray.data.DataIterator.iter_torch_batches` or
    :meth:`~ray.data.DataIterator.to_tf` on this shard to convert it to the
    appropriate framework-specific data type.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.torch import TorchTrainer

        def train_loop_per_worker(config):
            ...
            for epoch in range(2):
                # Trainer will automatically handle sharding.
                data_shard = train.get_dataset_shard("train")
                for batch in data_shard.iter_torch_batches():
                    ...

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TorchTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...

    Args:
        dataset_name: If a Dictionary of Datasets was passed to ``Trainer``, then
            specifies which dataset shard to return.

    Returns:
        The ``DataIterator`` shard to use for this worker.
        If no dataset is passed into Trainer, then return None.
    r   z`get_dataset_shard` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r)  r\  r   r   )r   r0  s     r>   r   r   l  sN    ^ mmG7/00 
9
 
 	

 $$\222r=   c                  (    t                      j        S )a2  Returns the :class:`~ray.train._internal.storage.StorageContext` storage
    context which gives advanced access to the filesystem and paths
    configured through `RunConfig`.

    NOTE: This is a developer API, and the `StorageContext` interface may change
    without notice between minor versions.
    )r)  rk   r<   r=   r>   get_storageri    s     ==  r=   c                  b    t          t                                ot                      j        duS )z6Check if the current process is a Ray Train V1 worker.N)rU   r)  rb   r<   r=   r>   _in_ray_train_workerrk    s%    G;==#;4#GGr=   )rG   NrC   r  )kr<  loggingr   r   r"   r   r   rq   r   dataclassesr   r   typingr   r   r   r   r	   r
   r   rL   ray.air._internal.utilr   r   ray.air.constantsr   r   r   r   r   ray.datar   	ray.trainr   ray.train._internal.acceleratorr   ray.train._internal.storager   ray.train.constantsr   r   r   r   r   r   r   r   ray.train.errorr    ray.train.utilsr!   ray.utilr   ray.util.annotationsr#   r$   ray.util.debugr%   ray.util.placement_groupr&   ray.util.scheduling_strategiesr'   r(   r)   #ray.tune.execution.placement_groupsr*   	getLoggerr5   rP   r,   r@   rH   r`   r   r  r  r:   r  r9   r;   r  r&  r)  r+  r-  r1  r3  r?  r   rJ  r   rO  rR  rT  rW  r  rZ  r   r^  r`  rb  rd  rf  r   ri  rU   rk  r<   r=   r>   <module>r     s	         				   



       ! ! ! ! ! !       J J J J J J J J J J J J J J J J J J 



 ? ? ? ? ? ? ? ?                                7 7 7 7 7 7 6 6 6 6 6 6	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 / . . . . . 4 4 4 4 4 4 ' ' ' ' ' ' 8 8 8 8 8 8 8 8 # # # # # # : : : : : :       
  J%%%%%%IIIIII 
	8	$	$ 
! 
! 
! 
! 
! 
! 
! 
!; ; ; ; ; ; ; ;<W W W W W W W W W W W W W W W Wv &)SUU C	N * * * %)(=
! ( ( (=U
#=/78K/L= = = =@. . . .$Xm,        T+-> ;    "& & & & & &$     0 X (,)-	m9 m9 m9m9 $m9 "#	m9
 
m9 m9 m9  m9` X9+, 9+ 9+ 9+  9+x V"d38n " " "  "
 V)S ) ) )  )
 V$ $ $ $  $
 V"c " " "  "
 W C         
 V) ) )  )
 V#s # # #  #. VA&&&# # # # '& #L VA&&&" " " " '& "J VA&&&% % % % '& %P VA&&&"$c "$ "$ "$ '& "$J VA&&&"s " " " '& "J X"&43 433-43n43 43 43  43n !^ ! ! !  !Hd H H H H H Hr=   