
    &`i)                        d dl Z d dlmZmZ d dlmZ d dlmZmZmZ d dl	m
Z
 d dlZdZ ed eD                       Zdd	hZ ed
d                    e                    Z edd                    d eD                                 Z G d de
          Z G d d          Zd Zd Ze j        dd            Z G d de          ZdS )    N)defaultdict
namedtuple)datetime)AnyListOptional)Callback)ray_presubmitray_postsubmitray_pretaskray_posttaskray_postsubmit_all
ray_finishc              #       K   | ]	}d |z   V  
dS )_N .0fields     k/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/util/dask/callbacks.py	<genexpr>r      s&      //%#+//////    r   r   RayCallback RayCallbacksc                     g | ]}|d z   S )_cbsr   r   s     r   
<listcomp>r       s    3T3T3TuEFN3T3T3Tr   c                        e Zd ZdZ e            Z fdZed             Z fdZ	 fdZ
 fdZ fdZdee         fd	Zd
ej        fdZdeej                 fdZd Zdeej                 fdZd Z xZS )RayDaskCallbacka  
    Extends Dask's `Callback` class with Ray-specific hooks. When instantiating
    or subclassing this class, both the normal Dask hooks (e.g. pretask,
    posttask, etc.) and the Ray-specific hooks can be provided.

    See `dask.callbacks.Callback` for usage.

    Caveats: Any Dask-Ray scheduler must bring the Ray-specific callbacks into
    context using the `local_ray_callbacks` context manager, since the built-in
    `local_callbacks` context manager provided by Dask isn't aware of this
    class.
    c                     t           D ].}|                    |d           }|t          | d|z   |           / t                      j        di | d S )Nr   r   )CBSpopsetattrsuper__init__)selfkwargscbcb_func	__class__s       r   r&   zRayDaskCallback.__init__4   se     	1 	1BjjT**G"cBh000""6"""""r   c                 8     t           fdt          D              S )Nc                 2    g | ]}t          |d           S N)getattr)r   r   r'   s     r   r   z1RayDaskCallback._ray_callback.<locals>.<listcomp>>   s%    OOOEWT5$77OOOr   )r   	CB_FIELDSr'   s   `r   _ray_callbackzRayDaskCallback._ray_callback<   s#    OOOOYOOOPPr   c                     t          |           | _        | j                                         t                                                       | S r.   )add_ray_callbacks_ray_cm	__enter__r%   r'   r+   s    r   r6   zRayDaskCallback.__enter__@   sA    (..   r   c                 R     t                      j        |   | j        j        |  d S r.   )r%   __exit__r5   )r'   argsr+   s     r   r9   zRayDaskCallback.__exit__F   s/    $t$$$$r   c                     t          |           j                            | j                   t	                                                       d S r.   )type
ray_activeaddr2   r%   registerr7   s    r   r?   zRayDaskCallback.registerJ   s?    T

!!$"4555r   c                     t          |           j                            | j                   t	                                                       d S r.   )r<   r=   remover2   r%   
unregisterr7   s    r   rB   zRayDaskCallback.unregisterN   s?    T

$$T%7888r   returnc                     dS )at  Run before submitting a Ray task.

        If this callback returns a non-`None` value, Ray does _not_ create
        a task and uses this value as the would-be task's result value.

        Args:
            task: A Dask task, where the first tuple item is
                the task function, and the remaining tuple items are
                the task arguments, which are either the actual argument values,
                or Dask keys into the deps dictionary whose
                corresponding values are the argument values.
            key: The Dask graph key for the given task.
            deps: The dependencies of this task.

        Returns:
            Either None, in which case Ray submits a task, or
            a non-None value, in which case Ray task doesn't submit
            a task and uses this return value as the
            would-be task result value.
        Nr   )r'   taskkeydepss       r   _ray_presubmitzRayDaskCallback._ray_presubmitR   s	    * 	r   
object_refc                     dS )aR  Run after submitting a Ray task.

        Args:
            task: A Dask task, where the first tuple item is
                the task function, and the remaining tuple items are
                the task arguments, which are either the actual argument values,
                or Dask keys into the deps dictionary whose
                corresponding values are the argument values.
            key: The Dask graph key for the given task.
            deps: The dependencies of this task.
            object_ref: The object reference for the
                return value of the Ray task.

        Nr   r'   rE   rF   rG   rI   s        r   _ray_postsubmitzRayDaskCallback._ray_postsubmiti   s	     	r   object_refsc                     dS )a"  Run before executing a Dask task within a Ray task.

        This method executes after Ray submits the task within a Ray
        worker. Ray passes the return value of this task to the
        _ray_posttask callback, if provided.

        Args:
            key: The Dask graph key for the Dask task.
            object_refs: The object references
                for the arguments of the Ray task.

        Returns:
            A value that Ray passes to the corresponding
            _ray_posttask callback, if the callback is defined.
        Nr   r'   rF   rM   s      r   _ray_pretaskzRayDaskCallback._ray_pretaskz   s	      	r   c                     dS )a  Run after executing a Dask task within a Ray task.

        This method executes within a Ray worker. This callback receives the
        return value of the _ray_pretask callback, if provided.

        Args:
            key: The Dask graph key for the Dask task.
            result: The task result value.
            pre_state: The return value of the corresponding
                _ray_pretask callback, if said callback is defined.
        Nr   r'   rF   result	pre_states       r   _ray_posttaskzRayDaskCallback._ray_posttask   s	     	r   c                     dS )zRun after Ray submits all tasks.

        Args:
            object_refs: The object references
                for the output (leaf) Ray tasks of the task graph.
            dsk: The Dask graph.
        Nr   )r'   rM   dsks      r   _ray_postsubmit_allz#RayDaskCallback._ray_postsubmit_all   s	     	r   c                     dS )a   Run after Ray finishes executing all Ray tasks and returns the final
        result.

        Args:
          result: The final result (output) of the Dask
              computation, before any repackaging is done by
              Dask collection-specific post-compute callbacks.
        Nr   r'   rS   s     r   _ray_finishzRayDaskCallback._ray_finish   s	     	r   )__name__
__module____qualname____doc__setr=   r&   propertyr2   r6   r9   r?   rB   r   r   rH   ray	ObjectRefrL   r   rP   rU   rX   r[   __classcell__)r+   s   @r   r    r    #   so         J# # # # # Q Q XQ    % % % % %        #    .3=    "T#--@    $  tCM/B    	 	 	 	 	 	 	r   r    c                        e Zd Zd Zd Zd ZdS )r4   c                 p    d |D             | _         t          j                            | j                    d S )Nc                 ,    g | ]}t          |          S r   )normalize_ray_callback)r   cs     r   r   z.add_ray_callbacks.__init__.<locals>.<listcomp>   s!    GGG033GGGr   )	callbacksr    r=   update)r'   rj   s     r   r&   zadd_ray_callbacks.__init__   s6    GGYGGG"))$.99999r   c                     | S r.   r   r1   s    r   r6   zadd_ray_callbacks.__enter__   s    r   c                 X    | j         D ]!}t          j                            |           "d S r.   )rj   r    r=   discard)r'   r:   ri   s      r   r9   zadd_ray_callbacks.__exit__   s7     	2 	2A&..q1111	2 	2r   N)r\   r]   r^   r&   r6   r9   r   r   r   r4   r4      sA        : : :  2 2 2 2 2r   r4   c                     t          | t                    r| j        S t          | t                    r| S t	          d          )NzFCallbacks must be either 'RayDaskCallback' or 'RayCallback' namedtuple)
isinstancer    r2   r   	TypeError)r)   s    r   rh   rh      sI    "o&& 
	B	$	$ 
	T
 
 	
r   c                     | r't          d t          t          |            D              S t          dgt          t                    z   S )z>Take an iterable of callbacks, return a list of each callback.c              3   >   K   | ]\  }fd |D             pdV  dS )c                 @    g | ]}|st                    t          v |S r   )r"   CBS_DONT_DROP)r   r)   idxs     r   r   z2unpack_ray_callbacks.<locals>.<genexpr>.<listcomp>   s,    FFFbFCH,E,E,E,E,Er   Nr   )r   cbs_rv   s     @r   r   z'unpack_ray_callbacks.<locals>.<genexpr>   sU        C GFFFdFFFN$     r   r   )r   	enumerateziplenr"   )cbss    r   unpack_ray_callbacksr|      sX    
 	0 !*39!5!5  
 	
 rdSXXo//r   c              #      K   | du }|r%t           j        t                      c} t           _        	 | pdV  |r| t           _        dS dS # |r| t           _        w xY w)z
    Allows Dask-Ray callbacks to work with nested schedulers.

    Callbacks will only be used by the first started scheduler they encounter.
    This means that only the outermost scheduler will use global callbacks.
    Nr   )r    r=   r`   )rj   global_callbackss     r   local_ray_callbacksr      s       !D( T1@1KSUU-	?-3o2 	3)2O&&&	3 	3 	3)2O&2222s   A Ac                   ,    e Zd Zd Zd Zd Zd Zd ZdS )ProgressBarCallbackc                    t           j         G d d                      }	 t          j        d          | _        t          j        | j        j                                                   d S # t          $ ra |                    d                                          | _        t          j        | j        j                                                   Y d S w xY w)Nc                   D    e Zd Zd Zd Zd Zd Zd Zd Zd Z	d Z
d	 Zd
S )6ProgressBarCallback.__init__.<locals>.ProgressBarActorc                 .    |                                   d S r.   _initr1   s    r   r&   z?ProgressBarCallback.__init__.<locals>.ProgressBarActor.__init__       

r   c                     |                                 D ]"}| j        |                             |           #|| j        |<   | j                            ||f           d S r.   )keysrG   r>   	submittedsubmission_queueappend)r'   rF   rG   nowdeps        r   submitz=ProgressBarCallback.__init__.<locals>.ProgressBarActor.submit   sc    99;; , ,CIcN&&s++++&)s#%,,c3Z88888r   c                     || j         |<   d S r.   )	scheduledr'   rF   r   s      r   task_scheduledzEProgressBarCallback.__init__.<locals>.ProgressBarActor.task_scheduled   s    &)s###r   c                     || j         |<   d S r.   )finishedr   s      r   finishz=ProgressBarCallback.__init__.<locals>.ProgressBarActor.finish   s    %(c"""r   c                 R    t          | j                  t          | j                  fS r.   )rz   r   r   r1   s    r   rS   z=ProgressBarCallback.__init__.<locals>.ProgressBarActor.result   s!    4>**C,>,>>>r   c                 4   t          t                    }| j                                        D ]_\  }}| j        |         }| j        |         }||z
                                  ||         d<   ||z
                                  ||         d<   `| j        |d<   |S )Nexecution_timescheduling_timesubmission_order)r   dictr   itemsr   r   total_secondsr   )r'   rS   rF   r   r   r   s         r   reportz=ProgressBarCallback.__init__.<locals>.ProgressBarActor.report   s    $T**%)]%8%8%:%: & &MC $s 3I $s 3I !9,#moo 3K 01 "I-#moo 3K 122 .2-B)*r   c                     d S r.   r   r1   s    r   readyz<ProgressBarCallback.__init__.<locals>.ProgressBarActor.ready  s    r   c                 .    |                                   d S r.   r   r1   s    r   resetz<ProgressBarCallback.__init__.<locals>.ProgressBarActor.reset  r   r   c                     g | _         t          d           | _        t          d           | _        t          d           | _        t          t
                    | _        d S r.   )r   r   r   r   r   r`   rG   r1   s    r   r   z<ProgressBarCallback.__init__.<locals>.ProgressBarActor._init  sJ    (*%!,T!2!2!,T!2!2 +D 1 1',,			r   N)r\   r]   r^   r&   r   r   r   rS   r   r   r   r   r   r   r   ProgressBarActorr      s          9 9 9* * *) ) )? ? ?  (    - - - - -r   r   _dask_on_ray_pb)name)	rb   remote	get_actorpbgetr   
ValueErroroptionsr   )r'   r   s     r   r&   zProgressBarCallback.__init__   s    	2	- 2	- 2	- 2	- 2	- 2	- 2	- 
2	-h	,m$566DGGDGM((**+++++ 	, 	, 	,&..4E.FFMMOODGGDGM((**++++++	,s   A	A, ,A'CCc                 j    | j         j                            ||t          j                               d S r.   )r   r   r   r   r   rK   s        r   rL   z#ProgressBarCallback._ray_postsubmit&  s*    c488888r   c                 h    | j         j                            |t          j                               d S r.   )r   r   r   r   r   rO   s      r   rP   z ProgressBarCallback._ray_pretask*  s)    %%c8<>>:::::r   c                 h    | j         j                            |t          j                               d S r.   )r   r   r   r   r   rR   s       r   rU   z!ProgressBarCallback._ray_posttask-  s(    c8<>>22222r   c                 $    t          d           d S )NzAll tasks are completed.)printrZ   s     r   r[   zProgressBarCallback._ray_finish1  s    ()))))r   N)r\   r]   r^   r&   rL   rP   rU   r[   r   r   r   r   r      s`        ;, ;, ;,z9 9 9; ; ;3 3 3* * * * *r   r   r.   )
contextlibcollectionsr   r   r   typingr   r   r   dask.callbacksr	   rb   r"   tupler0   ru   joinr   r   r    r4   rh   r|   contextmanagerr   r   r   r   r   <module>r      s       / / / / / / / /       & & & & & & & & & & # # # # # # 




 E//3/////	 / j66 z.#((3T3TPS3T3T3T*U*UVVJ J J J Jh J J JZ
2 
2 
2 
2 
2 
2 
2 
2
 
 
0 0 0 3 3 3 3"J* J* J* J* J*/ J* J* J* J* J*r   